servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ge...@apache.org
Subject svn commit: r707336 - in /servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src: main/java/org/apache/servicemix/camel/ test/java/org/apache/servicemix/camel/ test/resources/
Date Thu, 23 Oct 2008 10:22:47 GMT
Author: gertv
Date: Thu Oct 23 03:22:45 2008
New Revision: 707336

URL: http://svn.apache.org/viewvc?rev=707336&view=rev
Log:
SM-1486: servicemix-camel should use asynchronous messaging

Added:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties
Removed:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiEndpoint.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/ToJbiProcessor.java
Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties

Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java?rev=707336&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
(added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
Thu Oct 23 03:22:45 2008
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+import java.net.URISyntaxException;
+import java.util.Set;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
+import org.apache.servicemix.id.IdGenerator;
+import org.apache.servicemix.jbi.resolver.URIResolver;
+
+/**
+ * A consumer endpoint that will be used to send JBI exchanges
+ * originating from camel.
+ */
+public class CamelConsumerEndpoint extends ConsumerEndpoint implements AsyncProcessor {
+
+    public static final QName SERVICE_NAME = new QName("http://activemq.apache.org/camel/schema/jbi",
"consumer");
+
+    private JbiBinding binding;
+
+    private JbiEndpoint jbiEndpoint;
+
+    public CamelConsumerEndpoint(JbiBinding binding, JbiEndpoint jbiEndpoint) {
+        setService(SERVICE_NAME);
+        setEndpoint(new IdGenerator().generateId());
+        this.binding = binding;
+        this.jbiEndpoint = jbiEndpoint;
+    }
+
+    public void process(MessageExchange messageExchange) throws Exception {
+        Exchange exchange = (Exchange) messageExchange.getProperty(Exchange.class.getName());
+        AsyncCallback asyncCallback = (AsyncCallback) messageExchange.getProperty(AsyncCallback.class.getName());
+        if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
+            exchange.setException(messageExchange.getError());
+        } else if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+            addHeaders(messageExchange, exchange);
+            if (messageExchange.getFault() != null) {
+                exchange.getFault().setBody(messageExchange.getFault().getContent());
+                addHeaders(messageExchange.getFault(), exchange.getFault());
+                addAttachments(messageExchange.getFault(), exchange.getFault());
+            } else {
+                exchange.getOut().setBody(messageExchange.getMessage("out").getContent());
+                addHeaders(messageExchange.getMessage("out"), exchange.getOut());
+                addAttachments(messageExchange.getMessage("out"), exchange.getOut());
+            }
+            done(messageExchange);
+        }
+        asyncCallback.done(false);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
+        try {
+            MessageExchange messageExchange = binding.makeJbiMessageExchange(exchange, getExchangeFactory(),
jbiEndpoint.getMep());
+
+            if (jbiEndpoint.getOperation() != null) {
+                messageExchange.setOperation(QName.valueOf(jbiEndpoint.getOperation()));
+            }
+
+            URIResolver.configureExchange(messageExchange, getContext(), jbiEndpoint.getDestinationUri());
+            messageExchange.setProperty(Exchange.class.getName(), exchange);
+            messageExchange.setProperty(AsyncCallback.class.getName(), asyncCallback);
+
+            send(messageExchange);
+            return false;
+        } catch (MessagingException e) {
+            throw new JbiException(e);
+        } catch (URISyntaxException e) {
+            throw new JbiException(e);
+        }
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        try {
+            MessageExchange messageExchange = binding.makeJbiMessageExchange(exchange, getExchangeFactory(),
jbiEndpoint.getMep());
+
+            if (jbiEndpoint.getOperation() != null) {
+                messageExchange.setOperation(QName.valueOf(jbiEndpoint.getOperation()));
+            }
+
+            URIResolver.configureExchange(messageExchange, getContext(), jbiEndpoint.getDestinationUri());
+
+            sendSync(messageExchange);
+
+            if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
+                exchange.setException(messageExchange.getError());
+            } else if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+                addHeaders(messageExchange, exchange);
+                if (messageExchange.getFault() != null) {
+                    exchange.getFault().setBody(messageExchange.getFault().getContent());
+                    addHeaders(messageExchange.getFault(), exchange.getFault());
+                    addAttachments(messageExchange.getFault(), exchange.getFault());
+                } else {
+                    exchange.getOut().setBody(messageExchange.getMessage("out").getContent());
+                    addHeaders(messageExchange.getMessage("out"), exchange.getOut());
+                    addAttachments(messageExchange.getMessage("out"), exchange.getOut());
+                }
+                done(messageExchange);
+            }
+
+        } catch (MessagingException e) {
+            throw new JbiException(e);
+        } catch (URISyntaxException e) {
+            throw new JbiException(e);
+        }
+    }
+    
+    @Override
+    public String getLocationURI() {
+        return null;
+    }
+
+    @Override
+    public void validate() throws DeploymentException {
+        // No validation required
+    }
+
+    private void addHeaders(MessageExchange messageExchange, Exchange camelExchange) {
+        Set entries = messageExchange.getPropertyNames();
+        for (Object o : entries) {
+            String key = o.toString();
+            camelExchange.setProperty(key, messageExchange.getProperty(key));
+        }
+    }
+
+    private void addHeaders(NormalizedMessage normalizedMessage, Message camelMessage) {
+        Set entries = normalizedMessage.getPropertyNames();
+        for (Object o : entries) {
+            String key = o.toString();
+            camelMessage.setHeader(key, normalizedMessage.getProperty(key));
+        }
+    }
+
+    private void addAttachments(NormalizedMessage normalizedMessage, Message camelMessage)
{
+        Set entries = normalizedMessage.getAttachmentNames();
+        for (Object o : entries) {
+            String id = o.toString();
+            camelMessage.addAttachment(id, normalizedMessage.getAttachment(id));
+        }
+    }
+
+}

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java
Thu Oct 23 03:22:45 2008
@@ -74,8 +74,8 @@
      * @see org.apache.servicemix.common.DefaultComponent#getConfiguredEndpoints()
      */
     @Override
