Internet Programming with Java Course

1.10 : Forward

 

Multithreaded Forward Server with Load Balancing and Failover Features

 

Multithreaded Forward Server Implementation Nakov Forward Server

 

/**

* Nakov TCP Socket Forward Server - freeware

* Version 1.0 - March, 2002

* (c) 2001 by Svetlin Nakov - http://www.nakov.com

*

* Short decription: Nakov Forward Server is designed to forward (redirect) TCP

* connections from a client to a server choosen from a servers list. When a client

* is connected to Nakov Forward Server, a new connection is opened to some of the

* specified destination servers and all the traffic from destination server to

* Nakov Forward Server is redirected to the client and also all the traffic from

* the client to Nakov Forward Server is redirected to destination server. That

* way Nakov Forward Server makes transparent redirection of TCP connections.

* The data transfer schema is the following:

*

* CLIENT <--> NAKOV_FORWARD_SERVER <--> DESTINATION_SERVER

*

* Clients and Destination Servers communicate only with Nakov Forward Server.

*

* Nakov Forward Server supports failt tolerance. When some of the servers in the

* list fail to respond to TCP connect request (dead server), Nakov Forward Server

* tries the next server in the list until it finds alive server. All dead servers

* are checked if they are alive through some time interval and when some server

* becomes available, it is added to alive list. When no server is alive, no

* connection will be established.

*

* Nakov Forward Server supports also load balancing features. If load balancing

* is enabled, when a client connection is accepted, Nakov Forward Server will

* redirect the client to the least loaded server from the servers list. We consider

* the server which hast minimal alive connections established by Nakov Forward

* Server is least loaded.

*

* What we gain when we use Nakov Proxy Server?

* - Destination server does not know the real IP of the client. It thinks

* that the IP of Nakov Forward Server is its client. Thus we can use a server

* licensed for one IP address on several machines simultaneously.

* - Nakov Forward Server can run on a port number that is allowed by the

* firewall and forward to a port number that is not allowed by firewall. Thus,

* started on a server in a local network, it can give access to some disabled

* by the firewall services.

* - Nakov Forward Server can give access to multiple clients to some service

* that is allowed only for some fixed IP address when started on the machine

* with this IP.

* - Fault Tolerance (failover) of Nakov Forward Server helps to avoid faults

* when some of the servers die. Of course there is special hardware for this, but

* it has high price. Instead you can use Nakov Forward Server (that is free).

* If you setup several Nakov Forward Servers configured to use the same set of

* destination servers and if you configure your routers to use redirect traffic

* to both servers, you will obtain a stable fault tolerance system. In such a

* system you have guarantee that crash of any of the servers (including some of

* the Nakov Forward Servers) will not stop the service that these servers provide.

* Of course the destination servers should run in a cluster and replicate their

* sessions.

* - Load balancing helps to avoid overloading of the servers by distributing

* the clients between them. Of course this should be done by special hardware

* called "load balancer", but if we don't have such hardware, we can still use

* this technology. When we use load balancing, all the servers in the list should

* be running in a cluster and there should be no matter what of the servers the

* client is connected to. The servers should communicate each other and replicate

* their session data.

*

* NakovForwardServer.properties configuration file contains all the settings of

* Nakov Forward Server. The only mandatory field is "Servers"

* Destination servers should be in following format:

* Servers = server1:port1, server2:port2, server3:port3, ...

* For example:

* Servers = 192.168.0.22:80, rakiya:80, 192.168.0.23:80, www.nakov.com:80

* Nakov Forward Server listening port should be in format:

* ListeningPort = some_port (in range 1-65535)

* Using load balancing algorithm is specified by following line:

* LoadBalancing = Yes/No

* Check alive interval through which all dead threads should be re-checked if

* they are alive is specified by following line:

* CheckAliveInterval = time_interval (in milliseconds)

*/

 

import java.util.ArrayList;

import java.net.ServerSocket;

import java.net.Socket;

import java.net.InetAddress;

import java.io.IOException;

import java.io.FileInputStream;

import java.util.Properties;

import java.util.StringTokenizer;

 

public class NakovForwardServer

