// (C) Copyright Jack Culpepper 1997 // This code may not be distributed without permission. package gestalt; import java.net.*; import java.io.*; import java.util.*; // This class is the thread that handles all communication with a peer public class Connection extends Thread { static int number = 0; private Socket peer; protected ThreadGroup threadgroup; private Vector incoming; private Vector outgoing; private Reader reader; private Writer writer; public Connection( Socket peer, ThreadGroup threadgroup ) throws IOException { super( threadgroup, "Connection-" + number++ + ":" + peer.getInetAddress().toString() + ":" + peer.getPort() ); this.peer = peer; this.threadgroup = threadgroup; // create the vectors incoming = new Vector(); outgoing = new Vector(); makeStreams(); this.start(); } public Connection( Socket peer, ThreadGroup threadgroup, Connection c ) throws IOException { super( threadgroup, "Connection-" + number++ + ":" + peer.getInetAddress().toString() + ":" + peer.getPort() ); this.peer = peer; this.incoming = c.incoming; this.outgoing = c.outgoing; makeStreams(); this.start(); } public InetAddress getLocalAddress() { return peer.getLocalAddress(); } public int getLocalPort() { return peer.getLocalPort(); } public void send( Object o ) { outgoing.addElement( o ); } public Object recv() { if ( incoming.isEmpty() ) return null; Object o = incoming.firstElement(); incoming.removeElement( o ); return o; } public boolean avail() { return ! incoming.isEmpty(); } private void makeStreams() throws IOException { writer = new Writer( this, threadgroup, peer.getOutputStream(), outgoing ); reader = new Reader( this, threadgroup, peer.getInputStream(), incoming ); } public String status() { return reader.status() + "\n" + writer.status() + "\n" + "Objects in incoming queue: " + incoming.size() + "\n" + "Objects in outgoing queue: " + outgoing.size(); } // loop forever, reading objects and putting them in the queue if they // are available, and writing objects if the queue is not empty public void run() { while ( true ) { // make sure the reader and writer are still alive if ( ! reader.isAlive() ) { // report( "My Reader died!" ); close(); stop(); } if ( ! writer.isAlive() ) { // report( "My Writer died!" ); close(); stop(); } // sleep so that this run method is not a tight loop try { sleep( 50 ); } catch ( InterruptedException e ) { report( "What??!? I was rudely interrupted from my slumber: " + e ); } } } protected void report( String s ) { System.out.println( this + ": " + s ); } public String toString() { return this.getName(); } protected void close() { // report( "Stopping reader." ); reader.stop(); // report( "Stopping writer." ); writer.stop(); try { reader.close(); writer.close(); peer.close(); } catch ( IOException e ) { report( "IOException occurred while trying to close a stream: " + e ); } } }