-    protected List<CamelJbiEndpoint> getConfiguredEndpoints() {
-        return new ArrayList<CamelJbiEndpoint>();
+    protected List<CamelProviderEndpoint> getConfiguredEndpoints() {
+        return new ArrayList<CamelProviderEndpoint>();
     }
 
     /**
@@ -84,7 +84,7 @@
      */
     @Override
     protected Class[] getEndpointClasses() {
-        return new Class[] {CamelJbiEndpoint.class};
+        return new Class[] {CamelProviderEndpoint.class, CamelConsumerEndpoint.class};
     }
 
     /**
@@ -112,18 +112,18 @@
 
     @Override
     protected org.apache.servicemix.common.Endpoint getResolvedEPR(ServiceEndpoint ep) throws
Exception {
-        CamelJbiEndpoint endpoint = createEndpoint(ep);
+        CamelProviderEndpoint endpoint = createEndpoint(ep);
         endpoint.activate();
         return endpoint;
     }
 
-    public CamelJbiEndpoint createEndpoint(ServiceEndpoint ep) throws URISyntaxException
{
+    public CamelProviderEndpoint createEndpoint(ServiceEndpoint ep) throws URISyntaxException
{
         URI uri = new URI(ep.getEndpointName());
         Map map = URISupport.parseQuery(uri.getQuery());
         String camelUri = uri.getSchemeSpecificPart();
         Endpoint camelEndpoint = getCamelContext().getEndpoint(camelUri);
         Processor processor = createCamelProcessor(camelEndpoint);
-        CamelJbiEndpoint endpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint,
getBinding(), processor);
+        CamelProviderEndpoint endpoint = new CamelProviderEndpoint(getServiceUnit(), camelEndpoint,
getBinding(), processor);
 
         IntrospectionSupport.setProperties(endpoint, map);
 
@@ -163,8 +163,8 @@
      * 
      * @returns a JBI endpoint created for the given Camel endpoint
      */
-    public CamelJbiEndpoint activateJbiEndpoint(Endpoint camelEndpoint, Processor processor)
throws Exception {
-        CamelJbiEndpoint jbiEndpoint = createJbiEndpointFromCamel(camelEndpoint, processor);
+    public CamelProviderEndpoint activateJbiEndpoint(Endpoint camelEndpoint, Processor processor)
throws Exception {
+        CamelProviderEndpoint jbiEndpoint = createJbiEndpointFromCamel(camelEndpoint, processor);
 
         // the following method will activate the new dynamic JBI endpoint
         if (deployer != null) {
@@ -176,20 +176,20 @@
         return jbiEndpoint;
     }
 
-    public void deactivateJbiEndpoint(CamelJbiEndpoint jbiEndpoint) throws Exception {
+    public void deactivateJbiEndpoint(CamelProviderEndpoint jbiEndpoint) throws Exception
{
         // this will be done by the ServiceUnit
         // jbiEndpoint.deactivate();
     }
 
-    protected CamelJbiEndpoint createJbiEndpointFromCamel(Endpoint camelEndpoint, Processor
processor) {
-        CamelJbiEndpoint jbiEndpoint;
+    protected CamelProviderEndpoint createJbiEndpointFromCamel(Endpoint camelEndpoint, Processor
processor) {
+        CamelProviderEndpoint jbiEndpoint;
         String endpointUri = camelEndpoint.getEndpointUri();
         if (camelEndpoint instanceof JbiEndpoint) {
             QName service = null;
             String endpoint = null;
             if (endpointUri.startsWith("name:")) {
                 endpoint = endpointUri.substring("name:".length());
-                service = CamelJbiEndpoint.SERVICE_NAME;
+                service = CamelProviderEndpoint.SERVICE_NAME;
             } else if (endpointUri.startsWith("endpoint:")) {
                 String uri = endpointUri.substring("endpoint:".length());
                 // lets decode "serviceNamespace sep serviceName sep
@@ -225,9 +225,9 @@
                         + "or jbi:service:[serviceNamespace][sep][serviceName or jbi:name:[endpointName]
but was given: "
                                 + endpointUri);
             }
-            jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), service, endpoint, camelEndpoint,
getBinding(), processor);
+            jbiEndpoint = new CamelProviderEndpoint(getServiceUnit(), service, endpoint,
camelEndpoint, getBinding(), processor);
         } else {
-            jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding(),
processor);
+            jbiEndpoint = new CamelProviderEndpoint(getServiceUnit(), camelEndpoint, getBinding(),
processor);
         }
         return jbiEndpoint;
     }
@@ -242,7 +242,7 @@
     /**
      * Returns a JBI endpoint created for the given Camel endpoint
      */
-    public CamelJbiEndpoint createJbiEndpointFromCamel(Endpoint camelEndpoint) {
+    public CamelProviderEndpoint createJbiEndpointFromCamel(Endpoint camelEndpoint) {
         Processor processor = createCamelProcessor(camelEndpoint);
         return createJbiEndpointFromCamel(camelEndpoint, processor);
     }

Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java?rev=707336&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
(added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
Thu Oct 23 03:22:45 2008
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.RobustInOnly;
+import javax.xml.namespace.QName;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.common.ServiceUnit;
+import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+
+/**
+ * A JBI endpoint which when invoked will delegate to a Camel endpoint
+ *
+ * @version $Revision: 426415 $
+ */
+public class CamelProviderEndpoint extends ProviderEndpoint {
+
+    public static final QName SERVICE_NAME = new QName("http://activemq.apache.org/camel/schema/jbi",
"provider");
+
+    private Endpoint camelEndpoint;
+
+    private JbiBinding binding;
+
+    private Processor camelProcessor;
+
+    public CamelProviderEndpoint(ServiceUnit serviceUnit, QName service, String endpoint,
Endpoint camelEndpoint, JbiBinding binding,
+            Processor camelProcessor) {
+        super(serviceUnit, service, endpoint);
+        this.camelProcessor = camelProcessor;
+        this.camelEndpoint = camelEndpoint;
+        this.binding = binding;
+    }
+
+    public CamelProviderEndpoint(ServiceUnit serviceUnit, Endpoint camelEndpoint, JbiBinding
binding, Processor camelProcessor) {
+        this(serviceUnit, SERVICE_NAME, camelEndpoint.getEndpointUri(), camelEndpoint, binding,
camelProcessor);
+    }
+
+    @Override
+    public void process(MessageExchange exchange) throws Exception {
+        // The component acts as a provider, this means that another component has requested
our service
+        // As this exchange is active, this is either an in or a fault (out are sent by this
component)
+        
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            // Exchange is finished
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                return;
+            // Exchange has been aborted with an exception
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                return;
+            // Exchange is active
+            } else {
+                handleActiveProviderExchange(exchange);
+
+            }
+        // Unsupported role: this should never happen has we never create exchanges
+        } else {
+            throw new IllegalStateException("Unsupported role: " + exchange.getRole());
+        }
+    }
+
+
+    protected void handleActiveProviderExchange(MessageExchange exchange) throws Exception
{
+        // Fault message
+        if (exchange.getFault() != null) {
+            done(exchange);
+        // In message
+        } else if (exchange.getMessage("in") != null) {
+            if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Received exchange: " + exchange);
+                }
+                JbiExchange camelExchange = new JbiExchange(camelEndpoint.getCamelContext(),
binding, exchange);
+                camelProcessor.process(camelExchange);
+                if (camelExchange.isFailed()) {
+                    Throwable t = camelExchange.getException();
+                    Exception e;
+                    if (t == null) {
+                        e = new Exception("Unknown error");
+                    } else if (t instanceof Exception) {
+                        e = (Exception) t;
+                    } else {
+                        e = new Exception(t);
+                    }
+                    fail(exchange, e);
+                } else {
+                    done(exchange);
+                }
+            } else {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Received exchange: " + exchange);
+                }
+                JbiExchange camelExchange = new JbiExchange(camelEndpoint.getCamelContext(),
binding, exchange);
+                camelProcessor.process(camelExchange);
+                if (camelExchange.isFailed()) {
+                    Throwable t = camelExchange.getException();
+                    Exception e;
+                    if (t == null) {
+                        e = new Exception("Unknown error");
+                    } else if (t instanceof Exception) {
+                        e = (Exception) t;
+                    } else {
+                        e = new Exception(t);
+                    }
+                    fail(exchange, e);
+                } else {
+                    boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+                    if (txSync) {
+                        sendSync(exchange);
+                    } else {
+                        send(exchange);
+                    }
+                }
+            }
+        // This is not compliant with the default MEPs
+        } else {
+            throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault
is provided");
+        }
+    }
+
+}

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java
Thu Oct 23 03:22:45 2008
@@ -51,7 +51,7 @@
         }
     };
 
