airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject svn commit: r1479970 - /airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
Date Tue, 07 May 2013 16:24:15 GMT
Author: lahiru
Date: Tue May  7 16:24:14 2013
New Revision: 1479970

URL: http://svn.apache.org/r1479970
Log:
minor fix for parallel node execution in workflow engine.

Modified:
    airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java

Modified: airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java?rev=1479970&r1=1479969&r2=1479970&view=diff
==============================================================================
--- airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
(original)
+++ airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
Tue May  7 16:24:14 2013
@@ -78,6 +78,7 @@ import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathConstants;
 import javax.xml.xpath.XPathExpressionException;
 import javax.xml.xpath.XPathFactory;
+import java.awt.image.VolatileImage;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.*;
@@ -101,6 +102,7 @@ public class WorkflowInterpreter {
 
 	private WorkflowInterpreterInteractor interactor;
 
+
     public static ThreadLocal<WorkflowInterpreterConfiguration> workflowInterpreterConfigurationThreadLocal
=
             new ThreadLocal<WorkflowInterpreterConfiguration>();
 
@@ -163,30 +165,21 @@ public class WorkflowInterpreter {
 			this.config.getNotifier().workflowStarted(values, keywords);
 			this.config.getConfiguration().setContextHeader(WorkflowContextHeaderBuilder.getCurrentContextHeader());
 
-            int lastReadNodeSize = -1;
 			while (this.getWorkflow().getExecutionState() != WorkflowExecutionState.STOPPED) {
-				if (getRemainNodesDynamically() == 0) {
-					notifyViaInteractor(WorkflowExecutionMessage.EXECUTION_STATE_CHANGED, WorkflowExecutionState.PAUSED);
-				}
-				// ok we have paused sleep
-				while (this.getWorkflow().getExecutionState() == WorkflowExecutionState.PAUSED) {
-					try {
-						Thread.sleep(400);
-					} catch (InterruptedException e) {
-						e.printStackTrace();
-					}
-				}
-				// get task list and execute them
-				ArrayList<Node> readyNodes = this.getReadyNodesDynamically();
-                while(lastReadNodeSize != 0  && lastReadNodeSize == this.getReadyNodesDynamically().size()
&& !(readyNodes.get(0) instanceof OutputNode) ){
+                ArrayList<Node> readyNodes = this.getReadyNodesDynamically();
+                ArrayList<Thread> threadList = new ArrayList<Thread>();
+                if (getRemainNodesDynamically() == 0) {
+                    notifyViaInteractor(WorkflowExecutionMessage.EXECUTION_STATE_CHANGED,
WorkflowExecutionState.PAUSED);
+                }
+                // ok we have paused sleep
+                while (this.getWorkflow().getExecutionState() == WorkflowExecutionState.PAUSED)
{
                     try {
-                        System.out.println(lastReadNodeSize);
-						Thread.sleep(400);
-					} catch (InterruptedException e) {
-						e.printStackTrace();
-					}
+                        Thread.sleep(400);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
                 }
-                lastReadNodeSize = readyNodes.size();
+                // get task list and execute them
 				for (final Node node : readyNodes) {
 					if (node.isBreak()) {
 						this.notifyPause();
@@ -198,23 +191,33 @@ public class WorkflowInterpreter {
 					}
 
                     // Since this is an independent node execution we can run these nodes
in separate threads.
-//                    Thread th = new Thread() {
-//
-//                        public synchronized void run() {
-//                            try {
+                    Thread th = new Thread() {
+                        public synchronized void run() {
+                            try {
                                 executeDynamically(node);
-//                            } catch (WorkflowException e) {
-//                                log.error("Error execution workflow Node : " + node.getID());
-//                                return;
-//                            }
-//                        }
-//                    };
-//                    th.start();
+                            } catch (WorkflowException e) {
+                                log.error("Error execution workflow Node : " + node.getID());
+                                return;
+                            }
+                        }
+                    };
+                    threadList.add(th);
+                    th.start();
 					if (this.getWorkflow().getExecutionState() == WorkflowExecutionState.STEP) {
 						this.getWorkflow().setExecutionState(WorkflowExecutionState.PAUSED);
 						break;
 					}
 				}
+                //This thread waits until parallel nodes get finished to send the outputs
dynamically.
+               for(Thread th:threadList){
+                   try {
+                       th.join();
+                   } catch (InterruptedException e) {
+                       e.printStackTrace();  //To change body of catch statement use File
| Settings | File Templates.
+                   }
+               }
+                // Above statement set the nodeCount back to 0.
+
 				// TODO commented this for foreach, fix this.
 				sendOutputsDynamically();
 				// Dry run sleep a lil bit to release load
@@ -543,9 +546,8 @@ public class WorkflowInterpreter {
 		} else if (component instanceof TerminateInstanceComponent) {
 			handleAmazonTerminateInstance(node);
 		} else {
-			throw new WorkFlowInterpreterException("Encountered Node that cannot be executed:" + node);
+            throw new WorkFlowInterpreterException("Encountered Node that cannot be executed:"
+ node);
 		}
-
 	}
 
 	private void handleAmazonTerminateInstance(final Node node)



Mime
View raw message