servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lh...@apache.org
Subject svn commit: r1030007 - in /servicemix/components: bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/ bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/ bindings/servicemix-jms/trunk/src/mai...
Date Tue, 02 Nov 2010 11:47:42 GMT
Author: lhein
Date: Tue Nov  2 11:47:41 2010
New Revision: 1030007

URL: http://svn.apache.org/viewvc?rev=1030007&view=rev
Log:
now different executors are used for consumer and provider endpoints (see SM-2007)

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
    servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java
    servicemix/components/shared-libraries/trunk/pom.xml
    servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
    servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java
    servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java
    servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java
(original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java
Tue Nov  2 11:47:41 2010
@@ -16,26 +16,22 @@
  */
 package org.apache.servicemix.jms.endpoints;
 
-import java.util.Timer;
+import org.apache.servicemix.executors.Executor;
+import org.apache.servicemix.executors.WorkManagerWrapper;
+import org.apache.servicemix.jms.JmsEndpointType;
+import org.jencks.SingletonEndpointFactory;
+import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
 
 import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.MessageExchange;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
-import javax.resource.spi.ActivationSpec;
-import javax.resource.spi.BootstrapContext;
-import javax.resource.spi.ResourceAdapter;
-import javax.resource.spi.UnavailableException;
-import javax.resource.spi.XATerminator;
+import javax.resource.spi.*;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
 import javax.resource.spi.work.WorkManager;
 import javax.transaction.TransactionManager;
-
-import org.apache.servicemix.executors.Executor;
-import org.apache.servicemix.executors.WorkManagerWrapper;
-import org.apache.servicemix.jms.JmsEndpointType;
-import org.jencks.SingletonEndpointFactory;
-import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
+import java.util.Timer;
 
 /**
  *  A Spring-based JMS consumer that uses JCA to connect to the JMS provider
@@ -107,7 +103,7 @@ public class JmsJcaConsumerEndpoint exte
 
     public synchronized void start() throws Exception {
         if (bootstrapContext == null) {
-            Executor executor = getServiceUnit().getComponent().getExecutor();
+            Executor executor = getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER);
             WorkManager wm = new WorkManagerWrapper(executor); 
             bootstrapContext = new SimpleBootstrapContext(wm);
         }

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
(original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
Tue Nov  2 11:47:41 2010
@@ -16,23 +16,18 @@
  */
 package org.apache.servicemix.jms.multiplexing;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicemix.jms.AbstractJmsProcessor;
+import org.apache.servicemix.jms.JmsEndpoint;
+import org.apache.servicemix.soap.Context;
 
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.*;
 import javax.naming.InitialContext;
-
-import org.apache.servicemix.jms.AbstractJmsProcessor;
-import org.apache.servicemix.jms.JmsEndpoint;
-import org.apache.servicemix.soap.Context;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.lang.IllegalStateException;
 
 public class MultiplexingConsumerProcessor extends AbstractJmsProcessor implements MessageListener
{
 
@@ -85,7 +80,7 @@ public class MultiplexingConsumerProcess
         if (log.isDebugEnabled()) {
             log.debug("Received jms message " + message);
         }
-        endpoint.getServiceUnit().getComponent().getExecutor().execute(new Runnable() {
+        endpoint.getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER).execute(new
Runnable() {
             public void run() {
                 try {
                     if (log.isDebugEnabled()) {

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
(original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
Tue Nov  2 11:47:41 2010
@@ -16,28 +16,16 @@
  */
 package org.apache.servicemix.jms.multiplexing;
 
-import java.io.IOException;
-
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.Fault;
-import javax.jbi.messaging.InOnly;
-import javax.jbi.messaging.InOut;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.NormalizedMessage;
-import javax.jbi.messaging.RobustInOnly;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.naming.InitialContext;
-
 import org.apache.servicemix.jms.AbstractJmsProcessor;
 import org.apache.servicemix.jms.JmsEndpoint;
 import org.apache.servicemix.soap.SoapFault;
 import org.apache.servicemix.soap.marshalers.SoapMessage;
 
+import javax.jbi.messaging.*;
+import javax.jms.*;
+import javax.naming.InitialContext;
+import java.lang.IllegalStateException;
+
 public class MultiplexingProviderProcessor extends AbstractJmsProcessor implements MessageListener
{
 
     
@@ -77,7 +65,7 @@ public class MultiplexingProviderProcess
         if (log.isDebugEnabled()) {
             log.debug("Received jms message " + message);
         }
-        endpoint.getServiceUnit().getComponent().getExecutor().execute(new Runnable() {
+        endpoint.getServiceUnit().getComponent().getExecutor(MessageExchange.Role.PROVIDER).execute(new
Runnable() {
             public void run() {
                 InOut exchange = null;
                 if (log.isDebugEnabled()) {

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
(original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
Tue Nov  2 11:47:41 2010
@@ -16,20 +16,17 @@
  */
 package org.apache.servicemix.jms.standard;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.MessageExchange;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.InitialContext;
-
 import org.apache.servicemix.jms.AbstractJmsProcessor;
 import org.apache.servicemix.jms.JmsEndpoint;
 import org.apache.servicemix.soap.Context;
 
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jms.*;
+import javax.naming.InitialContext;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.lang.IllegalStateException;
+
 public class StandardConsumerProcessor extends AbstractJmsProcessor {
 
     protected Session session;
@@ -53,7 +50,7 @@ public class StandardConsumerProcessor e
 
     protected void doStart() throws Exception {
         synchronized (running) {
-            endpoint.getServiceUnit().getComponent().getExecutor().execute(new Runnable()
{
+            endpoint.getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER).execute(new
Runnable() {
                 public void run() {
                     StandardConsumerProcessor.this.poll();
                 }

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
(original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
Tue Nov  2 11:47:41 2010
@@ -16,22 +16,16 @@
  */
 package org.apache.servicemix.eip.patterns;
 
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.servicemix.eip.support.resequence.*;
+import org.apache.servicemix.executors.Executor;
 
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
-
-import org.apache.servicemix.eip.support.resequence.DefaultComparator;
-import org.apache.servicemix.eip.support.resequence.ResequencerBase;
-import org.apache.servicemix.eip.support.resequence.ResequencerEngine;
-import org.apache.servicemix.eip.support.resequence.SequenceElementComparator;
-import org.apache.servicemix.eip.support.resequence.SequenceReader;
-import org.apache.servicemix.eip.support.resequence.SequenceSender;
-import org.apache.servicemix.executors.Executor;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * This pattern implements the <a href="http://www.enterpriseintegrationpatterns.com/Resequencer.html">Resequencer</a>
EIP
@@ -93,7 +87,7 @@ public class Resequencer extends Reseque
     public void start() throws Exception {
         super.start();
         if (executor == null) {
-            executor = getServiceUnit().getComponent().getExecutor();
+            executor = getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER);
         }
         BlockingQueue<MessageExchange> queue = new LinkedBlockingQueue<MessageExchange>();
         reseq = new ResequencerEngine<MessageExchange>(comparator, capacity);

Modified: servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java
(original)
+++ servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java
Tue Nov  2 11:47:41 2010
@@ -16,13 +16,18 @@
  */
 package org.apache.servicemix.wsn.component;
 
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import com.ibm.wsdl.Constants;
+import org.apache.servicemix.common.*;
+import org.apache.servicemix.common.tools.wsdl.WSDLFlattener;
+import org.apache.servicemix.wsn.EndpointManager;
+import org.apache.servicemix.wsn.EndpointRegistrationException;
+import org.apache.servicemix.wsn.jbi.JbiNotificationBroker;
+import org.apache.servicemix.wsn.jms.JmsCreatePullPoint;
+import org.springframework.core.io.Resource;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
 
+import javax.jbi.messaging.MessageExchange;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.jms.ConnectionFactory;
 import javax.naming.Context;
@@ -30,28 +35,11 @@ import javax.naming.InitialContext;
 import javax.naming.NamingException;
 import javax.wsdl.Definition;
 import javax.wsdl.factory.WSDLFactory;
-import javax.wsdl.xml.WSDLReader;
 import javax.wsdl.xml.WSDLLocator;
+import javax.wsdl.xml.WSDLReader;
 import javax.xml.namespace.QName;
-
-import org.w3c.dom.Document;
-
-import com.ibm.wsdl.Constants;
-
-import org.apache.servicemix.common.BaseServiceUnitManager;
-import org.apache.servicemix.common.DefaultComponent;
-import org.apache.servicemix.common.Deployer;
-import org.apache.servicemix.common.Endpoint;
-import org.apache.servicemix.common.EndpointSupport;
-import org.apache.servicemix.common.ServiceUnit;
-import org.apache.servicemix.common.DefaultServiceUnit;
-import org.apache.servicemix.common.tools.wsdl.WSDLFlattener;
-import org.apache.servicemix.wsn.EndpointManager;
-import org.apache.servicemix.wsn.EndpointRegistrationException;
-import org.apache.servicemix.wsn.jbi.JbiNotificationBroker;
-import org.apache.servicemix.wsn.jms.JmsCreatePullPoint;
-import org.springframework.core.io.Resource;
-import org.xml.sax.InputSource;
+import java.net.URL;
+import java.util.*;
 
 public class WSNComponent extends DefaultComponent {
 
@@ -288,7 +276,7 @@ public class WSNComponent extends Defaul
         }
 
         public void unregister(final Object endpoint) throws EndpointRegistrationException
{
-            WSNComponent.this.getExecutor().execute(new Runnable() {
+            WSNComponent.this.getExecutor(MessageExchange.Role.CONSUMER).execute(new Runnable()
{
                 public void run() {
                     try {
                         Endpoint ep = (Endpoint) endpoint;

Modified: servicemix/components/shared-libraries/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/pom.xml?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/shared-libraries/trunk/pom.xml (original)
+++ servicemix/components/shared-libraries/trunk/pom.xml Tue Nov  2 11:47:41 2010
@@ -41,7 +41,7 @@
   </scm>
 
   <properties>
-    <servicemix-version>3.3</servicemix-version>
+    <servicemix-version>3.3.3-SNAPSHOT</servicemix-version>
     <activemq-version>5.4.1</activemq-version>
     <servicemix.osgi.export>${servicemix.osgi.export.pkg}*;version=${project.version}</servicemix.osgi.export>
     <servicemix.osgi.export.pkg />

Modified: servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL: http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
(original)
+++ servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
Tue Nov  2 11:47:41 2010
@@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
 import org.apache.servicemix.executors.Executor;
 import org.apache.servicemix.executors.ExecutorFactory;
 import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
+import org.apache.servicemix.jbi.container.JBIContainer;
 
 /**
  * Base class for life cycle management of components. This class may be used as
@@ -66,7 +67,9 @@ public class AsyncBaseLifeCycle implemen
 
     protected ExecutorFactory executorFactory;
 
-    protected Executor executor;
+    protected Executor consumerExecutor;
+
+    protected Executor providerExecutor;
 
     protected AtomicBoolean running;
 
@@ -241,7 +244,8 @@ public class AsyncBaseLifeCycle implemen
         if (this.executorFactory == null) {
             this.executorFactory = createExecutorFactory();
         }
-        this.executor = this.executorFactory.createExecutor("component." + getContext().getComponentName());
+        this.consumerExecutor = this.executorFactory.createExecutor("component." + getContext().getComponentName()
+ ".consumer");
+        this.providerExecutor = this.executorFactory.createExecutor("component." + getContext().getComponentName()
+ ".provider");
     }
 
     /*
@@ -281,8 +285,10 @@ public class AsyncBaseLifeCycle implemen
             }
         }
         // Destroy excutor
-        executor.shutdown();
-        executor = null;
+        consumerExecutor.shutdown();
+        providerExecutor.shutdown();
+        consumerExecutor = null;
+        providerExecutor = null;
     }
 
     /*
@@ -312,9 +318,20 @@ public class AsyncBaseLifeCycle implemen
     }
 
     protected void doStart() throws Exception {
+        boolean doPoll = false;
+
         if (container.getType() != Container.Type.ServiceMix3) {
+            doPoll = true;
+        } else {
+            Object smx3container = ((Container.Smx3Container)container).getSmx3Container();
+            if (smx3container instanceof JBIContainer) {
+                doPoll = ((JBIContainer)smx3container).isOptimizedDelivery() == false;
+            }
+        }
+
+        if (doPoll) {
             synchronized (this.polling) {
-                executor.execute(new Runnable() {
+                consumerExecutor.execute(new Runnable() {
                     public void run() {
                         poller = Thread.currentThread();
                         pollDeliveryChannel();
@@ -330,10 +347,12 @@ public class AsyncBaseLifeCycle implemen
             polling.set(true);
             polling.notify();
         }
+        Executor executor = null;
         while (running.get()) {
             try {
                 final MessageExchange exchange = channel.accept(1000L);
                 if (exchange != null) {
+                    executor = exchange.getRole().equals(Role.CONSUMER) ? consumerExecutor
: providerExecutor;
                     final Transaction tx = (Transaction) exchange
                             .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
                     if (tx != null && container.handleTransactions()) {
@@ -413,8 +432,13 @@ public class AsyncBaseLifeCycle implemen
         return context;
     }
 
-    public Executor getExecutor() {
-        return executor;
+    public Executor getExecutor(Role role) {
+        if (role != null && role.equals(Role.CONSUMER)) {
+            return this.consumerExecutor;
+        } else if (role != null && role.equals(Role.PROVIDER)) {
+            return this.providerExecutor;
+        }
+        return null;
     }
 
     /**
@@ -424,8 +448,12 @@ public class AsyncBaseLifeCycle implemen
      * @param executor
      * @see #setExecutorFactory(ExecutorFactory)
      */
-    public void setExecutor(Executor executor) {
-        this.executor = executor;
+    public void setExecutor(Role role, Executor executor) {
+        if (role != null && role.equals(Role.CONSUMER)) {
+            this.consumerExecutor = executor;
+        } else {
+            this.providerExecutor = executor;
+        }
     }
 
     public ExecutorFactory getExecutorFactory() {
@@ -438,7 +466,7 @@ public class AsyncBaseLifeCycle implemen
      * is deployed into ServiceMix 3.x, or a default implementation will be used.
      *
      * @param executorFactory
-     * @see #setExecutor(Executor)
+     * @see #setExecutor(Role, Executor)
      */
     public void setExecutorFactory(ExecutorFactory executorFactory) {
         this.executorFactory = executorFactory;
@@ -482,7 +510,7 @@ public class AsyncBaseLifeCycle implemen
                     }
                 }
                 if (oldStatus == ExchangeStatus.ACTIVE) {
-                	exchange.setStatus(ExchangeStatus.ERROR);
+                	exchange.setStatus(ExchangeStatus.ERROR);
                     exchange.setError(t instanceof Exception ? (Exception) t : new Exception(t));
                     channel.send(exchange);
                 }
@@ -523,6 +551,7 @@ public class AsyncBaseLifeCycle implemen
             processExchangeInTx(exchange, tx);
             return;
         }
+
         ExchangeStatus oldStatus = exchange.getStatus();
         try {
             processExchange(exchange);

Modified: servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java
(original)
+++ servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java
Tue Nov  2 11:47:41 2010
@@ -187,8 +187,8 @@ public abstract class BaseComponent impl
      * 
      * @return the executor for this component
      */
-    public Executor getExecutor() {
-        return lifeCycle.getExecutor();
+    public Executor getExecutor(Role role) {
+        return lifeCycle.getExecutor(role);
     }
 
     public Object getSmx3Container() {

Modified: servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java
(original)
+++ servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java
Tue Nov  2 11:47:41 2010
@@ -44,9 +44,10 @@ public interface ServiceMixComponent ext
     public Registry getRegistry();
 
     /**
+     * @param role  the role to use
      * @return Returns the executor for this component
      */
-    public Executor getExecutor();
+    public Executor getExecutor(MessageExchange.Role role);
 
     /**
      * @return Returns the components context

Modified: servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java
(original)
+++ servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java
Tue Nov  2 11:47:41 2010
@@ -18,8 +18,8 @@ package org.apache.servicemix.common.end
 
 import java.util.Date;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
 
@@ -154,7 +154,7 @@ public abstract class PollingEndpoint ex
             }
 
             if (executor == null) {
-                executor = getServiceUnit().getComponent().getExecutor();
+                executor = getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER);
             }
             if (schedulerTask != null) {
                 schedulerTask.cancel();



Mime
View raw message