{

private static final boolean ENABLE_LOGGING = true;

public static final String SETTINGS_FILE_NAME = "NakovForwardServer.properties";

 

private ServerDescription[] mServersList = null;

private int mListeningTcpPort = 2001;

private boolean mUseLoadBalancingAlgorithm = true;

private long mCheckAliveIntervalMs = 5*1000;

 

/**

* ServerDescription descripts a server (server hostname/IP, server port,

* is the server alive at last check, how many clients are connected to it, etc.)

*/

class ServerDescription

{

public String host;

public int port;

public int clientsConectedCount = 0;

public boolean isAlive = true;

public ServerDescription(String host, int port)

{

this.host = host;

this.port = port;

}

}

 

/**

* @return an array of ServerDescription - all destination servers.

*/

public ServerDescription[] getServersList()

{

return mServersList;

}

 

/**

* @return the time interval (in milliseconds) through which all dead servers

* should be re-checked if they are alive (a server is alive if accepts

* client connections on the specified port, otherwise is dead).

*/

public long getCheckAliveIntervalMs()

{

return mCheckAliveIntervalMs;

}

 

/**

* @return true if load balancing algorithm is enabled.

*/

public boolean isLoadBalancingEnabled()

{

return mUseLoadBalancingAlgorithm;

}

 

/**

* Reads the Nakov Forward Server configuration file "NakovForwardServer.properties"

* and load user preferences. This method is called once during the server startup.

*/

public void readSettings()

throws Exception

{

// Read properties file in a Property object

Properties props = new Properties();

props.load(new FileInputStream(SETTINGS_FILE_NAME));

 

// Read and parse the server list

String serversProperty = props.getProperty("Servers");

if (serversProperty == null )

throw new Exception("The server list can not be empty.");

try {

ArrayList servers = new ArrayList();

StringTokenizer stServers = new StringTokenizer(serversProperty,",");

while (stServers.hasMoreTokens()) {

String serverAndPort = stServers.nextToken().trim();

StringTokenizer stServerPort = new StringTokenizer(serverAndPort,": ");

String host = stServerPort.nextToken();

int port = Integer.parseInt(stServerPort.nextToken());

servers.add(new ServerDescription(host,port));

}

mServersList =

(ServerDescription[]) servers.toArray(new ServerDescription[] {});

} catch (Exception e) {

throw new Exception("Invalid server list format : " + serversProperty);

}

if (mServersList.length == 0)

throw new Exception("The server list can not be empty.");

 

// Read server's listening port number

try {

mListeningTcpPort = Integer.parseInt(props.getProperty("ListeningPort"));

} catch (Exception e) {

log("Server listening port not specified. Using default port : " +

mListeningTcpPort);

}

 

// Read load balancing property

try {

String loadBalancing = props.getProperty("LoadBalancing").toLowerCase();

mUseLoadBalancingAlgorithm = (loadBalancing.equals("yes") ||

loadBalancing.equals("true") || loadBalancing.equals("1") || loadBalancing.equals("enable") || loadBalancing.equals("enabled"));

} catch (Exception e) {

log("LoadBalancing property is not specified. Using default value : " +

mUseLoadBalancingAlgorithm);

}

 

// Read the check alive interval

try {

mCheckAliveIntervalMs =

Integer.parseInt(props.getProperty("CheckAliveInterval"));

} catch (Exception e) {

log("Check alive interval is not specified. Using default value : " +

mCheckAliveIntervalMs + " ms.");

}

 

}

 

/**

* Starts a thread that re-checks all dead threads if they are alive

* through mCheckAliveIntervalMs millisoconds

*/

private void startCheckAliveThread()

{

CheckAliveThread checkAliveThread = new CheckAliveThread(this);

checkAliveThread.setDaemon(true);

checkAliveThread.start();

}

 

/**

* Starts the forward server - binds on a given port and starts serving

*/

public void startForwardServer()

throws Exception

{

// Bind server on given TCP port

ServerSocket serverSocket;

try {

serverSocket = new ServerSocket(mListeningTcpPort);

} catch (IOException ioe) {

throw new IOException("Unable to bind to port " + mListeningTcpPort);

}

 

log("Nakov Forward Server started on TCP port " + mListeningTcpPort + ".");

log("All TCP connections to " + InetAddress.getLocalHost().getHostAddress() +

":" + mListeningTcpPort + " will be forwarded to the following servers:");

for (int i=0; i<mServersList.length; i++) {

log(" " + mServersList[i].host + ":" + mServersList[i].port);

}

log("Load balancing algorithm is " +

(mUseLoadBalancingAlgorithm ? "ENABLED." : "DISABLED."));

 

// Accept client connections and process them until stopped

while(true) {

try {

Socket clientSocket = serverSocket.accept();

String clientHostPort = clientSocket.getInetAddress().getHostAddress() +

":" + clientSocket.getPort();

log("Accepted client from " + clientHostPort);

ForwardServerClientThread forwardThread =

new ForwardServerClientThread(this, clientSocket);

forwardThread.start();

} catch (Exception e) {

throw new Exception("Unexpected error.\n" + e.toString());

}

}

}

 

/**

* Prints given log message on the standart output if logging is enabled,

* otherwise ignores it

*/

public void log(String aMessage)

{

if (ENABLE_LOGGING)

System.out.println(aMessage);

}

 

/**

* Program entry point. Reads settings, starts check-alive thread and

* the forward server

*/

public static void main(String[] aArgs)

{

NakovForwardServer srv = new NakovForwardServer();

try {

srv.readSettings();

srv.startCheckAliveThread();

srv.startForwardServer();

} catch (Exception e) {

e.printStackTrace();

}

}

 

}

 

 

