servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r680712 - in /servicemix/components/engines/servicemix-osworkflow/trunk: ./ src/main/java/org/apache/servicemix/osworkflow/ src/main/resources/ src/main/resources/META-INF/ src/main/resources/META-INF/spring/
Date Tue, 29 Jul 2008 14:32:56 GMT
Author: gnodet
Date: Tue Jul 29 07:32:55 2008
New Revision: 680712

URL: http://svn.apache.org/viewvc?rev=680712&view=rev
Log:
SM-1484, SM-1387
  * endpoint should inherit ProviderEndpoint
  * When an endpoint is stopped, the thread pool for all endpoints is stopped
  * Use servicemix thread pool service
  * Make the component OSGi friendly
  

Added:
    servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/
    servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/
    servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/
    servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml
Removed:
    servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowBootstrap.java
    servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/WorkflowManager.java
Modified:
    servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml
    servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java
    servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java

Modified: servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml?rev=680712&r1=680711&r2=680712&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml (original)
+++ servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml Tue Jul 29 07:32:55
2008
@@ -42,6 +42,19 @@
     <previous.releases>3.1.2,3.2,3.2.1</previous.releases>
     <servicemix-version>3.2.1</servicemix-version>
     <servicemix-shared-version>4.0-SNAPSHOT</servicemix-shared-version>
+
+    <servicemix.osgi.import>
+      org.apache.servicemix.common,
+      org.apache.servicemix.common.osgi,
+      org.apache.servicemix.executors.impl,
+      org.apache.xbean.spring.context.v2,
+      org.springframework.beans.factory.xml,
+      *
+    </servicemix.osgi.import>
+    <servicemix.osgi.export>
+      org.apache.servicemix.osworkflow*;version=${project.version},
+      META-INF.services.org.apache.xbean.spring.http.servicemix.apache.org.osworkflow
+    </servicemix.osgi.export>
   </properties>
 
   <dependencies>
@@ -104,7 +117,6 @@
         <extensions>true</extensions>
         <configuration>
           <type>service-engine</type>
-          <bootstrap>org.apache.servicemix.osworkflow.OSWorkflowBootstrap</bootstrap>
           <component>org.apache.servicemix.osworkflow.OSWorkflowComponent</component>
         </configuration>
       </plugin>

Modified: servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java?rev=680712&r1=680711&r2=680712&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java
(original)
+++ servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java
Tue Jul 29 07:32:55 2008
@@ -36,7 +36,7 @@
 /**
  * @author lhe
  */
