servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r734082 - /servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
Date Tue, 13 Jan 2009 09:37:49 GMT
Author: gnodet
Date: Tue Jan 13 01:37:44 2009
New Revision: 734082

URL: http://svn.apache.org/viewvc?rev=734082&view=rev
Log:
SM-1760: smx-jms provider should not create a temporary replyTo destination unless one is
needed

Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=734082&r1=734081&r2=734082&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
(original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
Tue Jan 13 01:37:44 2009
@@ -16,6 +16,9 @@
  */
 package org.apache.servicemix.jms.endpoints;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.Fault;
@@ -29,7 +32,6 @@
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.ObjectMessage;
-import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.servicemix.JbiConstants;
@@ -38,6 +40,7 @@
 import org.apache.servicemix.store.Store;
 import org.apache.servicemix.store.StoreFactory;
 import org.apache.servicemix.store.memory.MemoryStoreFactory;
+import org.springframework.jms.JmsException;
 import org.springframework.jms.UncategorizedJmsException;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.JmsTemplate102;
@@ -58,11 +61,6 @@
  */
 public class JmsProviderEndpoint extends ProviderEndpoint implements JmsEndpointType {
 
-    /**
-     * Timeout value indicating a blocking receive without timeout.
-     */
-    public static final long RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0;
-
     private static final String MSG_SELECTOR_START = "JMSCorrelationID='";    
     private static final String MSG_SELECTOR_END = "'";
 
@@ -80,7 +78,7 @@
     private boolean messageIdEnabled = true;
     private boolean messageTimestampEnabled = true;
     private boolean pubSubNoLocal;
-    private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
+    private long receiveTimeout;
     private boolean explicitQosEnabled;
     private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
     private int priority = Message.DEFAULT_PRIORITY;
@@ -479,16 +477,19 @@
                 if (exchange.getFault() != null) {
                     done(exchange);
                 // In message
-                } else if (exchange.getMessage("in") != null) {
-                    if (exchange instanceof InOnly) {
-                        processInOnly(exchange, exchange.getMessage("in"));
-                        done(exchange);
+                } else {
+                    NormalizedMessage in = exchange.getMessage("in");
+                    if (in != null) {
+                        if (exchange instanceof InOnly) {
+                            processInOnly(exchange, in);
+                            done(exchange);
+                        } else {
+                            processInOut(exchange, in);
+                        }
+                    // This is not compliant with the default MEPs
                     } else {
-                        processInOut(exchange, exchange.getMessage("in"));
+                        throw new IllegalStateException("Provider exchange is ACTIVE, but
no in or fault is provided");
                     }
-                // This is not compliant with the default MEPs
-                } else {
-                    throw new IllegalStateException("Provider exchange is ACTIVE, but no
in or fault is provided");
                 }
             }
         // Unsupported role: this should never happen has we never create exchanges
@@ -608,16 +609,12 @@
         boolean asynchronous = replyDest.equals(replyDestination);
 
         if (asynchronous) {
+            createAndStartListener();
             store.store(correlationId, exchange);
         }
 
         try {
-            template.send(dest, new MessageCreator() {
-                public Message createMessage(Session session)
-                    throws JMSException {
-                    return sendJmsMsg;
-                }
-            });
+            send(session, dest, sendJmsMsg);
         } catch (Exception e) {
             if (asynchronous) {
                 store.load(exchange.getExchangeId());
@@ -627,10 +624,9 @@
 
         if (!asynchronous) {
             // Create selector
-            String jmsId = sendJmsMsg.getJMSMessageID();
-            String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
+            String selector = MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END;
             // Receiving JMS Message, Creating and Returning NormalizedMessage out
-            Message receiveJmsMsg = template.receiveSelected(replyDest, selector);
+            Message receiveJmsMsg = receiveSelected(session, replyDest, selector);
             if (receiveJmsMsg == null) {
                 throw new IllegalStateException("Unable to receive response");
             }
@@ -666,6 +662,48 @@
         }
     }
 
+    private void send(final Session session, final Destination dest, final Message message)
throws JmsException {
+        // Do not call directly the template to avoid the cost of creating a new connection
/ session
+//        template.send(dest, new MessageCreator() {
+//            public Message createMessage(Session session) throws JMSException {
+//                return message;
+//            }
+//        });
+        try {
+            Method method = JmsTemplate.class.getDeclaredMethod("doSend", Session.class,
Destination.class, MessageCreator.class);
+            method.setAccessible(true);
+            method.invoke(template, session, dest, new MessageCreator() {
+                public Message createMessage(Session session) throws JMSException {
+                    return message;
+                }
+            });
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Message receiveSelected(final Session session, 
+                                    final Destination dest, 
+                                    final String messageSelector) throws JMSException {
+        // Do not call directly the template to avoid the cost of creating a new connection
/ session
+//        return template.doReceive(session, dest, messageSelector);
+        try {
+            Method method = JmsTemplate.class.getDeclaredMethod("doReceive", Session.class,
Destination.class, String.class);
+            method.setAccessible(true);
+            return (Message) method.invoke(template, session, dest, messageSelector);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * Process a JMS response message.
      * This method delegates to the marshaler for the JBI out message creation
@@ -735,11 +773,18 @@
      * @throws JMSException
      */
     protected Destination getDestination(MessageExchange exchange, Object message, Session
session) throws JMSException {
-        return chooseDestination(exchange, message, session, destinationChooser, destination);
+        Destination dest = chooseDestination(exchange, message, session, destinationChooser,

+                                             destination != null ? destination : destinationName);
+        if (dest == null) {
+            throw new IllegalStateException("Unable to choose a destination for exchange
" + exchange);
+        }
+        return dest;
     }
 
     /**
-     * Choose the JMS destination for the reply message
+     * Choose the JMS destination for the reply message.
+     * If no default destination is specified or can be extracted from the JBI exchange,
+     * a temporary destination will be created.
      *
      * @param exchange
      * @param message
@@ -748,7 +793,17 @@
      * @throws JMSException
      */
     protected Destination getReplyDestination(MessageExchange exchange, Object message, Session
session) throws JMSException {
-        return chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination);
+        Destination dest = chooseDestination(exchange, message, session, replyDestinationChooser,

+                                             replyDestination != null ? replyDestination
: replyDestinationName);
+        if (dest == null) {
+            if (isPubSubDomain()) {
+                return session.createTemporaryQueue();
+            } else {
+                return session.createTemporaryTopic();
+            }
+        } else {
+            return dest;
+        }
     }
 
     /**
@@ -765,7 +820,7 @@
                                             Object message,
                                             Session session,
                                             DestinationChooser chooser,
-                                            Destination defaultDestination) throws JMSException
{
+                                            Object defaultDestination) throws JMSException
{
         Object dest = null;
         // Let the replyDestinationChooser a chance to choose the destination
         if (chooser != null) {
@@ -783,7 +838,7 @@
                                                               (String) dest, 
                                                               isPubSubDomain());
         }
-        throw new IllegalStateException("Unable to choose a destination for exchange " +
exchange);
+        return null;
     }
 
     /**
@@ -800,25 +855,14 @@
             store = storeFactory.open(getService().toString() + getEndpoint());
         }
         template = createTemplate();
-        // Obtain the default destination
-        if (destination == null && destinationName != null) {
-            destination = (Destination) template.execute(new SessionCallback() {
-                public Object doInJms(Session session) throws JMSException {
-                    return destinationResolver.resolveDestinationName(session, destinationName,
isPubSubDomain());
-                }
-            });
-        }
-        // Obtain the default reply destination
-        if (replyDestination == null && replyDestinationName != null) {
-            replyDestination = (Destination) template.execute(new SessionCallback() {
-                public Object doInJms(Session session) throws JMSException {
-                    return destinationResolver.resolveDestinationName(session, replyDestinationName,
isPubSubDomain());
-                }
-            });
+    }
+
+    protected synchronized void createAndStartListener() throws Exception {
+        if (listenerContainer == null) {
+            // create the listener container
+            listenerContainer = createListenerContainer();
+            listenerContainer.start();
         }
-        // create the listener container
-        listenerContainer = createListenerContainer();
-        listenerContainer.start();
     }
 
     /**
@@ -830,6 +874,7 @@
         if (listenerContainer != null) {
             listenerContainer.stop();
             listenerContainer.shutdown();
+            listenerContainer = null;
         }
         if (store != null) {
             if (storeFactory != null) {
@@ -905,12 +950,12 @@
             cont = new DefaultMessageListenerContainer();
         }
         cont.setConnectionFactory(getConnectionFactory());
-        Destination replyDest = getReplyDestination();
-        if (replyDest == null) {
-            replyDest = resolveOrCreateDestination(template, replyDestinationName, isPubSubDomain());
-            setReplyDestination(replyDest);
+        if (replyDestination != null) {
+            cont.setDestination(replyDestination);
+        }
+        if (replyDestinationName != null) {
+            cont.setDestinationName(replyDestinationName);
         }
-        cont.setDestination(replyDest);
         cont.setPubSubDomain(isPubSubDomain());
         cont.setPubSubNoLocal(isPubSubNoLocal());
         cont.setMessageListener(new MessageListener() {
@@ -923,31 +968,4 @@
         return cont;
     }
     
-    /**
-     * If the destinationName given is null then a temporary destination is created else
the destination name
-     * is resolved using the resolver from the jmsConfig
-     *
-     * @param jmsTemplate template to use for session and resolver
-     * @param replyToDestinationName null for temporary destination or a destination name
-     * @param usePubSubDomain true=pubSub, false=Queues
-     * @return resolved destination
-     */
-    private Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
-                                                          final String replyToDestinationName,
-                                                          final boolean usePubSubDomain)
{
-        return (Destination)jmsTemplate.execute(new SessionCallback() {
-            public Object doInJms(Session session) throws JMSException {
-                if (replyToDestinationName == null) {
-                    if (destination instanceof Queue) {
-                        return session.createTemporaryQueue();
-                    } else {
-                        return session.createTemporaryTopic();
-                    }
-                }
-                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
-                return resolv.resolveDestinationName(session, replyToDestinationName, usePubSubDomain);
-            }
-        });
-    }
-
 }



Mime
View raw message