servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r393668 - in /incubator/servicemix/trunk/servicemix-core/src: main/java/org/apache/servicemix/ main/java/org/apache/servicemix/jbi/nmr/flow/jca/ main/java/org/apache/servicemix/jbi/nmr/flow/jms/ test/java/org/apache/servicemix/jbi/nmr/flow/...
Date Thu, 13 Apr 2006 00:25:35 GMT
Author: gnodet
Date: Wed Apr 12 17:25:32 2006
New Revision: 393668

URL: http://svn.apache.org/viewcvs?rev=393668&view=rev
Log:
SM-395: InOut doesn't failover to another node if source node is unavailable

Added:
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJcaFlowTest.java
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJmsFlowTest.java
Modified:
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/PingService.java

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java?rev=393668&r1=393667&r2=393668&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java
(original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java
Wed Apr 12 17:25:32 2006
@@ -32,5 +32,9 @@
     String DATESTAMP_PROPERTY_NAME = "org.apache.servicemix.datestamp";
     
     String FLOW_PROPERTY_NAME = "org.apache.servicemix.flow";
-	
+    
+    String STATELESS_CONSUMER = " org.apache.servicemix.consumer.stateless";
+    
+    String STATELESS_PROVIDER = " org.apache.servicemix.provider.stateless";
+    
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?rev=393668&r1=393667&r2=393668&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
(original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
Wed Apr 12 17:25:32 2006
@@ -61,11 +61,14 @@
 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
 import org.apache.geronimo.connector.work.GeronimoWorkManager;
 import org.apache.geronimo.transaction.context.TransactionContextManager;
+import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.jbi.container.SpringJBIContainer;
+import org.apache.servicemix.jbi.event.ComponentAdapter;
+import org.apache.servicemix.jbi.event.ComponentEvent;
+import org.apache.servicemix.jbi.event.ComponentListener;
 import org.apache.servicemix.jbi.event.EndpointAdapter;
 import org.apache.servicemix.jbi.event.EndpointEvent;
 import org.apache.servicemix.jbi.event.EndpointListener;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.nmr.Broker;
 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
@@ -110,6 +113,8 @@
 
     private EndpointListener endpointListener;
 
+    private ComponentListener componentListener;
+
     /**
      * The type of Flow
      * 
@@ -240,6 +245,16 @@
             }
         };
         broker.getContainer().addListener(endpointListener);
+        // Create and register component listener
+        componentListener = new ComponentAdapter() {
+            public void componentStarted(ComponentEvent event) {
+                onComponentStarted(event);
+            }
+            public void componentStopped(ComponentEvent event) {
+                onComponentStopped(event);
+            }
+        };
+        broker.getContainer().addListener(componentListener);
         try {
         	resourceAdapter = createResourceAdapter();
         	
@@ -348,6 +363,8 @@
         stop();
         // Remove endpoint listener
         broker.getContainer().removeListener(endpointListener);
+        // Remove component listener
+        broker.getContainer().removeListener(componentListener);
         // Destroy connectors
         while (!connectorMap.isEmpty()) {
         	JCAConnector connector = (JCAConnector) connectorMap.remove(connectorMap.keySet().iterator().next());
@@ -445,6 +462,41 @@
         }
     }
     
+    public void onComponentStarted(ComponentEvent event) {
+        if (!started.get()) {
+            return;
+        }
+        try {
+            String key = event.getComponent().getName();
+            if(!connectorMap.containsKey(key)){
+                ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
+                ac.setDestinationType("javax.jms.Queue");
+                ac.setDestination(INBOUND_PREFIX + key);
+                JCAConnector connector = new JCAConnector();
+                connector.setBootstrapContext(getBootstrapContext());
+                connector.setActivationSpec(ac);
+                connector.setResourceAdapter(resourceAdapter);
+                connector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager()));
+                connector.afterPropertiesSet();
+                connectorMap.put(key, connector);
+            }
+        } catch (Exception e) {
+            log.error("Cannot create consumer for component " + event.getComponent().getName(),
e);
+        }
+    }
+    
+    public void onComponentStopped(ComponentEvent event) {
+        try {
+            String key = event.getComponent().getName();
+            JCAConnector connector = (JCAConnector) connectorMap.remove(key);
+            if (connector != null){
+                connector.destroy();
+            }
+        } catch (Exception e) {
+            log.error("Cannot destroy consumer for component " + event.getComponent().getName(),
e);
+        }
+    }
+
     public void onRemoteEndpointRegistered(EndpointEvent event) {
         log.info(broker.getContainerName() + ": adding remote endpoint: " + event.getEndpoint());
         broker.getRegistry().registerRemoteEndpoint(event.getEndpoint());
@@ -476,10 +528,21 @@
         try {
             String destination;
             if (me.getRole() == Role.PROVIDER) {
-                destination = INBOUND_PREFIX + me.getEndpoint().getServiceName() + me.getEndpoint().getEndpointName();
+                if (me.getDestinationId() == null) {
+                    destination = INBOUND_PREFIX + me.getEndpoint().getServiceName() + me.getEndpoint().getEndpointName();
+                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER))
&& !isSynchronous(me)) {
+                    destination = INBOUND_PREFIX + me.getDestinationId().getName();
+                } else {
+                    destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
+                }
             } else {
-                ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId()
: me.getSourceId();
-                destination = INBOUND_PREFIX + id.getContainerName();
+                if (me.getSourceId() == null) {
+                    throw new IllegalStateException("No sourceId set on the exchange");
+                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER))
&& !isSynchronous(me)) {
+                    destination = INBOUND_PREFIX + me.getSourceId().getName();
+                } else {
+                    destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
+                }
             }
             sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
         } catch (JMSException e) {

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?rev=393668&r1=393667&r2=393668&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
(original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
Wed Apr 12 17:25:32 2006
@@ -47,10 +47,14 @@
 import org.apache.activemq.command.RemoveInfo;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.jbi.event.ComponentAdapter;
+import org.apache.servicemix.jbi.event.ComponentEvent;
+import org.apache.servicemix.jbi.event.ComponentListener;
 import org.apache.servicemix.jbi.event.EndpointAdapter;
 import org.apache.servicemix.jbi.event.EndpointEvent;
 import org.apache.servicemix.jbi.event.EndpointListener;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
+import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.nmr.Broker;
 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
@@ -104,6 +108,8 @@
     private AtomicBoolean started = new AtomicBoolean(false);
 
     private EndpointListener endpointListener;
+    
+    private ComponentListener componentListener;
 
     /**
      * The type of Flow
@@ -216,6 +222,16 @@
             }
         };
         broker.getContainer().addListener(endpointListener);
+        // Create and register component listener
+        componentListener = new ComponentAdapter() {
+            public void componentStarted(ComponentEvent event) {
+                onComponentStarted(event);
+            }
+            public void componentStopped(ComponentEvent event) {
+                onComponentStopped(event);
+            }
+        };
+        broker.getContainer().addListener(componentListener);
         try {
             if (connectionFactory == null) {
                 if (jmsURL != null) {
@@ -285,6 +301,13 @@
                 });
 
                 // Start queue consumers for all components
+                for (Iterator it = broker.getRegistry().getComponents().iterator(); it.hasNext();)
{
+                    ComponentMBeanImpl cmp = (ComponentMBeanImpl) it.next();
+                    if (cmp.isStarted()) {
+                        onComponentStarted(new ComponentEvent(cmp, ComponentEvent.COMPONENT_STARTED));
+                    }
+                }
+                // Start queue consumers for all endpoints
                 ServiceEndpoint[] endpoints = broker.getRegistry().getEndpointsForInterface(null);
                 for (int i = 0; i < endpoints.length; i++) {
                     if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint)
endpoints[i]).isLocal()) {
@@ -328,6 +351,8 @@
         stop();
         // Remove endpoint listener
         broker.getContainer().removeListener(endpointListener);
+        // Remove component listener
+        broker.getContainer().removeListener(componentListener);
         if (this.connection != null) {
             try {
                 this.connection.close();
@@ -384,6 +409,35 @@
             log.error("Cannot destroy consumer for " + event, e);
         }
     }
+    
+    public void onComponentStarted(ComponentEvent event) {
+        if (!started.get()) {
+            return;
+        }
+        try {
+            String key = event.getComponent().getName();
+            if (!consumerMap.containsKey(key)) {
+                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
+                MessageConsumer consumer = inboundSession.createConsumer(queue);
+                consumer.setMessageListener(this);
+                consumerMap.put(key, consumer);
+            }
+        } catch (Exception e) {
+            log.error("Cannot create consumer for component " + event.getComponent().getName(),
e);
+        }
+    }
+    
+    public void onComponentStopped(ComponentEvent event) {
+        try {
+            String key = event.getComponent().getName();
+            MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
+            if (consumer != null) {
+                consumer.close();
+            }
+        } catch (Exception e) {
+            log.error("Cannot destroy consumer for component " + event.getComponent().getName(),
e);
+        }
+    }
 
     public void onRemoteEndpointRegistered(EndpointEvent event) {
         log.info(broker.getContainerName() + ": adding remote endpoint: " + event.getEndpoint());
@@ -416,11 +470,23 @@
         try {
             String destination;
             if (me.getRole() == Role.PROVIDER) {
-                destination = INBOUND_PREFIX + me.getEndpoint().getServiceName() + me.getEndpoint().getEndpointName();
+                if (me.getDestinationId() == null) {
+                    destination = INBOUND_PREFIX + me.getEndpoint().getServiceName() + me.getEndpoint().getEndpointName();
+                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER))
&& !isSynchronous(me)) {
+                    destination = INBOUND_PREFIX + me.getDestinationId().getName();
+                } else {
+                    destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
+                }
             } else {
-                ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId()
: me.getSourceId();
-                destination = INBOUND_PREFIX + id.getContainerName();
+                if (me.getSourceId() == null) {
+                    throw new IllegalStateException("No sourceId set on the exchange");
+                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER))
&& !isSynchronous(me)) {
+                    destination = INBOUND_PREFIX + me.getSourceId().getName();
+                } else {
+                    destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
+                }
             }
+                
             Queue queue = inboundSession.createQueue(destination);
             ObjectMessage msg = inboundSession.createObjectMessage(me);
             queueProducer.send(queue, msg);

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/PingService.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/PingService.java?rev=393668&r1=393667&r2=393668&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/PingService.java
(original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/PingService.java
Wed Apr 12 17:25:32 2006
@@ -34,7 +34,8 @@
             NormalizedMessage out=exchange.createMessage();
             out.setContent(new StringSource("<response>Ping back at ya!</response>"));
             System.out.println("SENDING RESPONSE; exchange.status="+exchange.getStatus());
-            answer(exchange,out);
+            exchange.setMessage(out, "out");
+            getDeliveryChannel().sendSync(exchange);
             System.out.println("RESPONSE SENT; exchange.status="+exchange.getStatus());
         } else {
             System.out.println("GOT A MESSAGE; exchange.status="+exchange.getStatus());

Added: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJcaFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJcaFlowTest.java?rev=393668&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJcaFlowTest.java
(added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJcaFlowTest.java
Wed Apr 12 17:25:32 2006
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr.flow.jms;
+
+import javax.transaction.TransactionManager;
+
+import org.apache.geronimo.transaction.ExtendedTransactionManager;
+import org.apache.geronimo.transaction.context.TransactionContextManager;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow;
+import org.jencks.factory.GeronimoTransactionManagerFactoryBean;
+import org.jencks.factory.TransactionContextManagerFactoryBean;
+import org.jencks.factory.TransactionManagerFactoryBean;
+
+public class StatelessJcaFlowTest extends StatelessJmsFlowTest {
+
+    private TransactionContextManager tcm;
+    private TransactionManager tm;
+    
+    protected void setUp() throws Exception {
+        TransactionManagerFactoryBean tmcf = new TransactionManagerFactoryBean();
+        tmcf.afterPropertiesSet();
+        ExtendedTransactionManager etm = (ExtendedTransactionManager) tmcf.getObject();
+        TransactionContextManagerFactoryBean tcmfb = new TransactionContextManagerFactoryBean();
+        tcmfb.setTransactionManager(etm);
+        tcmfb.afterPropertiesSet();
+        tcm = (TransactionContextManager) tcmfb.getObject();
+        GeronimoTransactionManagerFactoryBean gtmfb = new GeronimoTransactionManagerFactoryBean();
+        gtmfb.setTransactionContextManager(tcm);
+        gtmfb.afterPropertiesSet();
+        tm = (TransactionManager) gtmfb.getObject();
+        super.setUp();
+    }
+    
+    protected JBIContainer createContainer(String name) throws Exception {
+        JBIContainer container = new JBIContainer();
+        container.setName(name);
+        JCAFlow flow = new JCAFlow();
+        flow.setJmsURL("tcp://localhost:61616");
+        flow.setTransactionContextManager(tcm);
+        container.setTransactionManager(tm);
+        container.setFlow(flow);
+        container.setUseMBeanServer(false);
+        container.setEmbedded(true);
+        container.init();
+        container.start();
+        return container;
+    }
+    
+
+}

Added: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJmsFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJmsFlowTest.java?rev=393668&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJmsFlowTest.java
(added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/StatelessJmsFlowTest.java
Wed Apr 12 17:25:32 2006
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr.flow.jms;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+
+public class StatelessJmsFlowTest extends TestCase {
+
+    protected JBIContainer jbi1;
+    protected JBIContainer jbi2;
+    protected BrokerService broker;
+    
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        
+        jbi1 = createContainer("jbi1");
+        jbi2 = createContainer("jbi2");
+    }
+    
+    protected void tearDown() throws Exception {
+        jbi1.shutDown();
+        jbi2.shutDown();
+        broker.stop();
+    }
+    
+    protected JBIContainer createContainer(String name) throws Exception {
+        JBIContainer container = new JBIContainer();
+        container.setName(name);
+        container.setFlowName("jms?jmsURL=tcp://localhost:61616");
+        container.setUseMBeanServer(false);
+        container.setEmbedded(true);
+        container.init();
+        container.start();
+        return container;
+    }
+    
+    protected StatelessEcho activateProvider(JBIContainer container, boolean stateless) throws
Exception {
+        StatelessEcho echo = new StatelessEcho(stateless);
+        container.activateComponent(echo, "echo");
+        return echo;
+    }
+    
+    protected StatelessSender activateConsumer(JBIContainer container) throws Exception {
+        StatelessSender sender = new StatelessSender();
+        container.activateComponent(sender, "sender");
+        return sender;
+    }
+    
+    public void testStatelessConsumer() throws Exception {
+        StatelessEcho echo1 = activateProvider(jbi1, false);
+        StatelessEcho echo2 = activateProvider(jbi2, false);
+        StatelessSender sender1 = activateConsumer(jbi1);
+        StatelessSender sender2 = activateConsumer(jbi2);
+        
+        sender1.sendMessages(100, true);
+        
+        int n1 = 0;
+        int n2 = 0;
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1000);
+            n1 = sender1.outIds.size();
+            n2 = sender2.outIds.size();
+            if (n1 + n2 == 100) {
+                break;
+            }
+        }
+        assertTrue(n1 != 0);
+        assertTrue(n2 != 0);
+        assertTrue(n1 + n2 == 100);
+    }
+    
+    public void testStatefullConsumer() throws Exception {
+        StatelessEcho echo1 = activateProvider(jbi1, false);
+        StatelessEcho echo2 = activateProvider(jbi2, false);
+        StatelessSender sender1 = activateConsumer(jbi1);
+        StatelessSender sender2 = activateConsumer(jbi2);
+        
+        sender1.sendMessages(100, false);
+        
+        int n1 = 0;
+        int n2 = 0;
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1000);
+            n1 = sender1.outIds.size();
+            n2 = sender2.outIds.size();
+            if (n1 + n2 == 100) {
+                break;
+            }
+        }
+        assertTrue(n1 != 0);
+        assertTrue(n2 == 0);
+        assertTrue(n1 + n2 == 100);
+    }
+    
+    public void testStatelessProvider() throws Exception {
+        StatelessEcho echo1 = activateProvider(jbi1, true);
+        StatelessEcho echo2 = activateProvider(jbi2, true);
+        StatelessSender sender1 = activateConsumer(jbi1);
+        StatelessSender sender2 = activateConsumer(jbi2);
+        
+        sender1.sendMessages(100, false);
+
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1000);
+            if (echo1.doneIds.size() + echo2.doneIds.size() == 100) {
+                break;
+            }
+        }
+        assertTrue(echo1.doneIds.size() + echo2.doneIds.size() == 100);
+        
+        // Check that the echo1 component received
+        // DONE status for exchanges it did not handle
+        // the first time.
+        // Do not bother testing for echo2, as it will
+        // be automatically true.
+        Set doneIds1 = new HashSet();
+        doneIds1.addAll(echo1.doneIds);
+        doneIds1.removeAll(echo1.inIds);
+        assertTrue(doneIds1.size() > 0);
+    }
+    
+    public void testStatefullProvider() throws Exception {
+        StatelessEcho echo1 = activateProvider(jbi1, false);
+        StatelessEcho echo2 = activateProvider(jbi2, false);
+        StatelessSender sender1 = activateConsumer(jbi1);
+        StatelessSender sender2 = activateConsumer(jbi2);
+        
+        sender1.sendMessages(100, false);
+
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1000);
+            if (echo1.doneIds.size() + echo2.doneIds.size() == 100) {
+                break;
+            }
+        }
+        assertTrue(echo1.doneIds.size() + echo2.doneIds.size() == 100);
+        
+        // Check that the echo1 component received
+        // DONE status for exchanges it did not handle
+        // the first time.
+        // Do not bother testing for echo2, as it will
+        // be automatically true.
+        Set doneIds1 = new HashSet();
+        doneIds1.addAll(echo1.doneIds);
+        doneIds1.removeAll(echo1.inIds);
+        assertTrue(doneIds1.size() == 0);
+    }
+    
+    public static class StatelessSender extends ComponentSupport implements MessageExchangeListener
{
+        public static final QName SERVICE = new QName("sender");
+        public static final String ENDPOINT = "ep";
+        public List outIds = new CopyOnWriteArrayList();
+        public StatelessSender() {
+            super(SERVICE, ENDPOINT);
+        }
+        public void sendMessages(int nb, boolean stateless) throws Exception {
+            for (int i = 0; i < nb; i++) {
+                MessageExchangeFactory mef = getDeliveryChannel().createExchangeFactory();
+                InOut me = mef.createInOutExchange();
+                me.setService(new QName("echo"));
+                if (stateless) {
+                    me.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+                }
+                me.setInMessage(me.createMessage());
+                me.getInMessage().setContent(new StringSource("<hello/>"));
+                getDeliveryChannel().send(me);
+                
+            }
+        }
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException
{
+            outIds.add(exchange.getExchangeId());
+            done(exchange);
+        }
+    }
+    
+    public static class StatelessEcho extends ComponentSupport implements MessageExchangeListener
{
+        private boolean stateless;
+        public List inIds = new CopyOnWriteArrayList();
+        public List doneIds = new CopyOnWriteArrayList();
+        public StatelessEcho(boolean stateless) {
+            setService(new QName("echo"));
+            setEndpoint("ep");
+            this.stateless = stateless;
+        }
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException
{
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                doneIds.add(exchange.getExchangeId());
+            } else {
+                inIds.add(exchange.getExchangeId());
+                if (stateless) {
+                    exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
+                }
+                NormalizedMessage out = exchange.createMessage();
+                out.setContent(new StringSource("<world/>"));
+                answer(exchange, out);
+            }
+        }
+    }
+
+}



Mime
View raw message