source: src/main/java/weka/experiment/RemoteExperiment.java @ 7

Last change on this file since 7 was 4, checked in by gnappo, 14 years ago

Import di weka.

File size: 29.3 KB
Line 
1/*
2 *    This program is free software; you can redistribute it and/or modify
3 *    it under the terms of the GNU General Public License as published by
4 *    the Free Software Foundation; either version 2 of the License, or
5 *    (at your option) any later version.
6 *
7 *    This program is distributed in the hope that it will be useful,
8 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
9 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 *    GNU General Public License for more details.
11 *
12 *    You should have received a copy of the GNU General Public License
13 *    along with this program; if not, write to the Free Software
14 *    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
15 */
16
17/*
18 *    RemoteExperiment.java
19 *    Copyright (C) 2000 University of Waikato, Hamilton, New Zealand
20 *
21 */
22
23
24package weka.experiment;
25
26import weka.core.FastVector;
27import weka.core.Option;
28import weka.core.OptionHandler;
29import weka.core.Queue;
30import weka.core.RevisionUtils;
31import weka.core.SerializedObject;
32import weka.core.Utils;
33import weka.core.xml.KOML;
34import weka.core.xml.XMLOptions;
35import weka.experiment.xml.XMLExperiment;
36
37import java.io.BufferedInputStream;
38import java.io.BufferedOutputStream;
39import java.io.File;
40import java.io.FileInputStream;
41import java.io.FileOutputStream;
42import java.io.ObjectInputStream;
43import java.io.ObjectOutputStream;
44import java.rmi.Naming;
45import java.util.Enumeration;
46
47import javax.swing.DefaultListModel;
48
49/**
50 * Holds all the necessary configuration information for a distributed
51 * experiment. This object is able to be serialized for storage on disk.<p>
52 *
53 * This class is experimental at present. Has been tested using
54 * CSVResultListener (sending results to standard out) and
55 * DatabaseResultListener (InstantDB + RmiJdbc bridge). <p>
56 *
57 * Getting started:<p>
58 *
59 * Start InstantDB (with the RMI bridge) on some machine. If using java2
60 * then specify -Djava.security.policy=db.policy to the
61 * virtual machine. Where db.policy is as follows: <br>
62 * <pre>
63 * grant {
64 *   permission java.security.AllPermission;
65 * };
66 * </pre><p>
67 *
68 * Start RemoteEngine servers on x machines as per the instructons in the
69 * README_Experiment_Gui file. There must be a
70 * DatabaseUtils.props in either the HOME or current directory of each
71 * machine, listing all necessary jdbc drivers.<p>
72 *
73 * The machine where a RemoteExperiment is started must also have a copy
74 * of DatabaseUtils.props listing the URL to the machine where the
75 * database server is running (RmiJdbc + InstantDB). <p>
76 *
77 * Here is an example of starting a RemoteExperiment: <p>
78 *
79 * <pre>
80 *
81 * java -Djava.rmi.server.codebase=file:/path to weka classes/ \
82 * weka.experiment.RemoteExperiment -L 1 -U 10 \
83 * -T /home/ml/datasets/UCI/iris.arff \
84 * -D "weka.experiment.DatabaseResultListener" \
85 * -P "weka.experiment.RandomSplitResultProducer" \
86 * -h rosebud.cs.waikato.ac.nz -h blackbird.cs.waikato.ac.nz -r -- \
87 * -W weka.experiment.ClassifierSplitEvaluator -- \
88 * -W weka.classifiers.bayes.NaiveBayes
89 *
90 * </pre> <p>
91 * The "codebase" property tells rmi where to serve up weka classes from.
92 * This can either be a file url (as long as a shared file system is being
93 * used that is accessable by the remoteEngine servers), or http url (which
94 * of course supposes that a web server is running and you have put your
95 * weka classes somewhere that is web accessable). If using a file url the
96 * trailing "/" is *most* important unless the weka classes are in a jar
97 * file. <p>
98 *
99 <!-- options-start -->
100 * Valid options are: <p/>
101 *
102 * <pre> -L &lt;num&gt;
103 *  The lower run number to start the experiment from.
104 *  (default 1)</pre>
105 *
106 * <pre> -U &lt;num&gt;
107 *  The upper run number to end the experiment at (inclusive).
108 *  (default 10)</pre>
109 *
110 * <pre> -T &lt;arff file&gt;
111 *  The dataset to run the experiment on.
112 *  (required, may be specified multiple times)</pre>
113 *
114 * <pre> -P &lt;class name&gt;
115 *  The full class name of a ResultProducer (required).
116 *  eg: weka.experiment.RandomSplitResultProducer</pre>
117 *
118 * <pre> -D &lt;class name&gt;
119 *  The full class name of a ResultListener (required).
120 *  eg: weka.experiment.CSVResultListener</pre>
121 *
122 * <pre> -N &lt;string&gt;
123 *  A string containing any notes about the experiment.
124 *  (default none)</pre>
125 *
126 * <pre>
127 * Options specific to result producer weka.experiment.RandomSplitResultProducer:
128 * </pre>
129 *
130 * <pre> -P &lt;percent&gt;
131 *  The percentage of instances to use for training.
132 *  (default 66)</pre>
133 *
134 * <pre> -D
135 * Save raw split evaluator output.</pre>
136 *
137 * <pre> -O &lt;file/directory name/path&gt;
138 *  The filename where raw output will be stored.
139 *  If a directory name is specified then then individual
140 *  outputs will be gzipped, otherwise all output will be
141 *  zipped to the named file. Use in conjuction with -D. (default splitEvalutorOut.zip)</pre>
142 *
143 * <pre> -W &lt;class name&gt;
144 *  The full class name of a SplitEvaluator.
145 *  eg: weka.experiment.ClassifierSplitEvaluator</pre>
146 *
147 * <pre> -R
148 *  Set when data is not to be randomized and the data sets' size.
149 *  Is not to be determined via probabilistic rounding.</pre>
150 *
151 * <pre>
152 * Options specific to split evaluator weka.experiment.ClassifierSplitEvaluator:
153 * </pre>
154 *
155 * <pre> -W &lt;class name&gt;
156 *  The full class name of the classifier.
157 *  eg: weka.classifiers.bayes.NaiveBayes</pre>
158 *
159 * <pre> -C &lt;index&gt;
160 *  The index of the class for which IR statistics
161 *  are to be output. (default 1)</pre>
162 *
163 * <pre> -I &lt;index&gt;
164 *  The index of an attribute to output in the
165 *  results. This attribute should identify an
166 *  instance in order to know which instances are
167 *  in the test set of a cross validation. if 0
168 *  no output (default 0).</pre>
169 *
170 * <pre> -P
171 *  Add target and prediction columns to the result
172 *  for each fold.</pre>
173 *
174 * <pre>
175 * Options specific to classifier weka.classifiers.rules.ZeroR:
176 * </pre>
177 *
178 * <pre> -D
179 *  If set, classifier is run in debug mode and
180 *  may output additional info to the console</pre>
181 *
182 <!-- options-end -->
183 *
184 * @author Mark Hall (mhall@cs.waikato.ac.nz)
185 * @version $Revision: 1.16 $
186 */
187public class RemoteExperiment 
188  extends Experiment {
189 
190  /** for serialization */
191  static final long serialVersionUID = -7357668825635314937L;
192
193  /** The list of objects listening for remote experiment events */
194  private FastVector m_listeners = new FastVector();
195
196  /** Holds the names of machines with remoteEngine servers running */
197  protected DefaultListModel m_remoteHosts = new DefaultListModel();
198 
199  /** The queue of available hosts */
200  private Queue m_remoteHostsQueue = new Queue();
201
202  /** The status of each of the remote hosts */
203  private int [] m_remoteHostsStatus;
204
205  /** The number of times tasks have failed on each remote host */
206  private int [] m_remoteHostFailureCounts;
207
208  /** status of the remote host: available */
209  protected static final int AVAILABLE=0;
210  /** status of the remote host: in use */
211  protected static final int IN_USE=1;
212  /** status of the remote host: connection failed */
213  protected static final int CONNECTION_FAILED=2;
214  /** status of the remote host: some other failure */
215  protected static final int SOME_OTHER_FAILURE=3;
216
217//    protected static final int TO_BE_RUN=0;
218//    protected static final int PROCESSING=1;
219//    protected static final int FAILED=2;
220//    protected static final int FINISHED=3;
221
222  /** allow at most 3 failures on a host before it is removed from the list
223      of usable hosts */
224  protected static final int MAX_FAILURES=3;
225
226  /** Set to true if MAX_FAILURES exceeded on all hosts or connections fail
227      on all hosts or user aborts experiment (via gui) */
228  private boolean m_experimentAborted = false;
229
230  /** The number of hosts removed due to exceeding max failures */
231  private int m_removedHosts;
232
233  /** The count of failed sub-experiments */
234  private int m_failedCount;
235
236  /** The count of successfully completed sub-experiments */
237  private int m_finishedCount;
238
239  /** The base experiment to split up into sub experiments for remote
240      execution */
241  private Experiment m_baseExperiment = null;
242
243  /** The sub experiments */
244  protected Experiment [] m_subExperiments;
245
246  /** The queue of sub experiments waiting to be processed */
247  private Queue m_subExpQueue = new Queue();
248
249  /** The status of each of the sub-experiments */
250  protected int [] m_subExpComplete;
251
252  /**
253   * If true, then sub experiments are created on the basis of data sets
254   * rather than run number.
255   */
256  protected boolean m_splitByDataSet = true;
257
258
259  /**
260   * Returns true if sub experiments are to be created on the basis of
261   * data set..
262   *
263   * @return a <code>boolean</code> value indicating whether sub
264   * experiments are to be created on the basis of data set (true) or
265   * run number (false).
266   */
267  public boolean getSplitByDataSet() {
268    return m_splitByDataSet;
269  }
270
271  /**
272   * Set whether sub experiments are to be created on the basis of
273   * data set.
274   *
275   * @param sd true if sub experiments are to be created on the basis
276   * of data set. Otherwise sub experiments are created on the basis of
277   * run number.
278   */
279  public void setSplitByDataSet(boolean sd) {
280    m_splitByDataSet = sd;
281  }
282 
283  /**
284   * Construct a new RemoteExperiment using an empty Experiment as base
285   * Experiment
286   * @throws Exception if the base experiment is null
287   */
288  public RemoteExperiment() throws Exception {
289     this(new Experiment());
290  }
291 
292  /**
293   * Construct a new RemoteExperiment using a base Experiment
294   * @param base the base experiment to use
295   * @throws Exception if the base experiment is null
296   */
297  public RemoteExperiment(Experiment base) throws Exception {
298    setBaseExperiment(base);
299  }
300
301  /**
302   * Add an object to the list of those interested in recieving update
303   * information from the RemoteExperiment
304   * @param r a listener
305   */
306  public void addRemoteExperimentListener(RemoteExperimentListener r) {
307    m_listeners.addElement(r);
308  }
309
310  /**
311   * Get the base experiment used by this remote experiment
312   * @return the base experiment
313   */
314  public Experiment getBaseExperiment() {
315    return m_baseExperiment;
316  }
317
318  /**
319   * Set the base experiment. A sub experiment will be created for each
320   * run in the base experiment.
321   * @param base the base experiment to use.
322   * @throws Exception if supplied base experiment is null
323   */
324  public void setBaseExperiment(Experiment base) throws Exception {
325    if (base == null) {
326      throw new Exception("Base experiment is null!");
327    }
328    m_baseExperiment = base;
329    setRunLower(m_baseExperiment.getRunLower());
330    setRunUpper(m_baseExperiment.getRunUpper());
331    setResultListener(m_baseExperiment.getResultListener());
332    setResultProducer(m_baseExperiment.getResultProducer());
333    setDatasets(m_baseExperiment.getDatasets());
334    setUsePropertyIterator(m_baseExperiment.getUsePropertyIterator());
335    setPropertyPath(m_baseExperiment.getPropertyPath());
336    setPropertyArray(m_baseExperiment.getPropertyArray());
337    setNotes(m_baseExperiment.getNotes());
338    m_ClassFirst = m_baseExperiment.m_ClassFirst;
339    m_AdvanceDataSetFirst = m_baseExperiment.m_AdvanceDataSetFirst;
340  }
341 
342  /**
343   * Set the user notes.
344   *
345   * @param newNotes New user notes.
346   */
347  public void setNotes(String newNotes) {
348   
349    super.setNotes(newNotes);
350    m_baseExperiment.setNotes(newNotes);
351  }
352
353  /**
354   * Set the lower run number for the experiment.
355   *
356   * @param newRunLower the lower run number for the experiment.
357   */
358  public void setRunLower(int newRunLower) {
359   
360    super.setRunLower(newRunLower);
361    m_baseExperiment.setRunLower(newRunLower);
362  }
363
364  /**
365   * Set the upper run number for the experiment.
366   *
367   * @param newRunUpper the upper run number for the experiment.
368   */
369  public void setRunUpper(int newRunUpper) {
370   
371    super.setRunUpper(newRunUpper);
372    m_baseExperiment.setRunUpper(newRunUpper);
373  }
374
375  /**
376   * Sets the result listener where results will be sent.
377   *
378   * @param newResultListener the result listener where results will be sent.
379   */
380  public void setResultListener(ResultListener newResultListener) {
381   
382    super.setResultListener(newResultListener);
383    m_baseExperiment.setResultListener(newResultListener);
384  }
385
386  /**
387   * Set the result producer used for the current experiment.
388   *
389   * @param newResultProducer result producer to use for the current
390   * experiment.
391   */
392  public void setResultProducer(ResultProducer newResultProducer) {
393   
394    super.setResultProducer(newResultProducer);
395    m_baseExperiment.setResultProducer(newResultProducer);
396  }
397
398  /**
399   * Set the datasets to use in the experiment
400   * @param ds the list of datasets to use
401   */
402  public void setDatasets(DefaultListModel ds) {
403    super.setDatasets(ds);
404    m_baseExperiment.setDatasets(ds);
405  }
406
407  /**
408   * Sets whether the custom property iterator should be used.
409   *
410   * @param newUsePropertyIterator true if so
411   */
412  public void setUsePropertyIterator(boolean newUsePropertyIterator) {
413   
414    super.setUsePropertyIterator(newUsePropertyIterator);
415    m_baseExperiment.setUsePropertyIterator(newUsePropertyIterator);
416  }
417
418  /**
419   * Sets the path of properties taken to get to the custom property
420   * to iterate over.
421   *
422   * @param newPropertyPath an array of PropertyNodes
423   */
424  public void setPropertyPath(PropertyNode [] newPropertyPath) {
425   
426    super.setPropertyPath(newPropertyPath);
427    m_baseExperiment.setPropertyPath(newPropertyPath);
428  }
429
430  /**
431   * Sets the array of values to set the custom property to.
432   *
433   * @param newPropArray a value of type Object which should be an
434   * array of the appropriate values.
435   */
436  public void setPropertyArray(Object newPropArray) {
437    super.setPropertyArray(newPropArray);
438    m_baseExperiment.setPropertyArray(newPropArray);
439  }
440
441   
442  /**
443   * Prepares a remote experiment for running, creates sub experiments
444   *
445   * @throws Exception if an error occurs
446   */
447  public void initialize() throws Exception {
448    if (m_baseExperiment == null) {
449      throw new Exception("No base experiment specified!");
450    }
451
452    m_experimentAborted = false;
453    m_finishedCount = 0;
454    m_failedCount = 0;
455    m_RunNumber = getRunLower();
456    m_DatasetNumber = 0;
457    m_PropertyNumber = 0;
458    m_CurrentProperty = -1;
459    m_CurrentInstances = null;
460    m_Finished = false;
461
462    if (m_remoteHosts.size() == 0) {
463      throw new Exception("No hosts specified!");
464    }
465    // initialize all remote hosts to available
466    m_remoteHostsStatus = new int [m_remoteHosts.size()];   
467    m_remoteHostFailureCounts = new int [m_remoteHosts.size()];
468
469    m_remoteHostsQueue = new Queue();
470    // prime the hosts queue
471    for (int i=0;i<m_remoteHosts.size();i++) {
472      m_remoteHostsQueue.push(new Integer(i));
473    }
474
475    // set up sub experiments
476    m_subExpQueue = new Queue();
477    int numExps;
478    if (getSplitByDataSet()) {
479      numExps = m_baseExperiment.getDatasets().size();
480    } else {
481      numExps = getRunUpper() - getRunLower() + 1;
482    }
483    m_subExperiments = new Experiment[numExps];
484    m_subExpComplete = new int[numExps];
485    // create copy of base experiment
486    SerializedObject so = new SerializedObject(m_baseExperiment);
487
488    if (getSplitByDataSet()) {
489      for (int i = 0; i < m_baseExperiment.getDatasets().size(); i++) {
490        m_subExperiments[i] = (Experiment)so.getObject();
491        // one for each data set
492        DefaultListModel temp = new DefaultListModel();
493        temp.addElement(m_baseExperiment.getDatasets().elementAt(i));
494        m_subExperiments[i].setDatasets(temp);
495        m_subExpQueue.push(new Integer(i));
496      }
497    } else {
498      for (int i = getRunLower(); i <= getRunUpper(); i++) {
499        m_subExperiments[i-getRunLower()] = (Experiment)so.getObject();
500        // one run for each sub experiment
501        m_subExperiments[i-getRunLower()].setRunLower(i);
502        m_subExperiments[i-getRunLower()].setRunUpper(i);
503       
504        m_subExpQueue.push(new Integer(i-getRunLower()));
505      }   
506    }
507  }
508
509  /**
510   * Inform all listeners of progress
511   * @param status true if this is a status type of message
512   * @param log true if this is a log type of message
513   * @param finished true if the remote experiment has finished
514   * @param message the message.
515   */
516  private synchronized void notifyListeners(boolean status, 
517                                            boolean log, 
518                                            boolean finished,
519                                            String message) {
520    if (m_listeners.size() > 0) {
521      for (int i=0;i<m_listeners.size();i++) {
522        RemoteExperimentListener r = 
523          (RemoteExperimentListener)(m_listeners.elementAt(i));
524        r.remoteExperimentStatus(new RemoteExperimentEvent(status,
525                                                           log,
526                                                           finished,
527                                                           message));
528      }
529    } else {
530      System.err.println(message);
531    }
532  }
533
534  /**
535   * Set the abort flag
536   */
537  public void abortExperiment() {
538    m_experimentAborted = true;
539  }
540
541  /**
542   * Increment the number of successfully completed sub experiments
543   */
544  protected synchronized void incrementFinished() {
545    m_finishedCount++;
546  }
547
548  /**
549   * Increment the overall number of failures and the number of failures for
550   * a particular host
551   * @param hostNum the index of the host to increment failure count
552   */
553  protected synchronized void incrementFailed(int hostNum) {
554    m_failedCount++;
555    m_remoteHostFailureCounts[hostNum]++;
556  }
557
558  /**
559   * Push an experiment back on the queue of waiting experiments
560   * @param expNum the index of the experiment to push onto the queue
561   */
562  protected synchronized void waitingExperiment(int expNum) {
563    m_subExpQueue.push(new Integer(expNum));
564  }
565
566  /**
567   * Check to see if we have failed to connect to all hosts
568   *
569   * @return true if failed to connect to all hosts
570   */
571  private boolean checkForAllFailedHosts() {
572    boolean allbad = true;
573    for (int i = 0; i < m_remoteHostsStatus.length; i++) {
574      if (m_remoteHostsStatus[i] != CONNECTION_FAILED) {
575        allbad = false;
576        break;
577      }
578    }
579    if (allbad) {
580      abortExperiment();
581      notifyListeners(false,true,true,"Experiment aborted! All connections "
582                      +"to remote hosts failed.");
583    }
584    return allbad;
585  }
586
587  /**
588   * Returns some post experiment information.
589   * @return a String containing some post experiment info
590   */
591  private String postExperimentInfo() {
592    StringBuffer text = new StringBuffer();
593    text.append(m_finishedCount+(m_splitByDataSet
594                                 ? " data sets" 
595                                 : " runs") + " completed successfully. "
596                +m_failedCount+" failures during running.\n");
597    System.err.print(text.toString());
598    return text.toString();
599  }
600
601  /**
602   * Pushes a host back onto the queue of available hosts and attempts to
603   * launch a waiting experiment (if any).
604   * @param hostNum the index of the host to push back onto the queue of
605   * available hosts
606   */
607  protected synchronized void availableHost(int hostNum) {
608    if (hostNum >= 0) { 
609      if (m_remoteHostFailureCounts[hostNum] < MAX_FAILURES) {
610        m_remoteHostsQueue.push(new Integer(hostNum));
611      } else {
612        notifyListeners(false,true,false,"Max failures exceeded for host "
613                        +((String)m_remoteHosts.elementAt(hostNum))
614                        +". Removed from host list.");
615        m_removedHosts++;
616      }
617    }
618
619    // check for all sub exp complete or all hosts failed or failed count
620    // exceeded
621    if (m_failedCount == (MAX_FAILURES * m_remoteHosts.size())) {
622      abortExperiment();
623      notifyListeners(false,true,true,"Experiment aborted! Max failures "
624                      +"exceeded on all remote hosts.");
625      return;
626    }
627
628    if ((getSplitByDataSet() && 
629         (m_baseExperiment.getDatasets().size() == m_finishedCount)) ||
630        (!getSplitByDataSet() && 
631         ((getRunUpper() - getRunLower() + 1) == m_finishedCount))) {
632      notifyListeners(false,true,false,"Experiment completed successfully.");
633      notifyListeners(false,true,true,postExperimentInfo());
634      return;
635    }
636   
637    if (checkForAllFailedHosts()) {
638      return;
639    }
640
641    if (m_experimentAborted && 
642        (m_remoteHostsQueue.size() + m_removedHosts) == m_remoteHosts.size()) {
643      notifyListeners(false,true,true,"Experiment aborted. All remote tasks "
644                      +"finished.");
645    }
646       
647    if (!m_subExpQueue.empty() && !m_experimentAborted) {
648      if (!m_remoteHostsQueue.empty()) {
649        int availHost, waitingExp;
650        try {
651          availHost = ((Integer)m_remoteHostsQueue.pop()).intValue();
652          waitingExp = ((Integer)m_subExpQueue.pop()).intValue();
653          launchNext(waitingExp, availHost);
654        } catch (Exception ex) {
655          ex.printStackTrace();
656        }
657      }
658    }   
659  }
660
661  /**
662   * Launch a sub experiment on a remote host
663   * @param wexp the index of the sub experiment to launch
664   * @param ah the index of the available host to launch on
665   */
666  public void launchNext(final int wexp, final int ah) {
667   
668    Thread subExpThread;
669    subExpThread = new Thread() {
670        public void run() {           
671          m_remoteHostsStatus[ah] = IN_USE;
672          m_subExpComplete[wexp] = TaskStatusInfo.PROCESSING;
673          RemoteExperimentSubTask expSubTsk = new RemoteExperimentSubTask();
674          expSubTsk.setExperiment(m_subExperiments[wexp]);
675          String subTaskType = (getSplitByDataSet())
676            ? "dataset :" + ((File)m_subExperiments[wexp].getDatasets().
677                             elementAt(0)).getName()
678            : "run :" + m_subExperiments[wexp].getRunLower();
679          try {
680            String name = "//"
681              +((String)m_remoteHosts.elementAt(ah))
682              +"/RemoteEngine";
683            Compute comp = (Compute) Naming.lookup(name);
684            // assess the status of the sub-exp
685            notifyListeners(false,true,false,"Starting "
686                            +subTaskType
687                            +" on host "
688                            +((String)m_remoteHosts.elementAt(ah)));
689            Object subTaskId = comp.executeTask(expSubTsk);
690            boolean finished = false;
691            TaskStatusInfo is = null;
692            while (!finished) {
693              try {
694                Thread.sleep(2000);
695               
696                TaskStatusInfo cs = (TaskStatusInfo)comp.
697                  checkStatus(subTaskId);
698                if (cs.getExecutionStatus() == TaskStatusInfo.FINISHED) {
699                  // push host back onto queue and try launching any waiting
700                  // sub-experiments
701                  notifyListeners(false, true, false,  cs.getStatusMessage());
702                  m_remoteHostsStatus[ah] = AVAILABLE;
703                  incrementFinished();
704                  availableHost(ah);
705                  finished = true;
706                } else if (cs.getExecutionStatus() == TaskStatusInfo.FAILED) {
707                  // a non connection related error---possibly host doesn't have
708                  // access to data sets or security policy is not set up
709                  // correctly or classifier(s) failed for some reason
710                  notifyListeners(false, true, false,  cs.getStatusMessage());
711                  m_remoteHostsStatus[ah] = SOME_OTHER_FAILURE;
712                  m_subExpComplete[wexp] = TaskStatusInfo.FAILED;
713                  notifyListeners(false,true,false,subTaskType
714                                  +" "+cs.getStatusMessage()
715                                  +". Scheduling for execution on another host.");
716                  incrementFailed(ah);
717                  // push experiment back onto queue
718                  waitingExperiment(wexp);     
719                  // push host back onto queue and try launching any waiting
720                  // sub-experiments. Host is pushed back on the queue as the
721                  // failure may be temporary---eg. with InstantDB using the
722                  // RMI bridge, two or more threads may try to create the
723                  // experiment index or results table simultaneously; all but
724                  // one will throw an exception. These hosts are still usable
725                  // however.
726                  availableHost(ah);
727                  finished = true;
728                } else {
729                  if (is == null) {
730                    is = cs;
731                    notifyListeners(false, true, false, cs.getStatusMessage());
732                  } else {
733                    if (cs.getStatusMessage().
734                        compareTo(is.getStatusMessage()) != 0) {
735                     
736                      notifyListeners(false, true, false, 
737                                      cs.getStatusMessage());
738                    }
739                    is = cs;
740                  } 
741                }
742              } catch (InterruptedException ie) {
743              }
744            }         
745
746          } catch (Exception ce) {
747            m_remoteHostsStatus[ah] = CONNECTION_FAILED;
748            m_subExpComplete[wexp] = TaskStatusInfo.TO_BE_RUN;
749            System.err.println(ce);
750            ce.printStackTrace();
751            notifyListeners(false,true,false,"Connection to "
752                            +((String)m_remoteHosts.elementAt(ah))
753                            +" failed. Scheduling "
754                            +subTaskType
755                            +" for execution on another host.");
756            checkForAllFailedHosts();
757            waitingExperiment(wexp);
758          } finally {
759            if (isInterrupted()) {
760              System.err.println("Sub exp Interupted!");
761            }
762          }
763        }         
764      };
765    subExpThread.setPriority(Thread.MIN_PRIORITY);
766    subExpThread.start();
767  }
768
769  /**
770   * Overides the one in Experiment
771   * @throws Exception never throws an exception
772   */
773  public void nextIteration() throws Exception {
774
775  }
776
777  /**
778   * overides the one in Experiment
779   */
780  public void advanceCounters() {
781
782  }
783
784  /**
785   * overides the one in Experiment
786   */
787  public void postProcess() {
788   
789  }
790
791  /**
792   * Add a host name to the list of remote hosts
793   * @param hostname the host name to add to the list
794   */
795  public void addRemoteHost(String hostname) {
796    m_remoteHosts.addElement(hostname);
797  }
798
799  /**
800   * Get the list of remote host names
801   * @return the list of remote host names
802   */
803  public DefaultListModel getRemoteHosts() {
804    return m_remoteHosts;
805  }
806
807  /**
808   * Set the list of remote host names
809   * @param list the list of remote host names
810   */
811  public void setRemoteHosts(DefaultListModel list) {
812    m_remoteHosts = list;
813  }
814
815  /**
816   * Overides toString in Experiment
817   * @return a description of this remote experiment
818   */
819  public String toString() {
820    String result = m_baseExperiment.toString();
821
822    result += "\nRemote Hosts:\n";
823    for (int i=0;i<m_remoteHosts.size();i++) {
824      result += ((String)m_remoteHosts.elementAt(i)) +'\n';
825    }
826    return result;
827  }
828
829  /**
830   * Overides runExperiment in Experiment
831   */
832  public void runExperiment() {
833    int totalHosts = m_remoteHostsQueue.size();
834    // Try to launch sub experiments on all available hosts
835    for (int i = 0; i < totalHosts; i++) {
836      availableHost(-1);
837    }
838  }
839 
840  /**
841   * Returns the revision string.
842   *
843   * @return            the revision
844   */
845  public String getRevision() {
846    return RevisionUtils.extract("$Revision: 1.16 $");
847  }
848
849  /**
850   * Configures/Runs the Experiment from the command line.
851   *
852   * @param args command line arguments to the Experiment.
853   */
854  public static void main(String[] args) {
855
856    try {
857      RemoteExperiment exp = null;
858
859      // get options from XML?
860      String xmlOption = Utils.getOption("xml", args);
861      if (!xmlOption.equals(""))
862         args = new XMLOptions(xmlOption).toArray();
863     
864      Experiment base = null;
865      String expFile = Utils.getOption('l', args);
866      String saveFile = Utils.getOption('s', args);
867      boolean runExp = Utils.getFlag('r', args);
868      FastVector remoteHosts = new FastVector();
869      String runHost = " ";
870      while (runHost.length() != 0) {
871        runHost = Utils.getOption('h', args);
872        if (runHost.length() != 0) {
873          remoteHosts.addElement(runHost);
874        }
875      }
876      if (expFile.length() == 0) {
877        base = new Experiment();
878        try {
879          base.setOptions(args);
880          Utils.checkForRemainingOptions(args);
881        } catch (Exception ex) {
882          ex.printStackTrace();
883          String result = "Usage:\n\n"
884            + "-l <exp file>\n"
885            + "\tLoad experiment from file (default use cli options)\n"
886            + "-s <exp file>\n"
887            + "\tSave experiment to file after setting other options\n"
888            + "\t(default don't save)\n"
889            + "-h <remote host name>\n"
890            + "\tHost to run experiment on (may be specified more than once\n"
891            + "\tfor multiple remote hosts)\n"
892            + "-r \n"
893            + "\tRun experiment on (default don't run)\n"
894       + "-xml <filename | xml-string>\n"
895       + "\tget options from XML-Data instead from parameters\n"
896       + "\n";
897          Enumeration enm = ((OptionHandler)base).listOptions();
898          while (enm.hasMoreElements()) {
899            Option option = (Option) enm.nextElement();
900            result += option.synopsis() + "\n";
901            result += option.description() + "\n";
902          }
903          throw new Exception(result + "\n" + ex.getMessage());
904        }
905      } else {
906         Object tmp;
907         
908         // KOML?
909         if ( (KOML.isPresent()) && (expFile.toLowerCase().endsWith(KOML.FILE_EXTENSION)) ) {
910            tmp = KOML.read(expFile);
911         }
912         else
913         // XML?
914         if (expFile.toLowerCase().endsWith(".xml")) {
915            XMLExperiment xml = new XMLExperiment(); 
916            tmp = xml.read(expFile);
917         }
918         // binary
919         else {
920            FileInputStream fi = new FileInputStream(expFile);
921            ObjectInputStream oi = new ObjectInputStream(
922                                   new BufferedInputStream(fi));
923            tmp = oi.readObject();
924            oi.close();
925         }
926        if (tmp instanceof RemoteExperiment) {
927          exp = (RemoteExperiment)tmp;
928        } else {
929          base = (Experiment)tmp;
930        }
931      }
932      if (base != null) {
933        exp = new RemoteExperiment(base);
934      }
935      for (int i=0;i<remoteHosts.size();i++) {
936        exp.addRemoteHost((String)remoteHosts.elementAt(i));
937      }
938      System.err.println("Experiment:\n" + exp.toString());
939
940      if (saveFile.length() != 0) {
941         // KOML?
942         if ( (KOML.isPresent()) && (saveFile.toLowerCase().endsWith(KOML.FILE_EXTENSION)) ) {
943            KOML.write(saveFile, exp);
944         }
945         else
946         // XML?
947         if (saveFile.toLowerCase().endsWith(".xml")) {
948            XMLExperiment xml = new XMLExperiment(); 
949            xml.write(saveFile, exp);
950         }
951         // binary
952         else {
953            FileOutputStream fo = new FileOutputStream(saveFile);
954            ObjectOutputStream oo = new ObjectOutputStream(
955                                    new BufferedOutputStream(fo));
956            oo.writeObject(exp);
957            oo.close();
958         }
959      }
960     
961      if (runExp) {
962        System.err.println("Initializing...");
963        exp.initialize();
964        System.err.println("Iterating...");
965        exp.runExperiment();
966        System.err.println("Postprocessing...");
967        exp.postProcess();
968      }     
969    } catch (Exception ex) {
970      ex.printStackTrace();
971      System.err.println(ex.getMessage());
972    }
973  }
974}
Note: See TracBrowser for help on using the repository browser.