servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r405803 - in /incubator/servicemix/trunk/servicemix-beanflow/src: main/java/org/apache/servicemix/beanflow/ test/java/org/apache/servicemix/beanflow/
Date Wed, 10 May 2006 17:31:59 GMT
Author: jstrachan
Date: Wed May 10 10:31:47 2006
New Revision: 405803

URL: http://svn.apache.org/viewcvs?rev=405803&view=rev
Log:
fixed up the lifecycle a little and added support for JoinAll and JoinQuorum flows

Added:
    incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinAll.java
  (with props)
    incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinQuorum.java
  (with props)
    incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/FlowTestSupport.java
  (with props)
    incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/JoinAllTest.java
  (with props)
Modified:
    incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Flow.java
    incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/FlowSupport.java
    incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/TimeoutFlow.java
    incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/TimeoutFlowTest.java

Modified: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Flow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Flow.java?rev=405803&r1=405802&r2=405803&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Flow.java
(original)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Flow.java
Wed May 10 10:31:47 2006
@@ -16,6 +16,8 @@
 
 package org.apache.servicemix.beanflow;
 
+import java.util.Timer;
+
 /**
  * Represents a flow instance which is a bean based workflow written using Java code.
  * A flow monitors various {@link State} objects and takes action when things change.
@@ -44,6 +46,12 @@
     public void start();
 
     /**
+     * For flows that support timeout based operation this helper method
+     * starts the flow and registers the timeout 
+     */
+    public void startWithTimeout(Timer timer, long timeout);
+    
+    /**
      * Stops the flow
      */
     public void stop();
@@ -63,4 +71,10 @@
      * Returns true if the flow has failed to complete succesfully
      */
     public boolean isFailed();
+    
+    /**
+     * If this flow has failed then return a reason for the failure
+     */
+    public String getFailedReason();
+
 }

