servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ge...@apache.org
Subject svn commit: r733681 - /servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
Date Mon, 12 Jan 2009 10:45:03 GMT
Author: gertv
Date: Mon Jan 12 02:44:59 2009
New Revision: 733681

URL: http://svn.apache.org/viewvc?rev=733681&view=rev
Log:
SM-1761: AutoDeploymentService stops working after x deployments

Modified:
    servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java

Modified: servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?rev=733681&r1=733680&r2=733681&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
(original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
Mon Jan 12 02:44:59 2009
@@ -74,7 +74,7 @@
     private String password;
     private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
     private MessageConsumer broadcastConsumer;
-    private Map<String, MessageConsumer> consumerMap = new ConcurrentHashMap<String,
MessageConsumer>();
+    private Map<String, MessageConsumerSession> consumerMap = new ConcurrentHashMap<String,
MessageConsumerSession>();
     private EndpointListener endpointListener;
     private ComponentListener componentListener;
     private Executor executor;
@@ -344,11 +344,7 @@
         try {
             String key = EndpointSupport.getKey(event.getEndpoint());
             if (!consumerMap.containsKey(key)) {
-                Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
-                MessageConsumer consumer = inboundSession.createConsumer(queue);
-                consumer.setMessageListener(this);
-                consumerMap.put(key, consumer);
+                consumerMap.put(key, new MessageConsumerSession(key, this));
             }
             if (broadcast) {
                 broadcast(event);
@@ -361,7 +357,7 @@
     public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
         try {
             String key = EndpointSupport.getKey(event.getEndpoint());
-            MessageConsumer consumer = consumerMap.remove(key);
+            MessageConsumerSession consumer = consumerMap.remove(key);
             if (consumer != null) {
                 consumer.close();
             }
@@ -396,11 +392,7 @@
         try {
             String key = event.getComponent().getName();
             if (!consumerMap.containsKey(key)) {
-                Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
-                MessageConsumer consumer = inboundSession.createConsumer(queue);
-                consumer.setMessageListener(this);
-                consumerMap.put(key, consumer);
+                consumerMap.put(key, new MessageConsumerSession(key, this));
             }
         } catch (Exception e) {
             log.error("Cannot create consumer for component " + event.getComponent().getName(),
e);
@@ -410,7 +402,7 @@
     public void onComponentStopped(ComponentEvent event) {
         try {
             String key = event.getComponent().getName();
-            MessageConsumer consumer = consumerMap.remove(key);
+            MessageConsumerSession consumer = consumerMap.remove(key);
             if (consumer != null) {
                 consumer.close();
             }
@@ -574,5 +566,31 @@
     public void setJmsURL(String jmsURL) {
         this.jmsURL = jmsURL;
     }
+    
+    
+    /*
+     * Creates a message consumer and holds on to both consumer and session
+     * to allow closing both of them together.
+     */
+    private final class MessageConsumerSession {
+        
+        private Session session;
+        private MessageConsumer consumer;
+        
+        private MessageConsumerSession(String key, MessageListener listener) throws JMSException
{
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(INBOUND_PREFIX + key);
+            consumer = session.createConsumer(queue);
+            consumer.setMessageListener(listener);            
+        }
 
+        private void close() throws JMSException {
+            if (consumer != null) {
+                consumer.close();
+            }
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
 }



Mime
View raw message