-    private List<CamelJbiEndpoint> activatedEndpoints = new ArrayList<CamelJbiEndpoint>();
+    private List<CamelProviderEndpoint> activatedEndpoints = new ArrayList<CamelProviderEndpoint>();
 
     private String serviceUnitName;
 
@@ -85,14 +85,14 @@
         return serviceUnit;
     }
 
-    public void addService(CamelJbiEndpoint endpoint) {
+    public void addService(CamelProviderEndpoint endpoint) {
         activatedEndpoints.add(endpoint);
     }
 
     @Override
     protected List getServices(Kernel kernel) {
         try {
-            List<CamelJbiEndpoint> services = new ArrayList<CamelJbiEndpoint>(activatedEndpoints);
+            List<org.apache.servicemix.common.Endpoint> services = new ArrayList<org.apache.servicemix.common.Endpoint>(activatedEndpoints);
             activatedEndpoints.clear();
 
             ApplicationContext applicationContext = springLoader.getApplicationContext();

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java
Thu Oct 23 03:22:45 2008
@@ -19,7 +19,10 @@
 import java.net.URISyntaxException;
 import java.util.Map;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -34,7 +37,6 @@
  * @version $Revision: 563665 $
  */
 public class JbiEndpoint extends DefaultEndpoint<Exchange> {
-    private Processor toJbiProcessor;
 
     private String destinationUri;
 
@@ -42,21 +44,52 @@
 
     private String operation;
 
+    private JbiProducer producer;
+
     private final CamelJbiComponent jbiComponent;
 
     public JbiEndpoint(CamelJbiComponent jbiComponent, String uri) {
         super(uri, jbiComponent);
         this.jbiComponent = jbiComponent;
         parseUri(uri);
-        toJbiProcessor = new ToJbiProcessor(jbiComponent.getBinding(), jbiComponent.getComponentContext(),
this);
     }
 
-    public Producer<Exchange> createProducer() throws Exception {
-        return new DefaultProducer<Exchange>(this) {
-            public void process(Exchange exchange) throws Exception {
-                toJbiProcessor.process(exchange);
-            }
-        };
+    public synchronized Producer<Exchange> createProducer() throws Exception {
+        if (producer == null) {
+            producer = new JbiProducer(this);
+        }
+        return producer;
+    }
+
+    protected class JbiProducer extends DefaultProducer<Exchange> implements AsyncProcessor
{
+
+        private CamelConsumerEndpoint consumer;
+
+        public JbiProducer(Endpoint<Exchange> exchangeEndpoint) {
+            super(exchangeEndpoint);
+        }
+
+        @Override
+        public void start() throws Exception {
+            consumer = new CamelConsumerEndpoint(jbiComponent.getBinding(), JbiEndpoint.this);
+            //consumer.start();
+            jbiComponent.addEndpoint(consumer);
+            super.start();
+        }
+        @Override
+        public void stop() throws Exception {
+            //consumer.stop();
+            jbiComponent.removeEndpoint(consumer);
+            super.stop();
+        }
+
+        public void process(Exchange exchange) throws Exception {
+            consumer.process(exchange);
+        }
+
+        public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
+            return consumer.process(exchange, asyncCallback);
+        }
     }
 
     private void parseUri(String uri) {
@@ -103,7 +136,7 @@
 
     public Consumer<Exchange> createConsumer(final Processor processor) throws Exception
{
         return new DefaultConsumer<Exchange>(this, processor) {
-            CamelJbiEndpoint jbiEndpoint;
+            CamelProviderEndpoint jbiEndpoint;
 
             @Override
             protected void doStart() throws Exception {

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java
Thu Oct 23 03:22:45 2008
@@ -39,7 +39,7 @@
     protected void configureExchange(ServiceMixClient client,
             MessageExchange exchange) {
         ServiceEndpoint endpoint = client.getContext().getEndpoint(
-                CamelJbiEndpoint.SERVICE_NAME, "cheese");
+                CamelProviderEndpoint.SERVICE_NAME, "cheese");
         assertNotNull("Should have a Camel endpoint exposed in JBI!", endpoint);
         exchange.setEndpoint(endpoint);
     }

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java
Thu Oct 23 03:22:45 2008
@@ -45,7 +45,7 @@
     protected void configureExchange(ServiceMixClient client,
             MessageExchange exchange) {
         ServiceEndpoint endpoint = client.getContext().getEndpoint(
-                CamelJbiEndpoint.SERVICE_NAME, "cheese");
+                CamelProviderEndpoint.SERVICE_NAME, "cheese");
         assertNotNull("Should have a Camel endpoint exposed in JBI!", endpoint);
         exchange.setEndpoint(endpoint);
     }

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java
Thu Oct 23 03:22:45 2008
@@ -20,9 +20,11 @@
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.xml.namespace.QName;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -35,6 +37,7 @@
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.servicemix.jbi.container.ActivationSpec;
 import org.apache.servicemix.jbi.container.SpringJBIContainer;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
 
 /**
  * @version $Revision: 563665 $
@@ -46,6 +49,8 @@
 
     protected SpringJBIContainer jbiContainer = new SpringJBIContainer();
 
+    protected ExchangeCompletedListener exchangeCompletedListener;
+
     protected CountDownLatch latch = new CountDownLatch(1);
 
     protected Endpoint<Exchange> endpoint;
@@ -67,6 +72,26 @@
         });
     }
 
+    /**
+     * Sends an exchange to the endpoint
+     */
+    protected AtomicBoolean sendExchangeAsync(final Object expectedBody) {
+        final AtomicBoolean bool = new AtomicBoolean();
+        client.send(endpoint, new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody(expectedBody);
+                in.setHeader("cheese", 123);
+            }
+        }, new AsyncCallback() {
+            public void done(boolean b) {
+                bool.set(true);
+                bool.notify();
+            }
+        });
+        return bool;
+    }
+
     protected Object assertReceivedValidExchange(Class type) throws Exception {
         // lets wait on the message being received
         boolean received = latch.await(5, TimeUnit.SECONDS);
@@ -84,6 +109,7 @@
     @Override
     protected void setUp() throws Exception {
         jbiContainer.setEmbedded(true);
+        exchangeCompletedListener = new ExchangeCompletedListener();
 
         CamelJbiComponent component = new CamelJbiComponent();
 
@@ -103,6 +129,7 @@
                 .toArray(new ActivationSpec[activationSpecList.size()]);
         jbiContainer.setActivationSpecs(activationSpecs);
         jbiContainer.afterPropertiesSet();
+        jbiContainer.addListener(exchangeCompletedListener);
 
         // lets configure some componnets
         camelContext.addComponent("jbi", component);

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java
Thu Oct 23 03:22:45 2008
@@ -151,7 +151,7 @@
     protected void configureExchange(ServiceMixClient client,
             MessageExchange exchange) {
         ServiceEndpoint endpoint = client.getContext().getEndpoint(
-                CamelJbiEndpoint.SERVICE_NAME, "camel:su1-controlBus");
+                CamelProviderEndpoint.SERVICE_NAME, "camel:su1-controlBus");
         assertNotNull("Should have a Camel endpoint exposed in JBI!", endpoint);
         exchange.setEndpoint(endpoint);
     }

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java
Thu Oct 23 03:22:45 2008
@@ -30,10 +30,11 @@
  * @version $Revision: 563665 $
  */
 public class SendFromCamelToJbiTest extends JbiTestSupport {
+
     private ReceiverComponent receiverComponent = new ReceiverComponent();
 
     public void testCamelInvokingJbi() throws Exception {
-        sendExchange("<foo bar='123'/>");
+        sendExchangeAsync("<foo bar='123'/>");
         MessageList list = receiverComponent.getMessageList();
 
         list.assertMessagesReceived(1);
@@ -43,6 +44,8 @@
         log.info("Received: " + message);
 
         assertEquals("cheese header", 123, message.getProperty("cheese"));
+
+        exchangeCompletedListener.assertExchangeCompleted();
     }
 
     @Override
@@ -56,14 +59,12 @@
     }
 
     @Override
-    protected void appendJbiActivationSpecs(
-            List<ActivationSpec> activationSpecList) {
+    protected void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList)
{
         ActivationSpec activationSpec = new ActivationSpec();
         activationSpec.setId("jbiReceiver");
         activationSpec.setService(new QName("serviceNamespace", "serviceA"));
         activationSpec.setEndpoint("endpointA");
         activationSpec.setComponent(receiverComponent);
-
         activationSpecList.add(activationSpec);
     }
 

Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties?rev=707336&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties
(added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties
Thu Oct 23 03:22:45 2008
@@ -0,0 +1,38 @@
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=DEBUG, out
+
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.file=target/servicemix-test.log
+log4j.appender.out.append=true

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties
Thu Oct 23 03:22:45 2008
@@ -1,36 +1,31 @@
 #
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed  under the  License is distributed on an "AS IS" BASIS,
-# WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
-# implied.
-#
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
 #
 
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
 
-log4j.logger.org.springframework=INFO
 log4j.logger.org.apache.activemq=INFO
 log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
 
-#log4j.logger.org.apache.camel=DEBUG
-#log4j.logger.org.apache.servicemix=DEBUG
-
-# CONSOLE appender
+# CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
@@ -39,5 +34,5 @@
 log4j.appender.out=org.apache.log4j.FileAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
 log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/camel-test.log
+log4j.appender.out.file=target/servicemix-test.log
 log4j.appender.out.append=true



Mime
View raw message