/** * * MulticastTunnelServer * * This is a server that listens on a given (via command line) port * for commands from clients. When it recieves a connect command, * it subscribes to the specified multicast group (if it isn't already) * and forwards all packets it hears on that group to the client, via * unicast. This avoids the need for multicast routers between the * client and the server. * * Two lists are maintained, one of them a list of clients, specified * by unicast address and port number, and one a list of Channels, * specified by multicast address and port number. An instance of a * channel listens on a multicast address and forwards packets to all * the clients who have subscribed. * * The valid commands to this server are: * * connect clientMachineAddress clientMachinePort multicastAddress multicastPort * disconnect * keepAlive * * Note that the commands should have a trailing space; this separates the * end of the command string from any stray data that may exist at the * end of the datagram packet we recieve. * * The connect command creates a channel, if it doesn't already exist, * and subscribes the client. * * The disconnect command removes the client from all channels. * * The keepAlive command asserts that we really do have someone * listening on the other side. If no keepAlive command is recieved * after five minutes, the client will be disconnected from all channels. * * This code requires the use of JDK 1.2, for some collection classes. * * Invocation: java mil.navy.nps.bridge.MulticastTunnelServer * * Author: Don McGregor * Date: 2/19/99 */ package mil.navy.nps.bridge; import java.io.*; import java.net.*; import java.util.*; public class MulticastTunnelServer { HashSet clients; // Set of people subscribed to us HashSet channels; // multicast addresses we listen on DatagramSocket serverSocket; // Socket we use for server communications private static MulticastTunnelServer singleInstance = null; /** * implementation of the singleton pattern. Uses a private * constructor to build a single instance of MulticastTunnelServer. */ public static MulticastTunnelServer getInstance(int pPort) { if(singleInstance == null) { singleInstance = new MulticastTunnelServer(pPort); } return singleInstance; } /** * private Constructor; listen on the given socket for commands. * Object should be accessed through the getinstance() method. */ private MulticastTunnelServer(int pServerPort) { clients = new HashSet(); channels = new HashSet(); try { serverSocket = new DatagramSocket(pServerPort); } catch(SocketException socke) { System.out.println("Socket exception: " + socke); } catch(SecurityException se) { System.out.println("can't run this from an applet" + se); } } /** * Implementation of the Runnable interface. This listens for commands * and forwards them to the appropriate handler. It also kicks of a * "reaper" thread that handles the keepAlive timeout housekeeping. */ public void run() { Thread reaperThread; // Handles keep-alive housekeeping Reaper reaper; // keep-alive handler System.out.println("starting server...."); // Start up a thread that handles keepAlive housekeeping by checking // the timestamps of all clients periodically. Clients that have not // been heard from lately are culled. Note the this.new Reaper() // line; this is an inner class, which means that this instance of // MulticastTunnelServer will be the "enclosing object" for that instance. // reaper will have access to our private variables. reaper = this.new Reaper(); reaperThread = new Thread(reaper); reaperThread.start(); while(true) { DatagramPacket packet; // Received datagram byte buff[] = new byte[1500]; String commandString; // Command we get StringTokenizer tokenizer; // Easy way to parse out command String token; // Get a new packet from the network..... packet = new DatagramPacket(buff, buff.length); try { serverSocket.receive(packet); } catch(IOException ioe) { System.out.println("can't read from socket " + ioe); } // Find out what kind of command this is commandString = new String(packet.getData()); System.out.println("Got command " + commandString); tokenizer = new StringTokenizer(commandString); token = tokenizer.nextToken(); // It was a connect command? Forward to appropriate method. if(token.compareToIgnoreCase("connect") == 0) { this.connect(commandString); } // A disconnect command; forward to method. We can determine // the machine and port it came from via the packet itself, // rather than commands embedded in the packet. if(token.compareToIgnoreCase("disconnect") == 0) { InetAddress clientAddress; int clientPort; clientAddress = packet.getAddress(); clientPort = packet.getPort(); this.disconnect(clientAddress, clientPort); } // A keepAlive method has been recieved; forward it to the // appropriate method. We can determine the machine and port // it came from from the datagram packet, rather than from // information contained in the command. if(token.compareToIgnoreCase("keepAlive") == 0) { InetAddress clientAddress; int clientPort; clientAddress = packet.getAddress(); clientPort = packet.getPort(); this.keepAlive(clientAddress, clientPort); } } // end of while loop } /** * Handles connect commands. Connect commands are of the form * * connect clientMachineAddress clientMachinePort multicastAddress multicastPort * * We parse this out via a tokenizer, which just takes the items one by * one. Note that the command string should be followed by a space, or * the parser will be unable to tell when the command quits and when * gibberish at the end of the packet begins. */ protected void connect(String pConnectCommand) { // The command, and how to break it up into constituent parts StringTokenizer tokenizer = new StringTokenizer(pConnectCommand); String token; String clientMachine, clientPort; int aPort = 0; String multicastAddress, multicastPort; Client newClient; // Client to forward to Channel newChannel; // channel to subscribe to //System.out.println("Connecting with command " + pConnectCommand); // Break up command into pieces token = tokenizer.nextToken(); // throw away "connect" System.out.println("Token: " + token); clientMachine = tokenizer.nextToken(); clientPort = tokenizer.nextToken(); multicastAddress = tokenizer.nextToken(); multicastPort = tokenizer.nextToken(); //System.out.println("token:" + token + " address:" + multicastAddress + " port:" + multicastPort); try { aPort = Integer.parseInt(multicastPort); } catch(NumberFormatException nfe) { System.out.println("format exception on " + multicastPort + " " + nfe); } // Create a new client from the command data. Add it to the // set of clients; if the client already exists in the set, // this is a null op, so we don't get two clients with the // same identity in the set. newClient = new Client(pConnectCommand); clients.add(newClient); // null op if already present // Create a new (provisional) channel, see if already present newChannel = new Channel(multicastAddress, aPort); // Use the same trick. Add it to the set of channels; if true is returned // by the add operation, that means it wasn't there before. If it was there // before; no loss, it's just not added twice. if(channels.add(newChannel) == true) // added succesfully; not already present in set { Thread channelThread = new Thread(newChannel); System.out.println("Adding new channel that was not present before"); newChannel.subscribeClient(newClient); // Subscribe client channelThread.start(); // start off thread reading } else // this channel already exists { Iterator it; //System.out.println("Adding client to existing channel"); // Find the reference to the channel, and subscribe the client. it = channels.iterator(); while(it.hasNext()) { Channel aChannel = (Channel)it.next(); if(aChannel.equals(newChannel)) aChannel.subscribeClient(newClient); } } } // End of connect /** * Removes the client from all channels. */ protected void disconnect(InetAddress pClientAddress, int pClientPort) { Client disconnectClient = new Client(pClientAddress, pClientPort); Iterator it; System.out.println("Disconnecting address " + pClientAddress.getHostAddress() + " port " + pClientPort); // Loop through all the channels, removing the client from each. // This is a no-op if the client is not subscribed to the channel. it = channels.iterator(); while(it.hasNext()) { Channel aChannel = (Channel)it.next(); aChannel.unsubscribeClient(disconnectClient); } // Remove the client from our client set. // Doesn't have to be the same reference, as long as it satisfies equals() clients.remove(disconnectClient); } /** * removeChannel. This should not be called unless you're certain that * all clients have been removed first. */ public void removeChannel(Channel pChannel) { Iterator it = channels.iterator(); while(it.hasNext()) { Channel aChannel = (Channel)it.next(); if(aChannel.equals(pChannel) ) // This one? { it.remove(); System.out.println("removed channel " + aChannel.mcastAddress.getHostAddress()); break; } } } /** * Handles the keepAlive server command. Updates the timestamp on * the client so the reaper process doesn't close it down. */ protected void keepAlive(InetAddress pClientAddress, int pPort) { Client dummyClient = new Client(pClientAddress, pPort); Iterator it; //System.out.println("keepalive message from " + pClientAddress.getHostAddress() + " port " + pPort); // Loop through all the clients, finding the correct client, and timestamp // that client. There should be a faster way to do this; if there are // a whole lot of clients this will be inefficient. it = clients.iterator(); while(it.hasNext()) { Client aClient = (Client)it.next(); if(aClient.equals(dummyClient)) { aClient.setTimestamp(); break; } } return; } /** * Main entry point. */ public static void main(String args[]) { int serverPort; // port we listen on DatagramSocket serverSocket; MulticastTunnelServer server; // Rudimentary args checking if((args.length == 0) || (args.length > 1)) { System.out.println("Usage: MulticastTunnelServer "); System.exit(0); } serverPort = Integer.parseInt(args[0]); // Singleton pattern server = MulticastTunnelServer.getInstance(serverPort); server.run(); } /** * An inner class that implements the Runnable interface. this * class runs in a thread, periodically going through the list * of clients looking for clients that we haven't heard from * lately. Any that are old are removed from the client list. */ class Reaper implements Runnable { /** * implemenation of the Runnable interface. Periodically wakes up * and loops through the list of clients, looking for old timestamps. * Any it finds that are old enough cause the client to be removed * from the list */ public void run() { final int TIMEOUT_PERIOD = 300000; // Throw away five minute old connections final int REAP_INTERVAL = 60000; // Check up on connections every minute while(true) { long currentTime = System.currentTimeMillis(); // Time, please Vector deadConnections = new Vector(); // List of dead connections //System.out.println("...Reaping old connections...."); synchronized(clients) // Prevents multiple access while we're looking { Iterator it = clients.iterator(); // Loop through list, find old connections while(it.hasNext()) { Client aClient = (Client)it.next(); //System.out.println("timestamp = " + aClient.getTimestamp() + // " current time = " + currentTime); if(currentTime - aClient.getTimestamp() > TIMEOUT_PERIOD) { deadConnections.add(aClient); } } // end while hasNext() // Actually remove the client. (We didn't want to do this in the // iteration above because changing the collection under an // iterator is risky sometimes.) for(int idx = 0; idx < deadConnections.size(); idx++) { Client aClient = (Client)deadConnections.elementAt(idx); System.out.println("reaping client " + aClient.clientAddress.getHostAddress()+ " Port:" + aClient.port); MulticastTunnelServer.this.disconnect(aClient.clientAddress, aClient.port); } } // end synchronized try { Thread.sleep(REAP_INTERVAL); // go to sleep for a while } catch(InterruptedException ie) { System.out.println("Troubled Sleep, and no, not the novel by Sarte"); } } // end while true } // end run } // end reaper class }