source: branches/MetisMQI/src/main/java/weka/gui/beans/FlowRunner.java @ 32

Last change on this file since 32 was 29, checked in by gnappo, 14 years ago

Taggata versione per la demo e aggiunto branch.

File size: 14.5 KB
Line 
1/*
2 *    This program is free software; you can redistribute it and/or modify
3 *    it under the terms of the GNU General Public License as published by
4 *    the Free Software Foundation; either version 2 of the License, or
5 *    (at your option) any later version.
6 *
7 *    This program is distributed in the hope that it will be useful,
8 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
9 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 *    GNU General Public License for more details.
11 *
12 *    You should have received a copy of the GNU General Public License
13 *    along with this program; if not, write to the Free Software
14 *    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
15 */
16
17/*
18 *    FlowRunner.java
19 *    Copyright (C) 2008 University of Waikato, Hamilton, New Zealand
20 *
21 */
22
23package weka.gui.beans;
24
25import java.text.SimpleDateFormat;
26import java.util.ArrayList;
27import java.util.Date;
28import java.util.Set;
29import java.util.TreeMap;
30import java.util.Vector;
31import java.io.File;
32import java.io.FileInputStream;
33import java.io.FileOutputStream;
34import java.io.IOException;
35import java.io.InputStream;
36import java.io.InputStreamReader;
37import java.io.ObjectInputStream;
38import java.io.ObjectOutputStream;
39import java.io.OutputStream;
40
41import weka.core.Environment;
42import weka.core.EnvironmentHandler;
43import weka.core.RevisionHandler;
44import weka.core.RevisionUtils;
45import weka.gui.Logger;
46import weka.gui.beans.xml.*;
47
48/**
49 * Small utility class for executing KnowledgeFlow
50 * flows outside of the KnowledgeFlow application
51 *
52 * @author Mark Hall (mhall{[at]}pentaho{[dot]}org)
53 * @version $Revision: 5928 $
54 */
55public class FlowRunner implements RevisionHandler {
56
57  /** The potential flow(s) to execute */
58  protected Vector m_beans;
59
60  protected int m_runningCount = 0;
61
62  protected transient Logger m_log = null;
63 
64  protected transient Environment m_env;
65 
66  /** run each Startable bean sequentially? (default in parallel) */
67  protected boolean m_startSequentially = false;
68 
69  public static class SimpleLogger implements weka.gui.Logger {
70    SimpleDateFormat m_DateFormat = 
71      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
72   
73    public void logMessage(String lm) {
74      System.out.println(m_DateFormat.format(new Date()) + ": " + lm);
75    }
76   
77    public void statusMessage(String lm) {
78      System.out.println(m_DateFormat.format(new Date()) + ": " + lm); 
79    }
80  }
81
82  /**
83   * Constructor
84   */
85  public FlowRunner() {
86    // make sure that properties and plugins are loaded
87    KnowledgeFlowApp.loadProperties();
88  }
89
90  public void setLog(Logger log) {
91    m_log = log;
92  }
93 
94  protected void runSequentially(TreeMap<Integer, Startable> startables) {
95    Set<Integer> s = startables.keySet();
96    for (Integer i : s) {
97      try {
98        Startable startPoint = startables.get(i);
99        startPoint.start();
100        Thread.sleep(200);
101        waitUntilFinished();
102      } catch (Exception ex) {
103        ex.printStackTrace();
104        if (m_log != null) {
105          m_log.logMessage(ex.getMessage());
106          m_log.logMessage("Aborting...");
107        } else {
108          System.err.println(ex.getMessage());
109          System.err.println("Aborting...");
110        }
111        break;
112      }
113    }
114  }
115
116  protected synchronized void launchThread(final Startable s, final int flowNum) {
117    Thread t = new Thread() {
118        private int m_num = flowNum;
119        public void run() {
120          try {
121            s.start();
122          } catch (Exception ex) {
123            ex.printStackTrace();
124            if (m_log != null) {
125              m_log.logMessage(ex.getMessage());
126            } else {
127              System.err.println(ex.getMessage());
128            }
129          } finally {
130            /*
131            if (m_log != null) {
132              m_log.logMessage("[FlowRunner] flow " + m_num + " finished.");
133            } else {
134              System.out.println("[FlowRunner] Flow " + m_num + " finished.");
135            }
136            */
137            decreaseCount();
138          }
139        }
140      };
141    m_runningCount++;
142    t.setPriority(Thread.MIN_PRIORITY);
143    t.start();
144  }
145
146  protected synchronized void decreaseCount() {
147    m_runningCount--;
148  }
149
150  public synchronized void stopAllFlows() {
151    for (int i = 0; i < m_beans.size(); i++) {
152      BeanInstance temp = (BeanInstance)m_beans.elementAt(i);
153      if (temp.getBean() instanceof BeanCommon) {
154        // try to stop any execution
155        ((BeanCommon)temp.getBean()).stop();
156      }
157    }
158  }
159
160  /**
161   * Waits until all flows have finished executing before returning
162   *
163   */
164  public void waitUntilFinished() {
165    try {
166      while (m_runningCount > 0) {
167        Thread.sleep(200);
168      }
169     
170      // now poll beans to see if there are any that are still busy
171      // (i.e. any multi-threaded ones that queue data instead of blocking)
172      while (true) {
173        boolean busy = false;
174        for (int i = 0; i < m_beans.size(); i++) {
175          BeanInstance temp = (BeanInstance)m_beans.elementAt(i);
176          if (temp.getBean() instanceof BeanCommon) {
177            if (((BeanCommon)temp.getBean()).isBusy()) {
178              busy = true;
179              break; // for
180            }           
181          }
182        }
183        if (busy) {
184          Thread.sleep(3000);
185        } else {
186          break; // while
187        }
188      }
189    } catch (Exception ex) {
190      if (m_log != null) {
191        m_log.logMessage("[FlowRunner] Attempting to stop all flows...");
192      } else {
193        System.err.println("[FlowRunner] Attempting to stop all flows...");
194      }
195      stopAllFlows();
196      //      ex.printStackTrace();
197    }
198  }
199
200  /**
201   * Load a serialized KnowledgeFlow (either binary or xml)
202   *
203   * @param fileName the name of the file to load from
204   * @throws Exception if something goes wrong
205   */
206  public void load(String fileName) throws Exception {
207    if (!fileName.endsWith(".kf") && !fileName.endsWith(".kfml")) {
208      throw new Exception("Can only load and run binary or xml serialized KnowledgeFlows "
209                          + "(*.kf | *.kfml)");
210    }
211   
212    if (fileName.endsWith(".kf")) {
213      loadBinary(fileName);
214    } else if (fileName.endsWith(".kfml")) {
215      loadXML(fileName);
216    }
217  }
218
219  /**
220   * Load a binary serialized KnowledgeFlow
221   *
222   * @param fileName the name of the file to load from
223   * @throws Exception if something goes wrong
224   */
225  public void loadBinary(String fileName) throws Exception {
226    if (!fileName.endsWith(".kf")) {
227      throw new Exception("File must be a binary flow (*.kf)");
228    }
229
230    InputStream is = new FileInputStream(fileName);
231    ObjectInputStream ois = new ObjectInputStream(is);
232    m_beans = (Vector)ois.readObject();
233   
234    // don't need the graphical connections
235    ois.close();
236   
237    if (m_env != null) {
238      String parentDir = (new File(fileName)).getParent();
239      if (parentDir == null) {
240        parentDir = "./";
241      }
242      m_env.addVariable("Internal.knowledgeflow.directory", 
243          parentDir);
244    }
245  }
246
247  /**
248   * Load an XML serialized KnowledgeFlow
249   *
250   * @param fileName the name of the file to load from
251   * @throws Exception if something goes wrong
252   */
253  public void loadXML(String fileName) throws Exception {
254    if (!fileName.endsWith(".kfml")) {
255      throw new Exception("File must be an XML flow (*.kfml)");
256    }
257
258    XMLBeans xml = new XMLBeans(null, null);
259    Vector v = (Vector) xml.read(new File(fileName));
260    m_beans = (Vector) v.get(XMLBeans.INDEX_BEANINSTANCES);
261
262    if (m_env != null) {
263      String parentDir = (new File(fileName)).getParent();
264      if (parentDir == null) {
265        parentDir = "./";
266      }
267      m_env.addVariable("Internal.knowledgeflow.directory", 
268          parentDir);
269    } else {
270      System.err.println("++++++++++++ Environment variables null!!...");
271    }
272  }
273 
274  /**
275   * Get the vector holding the flow(s)
276   *
277   * @return the Vector holding the flow(s)
278   */
279  public Vector getFlows() {
280    return m_beans;
281  }
282
283  /**
284   * Set the vector holding the flows(s) to run
285   *
286   * @param beans the Vector holding the flows to run
287   */
288  public void setFlows(Vector beans) {
289    m_beans = beans;
290  }
291 
292  /**
293   * Set the environment variables to use. NOTE: this needs
294   * to be called BEFORE a load method is invoked to ensure
295   * that the ${Internal.knowledgeflow.directory} variable get
296   * set in the supplied Environment object.
297   *
298   * @param env the environment variables to use.
299   */
300  public void setEnvironment(Environment env) {
301    m_env = env;
302  }
303 
304  /**
305   * Get the environment variables that are in use.
306   *
307   * @return the environment variables that are in ues.
308   */
309  public Environment getEnvironment() {
310    return m_env;
311  }
312 
313  /**
314   * Set whether to launch Startable beans one after the other
315   * or all in parallel.
316   *
317   * @param s true if Startable beans are to be launched sequentially
318   */
319  public void setStartSequentially(boolean s) {
320    m_startSequentially = s;
321  }
322 
323  /**
324   * Gets whether Startable beans will be launched sequentially
325   * or all in parallel.
326   *
327   * @return true if Startable beans will be launched sequentially
328   */
329  public boolean getStartSequentially() {
330    return m_startSequentially;
331  }
332
333  /**
334   * Launch all loaded KnowledgeFlow
335   *
336   * @throws Exception if something goes wrong during execution
337   */
338  public void run() throws Exception {
339    if (m_beans == null) {
340      throw new Exception("Don't seem to have any beans I can execute.");
341    }
342   
343    // register the log (if set) with the beans
344    for (int i = 0; i < m_beans.size(); i++) {
345      BeanInstance tempB = (BeanInstance)m_beans.elementAt(i);
346      if (m_log != null) {
347        if (tempB.getBean() instanceof BeanCommon) {
348          ((BeanCommon)tempB.getBean()).setLog(m_log);
349        }
350      }
351       
352      if (tempB.getBean() instanceof EnvironmentHandler) {
353        ((EnvironmentHandler)tempB.getBean()).setEnvironment(m_env);
354      }
355    }
356   
357    int numFlows = 1;
358
359    if (m_log != null) {
360      if (m_startSequentially) {
361        m_log.logMessage("[FlowRunner] launching flow start points sequentially...");
362      } else {
363        m_log.logMessage("[FlowRunner] launching flow start points in parallel...");
364      }
365    }
366    TreeMap<Integer, Startable> startables = new TreeMap<Integer, Startable>();
367    // look for a Startable bean...
368    for (int i = 0; i < m_beans.size(); i++) {
369      BeanInstance tempB = (BeanInstance)m_beans.elementAt(i);
370      if (tempB.getBean() instanceof Startable) {
371        Startable s = (Startable)tempB.getBean();
372        // start that sucker (if it's happy to be started)...
373        if (!m_startSequentially) {
374          if (s.getStartMessage().charAt(0) != '$') {
375            if (m_log != null) {
376              m_log.logMessage("[FlowRunner] Launching flow "+numFlows+"...");
377            } else {
378              System.out.println("[FlowRunner] Launching flow "+numFlows+"...");
379            }
380            launchThread(s, numFlows);
381            numFlows++;
382          } else {
383            String beanName = s.getClass().getName();
384            if (s instanceof BeanCommon) {
385              String customName = ((BeanCommon)s).getCustomName();
386              beanName = customName;
387            }
388            if (m_log != null) {
389              m_log.logMessage("[FlowRunner] WARNING: Can't start " + beanName + " at this time.");
390            } else {
391              System.out.println("[FlowRunner] WARNING: Can't start " + beanName + " at this time.");
392            }
393          }
394        } else {
395          boolean ok = false;
396          Integer position = null;
397          String beanName = s.getClass().getName();
398          if (s instanceof BeanCommon) {
399            String customName = ((BeanCommon)s).getCustomName();
400            beanName = customName;
401            // see if we have a parseable integer at the start of the name
402            if (customName.indexOf(':') > 0) {
403              String startPos = customName.substring(0, customName.indexOf(':'));
404              try {
405                position = new Integer(startPos);
406                ok = true;
407              } catch (NumberFormatException n) {
408              }
409            }           
410          }
411         
412          if (!ok) {
413            if (startables.size() == 0) {
414              position = new Integer(0);
415            } else {
416              int newPos = startables.lastKey().intValue();
417              newPos++;
418              position = new Integer(newPos);
419            }
420          }
421         
422          if (s.getStartMessage().charAt(0) != '$') {
423            if (m_log != null) {
424              m_log.logMessage("[FlowRunner] adding start point " + beanName
425                  + " to the execution list (position " + position + ")");
426            } else {
427              System.out.println("[FlowRunner] adding start point " + beanName
428                  + " to the execution list (position " + position + ")");
429            }
430            startables.put(position, s);
431          } else {
432            if (m_log != null) {
433              m_log.logMessage("[FlowRunner] WARNING: Can't start " + beanName + " at this time.");
434            } else {
435              System.out.println("[FlowRunner] WARNING: Can't start " + beanName + " at this time.");
436            }
437          }
438        }
439      }
440    }
441   
442    if (m_startSequentially) {
443      runSequentially(startables);
444    }
445  }
446
447  /**
448   * Main method for testing this class. <p>
449   * <br>Usage:<br><br>
450   * <pre>Usage:\n\nFlowRunner <serialized kf file></pre>
451   *
452   * @param args command line arguments
453   */
454  public static void main(String[] args) {
455    weka.core.logging.Logger.log(weka.core.logging.Logger.Level.INFO, "Logging started");
456    if (args.length < 1) {
457      System.err.println("Usage:\n\nFlowRunner <serialized kf file> [-s]\n\n" 
458          + "\tUse -s to launch start points sequentially (default launches "
459          + "in parallel).");
460    } else {
461      try {
462        FlowRunner fr = new FlowRunner();
463        FlowRunner.SimpleLogger sl = new FlowRunner.SimpleLogger();
464        String fileName = args[0];
465       
466        if (args.length == 2 && args[1].equals("-s")) {
467          fr.setStartSequentially(true);
468        }
469       
470        // start with the system-wide vars
471        Environment env = Environment.getSystemWide();
472
473        fr.setLog(sl);
474        fr.setEnvironment(env);
475       
476        fr.load(fileName);
477        fr.run();
478        fr.waitUntilFinished();
479        System.out.println("Finished all flows.");
480        System.exit(1);
481      } catch (Exception ex) {
482        ex.printStackTrace();
483        System.err.println(ex.getMessage());
484      }
485    }                         
486  }
487
488  public String getRevision() {
489    return "$Revision: 5928 $";
490  }
491}
Note: See TracBrowser for help on using the repository browser.