/* Author: Mike McCarthy * File: MulticastRelayServer.java * Modified By: Julian Williams, January 1998 * Modified By: Don Brutzman * Last Modified: 4 October 1999 * * Invocation: command line entry, "java mil.navy.nps.bridge.MulticastRelayServer" * This class defines a server that creates a multicast group connection and * relays multicast datagrams to unicast Clients. The server waits for * clients to request multicast data via a threaded method. Once a client * subscribes, multicast datagrams are translated into unicast datagrams * and sent to the client. The server queries each client periodically * and clients that do not respond are effectively unsubscribed from * the server and are not sent any future multicast datagrams. */ package mil.navy.nps.bridge; // package to which we belong import java.io.*; import java.net.*; import java.util.*; /** * Relays datagrams from/to MulticastRelayClient. */ class MulticastRelayServer extends Object { //class declarations protected Vector client_address_vec = new Vector(); //maintain a vector table protected Vector client_port_vec = new Vector(); //of client IPs and ports protected static int defaultPort = 8010; //default server port protected int unicastPort; protected DatagramSocket socket = null; private byte[] connectbuff = new byte[CLIENT_REQUEST_BUFSIZE]; //client input buffer private byte[] outBuffer = new byte[DATAGRAM_BUFSIZE]; //output buffer to client protected DatagramPacket connectDatagramPacket; protected DatagramPacket packet; protected InetAddress address; //client address protected int port; //client port protected int count = 0; private int multicastport = 62040; private String multicastgroup = "224.2.181.145"; private DatagramPacket mpacket; private byte[] multicastbuff = new byte[DATAGRAM_BUFSIZE]; protected MulticastSocket multicastsocket; protected DatagramPacket outDgram; private Integer port_obj; //Integer object for port in Vector private Integer port_Int; private Integer port_Integer; private String deadAddress; private static int CLIENT_REQUEST_BUFSIZE = 256; private static int DATAGRAM_BUFSIZE = 1500; protected static int delayTime = 10000; //pause 10 seconds between client checks private byte[] responsebuff = new byte[CLIENT_REQUEST_BUFSIZE]; //client input buffer protected DatagramPacket responsepacket; protected DatagramPacket hellopacket; protected InetAddress check_address; //client address protected InetAddress thisAddress; protected int check_port; //client port protected Vector client_reply_vec = new Vector(); //maintain a vector table /** Creates instance of server and client listen thread */ public static void main(String args[]) { MulticastRelayServer mcastServer = new MulticastRelayServer(); ClientListenThread clThread = new ClientListenThread(mcastServer); ClientChecksThread c2Thread = new ClientChecksThread(mcastServer); clThread.start(); c2Thread.start(); mcastServer.mcastListen(); } // the multicast listen method which joins the specified multicast group public void mcastListen() { //instance of multicast datagram packet mpacket = new DatagramPacket(multicastbuff, multicastbuff.length); try{ // lets try to join multicastsocket = new MulticastSocket(multicastport); multicastsocket.joinGroup(InetAddress.getByName(multicastgroup)); multicastsocket.receive(mpacket); //check for multicast input //Server debug printout if (mpacket != null) System.out.println("Received multicast data, " + mpacket.getAddress().toString() + ":" + mpacket.getPort() + ", length " + mpacket.getLength()); }//endtry catch(UnknownHostException unkHostException) { throw new RuntimeException("problem joining mcast group " + multicastgroup); } catch(SocketException sockEx) { throw new RuntimeException("couldn't create inputSocket"); } catch(IOException ioEx) { throw new RuntimeException("problem with input multicast socket"); } // infinite loop to relay multicast to unicast clients while(true) { try{ multicastsocket.receive(mpacket); //get multicastpackets //Server debug printout if (mpacket != null) System.out.println("Received multicast data, " + mpacket.getAddress().toString() + ":" + mpacket.getPort() + ", length " + mpacket.getLength()); // copy the mulicastbuffer to unicast buffer - this is necessary System.arraycopy(multicastbuff, 0, outBuffer, 0, multicastbuff.length); // loop thru the client vector table, synchonize read because // the write method could have the Vector for(int ix = 0;ix < client_address_vec.size(); ix++) { synchronized(client_address_vec) { address = (InetAddress)client_address_vec.elementAt(ix); } synchronized(client_port_vec) { Integer port_Integer = (Integer)client_port_vec.elementAt(ix); port = port_Integer.intValue(); System.out.println("Sending to unicast port: " + port); } outDgram = new DatagramPacket(outBuffer,outBuffer.length,address,port); socket.send(outDgram); //send data to unicast clients } //endfor } //endtry catch(IOException ioEx) { throw new RuntimeException("problem with output unicast socket"); }//endcatch } //end while } //end Mcastlisten() /* clientListen method * loops and reads client requests and stores * client info in Vector table */ public void clientListen() { try { unicastPort = defaultPort; socket = new DatagramSocket(unicastPort); System.out.println("MulticastRelayServer listening on unicastPort: " + socket.getLocalPort()); }//endtry catch (java.io.IOException e) { System.err.println("Could not create datagram socket."); }//end catch //loop infinite so we don't miss client request while(true) { try { byte[] connectbuff = new byte[CLIENT_REQUEST_BUFSIZE]; //client input buffer byte[] outBuffer = new byte[DATAGRAM_BUFSIZE]; //output buffer to client // listen for unicast clients connectDatagramPacket = new DatagramPacket(connectbuff, CLIENT_REQUEST_BUFSIZE); socket.receive(connectDatagramPacket); String s = new String(connectDatagramPacket.getData(),0,connectDatagramPacket.getLength()); //add any new clients who choose to connect // to the client address vector group so they can recieve PDUs if (s.equals("connect")) //valid connect request { address = connectDatagramPacket.getAddress(); //client address set port = connectDatagramPacket.getPort(); //client port System.out.println("Connect request from: " + port + "\n"); Integer port_obj; port_obj = new Integer(port); synchronized(client_address_vec) //sync for write { client_address_vec.addElement(address); } synchronized(client_port_vec) { client_port_vec.addElement(port_obj); } }//endif // remove clients who request to disconnect from the client // address vector list so they do not recieve any more PDUs if (s.equals("disconnect")) //valid disconnect request { address = connectDatagramPacket.getAddress(); //client address set port = connectDatagramPacket.getPort(); //client port System.out.println("disconnect request from: " + port + "\n"); Integer port_obj; port_obj = new Integer(port); synchronized(client_address_vec) //sync for write { client_address_vec.removeElement(address); } synchronized(client_port_vec) { client_port_vec.removeElement(port_obj); } }//endif //add clients who respond to hello message to the client //reply vector list if (s.equals("iAmHere")) //valid reply response { address = connectDatagramPacket.getAddress(); //client address set port = connectDatagramPacket.getPort(); //client port System.out.println("\"iAmHere\" from: " + port + ": " + address.toString() + "\n"); Integer port_obj; port_obj = new Integer(port); // when a client responds to Server's hello message, remove client from reply vector String addressKey = new String(address.getHostAddress() + port_obj.toString()); synchronized(client_reply_vec) //sync for write { if (client_reply_vec.contains(addressKey)) { client_reply_vec.removeElement(addressKey); //System.out.println("Address key: " + addressKey + " removed from reply waiting list."); } } //System.out.println("Items on after remove: " + client_reply_vec.toString()); }//endif }//endtry catch (IOException e) { System.err.println("IOException: " + e); e.printStackTrace(); }//endcatch }//endwhile }//end clientListen() /******************************************************************** * send a hello message to each client and wait for a response, if no * response is heard after three attempts, remove client from the * client address vector ********************************************************************/ public void mcastCheckClients() { String hello = new String("hello"); //set up hello message byte[] hellobuf = new byte[hello.length()]; hellobuf = hello.getBytes(); //loop infinite to continually check client status while(true) { // loop thru the client vector table, synchonize read because // the write method could have the Vector for(int ix = 0;ix < client_address_vec.size(); ix++) { synchronized(client_address_vec) { check_address = (InetAddress)client_address_vec.elementAt(ix); } synchronized(client_port_vec) { port_Integer = (Integer)client_port_vec.elementAt(ix); check_port = port_Integer.intValue(); } try { //send hello message to each client hellopacket = new DatagramPacket(hellobuf, hellobuf.length, check_address, check_port); System.out.println("Hello port: " + check_port); //when sending a hello message to clients, add that client's addressKey //to the client reply vector, but only if the client's addressKey is //not already in the list String addressKey = new String(check_address.getHostAddress() + port_Integer.toString()); synchronized(client_reply_vec) //sync for write { if (!(client_reply_vec.contains(addressKey))) //if not already on list add { client_reply_vec.addElement(addressKey); //System.out.println("Address key: " + addressKey + " added to reply waiting list..."); } else { count++; //increment whenever item already on list } } //System.out.println("Items on list after add: " + client_reply_vec.toString()); socket.send(hellopacket); //send data to unicast clients } //endtry catch(IOException ioEx) { throw new RuntimeException("problem with output unicast socket"); }//endcatch }//endfor try { //wait for a few minutes before next check Thread.sleep(delayTime); }//endtry catch (InterruptedException ie) { System.out.println("Threw interrupted exception."); }//endcatch if (count > 3) { System.out.println("Checking Client List."); checkClientReplyList(); count = 0; }//endif }//endwhile }//end mcastCheckClients() public void checkClientReplyList() { // loop thru the client reply vector table, // to find any clients that should be removed //System.out.println("Starting for loop in checkClientReplyList"); for(int dx = 0; dx < client_reply_vec.size(); dx++) { //System.out.println("dx = " + dx + ":" + client_reply_vec.toString() ); synchronized(client_reply_vec) { deadAddress = (String)client_reply_vec.elementAt(dx); //System.out.println("Dead Address assigned."); // loop thru the client vector table, synchonize read because // the write method could have the Vector for(int ix = 0;ix < client_address_vec.size(); ix++) { //System.out.println("Inner for loop -- ix = " + ix + ": " + client_address_vec.toString() ); synchronized(client_address_vec) { thisAddress = (InetAddress)client_address_vec.elementAt(ix); //System.out.println("Assigned thisAddress to: " + thisAddress + "\n"); } synchronized(client_port_vec) { port_Int = (Integer)client_port_vec.elementAt(ix); port = port_Int.intValue(); //System.out.println("Assigned port_Int to: " + port_Int.toString() + "\n"); } String addressKey = new String(thisAddress.getHostAddress() + port_Int.toString()); if (deadAddress.equals(addressKey)) //check against clients in vector { //System.out.println("deadAddress = addressKey"); synchronized(client_address_vec) //sync for write { client_address_vec.removeElement(thisAddress); //System.out.println("Just removed thisAddress: " ); } synchronized(client_port_vec) { client_port_vec.removeElement(port_Int); System.out.println("Just removed: " + port_Int.toString() + "/" + thisAddress.getHostAddress() ); } }//endif }//endfor }//endsync }//endfor synchronized(client_reply_vec) { client_reply_vec.removeAllElements(); } }//end checkClientReplyList() }//endclass MulticastRelayServer /* ClientListenThread class * Creates an instance of serverclass and calls client * listen method */ class ClientListenThread extends Thread { MulticastRelayServer mcastServerInstance; public ClientListenThread(MulticastRelayServer mcastServer) { mcastServerInstance = mcastServer; } public void run() { mcastServerInstance.clientListen(); } }//end ClientListenThread class /* ClientChecksThread class * Creates an instance of serverclass and calls client * check method */ class ClientChecksThread extends Thread { MulticastRelayServer mcastServerInstance; public ClientChecksThread(MulticastRelayServer mcastServer) { mcastServerInstance = mcastServer; } public void run() { mcastServerInstance.mcastCheckClients(); } }//end ClientChecksThread class