servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r722061 - in /servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms: ./ src/main/java/org/apache/servicemix/jms/endpoints/ src/test/java/org/apache/servicemix/jms/
Date Mon, 01 Dec 2008 12:51:09 GMT
Author: gnodet
Date: Mon Dec  1 04:51:00 2008
New Revision: 722061

URL: http://svn.apache.org/viewvc?rev=722061&view=rev
Log:
SM-1680, SM-1681, SM-1682: handle attachments, faults and errors on the new JMS endpoints

Added:
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml?rev=722061&r1=722060&r2=722061&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml Mon Dec  1 04:51:00 2008
@@ -84,7 +84,20 @@
     </dependency>
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-javamail_1.3.1_spec</artifactId>
+      <artifactId>geronimo-javamail_1.4_spec</artifactId>
+      <version>1.5</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.javamail</groupId>
+      <artifactId>geronimo-javamail_1.4_mail</artifactId>
+      <version>1.5</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.javamail</groupId>
+      <artifactId>geronimo-javamail_1.4_provider</artifactId>
+      <version>1.5</version>
       <scope>provided</scope>
     </dependency>
     <dependency>

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=722061&r1=722060&r2=722061&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Mon Dec  1 04:51:00 2008
@@ -354,8 +354,8 @@
     }
 
     protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
-        // Ignore DONE exchanges
-        if (exchange.getStatus() == ExchangeStatus.DONE) {
+        // Ignore InOnly exchanges which are currently handled in fire-and-forget mode
+        if (exchange instanceof InOnly) {
             return;
         }
         // Create session if needed
@@ -403,6 +403,10 @@
             dest = getReplyDestination(exchange, error, session, context);
             setCorrelationId(context.getMessage(), msg);
             send(msg, session, dest);
+        } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+            msg = session.createMessage();
+            msg.setBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY, true);
+            send(msg, session, dest);
         } else {
             throw new IllegalStateException("Unrecognized exchange status");
         }
@@ -465,9 +469,7 @@
                 } catch (Exception e) {
                     handleException(exchange, e, session, context);
                 }
