Internet Programming with Java Course

1.10 Ïðèìåð: Ðàçðàáîòêà íà Forward ñúðâúð


Multithreaded Forward Server with Load Balancing and Failover Features


Multithreaded Forward Server Implementation – Nakov Forward Server



 * Nakov TCP Socket Forward Server - freeware

 * Version 1.0 - March, 2002

 * (c) 2001 by Svetlin Nakov -


 * Short decription: Nakov Forward Server is designed to forward (redirect) TCP

 * connections from a client to a server choosen from a servers list. When a client

 * is connected to Nakov Forward Server, a new connection is opened to some of the

 * specified destination servers and all the traffic from destination server to

 * Nakov Forward Server is redirected to the client and also all the traffic from

 * the client to Nakov Forward Server is redirected to destination server. That

 * way Nakov Forward Server makes transparent redirection of TCP connections.

 * The data transfer schema is the following:




 * Clients and Destination Servers communicate only with Nakov Forward Server.


 * Nakov Forward Server supports failt tolerance. When some of the servers in the

 * list fail to respond to TCP connect request (dead server), Nakov Forward Server

 * tries the next server in the list until it finds alive server. All dead servers

 * are checked if they are alive through some time interval and when some server

 * becomes available, it is added to alive list. When no server is alive, no

 * connection will be established.


 * Nakov Forward Server supports also load balancing features. If load balancing

 * is enabled, when a client connection is accepted, Nakov Forward Server will

 * redirect the client to the least loaded server from the servers list. We consider

 * the server which hast minimal alive connections established by Nakov Forward

 * Server is least loaded.


 * What we gain when we use Nakov Proxy Server?

 *     - Destination server does not know the real IP of the client. It thinks

 * that the IP of Nakov Forward Server is its client. Thus we can use a server

 * licensed for one IP address on several machines simultaneously.

 *     - Nakov Forward Server can run on a port number that is allowed by the

 * firewall and forward to a port number that is not allowed by firewall. Thus,

 * started on a server in a local network, it can give access to some disabled

 * by the firewall services.

 *     - Nakov Forward Server can give access to multiple clients to some service

 * that is allowed only for some fixed IP address when started on the machine

 * with this IP.

 *     - Fault Tolerance (failover) of Nakov Forward Server helps to avoid faults

 * when some of the servers die. Of course there is special hardware for this, but

 * it has high price. Instead you can use Nakov Forward Server (that is free).

 * If you setup several Nakov Forward Servers configured to use the same set of

 * destination servers and if you configure your routers to use redirect traffic

 * to both servers, you will obtain a stable fault tolerance system. In such a

 * system you have guarantee that crash of any of the servers (including some of

 * the Nakov Forward Servers) will not stop the service that these servers provide.

 * Of course the destination servers should run in a cluster and replicate their

 * sessions.

 *     - Load balancing helps to avoid overloading of the servers by distributing

 * the clients between them. Of course this should be done by special hardware

 * called "load balancer", but if we don't have such hardware, we can still use

 * this technology. When we use load balancing, all the servers in the list should

 * be running in a cluster and there should be no matter what of the servers the

 * client is connected to. The servers should communicate each other and replicate

 * their session data.


 * configuration file contains all the settings of

 * Nakov Forward Server. The only mandatory field is "Servers"

 * Destination servers should be in following format:

 *     Servers = server1:port1, server2:port2, server3:port3, ...

 * For example:

 *     Servers =, rakiya:80,,

 * Nakov Forward Server listening port should be in format:

 *     ListeningPort = some_port (in range 1-65535)

 * Using load balancing algorithm is specified by following line:

 *     LoadBalancing = Yes/No

 * Check alive interval through which all dead threads should be re-checked if

 * they are alive is specified by following line:

 *     CheckAliveInterval = time_interval (in milliseconds)



import java.util.ArrayList;






import java.util.Properties;

import java.util.StringTokenizer;


