activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [1/2] activemq git commit: https://issues.jboss.org/browse/ENTMQ-780
Date Mon, 08 Jun 2015 14:56:50 GMT
Repository: activemq
Updated Branches:
  refs/heads/master af999fe2b -> 67c28b1c6


https://issues.jboss.org/browse/ENTMQ-780

pauses the delivery of messages to the activemq session when a rollback is happening until
the message has been redelivered.

patch applied with thanks to Tamas Cserveny


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c9a3202b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c9a3202b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c9a3202b

Branch: refs/heads/master
Commit: c9a3202bc3b526bdf65f239dad68f555d7b83df1
Parents: af999fe
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Mon Jun 8 15:51:44 2015 +0100
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Mon Jun 8 15:51:44 2015 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQSession.java    | 23 ++++++++++++++++++--
 .../activemq/ActiveMQSessionExecutor.java       | 11 ++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c9a3202b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 1d2ae83..2e0f646 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -717,7 +717,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
         if (!closed) {
 
             try {
-                executor.stop();
+                executor.close();
 
                 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator();
iter.hasNext();) {
                     ActiveMQMessageConsumer consumer = iter.next();
@@ -978,11 +978,24 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                                 for (int i = 0; i < redeliveryCounter; i++) {
                                     redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
                                 }
+                                
+                                if ( connection.isNonBlockingRedelivery() == false) {
+                                    LOG.debug("Blocking session until re-delivery...");
+                                    executor.stop();
+                                }
+                                
                                 connection.getScheduler().executeAfterDelay(new Runnable()
{
 
                                     @Override
                                     public void run() {
-                                        ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
+                                        
+                                        if (connection.isNonBlockingRedelivery()) {
+                                            ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
+                                        } else {
+                                            LOG.debug("Session released, issuing re-delivery...");
+                                            executor.executeFirst(md);
+                                            executor.start();
+                                        }
                                     }
                                 }, redeliveryDelay);
                             }
@@ -1016,6 +1029,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
             if (deliveryListener != null) {
                 deliveryListener.afterDelivery(this, message);
             }
+            
+            try {
+                executor.waitForQueueRestart();
+            } catch (InterruptedException ex) {
+                connection.onClientInternalException(ex);
+            }            
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/c9a3202b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
index caa1ca9..3578155 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
@@ -207,4 +207,15 @@ public class ActiveMQSessionExecutor implements Task {
     List<MessageDispatch> getUnconsumedMessages() {
         return messageQueue.removeAll();
     }
+    
+    void waitForQueueRestart() throws InterruptedException {
+        synchronized (messageQueue.getMutex()) {
+            while (messageQueue.isRunning() == false) {
+                if (messageQueue.isClosed()) {
+                    break;
+                }
+                messageQueue.getMutex().wait();
+            }
+        }
+    }
 }


Mime
View raw message