Modified: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/FlowSupport.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/FlowSupport.java?rev=405803&r1=405802&r2=405803&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/FlowSupport.java
(original)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/FlowSupport.java
Wed May 10 10:31:47 2006
@@ -31,7 +31,7 @@
 public abstract class FlowSupport implements Runnable, Flow {
 
     private State<Transitions> state = new DefaultState<Transitions>(Transitions.Initialised);
-    private State<Boolean> failed = new DefaultState<Boolean>(Boolean.FALSE);
+    private State<String> failed = new DefaultState<String>();
     private Introspector introspector = new FieldIntrospector();
 
     /**
@@ -49,17 +49,17 @@
      */
     public void stop() {
         if (state.compareAndSet(Transitions.Started, Transitions.Stopping)) {
-            doStop();
             state.set(Transitions.Stopped);
+            doStop();
         }
     }
 
     /**
-     * Stops the flow with a failed state
+     * Stops the flow with a failed state, giving the reason for the failure
      */
-    public void fail() {
+    public void fail(String reason) {
         stop();
-        failed.set(true);
+        failed.set(reason);
     }
     
     /**
@@ -74,6 +74,10 @@
     }
 
     public boolean isFailed() {
+        return getFailedReason() != null;
+    }
+    
+    public String getFailedReason() {
         return failed.get();
     }
     

Added: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinAll.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinAll.java?rev=405803&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinAll.java
(added)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinAll.java
Wed May 10 10:31:47 2006
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.beanflow;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a flow which joins on the success of a number of child flows.
+ * 
+ * @version $Revision: $
+ */
+public class JoinAll extends TimeoutFlow {
+
+    private List<Flow> children = new ArrayList<Flow>();
+
+    public JoinAll() {
+    }
+
+    public JoinAll(List<Flow> flows) {
+        for (Flow flow : flows) {
+            addChildFlow(flow);
+        }
+    }
+
+    public JoinAll(Flow... flows) {
+        for (Flow flow : flows) {
+            addChildFlow(flow);
+        }
+    }
+
+    public void addChildFlow(Flow child) {
+        synchronized (children) {
+            child.getState().addRunnable(this);
+            children.add(child);
+        }
+    }
+
+    public void removeChildFlow(Flow child) {
+        synchronized (children) {
+            child.getState().removeRunnable(this);
+            children.remove(child);
+        }
+    }
+
+    @Override
+    public void run() {
+        if (isStopped()) {
+            return;
+        }
+        int childCount = 0;
+        int stoppedCount = 0;
+        int failedCount = 0;
+        synchronized (children) {
+            childCount = children.size();
+            for (Flow child : children) {
+                if (child.isStopped()) {
+                    stoppedCount++;
+                    if (child.isFailed()) {
+                        failedCount++;
+                    }
+                }
+            }
+        }
+        onChildStateChange(childCount, stoppedCount, failedCount);
+    }
+
+    @Override
+    protected void doStart() {
+        super.doStart();
+        
+        // lets make sure that the child flows are started properly
+        synchronized (children) {
+            for (Flow child : children) {
+                child.start();
+            }
+        }
+    }
+
+    /**
+     * Decide whether or not we are done based on the number of children, the
+     * number of child flows stopped and the number of failed flows
+     */
+    protected void onChildStateChange(int childCount, int stoppedCount, int failedCount)
{
+        if (childCount <= stoppedCount) {
+            if (failedCount > 0) {
+                fail("" + failedCount + " child workflows have failed");
+            }
+            else {
+                stop();
+            }
+        }
+    }
+
+}

Propchange: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinAll.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinQuorum.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinQuorum.java?rev=405803&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinQuorum.java
(added)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinQuorum.java
Wed May 10 10:31:47 2006
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.beanflow;
+
+import java.util.List;
+
+/**
+ * A flow which completes when quorum of the child flows are completed
+ * successfully (or there are too many child flows failed to achieve quorum).
+ * 
+ * @version $Revision: $
+ */
+public class JoinQuorum extends JoinAll {
+
+    public JoinQuorum() {
+        super();
+
+    }
+
+    public JoinQuorum(Flow... flows) {
+        super(flows);
+    }
+
+    public JoinQuorum(List<Flow> flows) {
+        super(flows);
+    }
+
+    protected void onChildStateChange(int childCount, int stoppedCount, int failedCount)
{
+        int quorum = calculateQuorum(childCount);
+        int successes = stoppedCount - failedCount;
+        if (successes >= quorum) {
+            stop();
+        }
+        else {
+            if (failedCount >= quorum) {
+                fail("Too many child flows failed: " + failedCount);
+            }
+        }
+    }
+
+    protected int calculateQuorum(int childCount) {
+        return (childCount / 2) + 1;
+    }
+}

Propchange: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinQuorum.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/TimeoutFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/TimeoutFlow.java?rev=405803&r1=405802&r2=405803&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/TimeoutFlow.java
(original)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/TimeoutFlow.java
Wed May 10 10:31:47 2006
@@ -15,6 +15,7 @@
  */
 package org.apache.servicemix.beanflow;
 
+import java.util.Timer;
 import java.util.TimerTask;
 
 /**
@@ -41,7 +42,7 @@
     public void run() {
         if (!isStopped()) {
             if (timedOut.get().booleanValue()) {
-                fail();
+                fail("Timed out");
             }
         }
     }
@@ -51,6 +52,12 @@
      */
     public boolean isTimedOut() {
         return timedOut.get();
+    }
+
+
+    public void startWithTimeout(Timer timer, long timeout) {
+        timer.schedule(getTimeoutTask(), timeout);
+        start();
     }
 
     /**

Added: incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/FlowTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/FlowTestSupport.java?rev=405803&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/FlowTestSupport.java
(added)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/FlowTestSupport.java
Wed May 10 10:31:47 2006
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.beanflow;
+
+import org.apache.servicemix.beanflow.Flow.Transitions;
+
+import java.util.Timer;
+
+import junit.framework.TestCase;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class FlowTestSupport extends TestCase {
+
+    protected Timer timer = new Timer();
+
+    protected void assertFlowStopped(Flow flow) {
+        assertEquals("Transition", Transitions.Stopped, flow.getState().get());
+
+        assertTrue("Flow should be stopped but is: " + flow.getState().get(), flow.isStopped());
+        assertTrue("Flow should not have failed", !flow.isFailed());
+    }
+
+    protected void assertFlowFailed(Flow flow) {
+        assertEquals("Transition", Transitions.Stopped, flow.getState().get());
+
+        assertTrue("Flow should be stopped but is: " + flow.getState().get(), flow.isStopped());
+        assertTrue("Flow should have failed", flow.isFailed());
+
+        System.out.println("The flow failed due to: " + flow.getFailedReason());
+    }
+
+    protected void startFlow(Flow flow, long timeout) {
+        assertTrue("flow should not be stopped", !flow.isStopped());
+        assertTrue("flow should not have failed", !flow.isFailed());
+        assertEquals("Transition", Transitions.Initialised, flow.getState().get());
+
+        flow.startWithTimeout(timer, timeout);
+        assertFlowStarted(flow);
+    }
+
+    protected void assertFlowStarted(Flow flow) {
+        assertEquals("Transition", Transitions.Started, flow.getState().get());
+    }
+
+}

Propchange: incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/FlowTestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/JoinAllTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/JoinAllTest.java?rev=405803&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/JoinAllTest.java
(added)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/JoinAllTest.java
Wed May 10 10:31:47 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.beanflow;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class JoinAllTest extends FlowTestSupport {
+
+    protected Flow child1 = new TimeoutFlow();
+    protected Flow child2 = new TimeoutFlow();
+    protected Flow child3 = new TimeoutFlow();
+    protected long timeout = 1000L;
+
+    public void testJoinAll() throws Exception {
+        JoinAll flow = new JoinAll(child1, child2, child3);
+        startFlow(flow, timeout);
+        
+        child1.stop();
+        assertFlowStarted(flow);
+        
+        child2.stop();
+        assertFlowStarted(flow);
+        
+        child3.stop();
+        assertFlowStopped(flow);
+    }
+}

Propchange: incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/JoinAllTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/TimeoutFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/TimeoutFlowTest.java?rev=405803&r1=405802&r2=405803&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/TimeoutFlowTest.java
(original)
+++ incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/TimeoutFlowTest.java
Wed May 10 10:31:47 2006
@@ -17,54 +17,30 @@
 
 import org.apache.servicemix.beanflow.Flow.Transitions;
 
-import java.util.Timer;
-
-import junit.framework.TestCase;
-
 /**
  * 
  * @version $Revision: $
  */
-public class TimeoutFlowTest extends TestCase {
+public class TimeoutFlowTest extends FlowTestSupport {
     protected TimeoutFlow flow = new TimeoutFlow();
-    protected Timer timer = new Timer();
     protected long timeout = 500;
     
     public void testFlowStopsSuccessfully() throws Exception {
         flow.getState().set(Transitions.Stopped);
-        
-        assertEquals("Transition", Transitions.Stopped, flow.getState().get());
-        
-        assertTrue("Flow should be stopped but is: " + flow.getState().get(), flow.isStopped());
-        assertTrue("Flow should not have failed", !flow.isFailed());
+        assertFlowStopped(flow);
         
         // lets sleep so that the timer can go off now to check we don't fail after we've
stopped
         Thread.sleep(timeout  * 4);
-        
-        assertEquals("Transition", Transitions.Stopped, flow.getState().get());
-        
-        assertTrue("Flow should be stopped but is: " + flow.getState().get(), flow.isStopped());
-        assertTrue("Flow should not have failed", !flow.isFailed());
+        assertFlowStopped(flow);
     }
 
     public void testFlowTimesOutAndFails() throws Exception {
         Thread.sleep(timeout  * 4);
-        
-        assertEquals("Transition", Transitions.Stopped, flow.getState().get());
-        
-        assertTrue("Flow should be stopped but is: " + flow.getState().get(), flow.isStopped());
-        assertTrue("Flow should have failed", flow.isFailed());
-        
+        assertFlowFailed(flow);
     }
 
     protected void setUp() throws Exception {
         timer.schedule(flow.getTimeoutTask(), timeout);
-        
-        assertTrue("flow should not be stopped", !flow.isStopped());
-        assertTrue("flow should not have failed", !flow.isFailed());
-        assertEquals("Transition", Transitions.Initialised, flow.getState().get());
-        
-        flow.start();
-        assertEquals("Transition", Transitions.Started, flow.getState().get());
+        startFlow(flow, timeout);
     }
 }



Mime
View raw message