public class NakovForwardServer


    private static final boolean ENABLE_LOGGING = true;

    public static final String SETTINGS_FILE_NAME = "";


    private ServerDescription[] mServersList = null;

    private int mListeningTcpPort = 2001;

    private boolean mUseLoadBalancingAlgorithm = true;

    private long mCheckAliveIntervalMs = 5*1000;



     * ServerDescription descripts a server (server hostname/IP, server port,

     * is the server alive at last check, how many clients are connected to it, etc.)


    class ServerDescription


        public String host;

        public int port;

        public int clientsConectedCount = 0;

        public boolean isAlive = true;

        public ServerDescription(String host, int port)


  = host;

           this.port = port;





     * @return an array of ServerDescription - all destination servers.


    public ServerDescription[] getServersList()


        return mServersList;




     * @return the time interval (in milliseconds) through which all dead servers

     * should be re-checked if they are alive (a server is alive if accepts

     * client connections on the specified port, otherwise is dead).


    public long getCheckAliveIntervalMs()


        return mCheckAliveIntervalMs;




     * @return true if load balancing algorithm is enabled.


    public boolean isLoadBalancingEnabled()


        return mUseLoadBalancingAlgorithm;




     * Reads the Nakov Forward Server configuration file ""

     * and load user preferences. This method is called once during the server startup.


    public void readSettings()

    throws Exception


        // Read properties file in a Property object

        Properties props = new Properties();

        props.load(new FileInputStream(SETTINGS_FILE_NAME));


        // Read and parse the server list

        String serversProperty = props.getProperty("Servers");

        if (serversProperty == null )

           throw new Exception("The server list can not be empty.");

        try {

           ArrayList servers = new ArrayList();

           StringTokenizer stServers = new StringTokenizer(serversProperty,",");

           while (stServers.hasMoreTokens()) {

               String serverAndPort = stServers.nextToken().trim();

               StringTokenizer stServerPort = new StringTokenizer(serverAndPort,": ");

               String host = stServerPort.nextToken();

               int port = Integer.parseInt(stServerPort.nextToken());

               servers.add(new ServerDescription(host,port));


           mServersList =

(ServerDescription[]) servers.toArray(new ServerDescription[] {});

        } catch (Exception e) {

           throw new Exception("Invalid server list format : " + serversProperty);


        if (mServersList.length == 0)

           throw new Exception("The server list can not be empty.");


        // Read server's listening port number

        try {

           mListeningTcpPort = Integer.parseInt(props.getProperty("ListeningPort"));

        } catch (Exception e) {

           log("Server listening port not specified. Using default port : " +




        // Read load balancing property

        try {

           String loadBalancing = props.getProperty("LoadBalancing").toLowerCase();

            mUseLoadBalancingAlgorithm = (loadBalancing.equals("yes") ||

loadBalancing.equals("true") || loadBalancing.equals("1") || loadBalancing.equals("enable") || loadBalancing.equals("enabled"));

        } catch (Exception e) {

           log("LoadBalancing property is not specified. Using default value : " +




        // Read the check alive interval

        try {

           mCheckAliveIntervalMs =


        } catch (Exception e) {

           log("Check alive interval is not specified. Using default value : " +

mCheckAliveIntervalMs + " ms.");






     * Starts a thread that re-checks all dead threads if they are alive

     * through mCheckAliveIntervalMs millisoconds


    private void startCheckAliveThread()


        CheckAliveThread checkAliveThread = new CheckAliveThread(this);






     * Starts the forward server - binds on a given port and starts serving


    public void startForwardServer()

    throws Exception


        // Bind server on given TCP port

        ServerSocket serverSocket;

        try {

           serverSocket = new ServerSocket(mListeningTcpPort);

        } catch (IOException ioe) {

           throw new IOException("Unable to bind to port " + mListeningTcpPort);



        log("Nakov Forward Server started on TCP port " + mListeningTcpPort + ".");

        log("All TCP connections to " + InetAddress.getLocalHost().getHostAddress() +

":" + mListeningTcpPort + " will be forwarded to the following servers:");

        for (int i=0; i<mServersList.length; i++) {

           log("  " + mServersList[i].host +  ":" + mServersList[i].port);


        log("Load balancing algorithm is " +

(mUseLoadBalancingAlgorithm ? "ENABLED." : "DISABLED."));


        // Accept client connections and process them until stopped

        while(true) {

           try {

               Socket clientSocket = serverSocket.accept();

               String clientHostPort = clientSocket.getInetAddress().getHostAddress() +

":" + clientSocket.getPort();

               log("Accepted client from " + clientHostPort);

               ForwardServerClientThread forwardThread =

new ForwardServerClientThread(this, clientSocket);


           } catch (Exception e) {

               throw new Exception("Unexpected error.\n" + e.toString());






     * Prints given log message on the standart output if logging is enabled,

     * otherwise ignores it


    public void log(String aMessage)


        if (ENABLE_LOGGING)





     * Program entry point. Reads settings, starts check-alive thread and

     * the forward server


    public static void main(String[] aArgs)


        NakovForwardServer srv = new NakovForwardServer();

        try {




        } catch (Exception e) {









 * ForwardServerClientThread handles the clients of Nakov Forward Server. It

 * finds suitable server from the server pool, connects to it and starts

 * the TCP forwarding between given client and its assigned server. After

 * the forwarding is failed and the two threads are stopped, closes the sockets.









public class ForwardServerClientThread extends Thread


    private NakovForwardServer mNakovForwardServer = null;

    private NakovForwardServer.ServerDescription mServer = null;

    private Socket mClientSocket = null;

    private Socket mServerSocket = null;

    private boolean mBothConnectionsAreAlive = false;

    private String mClientHostPort;

    private String mServerHostPort;



     * Creates a client thread for handling clients of NakovForwardServer.

     * A client socket should be connected and passed to this constructor.

     * A server socket is created later by run() method.


    public ForwardServerClientThread(NakovForwardServer aNakovForwardServer,

Socket aClientSocket)


        mNakovForwardServer = aNakovForwardServer;

        mClientSocket = aClientSocket;




     * Obtains a destination server socket to some of the servers in the list.

     * Starts two threads for forwarding : "client in <--> dest server out" and

     * "dest server in <--> client out", waits until one of these threads stop

     * due to read/write failure or connection closure. Closes opened connections.


    public void run()


        try {

           mClientHostPort = mClientSocket.getInetAddress().getHostAddress() +

":" + mClientSocket.getPort();


           // Create a new socket connection to one of the servers from the list

           mServerSocket = createServerSocket();

           if (mServerSocket == null) {  // If all the servers are down

               System.out.println("Can not establish connection for client " +

                   mClientHostPort + ". All the servers are down.");

               try { mClientSocket.close(); } catch (IOException e) {}




           // Obtain input and output streams of server and client

           InputStream clientIn = mClientSocket.getInputStream();

           OutputStream clientOut = mClientSocket.getOutputStream();

           InputStream serverIn = mServerSocket.getInputStream();

           OutputStream serverOut = mServerSocket.getOutputStream();


           mServerHostPort = + ":" + mServer.port;

           mNakovForwardServer.log("TCP Forwarding  " + mClientHostPort +

" <--> " + mServerHostPort + "  started.");


           // Start forwarding of socket data between server and client

           ForwardThread clientForward = new ForwardThread(this, clientIn, serverOut);

           ForwardThread serverForward = new ForwardThread(this, serverIn, clientOut);

           mBothConnectionsAreAlive = true;




        } catch (IOException ioe) {






     * connectionBroken() method is called by forwarding child threads to notify

     * this thread (their parent thread) that one of the connections (server or client)

     * is broken (a read/write failure occured). This method disconnects both server

     * and client sockets causing both threads to stop forwarding.


    public synchronized void connectionBroken()


        if (mBothConnectionsAreAlive) {

           // One of the connections is broken. Close the other connection

// and stop forwarding

           // Closing these socket connections will close their input/output streams

           // and that way will stop the threads that read from these streams

           try { mServerSocket.close(); } catch (IOException e) {}

           try { mClientSocket.close(); } catch (IOException e) {}


           mBothConnectionsAreAlive = false;



           mNakovForwardServer.log("TCP Forwarding  " + mClientHostPort +

" <--> " + mServerHostPort + "  stopped.");





     * @return a new socket connected to some of the servers in the destination

     * servers list. Sequentially a connection to the least loaded server from

     * the list is tried to be established. If connecting to some alive server

     * fail, this server it marked as dead and next alive server is tried. If all

     * the servers are dead, null is returned. Thus if at least one server is alive,

     * a connection will be established (of course after some delay) and the system

     * will not fail (it is fault tolerant). Dead servers can be marked as alive if

     * revived, but this is done later by check alive thread.


    private Socket createServerSocket()

    throws IOException


        while (true) {

           mServer = getServerWithMinimalLoad();

           if (mServer == null)  // All the servers are down

               return null;

           try {

               Socket socket = new Socket(, mServer.port);


               return socket;

           } catch (IOException ioe) {

               mServer.isAlive = false;






     * @return the least loaded alive server from the server list if load balancing

     * is enabled or first alive server from the list if load balancing algorithm is

     * disabled or null if all the servers in the list are dead.


    private NakovForwardServer.ServerDescription getServerWithMinimalLoad()


        NakovForwardServer.ServerDescription minLoadServer = null;

        NakovForwardServer.ServerDescription[] servers =


        for (int i=0; i<servers.length; i++) {

           if (servers[i].isAlive) {

               if ((minLoadServer==null) ||

(servers[i].clientsConectedCount < minLoadServer.clientsConectedCount))

                   minLoadServer = servers[i];

               // If load balancing is disabled, return first alive server

               if (!mNakovForwardServer.isLoadBalancingEnabled())




        return minLoadServer;







 * ForwardThread handles the TCP forwarding between a socket input stream (source)

 * and a socket output stream (destination). It reads the input stream and forwards

 * everything to the output stream. If some of the streams fails, the forwarding

 * is stopped and the parent thread is notified to close all its connections.







public class ForwardThread extends Thread


private static final int READ_BUFFER_SIZE = 8192;


    InputStream mInputStream = null;

    OutputStream mOutputStream = null;


    ForwardServerClientThread mParent = null;



     * Creates a new traffic forward thread specifying its input stream,

     * output stream and parent thread


    public ForwardThread(ForwardServerClientThread aParent, InputStream aInputStream,

OutputStream aOutputStream)


        mInputStream = aInputStream;

        mOutputStream = aOutputStream;

        mParent = aParent;




     * Runs the thread. Until it is possible, reads the input stream and puts read

     * data in the output stream. If reading can not be done (due to exception or

     * when the stream is at his end) or writing is failed, exits the thread.


    public void run()


        byte[] buffer = new byte[READ_BUFFER_SIZE];

        try {

           while (true) {

               int bytesRead =;

               if (bytesRead == -1)

break; // End of stream is reached --> exit the thread

               mOutputStream.write(buffer, 0, bytesRead);


        } catch (IOException e) {

           // Read/write failed --> connection is broken --> exit the thread



        // Notify parent thread that the connection is broken and forwarding should stop








 * CheckAliveThread checks all dead servers in the server list and updates the

 * list when some dead server becomes alive. Checking is done on a beforehand

 * specified time itrervals. A server is considered alive if it accepts client

 * connections on the specified port.







public class CheckAliveThread extends Thread


private NakovForwardServer mNakovForwardServer = null;



     * Creates a check alive thread. NakovForwardServer object is needed

     * for obtaining the servers list.


    public CheckAliveThread(NakovForwardServer aNakovForwardServer)


        mNakovForwardServer = aNakovForwardServer;




     * Until stopped checks all dead servers if they are alive and waits

     * specified time interval


    public void run()


        while (!interrupted()) {

            try {


           } catch (InterruptedException ie)     {








     * Checks all dead servers if they are alive and updates their state if needed.


    private void checkAllDeadServers()


        NakovForwardServer.ServerDescription[] servers =


        for (int i=0; i<servers.length; i++) {

           if (!servers[i].isAlive)

               if (alive(servers[i].host, servers[i].port))     {

                   servers[i].isAlive = true;






     * Checks if given server is alive (if accepts client connections on specified port)


    private boolean alive(String host, int port)


        boolean result = false;

        try {

           Socket s = new Socket(host, port);

           result = true;


        } catch (IOException ioe) {


           // Ignore unsuccessfull connect attempts


        return result;


