// (C) Copyright Jack Culpepper 1997 // This code may not be distributed without permission. package gestalt; import java.net.*; import java.io.*; import java.util.*; import gestalt.Connection; import gestalt.msg.Abort; import gestalt.TaskRunner; public class Slave extends Thread { private String hostname; private Vector pending; private Connection connection; private TaskRunner runner; public boolean connected = false; // exit with an error message when an exception occurs public static void fail( String msg ) { report( msg ); System.exit( 1 ); } // this must be static because fail calls it private static void report( String str ) { System.out.println( "Slave: " + str ); } // construct a slave to the host hostname public Slave( String hostname ) { // name thread super( "Slave" ); // init vars this.hostname = hostname; pending = new Vector(); // start it up! this.start(); } public void run() { // loop forever while ( true ) { // try to open a socket; sleep; try again Socket s = null; while ( s == null ) { report( "Attempting to open a socket to the server." ); try { s = new Socket( hostname, Constants.slavePort ); } catch ( UnknownHostException e ) { report( "Exception: " + e ); return; } catch ( IOException e2 ) { report( "Exception: " + e2 ); try { report( "Sleeping for 3 seconds." ); sleep( 3000 ); } catch (InterruptedException e) { report( "What!!?!? My sleep was interrupted: " + e ); } } } try { report( "Creating a new connection." ); if ( connection == null ) { connection = new Connection( s, getThreadGroup() ); } else { connection = new Connection( s, getThreadGroup(), connection ); } while ( connection.isAlive() ) { connected = true; // Check to see if the current TaskRunner has finished; if so, // send the finished Task back to the Server if ( runner != null ) { if ( ! runner.isAlive() ) { report( runner.task + " finished." ); connection.send( runner.task ); runner = null; } } // pull objects off the incoming queue and look at them. while ( connection.avail() ) { Object o = connection.recv(); // if it's a task, put it on the pending queue if ( o instanceof Task ) { Task t = ( Task )o; pending.addElement( t ); } // if it's an abort, abort it's target else if ( o instanceof Abort ) { Abort a = ( Abort )o; // hunt down the target of the abort: first look at runner, to // see if it's the one currently being worked on; else check the // pending vector boolean found = false; if ( runner != null && runner.task.id.equals( a.target ) ) { found = true; report( "Aborting running task: " + runner.task ); runner.stop(); runner = null; } else { for ( int i = 0 ; i < pending.size() ; i++ ) { Task test = ( Task )pending.elementAt( i ); if ( test.id.equals( a.target ) ) { found = true; report( "Aborting pending task: " + test ); pending.removeElement( test ); break; } } } if ( ! found ) report( "Could not find target of abort: " + a ); } // we don't recognize the object; junk it else report( "Found unknown object." ); } // if there isn't a TaskRunner working, pull a Task off the pending // queue and start one if ( runner == null && ! pending.isEmpty() ) { Task t = ( Task )pending.firstElement(); pending.removeElement( t ); report( "Starting task: " + t ); runner = new TaskRunner( t ); } try { sleep( 500 ); } catch (InterruptedException e) { report( "What!!?!? My sleep was interrupted: " + e ); } } connected = false; report( "The connection just died." ); // if a task is running, stop it if ( runner != null ) { runner.stop(); runner = null; } // sleep for a few seconds before trying again try { sleep( 5000 ); } catch (InterruptedException e) { report( "What!!?!? My sleep was interrupted: " + e ); } } catch ( IOException e ) { report( "IOException making connection: " + e ); } try { sleep( 500 ); } catch (InterruptedException e) { report( "What!!?!? My sleep was interrupted: " + e ); } } } public void close() { connection.close(); } public static void main( String[] args ) { if ( args.length != 1 || args[0].equals( "" ) ) { System.out.println( "Usage: java Slave " ); } else { Slave s = new Slave( args[0] ); } } }