-public class OSWorkflow extends Thread {
+public class OSWorkflow implements Runnable {
     
 
     public static final String KEY_EXCHANGE = "exchange";
@@ -89,8 +89,6 @@
      */
     public OSWorkflow(OSWorkflowEndpoint ep, String workflowName, int action,
             Map map, String caller, MessageExchange exchange) {
-        super(workflowName);
-        setDaemon(true);
 
         this.endpoint = ep; // remember the endpoint which called the osworkflow
         this.osWorkflowName = workflowName;
@@ -105,10 +103,7 @@
         this.map.put(KEY_CALLER, this.caller);
         this.map.put(KEY_IN_MESSAGE, this.exchange.getMessage("in"));
         this.map.put(KEY_EXCHANGE, this.exchange);
-        this.map
-                .put(
-                        KEY_ASYNC_PROCESSING,
-                        this.exchange instanceof InOnly || this.exchange instanceof RobustInOnly);
+        this.map.put(KEY_ASYNC_PROCESSING, this.exchange instanceof InOnly || this.exchange
instanceof RobustInOnly);
     }
 
     /**
@@ -121,26 +116,26 @@
         this.osWorkflowInstance = new BasicWorkflow(this.caller);
         DefaultConfiguration config = new DefaultConfiguration();
         this.osWorkflowInstance.setConfiguration(config);
-        long wfId = this.osWorkflowInstance.initialize(
-                this.osWorkflowName, this.action, this.map);
+        long wfId = this.osWorkflowInstance.initialize(this.osWorkflowName, this.action,
this.map);
         return wfId;
     }
 
     /*
      * (non-Javadoc)
      * 
-     * @see java.lang.Thread#run()
+     * @see java.lang.Runnable#run()
      */
-    @Override
     public void run() {
         // call the endpoint method for init actions
         this.endpoint.preWorkflow();
 
-        log.debug("Starting workflow...");
-        log.debug("Name:       " + this.osWorkflowName);
-        log.debug("Action:     " + this.action);
-        log.debug("Caller:     " + this.caller);
-        log.debug("Map:        " + this.map);
+        if (log.isDebugEnabled()) {
+            log.debug("Starting workflow...");
+            log.debug("Name:       " + this.osWorkflowName);
+            log.debug("Action:     " + this.action);
+            log.debug("Caller:     " + this.caller);
+            log.debug("Map:        " + this.map);
+        }
 
         // loop as long as there are more actions to do and the workflow is not
         // finished or aborted
@@ -157,8 +152,7 @@
             }
 
             // determine the available actions
-            int[] availableActions = this.osWorkflowInstance
-                    .getAvailableActions(this.workflowId, this.map);
+            int[] availableActions = this.osWorkflowInstance.getAvailableActions(this.workflowId,
this.map);
 
             // check if there are more actions available
             if (availableActions.length == 0) {
@@ -172,8 +166,7 @@
                 log.debug("call action " + nextAction);
                 try {
                     // call the action
-                    this.osWorkflowInstance.doAction(this.workflowId,
-                            nextAction, this.map);
+                    this.osWorkflowInstance.doAction(this.workflowId,nextAction, this.map);
                 } catch (InvalidInputException iiex) {
                     log.error(iiex);
                     aborted = true;
@@ -184,13 +177,15 @@
             }
         }
 
-        log.debug("Stopping workflow...");
-        log.debug("Name:       " + this.osWorkflowName);
-        log.debug("Action:     " + this.action);
-        log.debug("Caller:     " + this.caller);
-        log.debug("Map:        " + this.map);
-        log.debug("WorkflowId: " + this.workflowId);
-        log.debug("End state:  " + (finished ? "Finished" : "Aborted"));
+        if (log.isDebugEnabled()) {
+            log.debug("Stopping workflow...");
+            log.debug("Name:       " + this.osWorkflowName);
+            log.debug("Action:     " + this.action);
+            log.debug("Caller:     " + this.caller);
+            log.debug("Map:        " + this.map);
+            log.debug("WorkflowId: " + this.workflowId);
+            log.debug("End state:  " + (finished ? "Finished" : "Aborted"));
+        }
 
         // call the endpoint method for cleanup actions or message exchange
         this.endpoint.postWorkflow();

Modified: servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java?rev=680712&r1=680711&r2=680712&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java
(original)
+++ servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java
Tue Jul 29 07:32:55 2008
@@ -18,39 +18,30 @@
 
 import java.util.HashMap;
 
-import javax.jbi.component.ComponentContext;
-import javax.jbi.management.DeploymentException;
-import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessageExchange.Role;
-import javax.jbi.messaging.MessageExchangeFactory;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.RobustInOnly;
-import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
 import javax.xml.transform.Source;
 
-import org.apache.servicemix.common.Endpoint;
-import org.apache.servicemix.common.ExchangeProcessor;
+import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.EndpointSupport;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.executors.Executor;
 
 /**
  * @org.apache.xbean.XBean element="endpoint"
  * 
  * @author lhe
  */
-public class OSWorkflowEndpoint extends Endpoint implements ExchangeProcessor {
-    private static final long TIME_OUT = 30000;
-
-    private ServiceEndpoint activated;
+public class OSWorkflowEndpoint extends ProviderEndpoint {
 
-    private DeliveryChannel channel;
-
-    private MessageExchangeFactory exchangeFactory;
+    private static final long TIME_OUT = 30000;
 
     private String workflowName;
 
@@ -58,79 +49,19 @@
 
     private int action;
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.Endpoint#getRole()
-     */
-    public Role getRole() {
-        return Role.PROVIDER;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.Endpoint#activate()
-     */
-    public void activate() throws Exception {
-        logger = this.serviceUnit.getComponent().getLogger();
-        ComponentContext ctx = getServiceUnit().getComponent()
-                .getComponentContext();
-        channel = ctx.getDeliveryChannel();
-        exchangeFactory = channel.createExchangeFactory();
-        activated = ctx.activateEndpoint(service, endpoint);
-        start();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.Endpoint#deactivate()
-     */
-    public void deactivate() throws Exception {
-        stop();
-        ServiceEndpoint ep = activated;
-        activated = null;
-        ComponentContext ctx = getServiceUnit().getComponent()
-                .getComponentContext();
-        ctx.deactivateEndpoint(ep);
-    }
+    private Executor executor;
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.Endpoint#getProcessor()
-     */
-    public ExchangeProcessor getProcessor() {
-        return this;
-    }
+    private SourceTransformer sourceTransformer = new SourceTransformer();
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.Endpoint#validate()
-     */
-    public void validate() throws DeploymentException {
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.ExchangeProcessor#start()
-     */
     public void start() throws Exception {
-        // initialize the workflow manager
-        WorkflowManager.getInstance();
+        super.start();
+        OSWorkflowComponent component = (OSWorkflowComponent) getServiceUnit().getComponent();
+        executor = component.getExecutorFactory().createExecutor("component." + component.getComponentName()
+ "." + EndpointSupport.getKey(this));
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.ExchangeProcessor#stop()
-     */
-    public void stop() {
-        // shut down first finishing running threads
-        WorkflowManager.getInstance().prepareShutdown(true);
+    public void stop() throws Exception {
+        executor.shutdown();
+        super.stop();
     }
 
     /*
@@ -162,8 +93,7 @@
             onProviderExchange(exchange);
         } else {
             // Unknown role
-            throw new MessagingException(
-                    "OSWorkflowEndpoint.onMessageExchange(): Unknown role: "
+            throw new MessagingException("OSWorkflowEndpoint.onMessageExchange(): Unknown
role: "
                             + exchange.getRole());
         }
     }
@@ -178,12 +108,10 @@
         throws MessagingException {
         // Out message
         if (exchange.getMessage("out") != null) {
-            exchange.setStatus(ExchangeStatus.DONE);
-            channel.send(exchange);
+            done(exchange);
         } else if (exchange.getFault() != null) {
             //Fault message
-            exchange.setStatus(ExchangeStatus.DONE);
-            channel.send(exchange);
+            done(exchange);
         } else {
             //This is not compliant with the default MEPs
             throw new MessagingException(
@@ -208,8 +136,7 @@
             return;
         } else if (exchange.getFault() != null) {
             //Fault message
-            exchange.setStatus(ExchangeStatus.DONE);
-            channel.send(exchange);
+            done(exchange);
         } else {
             NormalizedMessage in = exchange.getMessage("in");
 
@@ -222,49 +149,12 @@
                 OSWorkflow osWorkflow = new OSWorkflow(this, this.workflowName,
                         this.action, new HashMap(), this.caller, exchange);
 
-                if (exchange instanceof InOnly
-                        || exchange instanceof RobustInOnly) {
-                    // do start the workflow in separate thread
-                    try {
-                        WorkflowManager.getInstance().executeWorkflow(
-                                osWorkflow);
-                    } catch (Exception ex) {
-                        logger.error(ex);
-                    }
-                } else {
-                    // synchronous processing, keep state ACTIVE
-                    // do start the workflow and join the thread
-                    try {
-                        osWorkflow.start();
-                        osWorkflow.join();
-                    } catch (Exception ex) {
-                        logger.error(ex);
-                    }
-                }
+                executor.execute(osWorkflow);
             }
         }
     }
 
     /**
-     * returns the delivery channel for the endpoint
-     * 
-     * @return the delivery channel
-     */
-    public DeliveryChannel getChannel() {
-
-        return this.channel;
-    }
-
-    /**
-     * returns the message exchange factory
-     * 
-     * @return the message exchange factory
-     */
-    public MessageExchangeFactory getMessageExchangeFactory() {
-        return this.exchangeFactory;
-    }
-
-    /**
      * sends the given DOMSource as message to the given service (inOnly)
      * 
      * @param service
@@ -277,12 +167,11 @@
      */
     public boolean sendMessage(QName service, Source source)
         throws MessagingException {
-        InOnly inOnly = channel.createExchangeFactoryForService(service)
-                .createInOnlyExchange();
+        InOnly inOnly = getChannel().createExchangeFactoryForService(service).createInOnlyExchange();
         NormalizedMessage msg = inOnly.createMessage();
         msg.setContent(source);
         inOnly.setInMessage(msg);
-        if (channel.sendSync(inOnly)) {
+        if (getChannel().sendSync(inOnly)) {
             return inOnly.getStatus() == ExchangeStatus.DONE;
         } else {
             return false;
@@ -302,22 +191,15 @@
      */
     public Source sendRequest(QName service, Source source)
         throws MessagingException {
-        InOut inOut = channel.createExchangeFactoryForService(service)
-                .createInOutExchange();
+        InOut inOut = getChannel().createExchangeFactoryForService(service).createInOutExchange();
         NormalizedMessage msg = inOut.createMessage();
         msg.setContent(source);
         inOut.setInMessage(msg);
 
-        if (channel.sendSync(inOut)) {
-            SourceTransformer sourceTransformer = new SourceTransformer();
-
+        if (getChannel().sendSync(inOut)) {
             try {
-                Source result = sourceTransformer.toDOMSource(inOut
-                        .getOutMessage().getContent());
-
-                inOut.setStatus(ExchangeStatus.DONE);
-                channel.send(inOut);
-
+                Source result = sourceTransformer.toDOMSource(inOut.getOutMessage().getContent());
+                done(inOut);
                 return result;
             } catch (Exception ex) {
                 ex.printStackTrace();
@@ -341,12 +223,11 @@
      */
     public MessageExchange sendRawInOutRequest(QName service, Source source)
         throws MessagingException {
-        InOut inOut = channel.createExchangeFactoryForService(service)
-                .createInOutExchange();
+        InOut inOut = getChannel().createExchangeFactoryForService(service).createInOutExchange();
         NormalizedMessage msg = inOut.createMessage();
         msg.setContent(source);
         inOut.setInMessage(msg);
-        if (channel.sendSync(inOut)) {
+        if (getChannel().sendSync(inOut)) {
             return inOut;
         } else {
             return null;
@@ -368,11 +249,9 @@
         MessageExchange exchange = null;
 
         if (inOut) {
-            exchange = channel.createExchangeFactoryForService(qname)
-                    .createInOutExchange();
+            exchange = getChannel().createExchangeFactoryForService(qname).createInOutExchange();
         } else {
-            exchange = channel.createExchangeFactoryForService(qname)
-                    .createInOnlyExchange();
+            exchange = getChannel().createExchangeFactoryForService(qname).createInOnlyExchange();
         }
 
         return exchange;
@@ -380,18 +259,17 @@
 
     /**
      * sends a done to the channel
-     * 
+     *
      * @param ex
      * @throws MessagingException
      */
     public void done(MessageExchange ex) throws MessagingException {
-        ex.setStatus(ExchangeStatus.DONE);
-        channel.send(ex);
+        super.done(ex);
     }
 
     /**
      * sends a msg to the channel
-     * 
+     *
      * @param ex
      * @param sync
      * @throws MessagingException
@@ -399,21 +277,20 @@
     public void send(MessageExchange ex, boolean sync)
         throws MessagingException {
         if (sync) {
-            channel.sendSync(ex, TIME_OUT);
+            getChannel().sendSync(ex, TIME_OUT);
         } else {
-            channel.send(ex);
+            getChannel().send(ex);
         }
     }
 
     /**
      * sends a error to the channel
-     * 
+     *
      * @param ex
      * @throws MessagingException
      */
     public void fail(MessageExchange ex) throws MessagingException {
-        ex.setStatus(ExchangeStatus.ERROR);
-        channel.send(ex);
+        super.fail(ex, new Exception("Failure"));
     }
 
     /**
@@ -474,4 +351,5 @@
     public void postWorkflow() {
         // nothing for now
     }
+
 }

Added: servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml?rev=680712&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml
(added)
+++ servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml
Tue Jul 29 07:32:55 2008
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:osgi="http://www.springframework.org/schema/osgi"
+       xmlns:osgix="http://www.springframework.org/schema/osgi-compendium"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+  http://www.springframework.org/schema/beans
+  http://www.springframework.org/schema/beans/spring-beans.xsd
+  http://www.springframework.org/schema/util
+  http://www.springframework.org/schema/util/spring-util.xsd
+  http://www.springframework.org/schema/osgi
+  http://www.springframework.org/schema/osgi/spring-osgi.xsd
+  http://www.springframework.org/schema/osgi-compendium
+  http://www.springframework.org/schema/osgi-compendium/spring-osgi-compendium.xsd">
+
+    <bean id="servicemix-osworkflow" class="org.apache.servicemix.osworkflow.OSWorkflowComponent">
+        <property name="executorFactory" ref="executorFactory" />
+    </bean>
+
+    <bean id="executorFactory" class="org.apache.servicemix.executors.impl.ExecutorFactoryImpl">
+        <property name="defaultConfig">
+            <bean class="org.apache.servicemix.executors.impl.ExecutorConfig">
+                <property name="corePoolSize" value="${threadPoolCorePoolSize}"/>
+                <property name="maximumPoolSize" value="${threadPoolMaximumPoolSize}"/>
+                <property name="queueSize" value="${threadPoolQueueSize}"/>
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="endpoint-tracker" class="org.apache.servicemix.common.osgi.EndpointTracker">
+        <property name="component" ref="servicemix-osworkflow" />
+    </bean>
+
+    <osgi:list id="endpoints"
+               interface="org.apache.servicemix.common.osgi.EndpointWrapper"
+               cardinality="0..N">
+        <osgi:listener ref="endpoint-tracker" bind-method="register" unbind-method="unregister"
/>
+    </osgi:list>
+
+    <osgi:service ref="servicemix-osworkflow" interface="javax.jbi.component.Component">
+        <osgi:service-properties>
+            <entry key="NAME" value="servicemix-osworkflow" />
+            <entry key="TYPE" value="service-engine" />
+        </osgi:service-properties>
+    </osgi:service>
+
+    <osgix:property-placeholder persistent-id="servicemix-osworkflow">
+        <osgix:default-properties>
+            <prop key="threadPoolCorePoolSize">8</prop>
+            <prop key="threadPoolMaximumPoolSize">32</prop>
+            <prop key="threadPoolQueueSize">256</prop>
+        </osgix:default-properties>
+    </osgix:property-placeholder>
+
+</beans>



Mime
View raw message