/* * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ /* * RemoteEngine.java * Copyright (C) 2000 University of Waikato, Hamilton, New Zealand * */ package weka.experiment; import weka.core.Queue; import weka.core.RevisionHandler; import weka.core.RevisionUtils; import weka.core.Utils; import java.net.InetAddress; import java.net.URL; import java.net.URLClassLoader; import java.rmi.Naming; import java.rmi.RMISecurityManager; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; import java.util.Enumeration; import java.util.Hashtable; /** * A general purpose server for executing Task objects sent via RMI. * * @author Mark Hall (mhall@cs.waikato.ac.nz) * @version $Revision: 1.12 $ */ public class RemoteEngine extends UnicastRemoteObject implements Compute, RevisionHandler { /** for serialization */ private static final long serialVersionUID = -1021538162895448259L; /** The name of the host that this engine is started on */ private String m_HostName = "local"; /** A queue of waiting tasks */ private Queue m_TaskQueue = new Queue(); /** A queue of corresponding ID's for tasks */ private Queue m_TaskIdQueue = new Queue(); /** A hashtable of experiment status */ private Hashtable m_TaskStatus = new Hashtable(); /** Is there a task running */ private boolean m_TaskRunning = false; /** Clean up interval (in ms) */ protected static long CLEANUPTIMEOUT = 3600000; /** * Constructor * @param hostName name of the host * @exception RemoteException if something goes wrong */ public RemoteEngine(String hostName) throws RemoteException { super(); m_HostName = hostName; /* launch a clean-up thread. Will purge any failed or finished tasks still in the TaskStatus hashtable after an hour */ Thread cleanUpThread; cleanUpThread = new Thread() { public void run() { while (true) { try { // sleep for a while Thread.sleep(CLEANUPTIMEOUT); } catch (InterruptedException ie) {} if (m_TaskStatus.size() > 0) { purge(); } else { System.err.println("RemoteEngine : purge - no tasks to check."); } } } }; cleanUpThread.setPriority(Thread.MIN_PRIORITY); cleanUpThread.setDaemon(true); cleanUpThread.start(); } /** * Takes a task object and queues it for execution * @param t the Task object to execute * @return an identifier for the Task that can be used when querying * Task status */ public synchronized Object executeTask(Task t) throws RemoteException { String taskId = ""+System.currentTimeMillis()+":"; taskId += t.hashCode(); addTaskToQueue(t, taskId); return taskId; // return t.execute(); } /** * Returns status information on a particular task * * @param taskId the ID of the task to check * @return a TaskStatusInfo encapsulating task status info * @exception Exception if an error occurs */ public Object checkStatus(Object taskId) throws Exception { TaskStatusInfo inf = (TaskStatusInfo)m_TaskStatus.get(taskId); if (inf == null) { throw new Exception("RemoteEngine ("+m_HostName+") : Task not found."); } TaskStatusInfo result = new TaskStatusInfo(); result.setExecutionStatus(inf.getExecutionStatus()); result.setStatusMessage(inf.getStatusMessage()); result.setTaskResult(inf.getTaskResult()); if (inf.getExecutionStatus() == TaskStatusInfo.FINISHED || inf.getExecutionStatus() == TaskStatusInfo.FAILED) { System.err.println("Finished/failed Task id : " + taskId + " checked by client. Removing."); inf.setTaskResult(null); inf = null; m_TaskStatus.remove(taskId); } inf = null; return result; } /** * Adds a new task to the queue. * * @param t a Task value to be added * @param taskId the id of the task to be added */ private synchronized void addTaskToQueue(Task t, String taskId) { TaskStatusInfo newTask = t.getTaskStatus(); if (newTask == null) { newTask = new TaskStatusInfo(); } m_TaskQueue.push(t); m_TaskIdQueue.push(taskId); newTask.setStatusMessage("RemoteEngine (" +m_HostName +") : task " + taskId + " queued at postion: " +m_TaskQueue.size()); // add task status to HashTable m_TaskStatus.put(taskId, newTask); System.err.println("Task id : " + taskId + " Queued."); if (m_TaskRunning == false) { startTask(); } } /** * Checks to see if there are any waiting tasks, and if no task is * currently running starts a waiting task. */ private void startTask() { if (m_TaskRunning == false && m_TaskQueue.size() > 0) { Thread activeTaskThread; activeTaskThread = new Thread() { public void run() { m_TaskRunning = true; Task currentTask = (Task)m_TaskQueue.pop(); String taskId = (String)m_TaskIdQueue.pop(); TaskStatusInfo tsi = (TaskStatusInfo)m_TaskStatus.get(taskId); tsi.setExecutionStatus(TaskStatusInfo.PROCESSING); tsi.setStatusMessage("RemoteEngine (" +m_HostName +") : task " + taskId + " running..."); try { System.err.println("Launching task id : " + taskId + "..."); currentTask.execute(); TaskStatusInfo runStatus = currentTask.getTaskStatus(); tsi.setExecutionStatus(runStatus.getExecutionStatus()); tsi.setStatusMessage("RemoteExperiment (" +m_HostName+") " +runStatus.getStatusMessage()); tsi.setTaskResult(runStatus.getTaskResult()); } catch (Error er) { // Object initialization can raise Error, which are not subclass of Exception tsi.setExecutionStatus(TaskStatusInfo.FAILED); if (er.getCause() instanceof java.security.AccessControlException) { tsi.setStatusMessage("RemoteEngine (" +m_HostName +") : security error, check remote policy file."); System.err.println("Task id " + taskId + " Failed! Check remote policy file"); } else { tsi.setStatusMessage("RemoteEngine (" +m_HostName +") : unknown initialization error."); System.err.println("Task id " + taskId + " Unknown initialization error"); } } catch (Exception ex) { tsi.setExecutionStatus(TaskStatusInfo.FAILED); if (ex instanceof java.io.FileNotFoundException) { tsi.setStatusMessage("RemoteEngine (" +m_HostName +") : " + ex.getMessage()); System.err.println("Task id " + taskId + " Failed, " + ex.getMessage()); } else { tsi.setStatusMessage("RemoteEngine (" +m_HostName +") : task " + taskId + " failed."); System.err.println("Task id " + taskId + " Failed!"); } } finally { if (m_TaskStatus.size() == 0) { purgeClasses(); } m_TaskRunning = false; // start any waiting tasks startTask(); } } }; activeTaskThread.setPriority(Thread.MIN_PRIORITY); activeTaskThread.start(); } } /** * Attempts to purge class types from the virtual machine. May take some * time as it relies on garbage collection */ private void purgeClasses() { try { // see if we can purge classes ClassLoader prevCl = Thread.currentThread().getContextClassLoader(); ClassLoader urlCl = URLClassLoader.newInstance(new URL[] {new URL("file:.")}, prevCl); Thread.currentThread().setContextClassLoader(urlCl); } catch (Exception ex) { ex.printStackTrace(); } } /** * Checks the hash table for failed/finished tasks. Any that have been * around for an @seeCLEANUPTIMEOUT or more are removed. Clients are expected to check * on the status of their remote tasks. Checking on the status of a * finished/failed task will remove it from the hash table, therefore * any failed/finished tasks left lying around for more than an hour * suggest that their client has died.. * */ private void purge() { Enumeration keys = m_TaskStatus.keys(); long currentTime = System.currentTimeMillis(); System.err.println("RemoteEngine purge. Current time : " + currentTime); while (keys.hasMoreElements()) { String taskId = (String)keys.nextElement(); System.err.print("Examining task id : " + taskId + "... "); String timeString = taskId.substring(0, taskId.indexOf(':')); long ts = Long.valueOf(timeString).longValue(); if (currentTime - ts > CLEANUPTIMEOUT) { TaskStatusInfo tsi = (TaskStatusInfo)m_TaskStatus.get(taskId); if ((tsi != null) && (tsi.getExecutionStatus() == TaskStatusInfo.FINISHED || tsi.getExecutionStatus() == TaskStatusInfo.FAILED)) { System.err.println("\nTask id : " + taskId + " has gone stale. Removing."); m_TaskStatus.remove(taskId); tsi.setTaskResult(null); tsi = null; } } else { System.err.println("ok."); } } if (m_TaskStatus.size() == 0) { purgeClasses(); } } /** * Returns the revision string. * * @return the revision */ public String getRevision() { return RevisionUtils.extract("$Revision: 1.12 $"); } /** * Main method. Gets address of the local host, creates a remote engine * object and binds it in the RMI registry. If there is no RMI registry, * then it tries to create one with default port 1099. * * @param args */ public static void main(String[] args) { if (System.getSecurityManager() == null) { System.setSecurityManager(new RMISecurityManager()); } int port = 1099; InetAddress localhost = null; try { localhost = InetAddress.getLocalHost(); System.err.println("Host name : "+localhost.getHostName()); } catch (Exception ex) { ex.printStackTrace(); } String name; if (localhost != null) { name = localhost.getHostName(); } else { name = "localhost"; } // get optional port try { String portOption = Utils.getOption("p", args); if (!portOption.equals("")) port = Integer.parseInt(portOption); } catch (Exception ex) { System.err.println("Usage : -p "); } if (port != 1099) { name = name + ":" + port; } name = "//"+name+"/RemoteEngine"; try { Compute engine = new RemoteEngine(name); try { Naming.rebind(name, engine); System.out.println("RemoteEngine bound in RMI registry"); } catch (RemoteException ex) { // try to bootstrap a new registry System.err.println("Attempting to start RMI registry on port " + port + "..."); java.rmi.registry.LocateRegistry.createRegistry(port); Naming.bind(name, engine); System.out.println("RemoteEngine bound in RMI registry"); } } catch (Exception e) { System.err.println("RemoteEngine exception: " + e.getMessage()); e.printStackTrace(); } } }