/**

* ForwardServerClientThread handles the clients of Nakov Forward Server. It

* finds suitable server from the server pool, connects to it and starts

* the TCP forwarding between given client and its assigned server. After

* the forwarding is failed and the two threads are stopped, closes the sockets.

*/

 

import java.net.Socket;

import java.net.SocketException;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

 

public class ForwardServerClientThread extends Thread

{

private NakovForwardServer mNakovForwardServer = null;

private NakovForwardServer.ServerDescription mServer = null;

private Socket mClientSocket = null;

private Socket mServerSocket = null;

private boolean mBothConnectionsAreAlive = false;

private String mClientHostPort;

private String mServerHostPort;

 

/**

* Creates a client thread for handling clients of NakovForwardServer.

* A client socket should be connected and passed to this constructor.

* A server socket is created later by run() method.

*/

public ForwardServerClientThread(NakovForwardServer aNakovForwardServer,

Socket aClientSocket)

{

mNakovForwardServer = aNakovForwardServer;

mClientSocket = aClientSocket;

}

 

/**

* Obtains a destination server socket to some of the servers in the list.

* Starts two threads for forwarding : "client in <--> dest server out" and

* "dest server in <--> client out", waits until one of these threads stop

* due to read/write failure or connection closure. Closes opened connections.

*/

public void run()

{

try {

mClientHostPort = mClientSocket.getInetAddress().getHostAddress() +

":" + mClientSocket.getPort();

 

// Create a new socket connection to one of the servers from the list

mServerSocket = createServerSocket();

if (mServerSocket == null) { // If all the servers are down

System.out.println("Can not establish connection for client " +

mClientHostPort + ". All the servers are down.");

try { mClientSocket.close(); } catch (IOException e) {}

return;

}

 

// Obtain input and output streams of server and client

InputStream clientIn = mClientSocket.getInputStream();

OutputStream clientOut = mClientSocket.getOutputStream();

InputStream serverIn = mServerSocket.getInputStream();

OutputStream serverOut = mServerSocket.getOutputStream();

 

mServerHostPort = mServer.host + ":" + mServer.port;

mNakovForwardServer.log("TCP Forwarding " + mClientHostPort +

" <--> " + mServerHostPort + " started.");

 

// Start forwarding of socket data between server and client

ForwardThread clientForward = new ForwardThread(this, clientIn, serverOut);

ForwardThread serverForward = new ForwardThread(this, serverIn, clientOut);

mBothConnectionsAreAlive = true;

clientForward.start();

serverForward.start();

 

} catch (IOException ioe) {

ioe.printStackTrace();

}

}

 

/**

* connectionBroken() method is called by forwarding child threads to notify

* this thread (their parent thread) that one of the connections (server or client)

* is broken (a read/write failure occured). This method disconnects both server

* and client sockets causing both threads to stop forwarding.

*/

public synchronized void connectionBroken()

{

if (mBothConnectionsAreAlive) {

// One of the connections is broken. Close the other connection

// and stop forwarding

// Closing these socket connections will close their input/output streams

// and that way will stop the threads that read from these streams

try { mServerSocket.close(); } catch (IOException e) {}

try { mClientSocket.close(); } catch (IOException e) {}

 

mBothConnectionsAreAlive = false;

mServer.clientsConectedCount--;

 

mNakovForwardServer.log("TCP Forwarding " + mClientHostPort +

" <--> " + mServerHostPort + " stopped.");

}

}

 

/**

* @return a new socket connected to some of the servers in the destination

* servers list. Sequentially a connection to the least loaded server from

* the list is tried to be established. If connecting to some alive server

* fail, this server it marked as dead and next alive server is tried. If all

* the servers are dead, null is returned. Thus if at least one server is alive,

* a connection will be established (of course after some delay) and the system

* will not fail (it is fault tolerant). Dead servers can be marked as alive if

* revived, but this is done later by check alive thread.

*/

private Socket createServerSocket()

throws IOException

{

while (true) {

mServer = getServerWithMinimalLoad();

if (mServer == null) // All the servers are down

return null;

try {

Socket socket = new Socket(mServer.host, mServer.port);

mServer.clientsConectedCount++;

return socket;

} catch (IOException ioe) {

mServer.isAlive = false;

}

}

}

 

/**

* @return the least loaded alive server from the server list if load balancing

* is enabled or first alive server from the list if load balancing algorithm is

* disabled or null if all the servers in the list are dead.

*/

private NakovForwardServer.ServerDescription getServerWithMinimalLoad()

{

NakovForwardServer.ServerDescription minLoadServer = null;

NakovForwardServer.ServerDescription[] servers =

mNakovForwardServer.getServersList();

for (int i=0; i<servers.length; i++) {

if (servers[i].isAlive) {

if ((minLoadServer==null) ||

(servers[i].clientsConectedCount < minLoadServer.clientsConectedCount))

minLoadServer = servers[i];

// If load balancing is disabled, return first alive server

if (!mNakovForwardServer.isLoadBalancingEnabled())

break;

}

}

return minLoadServer;

}

 

}

 

 

