servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r436701 - in /incubator/servicemix/trunk/servicemix-beanflow/src: main/java/org/apache/servicemix/beanflow/ test/java/org/apache/servicemix/beanflow/
Date Fri, 25 Aug 2006 07:49:17 GMT
Author: jstrachan
Date: Fri Aug 25 00:49:16 2006
New Revision: 436701

URL: http://svn.apache.org/viewvc?rev=436701&view=rev
Log:
simplified the workflow code to make it alittle easier to follow and reduce the possibility
of timing issues together with fixing of a few timing related issues in the join support &
activity classes to ensure we don't start things twice or set things to be started after they've
stopped

Modified:
    incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java
    incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java
    incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java
    incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java

Modified: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java?rev=436701&r1=436700&r2=436701&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java
(original)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java
Fri Aug 25 00:49:16 2006
@@ -45,7 +45,7 @@
     public void start() {
         if (state.compareAndSet(Transitions.Initialised, Transitions.Starting)) {
             doStart();
-            state.set(Transitions.Started);
+            state.compareAndSet(Transitions.Starting, Transitions.Started);
         }
     }
 

Modified: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java?rev=436701&r1=436700&r2=436701&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java
(original)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java
Fri Aug 25 00:49:16 2006
@@ -17,7 +17,9 @@
 package org.apache.servicemix.beanflow;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * A useful base class for a activity which joins on the success of a collection
@@ -28,6 +30,7 @@
 public abstract class JoinSupport extends TimeoutActivity {
 
     private List<Activity> children = new ArrayList<Activity>();
+    private Set<Activity> toBeStarted = new HashSet();
 
     public JoinSupport() {
     }
@@ -37,6 +40,7 @@
             for (Activity activity : activities) {
                 activity.getState().addRunnable(this);
                 children.add(activity);
+                toBeStarted.add(activity);
             }
         }
     }
@@ -46,6 +50,7 @@
             for (Activity activity : activities) {
                 activity.getState().addRunnable(this);
                 children.add(activity);
+                toBeStarted.add(activity);
             }
         }
     }
@@ -91,9 +96,10 @@
 
         // lets make sure that the child activities are started properly
         synchronized (children) {
-            for (Activity child : children) {
+            for (Activity child : toBeStarted) {
                 child.start();
             }
+            toBeStarted.clear();
         }
     }
 

Modified: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java?rev=436701&r1=436700&r2=436701&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java
(original)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java
Fri Aug 25 00:49:16 2006
@@ -23,8 +23,10 @@
 import org.apache.servicemix.beanflow.support.ReflectionInterpreter;
 
 import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -38,10 +40,9 @@
 
     private Executor executor;
     private Interpreter interpreter;
-    private State<T> step;
-    private T nextStep;
     private Timer timer = new Timer();
     private AtomicBoolean suspended = new AtomicBoolean();
+    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
 
     /**
      * TODO is there a way to reference the parameter type of this class?
@@ -57,65 +58,53 @@
     public Workflow(T firstStep) {
         this(Executors.newSingleThreadExecutor(), firstStep);
     }
-
+    
     public Workflow(Executor executor, T firstStep) {
-        this(executor, new ReflectionInterpreter(), new DefaultState<T>(firstStep));
+        this(executor, new ReflectionInterpreter(), firstStep);
     }
-
-    public Workflow(Executor executor, Interpreter interpreter, State<T> step) {
+    
+    public Workflow(Executor executor, Interpreter interpreter, T firstStep) {
         this.executor = executor;
         this.interpreter = interpreter;
-        this.step = step;
-
-        T firstStep = step.get();
+        
         if (firstStep instanceof Enum) {
             validateStepsExist(firstStep.getClass());
         }
+        setNextStep(firstStep);
     }
 
     /**
      * Returns the next step which will be executed asynchronously
      */
     public T getNextStep() {
-        return nextStep;
+        return queue.peek();
     }
 
     /**
      * Sets the next step to be executed when the current step completes
      */
     public void setNextStep(T stepName) {
-        this.nextStep = stepName;
-        suspended.set(false);
-        nextStep();
+        queue.add(stepName);
+        executor.execute(this);
     }
 
     public void run() {
-        if (!isSuspended() && !isStopped()) {
-            T stepToExecute = step.get();
-            if (stepToExecute != null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("About to execute step: " + stepToExecute);
+        while (!isStopped()) {
+            try {
+                T stepToExecute = queue.poll();
+                if (stepToExecute != null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("About to execute step: " + stepToExecute);
+                    }
+                    interpreter.executeStep(stepToExecute, this);
+                }
+                else { 
+                    break;
                 }
-                
-                interpreter.executeStep(stepToExecute, this);
-                nextStep();
             }
-        }
-    }
-
-    public void nextStep() {
-        if (nextStep != null) {
-            T stepToExecute = nextStep;
-            nextStep = null;
-            // lets fire any conditions
-            // This very function is a listener of step, so setting the step
-            // will trigger ourself.  We just need to return now.
-            step.set(stepToExecute);
-        }
-
-        // if we are not stopped lets add a task to re-evaluate ourself
-        if (!isStopped() && !isSuspended()) {
-            executor.execute(this);
+            catch (RuntimeException e) {
+                log.warn("Caught: " + e, e);
+            }
         }
     }
 
@@ -180,7 +169,7 @@
      * Returns true if this workflow has a next step to execute
      */
     public boolean isNextStepAvailable() {
-        return nextStep != null;
+        return !queue.isEmpty();
     }
 
     /**
@@ -188,10 +177,8 @@
      */
     public Runnable createGoToStepTask(final T joinedStep) {
         return new Runnable() {
-
             public void run() {
                 setNextStep(joinedStep);
-                //nextStep();
             }
         };
     }

Modified: incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java?rev=436701&r1=436700&r2=436701&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java
(original)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java
Fri Aug 25 00:49:16 2006
@@ -36,7 +36,7 @@
         // START SNIPPET: example
         ExampleParallelBean parallelBean = new ExampleParallelBean();
         ParallelActivity activity = ParallelActivity.newParallelMethodActivity(executor,
parallelBean);
-        activity.startWithTimeout(timer, 20000);
+        activity.startWithTimeout(timer, 2000);
         // END SNIPPET: example
 
         activity.join(10, TimeUnit.SECONDS);



Mime
View raw message