Author: lahiru
Date: Tue May 7 16:24:14 2013
New Revision: 1479970
URL: http://svn.apache.org/r1479970
Log:
minor fix for parallel node execution in workflow engine.
Modified:
airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
Modified: airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java?rev=1479970&r1=1479969&r2=1479970&view=diff
==============================================================================
--- airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
(original)
+++ airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
Tue May 7 16:24:14 2013
@@ -78,6 +78,7 @@ import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
+import java.awt.image.VolatileImage;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.*;
@@ -101,6 +102,7 @@ public class WorkflowInterpreter {
private WorkflowInterpreterInteractor interactor;
+
public static ThreadLocal<WorkflowInterpreterConfiguration> workflowInterpreterConfigurationThreadLocal
=
new ThreadLocal<WorkflowInterpreterConfiguration>();
@@ -163,30 +165,21 @@ public class WorkflowInterpreter {
this.config.getNotifier().workflowStarted(values, keywords);
this.config.getConfiguration().setContextHeader(WorkflowContextHeaderBuilder.getCurrentContextHeader());
- int lastReadNodeSize = -1;
while (this.getWorkflow().getExecutionState() != WorkflowExecutionState.STOPPED) {
- if (getRemainNodesDynamically() == 0) {
- notifyViaInteractor(WorkflowExecutionMessage.EXECUTION_STATE_CHANGED, WorkflowExecutionState.PAUSED);
- }
- // ok we have paused sleep
- while (this.getWorkflow().getExecutionState() == WorkflowExecutionState.PAUSED) {
- try {
- Thread.sleep(400);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- // get task list and execute them
- ArrayList<Node> readyNodes = this.getReadyNodesDynamically();
- while(lastReadNodeSize != 0 && lastReadNodeSize == this.getReadyNodesDynamically().size()
&& !(readyNodes.get(0) instanceof OutputNode) ){
+ ArrayList<Node> readyNodes = this.getReadyNodesDynamically();
+ ArrayList<Thread> threadList = new ArrayList<Thread>();
+ if (getRemainNodesDynamically() == 0) {
+ notifyViaInteractor(WorkflowExecutionMessage.EXECUTION_STATE_CHANGED,
WorkflowExecutionState.PAUSED);
+ }
+ // ok we have paused sleep
+ while (this.getWorkflow().getExecutionState() == WorkflowExecutionState.PAUSED)
{
try {
- System.out.println(lastReadNodeSize);
- Thread.sleep(400);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ Thread.sleep(400);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
- lastReadNodeSize = readyNodes.size();
+ // get task list and execute them
for (final Node node : readyNodes) {
if (node.isBreak()) {
this.notifyPause();
@@ -198,23 +191,33 @@ public class WorkflowInterpreter {
}
// Since this is an independent node execution we can run these nodes
in separate threads.
-// Thread th = new Thread() {
-//
-// public synchronized void run() {
-// try {
+ Thread th = new Thread() {
+ public synchronized void run() {
+ try {
executeDynamically(node);
-// } catch (WorkflowException e) {
-// log.error("Error execution workflow Node : " + node.getID());
-// return;
-// }
-// }
-// };
-// th.start();
+ } catch (WorkflowException e) {
+ log.error("Error execution workflow Node : " + node.getID());
+ return;
+ }
+ }
+ };
+ threadList.add(th);
+ th.start();
if (this.getWorkflow().getExecutionState() == WorkflowExecutionState.STEP) {
this.getWorkflow().setExecutionState(WorkflowExecutionState.PAUSED);
break;
}
}
+ //This thread waits until parallel nodes get finished to send the outputs
dynamically.
+ for(Thread th:threadList){
+ try {
+ th.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
+ }
+ }
+ // Above statement set the nodeCount back to 0.
+
// TODO commented this for foreach, fix this.
sendOutputsDynamically();
// Dry run sleep a lil bit to release load
@@ -543,9 +546,8 @@ public class WorkflowInterpreter {
} else if (component instanceof TerminateInstanceComponent) {
handleAmazonTerminateInstance(node);
} else {
- throw new WorkFlowInterpreterException("Encountered Node that cannot be executed:" + node);
+ throw new WorkFlowInterpreterException("Encountered Node that cannot be executed:"
+ node);
}
-
}
private void handleAmazonTerminateInstance(final Node node)
|