source: src/main/java/weka/experiment/RemoteEngine.java @ 28

Last change on this file since 28 was 4, checked in by gnappo, 14 years ago

Import di weka.

File size: 11.8 KB
Line 
1/*
2 *    This program is free software; you can redistribute it and/or modify
3 *    it under the terms of the GNU General Public License as published by
4 *    the Free Software Foundation; either version 2 of the License, or
5 *    (at your option) any later version.
6 *
7 *    This program is distributed in the hope that it will be useful,
8 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
9 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 *    GNU General Public License for more details.
11 *
12 *    You should have received a copy of the GNU General Public License
13 *    along with this program; if not, write to the Free Software
14 *    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
15 */
16
17/*
18 *    RemoteEngine.java
19 *    Copyright (C) 2000 University of Waikato, Hamilton, New Zealand
20 *
21 */
22
23
24package weka.experiment;
25
26import weka.core.Queue;
27import weka.core.RevisionHandler;
28import weka.core.RevisionUtils;
29import weka.core.Utils;
30
31import java.net.InetAddress;
32import java.net.URL;
33import java.net.URLClassLoader;
34import java.rmi.Naming;
35import java.rmi.RMISecurityManager;
36import java.rmi.RemoteException;
37import java.rmi.server.UnicastRemoteObject;
38import java.util.Enumeration;
39import java.util.Hashtable;
40
41/**
42 * A general purpose server for executing Task objects sent via RMI.
43 *
44 * @author Mark Hall (mhall@cs.waikato.ac.nz)
45 * @version $Revision: 1.12 $
46 */
47public class RemoteEngine
48  extends UnicastRemoteObject
49  implements Compute, RevisionHandler {
50
51  /** for serialization */
52  private static final long serialVersionUID = -1021538162895448259L;
53
54  /** The name of the host that this engine is started on */
55  private String m_HostName = "local";
56
57  /** A queue of waiting tasks */
58  private Queue m_TaskQueue = new Queue();
59
60  /** A queue of corresponding ID's for tasks */
61  private Queue m_TaskIdQueue = new Queue();
62
63  /** A hashtable of experiment status */
64  private Hashtable m_TaskStatus = new Hashtable();
65
66  /** Is there a task running */
67  private boolean m_TaskRunning = false;
68 
69  /** Clean up interval (in ms) */
70  protected static long CLEANUPTIMEOUT = 3600000;
71
72  /**
73   * Constructor
74   * @param hostName name of the host
75   * @exception RemoteException if something goes wrong
76   */
77  public RemoteEngine(String hostName) throws RemoteException {
78    super();
79    m_HostName = hostName;
80
81    /* launch a clean-up thread. Will purge any failed or finished
82       tasks still in the TaskStatus hashtable after an hour */
83       
84    Thread cleanUpThread;
85    cleanUpThread = new Thread() {
86        public void run() {
87          while (true) {
88            try {
89              // sleep for a while
90              Thread.sleep(CLEANUPTIMEOUT);
91            } catch (InterruptedException ie) {}
92
93            if (m_TaskStatus.size() > 0) {
94              purge();
95            } else {
96              System.err.println("RemoteEngine : purge - no tasks to check.");
97            }
98          }
99        }
100      };
101    cleanUpThread.setPriority(Thread.MIN_PRIORITY);
102    cleanUpThread.setDaemon(true);
103    cleanUpThread.start();
104  }
105 
106  /**
107   * Takes a task object and queues it for execution
108   * @param t the Task object to execute
109   * @return an identifier for the Task that can be used when querying
110   * Task status
111   */
112  public synchronized Object executeTask(Task t) throws RemoteException {
113    String taskId = ""+System.currentTimeMillis()+":";
114    taskId += t.hashCode();
115    addTaskToQueue(t, taskId);
116
117    return taskId;
118    //    return t.execute();
119  }
120
121  /**
122   * Returns status information on a particular task
123   *
124   * @param taskId the ID of the task to check
125   * @return a <code>TaskStatusInfo</code> encapsulating task status info
126   * @exception Exception if an error occurs
127   */
128  public Object checkStatus(Object taskId) throws Exception {
129   
130    TaskStatusInfo inf = (TaskStatusInfo)m_TaskStatus.get(taskId);
131
132    if (inf == null) {
133      throw new Exception("RemoteEngine ("+m_HostName+") : Task not found.");
134    }
135   
136    TaskStatusInfo result = new TaskStatusInfo();
137    result.setExecutionStatus(inf.getExecutionStatus());
138    result.setStatusMessage(inf.getStatusMessage());
139    result.setTaskResult(inf.getTaskResult());
140
141    if (inf.getExecutionStatus() == TaskStatusInfo.FINISHED ||
142        inf.getExecutionStatus() == TaskStatusInfo.FAILED) {
143      System.err.println("Finished/failed Task id : " 
144                         + taskId + " checked by client. Removing.");
145      inf.setTaskResult(null);
146      inf = null;
147      m_TaskStatus.remove(taskId);
148    }
149    inf = null;
150    return result;
151  }
152
153  /**
154   * Adds a new task to the queue.
155   *
156   * @param t a <code>Task</code> value to be added
157   * @param taskId the id of the task to be added
158   */
159  private synchronized void addTaskToQueue(Task t, String taskId) {
160    TaskStatusInfo newTask = t.getTaskStatus();
161    if (newTask == null) {
162      newTask = new TaskStatusInfo();
163    }
164    m_TaskQueue.push(t);
165    m_TaskIdQueue.push(taskId);
166    newTask.setStatusMessage("RemoteEngine ("
167                             +m_HostName
168                             +") : task " + taskId + " queued at postion: "
169                             +m_TaskQueue.size());
170    // add task status to HashTable
171    m_TaskStatus.put(taskId, newTask);
172    System.err.println("Task id : " + taskId + " Queued.");
173    if (m_TaskRunning == false) {
174      startTask();
175    }
176  }
177
178  /**
179   * Checks to see if there are any waiting tasks, and if no task is
180   * currently running starts a waiting task.
181   */
182  private void startTask() {
183
184    if (m_TaskRunning == false && m_TaskQueue.size() > 0) {
185      Thread activeTaskThread;
186      activeTaskThread = new Thread() {
187          public void run() {
188            m_TaskRunning = true;
189            Task currentTask = (Task)m_TaskQueue.pop();
190            String taskId = (String)m_TaskIdQueue.pop();
191            TaskStatusInfo tsi = (TaskStatusInfo)m_TaskStatus.get(taskId);
192            tsi.setExecutionStatus(TaskStatusInfo.PROCESSING);
193            tsi.setStatusMessage("RemoteEngine ("
194                                 +m_HostName
195                                 +") : task " + taskId + " running...");
196            try {
197              System.err.println("Launching task id : "
198                                 + taskId + "...");
199              currentTask.execute();
200              TaskStatusInfo runStatus = currentTask.getTaskStatus();
201              tsi.setExecutionStatus(runStatus.getExecutionStatus());
202              tsi.setStatusMessage("RemoteExperiment ("
203                                   +m_HostName+") "
204                                   +runStatus.getStatusMessage());
205              tsi.setTaskResult(runStatus.getTaskResult());
206            } catch (Error er) {
207              // Object initialization can raise Error, which are not subclass of Exception
208              tsi.setExecutionStatus(TaskStatusInfo.FAILED);
209              if (er.getCause() instanceof java.security.AccessControlException) {
210                tsi.setStatusMessage("RemoteEngine ("
211                                     +m_HostName
212                                     +") : security error, check remote policy file.");
213                System.err.println("Task id " + taskId + " Failed! Check remote policy file");
214              }
215              else {
216                tsi.setStatusMessage("RemoteEngine ("
217                                     +m_HostName
218                                     +") : unknown initialization error.");
219                System.err.println("Task id " + taskId + " Unknown initialization error");
220              }
221            } catch (Exception ex) {
222              tsi.setExecutionStatus(TaskStatusInfo.FAILED);
223              if (ex instanceof java.io.FileNotFoundException) {
224                tsi.setStatusMessage("RemoteEngine ("
225                                     +m_HostName
226                                     +") : " + ex.getMessage());
227                System.err.println("Task id " + taskId + " Failed, " + ex.getMessage());
228              }
229              else {
230                tsi.setStatusMessage("RemoteEngine ("
231                                     +m_HostName
232                                     +") : task " + taskId + " failed.");
233                System.err.println("Task id " + taskId + " Failed!");
234              }
235            } finally {
236              if (m_TaskStatus.size() == 0) {
237                purgeClasses();
238              }
239              m_TaskRunning = false;
240              // start any waiting tasks
241              startTask();
242            }
243          }
244        };
245      activeTaskThread.setPriority(Thread.MIN_PRIORITY);
246      activeTaskThread.start();
247    }
248  }
249
250  /**
251   * Attempts to purge class types from the virtual machine. May take some
252   * time as it relies on garbage collection
253   */
254  private void purgeClasses() {
255    try {
256      // see if we can purge classes
257      ClassLoader prevCl = 
258        Thread.currentThread().getContextClassLoader();
259      ClassLoader urlCl = 
260        URLClassLoader.newInstance(new URL[] {new URL("file:.")}, prevCl);
261      Thread.currentThread().setContextClassLoader(urlCl);
262    } catch (Exception ex) {
263      ex.printStackTrace();
264    }
265  }
266 
267  /**
268   * Checks the hash table for failed/finished tasks. Any that have been
269   * around for an @seeCLEANUPTIMEOUT or more are removed. Clients are expected to check
270   * on the status of their remote tasks. Checking on the status of a
271   * finished/failed task will remove it from the hash table, therefore
272   * any failed/finished tasks left lying around for more than an hour
273   * suggest that their client has died..
274   *
275   */
276  private void purge() {
277    Enumeration keys = m_TaskStatus.keys();
278    long currentTime = System.currentTimeMillis();
279    System.err.println("RemoteEngine purge. Current time : " + currentTime);
280    while (keys.hasMoreElements()) {
281      String taskId = (String)keys.nextElement();
282      System.err.print("Examining task id : " + taskId + "... ");
283      String timeString = taskId.substring(0, taskId.indexOf(':'));
284      long ts = Long.valueOf(timeString).longValue();
285      if (currentTime - ts > CLEANUPTIMEOUT) {
286        TaskStatusInfo tsi = (TaskStatusInfo)m_TaskStatus.get(taskId);
287        if ((tsi != null) 
288            && (tsi.getExecutionStatus() == TaskStatusInfo.FINISHED ||
289                tsi.getExecutionStatus() == TaskStatusInfo.FAILED)) {
290          System.err.println("\nTask id : " 
291                             + taskId + " has gone stale. Removing.");
292          m_TaskStatus.remove(taskId);
293          tsi.setTaskResult(null);
294          tsi = null;
295        }
296      } else {
297        System.err.println("ok.");
298      }
299    }
300    if (m_TaskStatus.size() == 0) {
301      purgeClasses();
302    }
303  }
304 
305  /**
306   * Returns the revision string.
307   *
308   * @return            the revision
309   */
310  public String getRevision() {
311    return RevisionUtils.extract("$Revision: 1.12 $");
312  }
313
314  /**
315   * Main method. Gets address of the local host, creates a remote engine
316   * object and binds it in the RMI registry. If there is no RMI registry,
317   * then it tries to create one with default port 1099.
318   *
319   * @param args
320   */
321  public static void main(String[] args) {
322    if (System.getSecurityManager() == null) {
323      System.setSecurityManager(new RMISecurityManager());
324    }
325   
326    int port = 1099;
327    InetAddress localhost = null;
328    try {
329      localhost = InetAddress.getLocalHost();
330      System.err.println("Host name : "+localhost.getHostName());
331    } catch (Exception ex) {
332      ex.printStackTrace();
333    }
334    String name;
335    if (localhost != null) {
336      name = localhost.getHostName();
337    } else {
338      name = "localhost";
339    }
340   
341    // get optional port
342    try {
343      String portOption = Utils.getOption("p", args);
344      if (!portOption.equals("")) 
345        port = Integer.parseInt(portOption);
346    } catch (Exception ex) {
347      System.err.println("Usage : -p <port>");
348    }
349
350    if (port != 1099) {
351      name = name + ":" + port;
352    }
353    name = "//"+name+"/RemoteEngine";
354   
355    try {
356      Compute engine = new RemoteEngine(name);
357     
358      try {     
359        Naming.rebind(name, engine);
360        System.out.println("RemoteEngine bound in RMI registry");
361      } catch (RemoteException ex) {
362        // try to bootstrap a new registry
363        System.err.println("Attempting to start RMI registry on port " + port + "...");
364        java.rmi.registry.LocateRegistry.createRegistry(port);
365        Naming.bind(name, engine);
366        System.out.println("RemoteEngine bound in RMI registry");
367      }
368     
369    } catch (Exception e) {
370      System.err.println("RemoteEngine exception: " + 
371                         e.getMessage());
372      e.printStackTrace();
373    }
374  }
375}
Note: See TracBrowser for help on using the repository browser.