-                if (exchange.getStatus() != ExchangeStatus.DONE) {
-                    processExchange(exchange, session, context);
-                }
+                processExchange(exchange, session, context);
             } else {
                 if (stateless) {
                     exchange.setProperty(PROP_JMS_CONTEXT, context);

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java?rev=722061&r1=722060&r2=722061&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java Mon Dec  1 04:51:00 2008
@@ -32,6 +32,12 @@
  */
 public abstract class AbstractJmsMarshaler {
 
+    public static final String DONE_JMS_PROPERTY = "JBIDone";
+
+    public static final String FAULT_JMS_PROPERTY = "JBIFault";
+
+    public static final String ERROR_JMS_PROPERTY = "JBIError";
+
     /**
      * Should marshaler copy properties set in messages?
      */
@@ -108,7 +114,7 @@
      * @param value the property value
      * @return true if it should be copied
      */
-    private boolean shouldIncludeHeader(String name, Object value) {
+    protected boolean shouldIncludeHeader(String name, Object value) {
         boolean allowed = value instanceof Boolean || value instanceof Byte || value instanceof Short;
         allowed |= value instanceof Integer || value instanceof Long || value instanceof Float;
         allowed |= value instanceof Double || value instanceof String;

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java?rev=722061&r1=722060&r2=722061&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java Mon Dec  1 04:51:00 2008
@@ -16,24 +16,39 @@
  */
 package org.apache.servicemix.jms.endpoints;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.URI;
+import java.util.Map;
+import java.util.Set;
 
+import javax.activation.DataHandler;
 import javax.jbi.component.ComponentContext;
 import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jms.Message;
+import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.xml.stream.XMLStreamReader;
 import javax.xml.transform.Source;
 
-import org.apache.servicemix.jbi.jaxp.SourceTransformer;
-import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
+import org.apache.servicemix.soap.core.MessageImpl;
+import org.apache.servicemix.soap.core.PhaseInterceptorChain;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsInInterceptor;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.BodyOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxInInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxOutInterceptor;
+import org.apache.servicemix.soap.util.stax.StaxSource;
 
 public class DefaultConsumerMarshaler extends AbstractJmsMarshaler implements JmsConsumerMarshaler {
 
@@ -81,30 +96,75 @@
 
     public Message createOut(MessageExchange exchange, NormalizedMessage outMsg, Session session,
                              JmsContext context) throws Exception {
-        String text = new SourceTransformer().contentToString(outMsg);
-        TextMessage textMessage = session.createTextMessage(text);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PhaseInterceptorChain chain = new PhaseInterceptorChain();
+        chain.add(new AttachmentsOutInterceptor());
+        chain.add(new StaxOutInterceptor());
+        chain.add(new BodyOutInterceptor());
+        org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+        msg.setContent(Source.class, outMsg.getContent());
+        msg.setContent(OutputStream.class, baos);
+        for (String attId : (Set<String>) outMsg.getAttachmentNames()) {
+            msg.getAttachments().put(attId, outMsg.getAttachment(attId));
+        }
+        chain.doIntercept(msg);
+        TextMessage text = session.createTextMessage(baos.toString());
+        text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+                               (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
         if (isCopyProperties()) {
-            copyPropertiesFromNM(outMsg, textMessage);
+            copyPropertiesFromNM(outMsg, text);
         }
-        return textMessage;
+        return text;
     }
 
     public Message createFault(MessageExchange exchange, Fault fault, Session session, JmsContext context)
         throws Exception {
-        String text = new SourceTransformer().contentToString(fault);
-        return session.createTextMessage(text);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PhaseInterceptorChain chain = new PhaseInterceptorChain();
+        chain.add(new AttachmentsOutInterceptor());
+        chain.add(new StaxOutInterceptor());
+        chain.add(new BodyOutInterceptor());
+        org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+        msg.setContent(Source.class, fault.getContent());
+        msg.setContent(OutputStream.class, baos);
+        for (String attId : (Set<String>) fault.getAttachmentNames()) {
+            msg.getAttachments().put(attId, fault.getAttachment(attId));
+        }
+        chain.doIntercept(msg);
+        TextMessage text = session.createTextMessage(baos.toString());
+        text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+                               (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
+        text.setBooleanProperty(FAULT_JMS_PROPERTY, true);
+        if (isCopyProperties()) {
+            copyPropertiesFromNM(fault, text);
+        }
+        return text;
     }
 
-    public Message createError(MessageExchange exchange, Exception error, Session session, JmsContext context)
-        throws Exception {
-        throw error;
+    public Message createError(MessageExchange exchange, Exception error, 
+                               Session session, JmsContext context) throws Exception {
+        ObjectMessage message = session.createObjectMessage(error);
+        message.setBooleanProperty(ERROR_JMS_PROPERTY, true);
+        return message;
     }
 
     protected void populateMessage(Message message, NormalizedMessage normalizedMessage) throws Exception {
         if (message instanceof TextMessage) {
-            TextMessage textMessage = (TextMessage)message;
-            Source source = new StringSource(textMessage.getText());
-            normalizedMessage.setContent(source);
+            PhaseInterceptorChain chain = new PhaseInterceptorChain();
+            chain.add(new AttachmentsInInterceptor());
+            chain.add(new StaxInInterceptor());
+            org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+            msg.setContent(InputStream.class, new ByteArrayInputStream(((TextMessage) message).getText().getBytes()));
+            String contentType = message.getStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE);
+            if (contentType != null) {
+                msg.put(org.apache.servicemix.soap.api.Message.CONTENT_TYPE, contentType);
+            }
+            chain.doIntercept(msg);
+            XMLStreamReader xmlReader = msg.getContent(XMLStreamReader.class);
+            normalizedMessage.setContent(new StaxSource(xmlReader));
+            for (Map.Entry<String, DataHandler> attachment : msg.getAttachments().entrySet()) {
+                normalizedMessage.addAttachment(attachment.getKey(), attachment.getValue());
+            }
         } else {
             throw new UnsupportedOperationException("JMS message is not a TextMessage");
         }

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java?rev=722061&r1=722060&r2=722061&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java Mon Dec  1 04:51:00 2008
@@ -16,22 +16,34 @@
  */
 package org.apache.servicemix.jms.endpoints;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Map;
+import java.util.Set;
 
+import javax.activation.DataHandler;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.xml.stream.XMLStreamReader;
 import javax.xml.transform.Source;
 
-import org.apache.servicemix.jbi.jaxp.SourceTransformer;
-import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.soap.core.MessageImpl;
+import org.apache.servicemix.soap.core.PhaseInterceptorChain;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsInInterceptor;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.BodyOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxInInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxOutInterceptor;
+import org.apache.servicemix.soap.util.stax.StaxSource;
 
 public class DefaultProviderMarshaler extends AbstractJmsMarshaler implements JmsProviderMarshaler {
 
     private Map<String, Object> jmsProperties;
-    private SourceTransformer transformer = new SourceTransformer();
 
     /**
      * @return the jmsProperties
@@ -49,8 +61,21 @@
 
     public Message createMessage(MessageExchange exchange, NormalizedMessage in, Session session)
         throws Exception {
-        TextMessage text = session.createTextMessage();
-        text.setText(transformer.contentToString(in));
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PhaseInterceptorChain chain = new PhaseInterceptorChain();
+        chain.add(new AttachmentsOutInterceptor());
+        chain.add(new StaxOutInterceptor());
+        chain.add(new BodyOutInterceptor());
+        org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+        msg.setContent(Source.class, in.getContent());
+        msg.setContent(OutputStream.class, baos);
+        for (String attId : (Set<String>) in.getAttachmentNames()) {
+            msg.getAttachments().put(attId, in.getAttachment(attId));
+        }
+        chain.doIntercept(msg);
+        TextMessage text = session.createTextMessage(baos.toString());
+        text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+                               (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
         if (jmsProperties != null) {
             for (Map.Entry<String, Object> e : jmsProperties.entrySet()) {
                 text.setObjectProperty(e.getKey(), e.getValue());
@@ -67,12 +92,24 @@
     public void populateMessage(Message message, MessageExchange exchange, NormalizedMessage normalizedMessage)
         throws Exception {
         if (message instanceof TextMessage) {
-            TextMessage textMessage = (TextMessage)message;
-            Source source = new StringSource(textMessage.getText());
-            normalizedMessage.setContent(source);
+            PhaseInterceptorChain chain = new PhaseInterceptorChain();
+            chain.add(new AttachmentsInInterceptor());
+            chain.add(new StaxInInterceptor());
+            org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+            msg.setContent(InputStream.class, new ByteArrayInputStream(((TextMessage) message).getText().getBytes()));
+            String contentType = message.getStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE);
+            if (contentType != null) {
+                msg.put(org.apache.servicemix.soap.api.Message.CONTENT_TYPE, contentType);
+            }
+            chain.doIntercept(msg);
+            XMLStreamReader xmlReader = msg.getContent(XMLStreamReader.class);
+            normalizedMessage.setContent(new StaxSource(xmlReader));
+            for (Map.Entry<String, DataHandler> attachment : msg.getAttachments().entrySet()) {
+                normalizedMessage.addAttachment(attachment.getKey(), attachment.getValue());
+            }
 
             if (isCopyProperties()) {
-                copyPropertiesFromJMS(textMessage, normalizedMessage);
+                copyPropertiesFromJMS(message, normalizedMessage);
             }
         } else {
             throw new UnsupportedOperationException("JMS message is not a TextMessage");

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=722061&r1=722060&r2=722061&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Mon Dec  1 04:51:00 2008
@@ -17,15 +17,22 @@
 package org.apache.servicemix.jms.endpoints;
 
 import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
 
+import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.common.endpoints.ProviderEndpoint;
 import org.apache.servicemix.jms.JmsEndpointType;
 import org.apache.servicemix.store.Store;
@@ -36,22 +43,32 @@
 import org.springframework.jms.core.JmsTemplate102;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer102;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.jms.support.destination.DynamicDestinationResolver;
 
 /**
- * 
+ * A Spring-based JMS provider endpoint
+ *
  * @author gnodet
  * @org.apache.xbean.XBean element="provider"
  * @since 3.2
  */
 public class JmsProviderEndpoint extends ProviderEndpoint implements JmsEndpointType {
 
+    /**
+     * Timeout value indicating a blocking receive without timeout.
+     */
+    public static final long RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0;
+
     private static final String MSG_SELECTOR_START = "JMSCorrelationID='";    
     private static final String MSG_SELECTOR_END = "'";
 
     private JmsProviderMarshaler marshaler = new DefaultProviderMarshaler();
     private DestinationChooser destinationChooser = new SimpleDestinationChooser();
+    private DestinationChooser replyDestinationChooser = new SimpleDestinationChooser();
     private JmsTemplate template;
 
     private boolean jms102;
@@ -63,7 +80,7 @@
     private boolean messageIdEnabled = true;
     private boolean messageTimestampEnabled = true;
     private boolean pubSubNoLocal;
-    private long receiveTimeout = JmsTemplate.DEFAULT_RECEIVE_TIMEOUT;
+    private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
     private boolean explicitQosEnabled;
     private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
     private int priority = Message.DEFAULT_PRIORITY;
@@ -71,11 +88,12 @@
 
     private Destination replyDestination;
     private String replyDestinationName;
-    
-    private boolean stateless;
+
     private StoreFactory storeFactory;
     private Store store;
-    
+
+    private AbstractMessageListenerContainer listenerContainer;
+
     /**
      * @return the destination
      */
@@ -84,7 +102,9 @@
     }
 
     /**
-     * @param destination the destination to set
+    * Specifies the JMS <code>Destination</code> used to send messages.
+    *
+     * @param destination the destination
      */
     public void setDestination(Destination destination) {
         this.destination = destination;
@@ -98,21 +118,30 @@
     }
 
     /**
-     * @param destinationName the destinationName to set
+    * Specifies a string identifying the JMS destination used to send 
+     * messages. The destination is resolved using the 
+     * <code>DesitinationResolver</code>.
+     *
+     * @param destinationName the destination name
      */
     public void setDestinationName(String destinationName) {
         this.destinationName = destinationName;
     }
 
     /**
-     * @return the jms102
-     */
+    * Determines if the provider used JMS 1.0.2 compliant APIs.
+    *
+    * @return <code>true</code> if the provider is JMS 1.0.2 compliant
+    */
     public boolean isJms102() {
         return jms102;
     }
 
     /**
-     * @param jms102 the jms102 to set
+    * Specifies if the provider uses JMS 1.0.2 compliant APIs. Defaults to 
+    * <code>false</code>.
+    * 
+     * @param jms102 provider is JMS 1.0.2 compliant?
      */
     public void setJms102(boolean jms102) {
         this.jms102 = jms102;
@@ -126,6 +155,8 @@
     }
 
     /**
+    * Specifies the <code>ConnectionFactory</code> used by the endpoint.
+    *
      * @param connectionFactory the connectionFactory to set
      */
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
@@ -140,7 +171,10 @@
     }
 
     /**
-     * @param deliveryMode the deliveryMode to set
+    * Specifies the JMS delivery mode used for the reply. Defaults to 
+    * (2)(<code>PERSISTENT</code>).
+    *
+     * @param deliveryMode the JMS delivery mode
      */
     public void setDeliveryMode(int deliveryMode) {
         this.deliveryMode = deliveryMode;
@@ -154,7 +188,10 @@
     }
 
     /**
-     * @param destinationChooser the destinationChooser to set
+    * Specifies a class implementing logic for choosing the destination used 
+    * to send messages.
+    *
+     * @param destinationChooser the destination chooser for sending messages
      */
     public void setDestinationChooser(DestinationChooser destinationChooser) {
         if (destinationChooser == null) {
@@ -164,6 +201,22 @@
     }
 
     /**
+     * @return the destination chooser for the reply destination
+     */
+    public DestinationChooser getReplyDestinationChooser() {
+        return replyDestinationChooser;
+    }
+
+    /**
+    * Specifies a class implementing logic for choosing the destination used 
+    * to recieve replies.
+    *
+     * @param replyDestinationChooser the destination chooser used for the reply destination
+     */
+    public void setReplyDestinationChooser(DestinationChooser replyDestinationChooser) {
+        this.replyDestinationChooser = replyDestinationChooser;
+    }
+    /**
      * @return the destinationResolver
      */
     public DestinationResolver getDestinationResolver() {
@@ -171,7 +224,10 @@
     }
 
     /**
-     * @param destinationResolver the destinationResolver to set
+    * Specifies the class implementing logic for converting strings into 
+    * destinations. The default is <code>DynamicDestinationResolver</code>.
+    *
+     * @param destinationResolver the destination resolver implementation
      */
     public void setDestinationResolver(DestinationResolver destinationResolver) {
         this.destinationResolver = destinationResolver;
@@ -185,7 +241,10 @@
     }
 
     /**
-     * @param explicitQosEnabled the explicitQosEnabled to set
+    * Specifies if the QoS values specified for the endpoint are explicitly 
+    * used when a messages is sent. The default is <code>false</code>.
+    *
+     * @param explicitQosEnabled should the QoS values be sent?
      */
     public void setExplicitQosEnabled(boolean explicitQosEnabled) {
         this.explicitQosEnabled = explicitQosEnabled;
@@ -199,7 +258,11 @@
     }
 
     /**
-     * @param marshaler the marshaler to set
+    * Specifies the class implementing the message marshaler. The message 
+    * marshaller is responsible for marshalling and unmarshalling JMS messages. 
+    * The default is <code>DefaultProviderMarshaler</code>.
+    *
+     * @param marshaler the marshaler implementation
      */
     public void setMarshaler(JmsProviderMarshaler marshaler) {
         if (marshaler == null) {
@@ -216,7 +279,15 @@
     }
 
     /**
-     * @param messageIdEnabled the messageIdEnabled to set
+    * Specifies if your endpoint requires JMS message IDs. Setting the 
+    * <code>messageIdEnabled</code> property to <code>false</code> causes the 
+    * endpoint to call its message producer's 
+    * <code>setDisableMessageID() </code> with a value of <code>true</code>. 
+    * The JMS broker is then given a hint that it does not need to generate 
+    * message IDs or add them to the messages from the endpoint. The JMS 
+    * broker can choose to accept the hint or ignore it.
+    * 
+     * @param messageIdEnabled the endpoint requires message IDs?
      */
     public void setMessageIdEnabled(boolean messageIdEnabled) {
         this.messageIdEnabled = messageIdEnabled;
@@ -230,7 +301,15 @@
     }
 
     /**
-     * @param messageTimestampEnabled the messageTimestampEnabled to set
+    * Specifies if your endpoints requires time stamps on its messages. 
+    * Setting the <code>messageTimeStampEnabled</code> property to 
+    * <code>false</code> causes the endpoint to call its message producer's 
+    * <code>setDisableMessageTimestamp() </code> method with a value of 
+    * <code>true</code>. The JMS broker is then given a hint that it does not 
+    * need to generate message IDs or add them to the messages from the 
+    * endpoint. The JMS broker can choose to accept the hint or ignore it.
+    * 
+     * @param messageTimestampEnabled the endpoint requires time stamps?
      */
     public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
         this.messageTimestampEnabled = messageTimestampEnabled;
@@ -244,7 +323,9 @@
     }
 
     /**
-     * @param priority the priority to set
+    * Specifies the priority assigned to the JMS messages. Defaults to 4.
+    *
+     * @param priority the message priority
      */
     public void setPriority(int priority) {
         this.priority = priority;
@@ -258,7 +339,11 @@
     }
 
     /**
-     * @param pubSubDomain the pubSubDomain to set
+    * Specifies if the destination is a topic. <code>true</code> means the 
+    * destination is a topic. <code>false</code> means the destination is a 
+    * queue.
+    *
+     * @param pubSubDomain the destination is a topic?
      */
     public void setPubSubDomain(boolean pubSubDomain) {
         this.pubSubDomain = pubSubDomain;
@@ -272,7 +357,10 @@
     }
 
     /**
-     * @param pubSubNoLocal the pubSubNoLocal to set
+    * Specifies if messages published by the listener's <code>Connection</code> 
+    * are suppressed. The default is <code>false</code>.
+    *
+     * @param pubSubNoLocal messages are surpressed?
      */
     public void setPubSubNoLocal(boolean pubSubNoLocal) {
         this.pubSubNoLocal = pubSubNoLocal;
@@ -286,7 +374,9 @@
     }
 
     /**
-     * @param receiveTimeout the receiveTimeout to set
+    * Specifies the timeout for receiving a message in milliseconds.
+    *
+     * @param receiveTimeout milliseconds to wait
      */
     public void setReceiveTimeout(long receiveTimeout) {
         this.receiveTimeout = receiveTimeout;
@@ -300,24 +390,24 @@
     }
 
     /**
-     * @param timeToLive the timeToLive to set
+    * Specifies the number of milliseconds a message is valid.
+    *
+     * @param timeToLive number of milliseonds a message lives
      */
     public void setTimeToLive(long timeToLive) {
         this.timeToLive = timeToLive;
     }
 
-    public boolean isStateless() {
-        return stateless;
-    }
-
-    public void setStateless(boolean stateless) {
-        this.stateless = stateless;
-    }
-
     public StoreFactory getStoreFactory() {
         return storeFactory;
     }
 
+    /**
+     * Sets the store factory used to create the store.
+     * If none is set, a {@link MemoryStoreFactory} will be created and used instead.
+     *
+     * @param storeFactory the factory
+     */
     public void setStoreFactory(StoreFactory storeFactory) {
         this.storeFactory = storeFactory;
     }
@@ -326,6 +416,12 @@
         return store;
     }
 
+    /**
+     * Sets the store used to store JBI exchanges that are waiting for a response
+     * JMS message.  The store will be automatically created if not set.
+     *
+     * @param store the store
+     */
     public void setStore(Store store) {
         this.store = store;
     }
@@ -334,6 +430,14 @@
         return replyDestination;
     }
 
+    /**
+     * Sets the reply destination.
+     * This JMS destination will be used as the default destination for the response
+     * messages when using an InOut JBI exchange.  It will be used if the
+     * <code>replyDestinationChooser</code> does not return any value.
+     *
+     * @param replyDestination
+     */
     public void setReplyDestination(Destination replyDestination) {
         this.replyDestination = replyDestination;
     }
@@ -342,42 +446,126 @@
         return replyDestinationName;
     }
 
+    /**
+     * Sets the name of the reply destination.
+     * This property will be used to create the <code>replyDestination</code>
+     * using the <code>destinationResolver</code> when the endpoint starts if
+     * the <code>replyDestination</code> has not been set.
+     *
+     * @param replyDestinationName
+     */
     public void setReplyDestinationName(String replyDestinationName) {
         this.replyDestinationName = replyDestinationName;
     }
 
-    protected void processInOnly(final MessageExchange exchange, final NormalizedMessage in) throws Exception {
-        MessageCreator creator = new MessageCreator() {
-            public Message createMessage(Session session) throws JMSException {
-                try {
-                    Message message = marshaler.createMessage(exchange, in, session);
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Sending message to: " + template.getDefaultDestinationName() + " message: " + message);
+    /**
+     * Process the incoming JBI exchange
+     * @param exchange
+     * @throws Exception
+     */
+    public void process(MessageExchange exchange) throws Exception {
+        // The component acts as a provider, this means that another component has requested our service
+        // As this exchange is active, this is either an in or a fault (out are sent by this component)
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            // Exchange is finished
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                return;
+            // Exchange has been aborted with an exception
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                return;
+            // Exchange is active
+            } else {
+                // Fault message
+                if (exchange.getFault() != null) {
+                    done(exchange);
+                // In message
+                } else if (exchange.getMessage("in") != null) {
+                    if (exchange instanceof InOnly) {
+                        processInOnly(exchange, exchange.getMessage("in"));
+                        done(exchange);
+                    } else {
+                        processInOut(exchange, exchange.getMessage("in"));
                     }
-                    return message;
-                } catch (Exception e) {
-                    JMSException jmsEx =  new JMSException("Failed to create JMS Message: " + e);
-                    jmsEx.setLinkedException(e);
-                    jmsEx.initCause(e);
-                    throw jmsEx;
+                // This is not compliant with the default MEPs
+                } else {
+                    throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault is provided");
                 }
             }
-        };
-        Object dest = destinationChooser.chooseDestination(exchange, in);
-        if (dest instanceof Destination) {
-            template.send((Destination) dest, creator);
-        } else if (dest instanceof String) {
-            template.send((String) dest, creator);
+        // Unsupported role: this should never happen has we never create exchanges
         } else {
-            template.send(creator);
+            throw new IllegalStateException("Unsupported role: " + exchange.getRole());
         }
     }
 
-    protected void processInOut(final MessageExchange exchange, final NormalizedMessage in, final NormalizedMessage out) throws Exception {
+    /**
+     * Process an InOnly or RobustInOnly exchange.
+     * This method uses the JMS template to create a session and call the
+     * {@link #processInOnlyInSession(javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage, javax.jms.Session)}
+     * method.
+     *
+     * @param exchange
+     * @param in
+     * @throws Exception
+     */
+    protected void processInOnly(final MessageExchange exchange,
+                                 final NormalizedMessage in) throws Exception {
+        SessionCallback callback = new SessionCallback() {
+            public Object doInJms(Session session) throws JMSException {
+                try {
+                    processInOnlyInSession(exchange, in, session);
+                    return null;
+                } catch (JMSException e) {
+                    throw e;
+                } catch (RuntimeException e) {
+                    throw e;
+                } catch (Exception e) {
+                    throw new UncategorizedJmsException(e);
+                }
+            }
+        };
+        template.execute(callback, true);
+    }
+
+    /**
+     * Process an InOnly or RobustInOnly exchange inside a JMS session.
+     * This method delegates the JMS message creation to the marshaler and uses
+     * the JMS template to send it.
+     *
+     * @param exchange
+     * @param in
+     * @param session
+     * @throws Exception
+     */
+    protected void processInOnlyInSession(final MessageExchange exchange,
+                                          final NormalizedMessage in,
+                                          final Session session) throws Exception {
+        // Create destination
+        final Destination dest = getDestination(exchange, in, session);
+        // Create message and send it
+        final Message message = marshaler.createMessage(exchange, in, session);
+        template.send(dest, new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                return message;
+            }
+        });
+    }
+
+    /**
+     * Process an InOut or InOptionalOut exchange.
+     * This method uses the JMS template to create a session and call the
+     * {@link #processInOutInSession(javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage, javax.jms.Session)}
+     * method.
+     *
+     * @param exchange
+     * @param in
+     * @throws Exception
+     */
+    protected void processInOut(final MessageExchange exchange,
+                                final NormalizedMessage in) throws Exception {
         SessionCallback callback = new SessionCallback() {
             public Object doInJms(Session session) throws JMSException {
                 try {
-                    processInOutInSession(exchange, in, out, session);
+                    processInOutInSession(exchange, in, session);
                     return null;
                 } catch (JMSException e) {
                     throw e;
@@ -391,69 +579,201 @@
         template.execute(callback, true);
     }
     
-    protected void processInOutInSession(final MessageExchange exchange, 
-                                         final NormalizedMessage in, 
-                                         final NormalizedMessage out, 
+    /**
+     * Process an InOnly or RobustInOnly exchange inside a JMS session.
+     * This method delegates the JMS message creation to the marshaler and uses
+     * the JMS template to send it.  If the JMS destination that was used to send
+     * the message is not the default one, it synchronously wait for the message
+     * to come back using a JMS selector.  Else, it just returns and the response
+     * message will come back from the listener container.
+     *
+     * @param exchange
+     * @param in
+     * @param session
+     * @throws Exception
+     */
+    protected void processInOutInSession(final MessageExchange exchange,
+                                         final NormalizedMessage in,
                                          final Session session) throws Exception {
         // Create destinations
         final Destination dest = getDestination(exchange, in, session);
-        final Destination replyDest = getReplyDestination(exchange, out, session);
+        final Destination replyDest = getReplyDestination(exchange, in, session);
         // Create message and send it
         final Message sendJmsMsg = marshaler.createMessage(exchange, in, session);
-        sendJmsMsg.setJMSReplyTo(replyDest);  
+        sendJmsMsg.setJMSReplyTo(replyDest);
+        // handle correlation ID
+        String correlationId = sendJmsMsg.getJMSMessageID() != null ? sendJmsMsg.getJMSMessageID() : exchange.getExchangeId(); 
+        sendJmsMsg.setJMSCorrelationID(correlationId);
+
+        boolean asynchronous = replyDest.equals(replyDestination);
+
+        if (asynchronous) {
+            store.store(correlationId, exchange);
+        }
+
+        try {
+            template.send(dest, new MessageCreator() {
+                public Message createMessage(Session session)
+                    throws JMSException {
+                    return sendJmsMsg;
+                }
+            });
+        } catch (Exception e) {
+            if (asynchronous) {
+                store.load(exchange.getExchangeId());
+            }
+            throw e;
+        }
 
-        template.send(dest, new MessageCreator() {
-            public Message createMessage(Session session)
-                throws JMSException {
-                return sendJmsMsg;
+        if (!asynchronous) {
+            // Create selector
+            String jmsId = sendJmsMsg.getJMSMessageID();
+            String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
+            // Receiving JMS Message, Creating and Returning NormalizedMessage out
+            Message receiveJmsMsg = template.receiveSelected(replyDest, selector);
+            if (receiveJmsMsg == null) {
+                throw new IllegalStateException("Unable to receive response");
+            }
+            if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY)) {
+                exchange.setStatus(ExchangeStatus.DONE);
+            } else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.ERROR_JMS_PROPERTY)) {
+                Exception e = (Exception) ((ObjectMessage) receiveJmsMsg).getObject();
+                exchange.setError(e);
+                exchange.setStatus(ExchangeStatus.ERROR);
+            } else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.FAULT_JMS_PROPERTY)) {
+                Fault fault = exchange.getFault();
+                if (fault == null) {
+                    fault = exchange.createFault();
+                    exchange.setFault(fault);
+                }
+                marshaler.populateMessage(receiveJmsMsg, exchange, fault);
+            } else {
+                NormalizedMessage out = exchange.getMessage("out");
+                if (out == null) {
+                    out = exchange.createMessage();
+                    exchange.setMessage(out, "out");
+                }
+                marshaler.populateMessage(receiveJmsMsg, exchange, out);
+            }
+            boolean txSync = exchange.getStatus() == ExchangeStatus.ACTIVE
+                                && exchange.isTransacted()
+                                && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+            if (txSync) {
+                sendSync(exchange);
+            } else {
+                send(exchange);
             }
-        });
-        
-        // Create selector
-        String jmsId = sendJmsMsg.getJMSMessageID();
-        String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
-
-        // Receiving JMS Message, Creating and Returning NormalizedMessage out
-        Message receiveJmsMsg = template.receiveSelected(replyDest, selector);
-        if (receiveJmsMsg == null) {
-            throw new IllegalStateException("Unable to receive response");
         }
-        marshaler.populateMessage(receiveJmsMsg, exchange, out);
     }
 
-    protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
-        Object dest = null;
-        // Let the destinationChooser a chance to choose the destination 
-        if (destinationChooser != null) {
-            dest = destinationChooser.chooseDestination(exchange, message);
+    /**
+     * Process a JMS response message.
+     * This method delegates to the marshaler for the JBI out message creation
+     * and sends it in to the NMR.
+     * 
+     * @param message
+     */
+    protected void onMessage(Message message) {
+        MessageExchange exchange = null;
+        try {
+            exchange = (InOut) store.load(message.getJMSCorrelationID());
+            if (exchange == null) {
+                throw new IllegalStateException("Could not find exchange " + message.getJMSCorrelationID());
+            }
+        } catch (Exception e) {
+            logger.error("Unable to load exchange related to incoming JMS message " + message, e);
         }
-        // Default to destinationName properties
-        if (dest == null) {
-            dest = destinationName;
+        try {
+            if (message.getBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY)) {
+                exchange.setStatus(ExchangeStatus.DONE);
+            } else if (message.getBooleanProperty(AbstractJmsMarshaler.ERROR_JMS_PROPERTY)) {
+                Exception e = (Exception) ((ObjectMessage) message).getObject();
+                exchange.setError(e);
+                exchange.setStatus(ExchangeStatus.ERROR);
+            } else if (message.getBooleanProperty(AbstractJmsMarshaler.FAULT_JMS_PROPERTY)) {
+                Fault fault = exchange.getFault();
+                if (fault == null) {
+                    fault = exchange.createFault();
+                    exchange.setFault(fault);
+                }
+                marshaler.populateMessage(message, exchange, fault);
+            } else {
+                NormalizedMessage out = exchange.getMessage("out");
+                if (out == null) {
+                    out = exchange.createMessage();
+                    exchange.setMessage(out, "out");
+                }
+                marshaler.populateMessage(message, exchange, out);
+            }
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Error while populating JBI exchange " + exchange, e);
+            }
+            exchange.setError(e);
         }
-        // Resolve destination if needed
-        if (dest instanceof Destination) {
-            return (Destination) dest;
-        } else if (dest instanceof String) {
-            return destinationResolver.resolveDestinationName(session, 
-                                                              (String) dest, 
-                                                              isPubSubDomain());
+        try {
+            boolean txSync = exchange.getStatus() == ExchangeStatus.ACTIVE
+                                && exchange.isTransacted()
+                                && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+            if (txSync) {
+                sendSync(exchange);
+            } else {
+                send(exchange);
+            }
+        } catch (Exception e) {
+            logger.error("Unable to send JBI exchange " + exchange, e);
         }
-        throw new IllegalStateException("Unable to choose destination for exchange " + exchange);
     }
 
+    /**
+     * Retrieve the destination where the JMS message should be sent to.
+     *
+     * @param exchange
+     * @param message
+     * @param session
+     * @return
+     * @throws JMSException
+     */
+    protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
+        return chooseDestination(exchange, message, session, destinationChooser, destination);
+    }
+
+    /**
+     * Choose the JMS destination for the reply message
+     *
+     * @param exchange
+     * @param message
+     * @param session
+     * @return
+     * @throws JMSException
+     */
     protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
+        return chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination);
+    }
+
+    /**
+     * Choose a JMS destination given the chooser, a default destination and name
+     * @param exchange
+     * @param message
+     * @param session
+     * @param chooser
+     * @param defaultDestination
+     * @return
+     * @throws JMSException
+     */
+    protected Destination chooseDestination(MessageExchange exchange,
+                                            Object message,
+                                            Session session,
+                                            DestinationChooser chooser,
+                                            Destination defaultDestination) throws JMSException {
         Object dest = null;
-        // Let the destinationChooser a chance to choose the destination 
-        if (destinationChooser != null) {
-            dest = destinationChooser.chooseDestination(exchange, message);
+        // Let the replyDestinationChooser a chance to choose the destination
+        if (chooser != null) {
+            dest = chooser.chooseDestination(exchange, message);
         }
-        // Default to replyDestination / replyDestinationName properties
+        // Default to defaultDestination properties
         if (dest == null) {
-            dest = replyDestination;
-        }
-        if (dest == null) {
-            dest = replyDestinationName;
+            dest = defaultDestination;
         }
         // Resolve destination if needed
         if (dest instanceof Destination) {
@@ -463,40 +783,69 @@
                                                               (String) dest, 
                                                               isPubSubDomain());
         }
-        //create temp queue/topic if no destination explicitly set
-        if (dest == null) {
-            if (destination instanceof Queue) {
-                return session.createTemporaryQueue();
-            } else {
-                return session.createTemporaryTopic();
-            }
-        }
-        throw new IllegalStateException("Unable to choose replyDestination for exchange " + exchange);
+        throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
     }
-    
-    public synchronized void start() throws Exception {
-        template = createTemplate();
-        if (store == null && !stateless) {
+
+    /**
+     * Start this endpoint.
+     *
+     * @throws Exception
+     */
+    public synchronized void activate() throws Exception {
+        super.activate();
+        if (store == null) {
             if (storeFactory == null) {
                 storeFactory = new MemoryStoreFactory();
             }
             store = storeFactory.open(getService().toString() + getEndpoint());
         }
-        super.start();
+        template = createTemplate();
+        // Obtain the default destination
+        if (destination == null && destinationName != null) {
+            destination = (Destination) template.execute(new SessionCallback() {
+                public Object doInJms(Session session) throws JMSException {
+                    return destinationResolver.resolveDestinationName(session, destinationName, isPubSubDomain());
+                }
+            });
+        }
+        // Obtain the default reply destination
+        if (replyDestination == null && replyDestinationName != null) {
+            replyDestination = (Destination) template.execute(new SessionCallback() {
+                public Object doInJms(Session session) throws JMSException {
+                    return destinationResolver.resolveDestinationName(session, replyDestinationName, isPubSubDomain());
+                }
+            });
+        }
+        // create the listener container
+        listenerContainer = createListenerContainer();
+        listenerContainer.start();
     }
-    
-    public synchronized void stop() throws Exception {
+
+    /**
+     * Stops this endpoint.
+     * 
+     * @throws Exception
+     */
+    public synchronized void deactivate() throws Exception {
+        if (listenerContainer != null) {
+            listenerContainer.stop();
+            listenerContainer.shutdown();
+        }
         if (store != null) {
             if (storeFactory != null) {
                 storeFactory.close(store);
             }
             store = null;
         }
-        super.stop();
+        super.deactivate();
     }
-    
+
+    /**
+     * Validate this endpoint.
+     * 
+     * @throws DeploymentException
+     */
     public void validate() throws DeploymentException {
-        // TODO: check service, endpoint
         super.validate();
         if (getService() == null) {
             throw new DeploymentException("service must be set");
@@ -509,6 +858,11 @@
         }
     }
 
+    /**
+     * Create the JMS template to be used to send the JMS messages.
+     *
+     * @return
+     */
     protected JmsTemplate createTemplate() {
         JmsTemplate tplt;
         if (isJms102()) {
@@ -534,6 +888,66 @@
         tplt.setPubSubNoLocal(isPubSubNoLocal());
         tplt.setTimeToLive(getTimeToLive());
         tplt.setReceiveTimeout(getReceiveTimeout());
+        tplt.afterPropertiesSet();
         return tplt;
     }
+
+    /**
+     * Create the message listener container to receive response messages.
+     * 
+     * @return
+     */
+    protected AbstractMessageListenerContainer createListenerContainer() {
+        DefaultMessageListenerContainer cont;
+        if (isJms102()) {
+            cont = new DefaultMessageListenerContainer102();
+        } else {
+            cont = new DefaultMessageListenerContainer();
+        }
+        cont.setConnectionFactory(getConnectionFactory());
+        Destination replyDest = getReplyDestination();
+        if (replyDest == null) {
+            replyDest = resolveOrCreateDestination(template, replyDestinationName, isPubSubDomain());
+            setReplyDestination(replyDest);
+        }
+        cont.setDestination(replyDest);
+        cont.setPubSubDomain(isPubSubDomain());
+        cont.setPubSubNoLocal(isPubSubNoLocal());
+        cont.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                JmsProviderEndpoint.this.onMessage(message);
+            }
+        });
+        cont.setAutoStartup(false);
+        cont.afterPropertiesSet();
+        return cont;
+    }
+    
+    /**
+     * If the destinationName given is null then a temporary destination is created else the destination name
+     * is resolved using the resolver from the jmsConfig
+     *
+     * @param jmsTemplate template to use for session and resolver
+     * @param replyToDestinationName null for temporary destination or a destination name
+     * @param usePubSubDomain true=pubSub, false=Queues
+     * @return resolved destination
+     */
+    private Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
+                                                          final String replyToDestinationName,
+                                                          final boolean usePubSubDomain) {
+        return (Destination)jmsTemplate.execute(new SessionCallback() {
+            public Object doInJms(Session session) throws JMSException {
+                if (replyToDestinationName == null) {
+                    if (destination instanceof Queue) {
+                        return session.createTemporaryQueue();
+                    } else {
+                        return session.createTemporaryTopic();
+                    }
+                }
+                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+                return resolv.resolveDestinationName(session, replyToDestinationName, usePubSubDomain);
+            }
+        });
+    }
+
 }

Added: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=722061&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Mon Dec  1 04:51:00 2008
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.activation.DataHandler;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.ConnectionFactory;
+import javax.mail.util.ByteArrayDataSource;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jms.endpoints.DefaultConsumerMarshaler;
+import org.apache.servicemix.jms.endpoints.DefaultProviderMarshaler;
+import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
+import org.apache.servicemix.jms.endpoints.JmsProviderEndpoint;
+
+public class JmsProviderConsumerEndpointTest extends AbstractJmsTestSupport {
+
+    private static Log logger = LogFactory.getLog(JmsProviderConsumerEndpointTest.class);
+
+    public void testProviderConsumerInOut() throws Exception {
+        ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
+        JmsComponent jmsComponent = new JmsComponent();
+        JmsConsumerEndpoint consumerEndpoint = createConsumerEndpoint(connFactory);
+        JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
+        jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
+        container.activateComponent(jmsComponent, "servicemix-jms");
+
+        // Add an echo component
+        EchoComponent echo = new EchoComponent();
+        ActivationSpec asEcho = new ActivationSpec("receiver", echo);
+        asEcho.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        container.activateComponent(asEcho);
+
+        InOut inout = null;
+        boolean result = false;
+        DataHandler dh = null;
+        
+        // Test successful return
+        inout = client.createInOutExchange();
+        inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
+        inout.getInMessage().addAttachment("myImage", dh);
+        result = client.sendSync(inout);
+        assertTrue(result);
+        NormalizedMessage out = inout.getOutMessage();
+        assertNotNull(out);
+        Source src = out.getContent();
+        assertNotNull(src);
+        dh = out.getAttachment("myImage");
+        assertNotNull(dh);
+        
+        logger.info(new SourceTransformer().toString(src));
+
+        // Test fault return 
+        container.deactivateComponent("receiver");
+        ReturnFaultComponent fault = new ReturnFaultComponent();
+        ActivationSpec asFault = new ActivationSpec("receiver", fault);
+        asFault.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        container.activateComponent(asFault);
+        
+        inout = client.createInOutExchange();
+        inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        result = client.sendSync(inout);
+        assertTrue(result);
+        assertNotNull(inout.getFault());
+        assertTrue(new SourceTransformer().contentToString(inout.getFault()).indexOf("<fault/>") > 0);
+        client.done(inout);
+
+        // Test error return
+        container.deactivateComponent("receiver");
+        ReturnErrorComponent error = new ReturnErrorComponent(new IllegalArgumentException());
+        ActivationSpec asError = new ActivationSpec("receiver", error);
+        asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        container.activateComponent(asError);
+        
+        inout = client.createInOutExchange();
+        inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(inout);
+        assertEquals(ExchangeStatus.ERROR, inout.getStatus());
+        assertTrue("An IllegalArgumentException was expected", inout.getError() instanceof IllegalArgumentException);
+
+    }
+
+    private JmsConsumerEndpoint createConsumerEndpoint(ConnectionFactory connFactory) throws URISyntaxException {
+        JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+        endpoint.setService(new QName("http://jms.servicemix.org/Test", "Consumer"));
+        endpoint.setEndpoint("endpoint");
+        DefaultConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
+        marshaler.setMep(new URI("http://www.w3.org/2004/08/wsdl/in-out"));
+        endpoint.setMarshaler(marshaler);
+        endpoint.setListenerType("simple");
+        endpoint.setConnectionFactory(connFactory);
+        endpoint.setDestinationName("destination");
+        endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        return endpoint;
+    }
+    
+    private JmsProviderEndpoint createProviderEndpoint(ConnectionFactory connFactory) {
+        JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
+        endpoint.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        endpoint.setEndpoint("endpoint");
+        DefaultProviderMarshaler marshaler = new DefaultProviderMarshaler();
+        endpoint.setMarshaler(marshaler);
+        endpoint.setConnectionFactory(connFactory);
+        endpoint.setDestinationName("destination");
+        return endpoint;
+    }
+}



Mime
View raw message