/**

* ForwardThread handles the TCP forwarding between a socket input stream (source)

* and a socket output stream (destination). It reads the input stream and forwards

* everything to the output stream. If some of the streams fails, the forwarding

* is stopped and the parent thread is notified to close all its connections.

*/

 

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

 

public class ForwardThread extends Thread

{

private static final int READ_BUFFER_SIZE = 8192;

 

InputStream mInputStream = null;

OutputStream mOutputStream = null;

 

ForwardServerClientThread mParent = null;

 

/**

* Creates a new traffic forward thread specifying its input stream,

* output stream and parent thread

*/

public ForwardThread(ForwardServerClientThread aParent, InputStream aInputStream,

OutputStream aOutputStream)

{

mInputStream = aInputStream;

mOutputStream = aOutputStream;

mParent = aParent;

}

 

/**

* Runs the thread. Until it is possible, reads the input stream and puts read

* data in the output stream. If reading can not be done (due to exception or

* when the stream is at his end) or writing is failed, exits the thread.

*/

public void run()

{

byte[] buffer = new byte[READ_BUFFER_SIZE];

try {

while (true) {

int bytesRead = mInputStream.read(buffer);

if (bytesRead == -1)

break; // End of stream is reached --> exit the thread

mOutputStream.write(buffer, 0, bytesRead);

}

} catch (IOException e) {

// Read/write failed --> connection is broken --> exit the thread

}

 

// Notify parent thread that the connection is broken and forwarding should stop

mParent.connectionBroken();

}

 

}

 

 

/**

* CheckAliveThread checks all dead servers in the server list and updates the

* list when some dead server becomes alive. Checking is done on a beforehand

* specified time itrervals. A server is considered alive if it accepts client

* connections on the specified port.

*/

 

import java.io.IOException;

import java.net.Socket;

 

 

public class CheckAliveThread extends Thread

{

private NakovForwardServer mNakovForwardServer = null;

 

/**

* Creates a check alive thread. NakovForwardServer object is needed

* for obtaining the servers list.

*/

public CheckAliveThread(NakovForwardServer aNakovForwardServer)

{

mNakovForwardServer = aNakovForwardServer;

}

 

/**

* Until stopped checks all dead servers if they are alive and waits

* specified time interval

*/

public void run()

{

while (!interrupted()) {

try {

Thread.sleep(mNakovForwardServer.getCheckAliveIntervalMs());

} catch (InterruptedException ie) {

ie.printStackTrace();

}

checkAllDeadServers();

}

}

 

/**

* Checks all dead servers if they are alive and updates their state if needed.

*/

private void checkAllDeadServers()

{

NakovForwardServer.ServerDescription[] servers =

mNakovForwardServer.getServersList();

for (int i=0; i<servers.length; i++) {

if (!servers[i].isAlive)

if (alive(servers[i].host, servers[i].port)) {

servers[i].isAlive = true;

}

}

}

 

/**

* Checks if given server is alive (if accepts client connections on specified port)

*/

private boolean alive(String host, int port)

{

boolean result = false;

try {

Socket s = new Socket(host, port);

result = true;

s.close();

} catch (IOException ioe) {

ioe.printStackTrace();

// Ignore unsuccessfull connect attempts

}

return result;

}

 

}