activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5077 - reduce reader thread work when client uses async send; async store updates can now queue up to the destination memory limit b/c they don't block the send thread. Pending store writes are now tr
Date Wed, 30 Apr 2014 15:19:51 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 8498136f5 -> ad1f751a4


https://issues.apache.org/jira/browse/AMQ-5077 - reduce reader thread work when client uses async send; async store updates can now queue up to the destination memory limit b/c they don't block the send thread. Pending store writes are now tracked in memory usage. This allows a client to quickly provide a burst of messages to fill the destination cache bounded only by network bandwidth


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ad1f751a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ad1f751a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ad1f751a

Branch: refs/heads/trunk
Commit: ad1f751a412ddb258256c1e8d2e72858b52f43ca
Parents: 8498136
Author: gtully <gary.tully@gmail.com>
Authored: Wed Apr 30 16:10:18 2014 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Apr 30 16:13:18 2014 +0100

----------------------------------------------------------------------
 .../region/PendingMarshalUsageTracker.java      | 39 +++++++++
 .../apache/activemq/broker/region/Queue.java    | 10 ++-
 .../activemq/store/AbstractMessageStore.java    | 24 ++----
 .../activemq/store/InlineListenableFuture.java  | 58 +++++++++++++
 .../apache/activemq/store/ListenableFuture.java | 28 +++++++
 .../org/apache/activemq/store/MessageStore.java | 13 ++-
 .../activemq/store/ProxyMessageStore.java       |  8 +-
 .../activemq/store/ProxyTopicMessageStore.java  |  8 +-
 .../store/memory/MemoryTransactionStore.java    | 20 ++---
 .../org/apache/activemq/ActiveMQConnection.java | 11 ++-
 .../activemq/store/kahadb/KahaDBStore.java      | 34 ++++++--
 .../store/kahadb/KahaDBTransactionStore.java    | 14 ++--
 .../kahadb/MultiKahaDBTransactionStore.java     | 13 ++-
 .../org/apache/activemq/leveldb/DBManager.scala | 26 +++++-
 .../apache/activemq/leveldb/LevelDBStore.scala  |  7 +-
 .../activemq/JmsMultipleClientsTestSupport.java |  6 ++
 .../activemq/broker/QueueSubscriptionTest.java  |  8 ++
 .../activemq/broker/TopicSubscriptionTest.java  |  6 +-
 .../broker/virtual/VirtualDestPerfTest.java     | 88 +++++++++++++++++---
 .../activemq/usecases/QueueBrowsingTest.java    |  2 +-
 20 files changed, 334 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PendingMarshalUsageTracker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PendingMarshalUsageTracker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PendingMarshalUsageTracker.java
new file mode 100644
index 0000000..78acb53
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PendingMarshalUsageTracker.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.usage.MemoryUsage;
+
+public class PendingMarshalUsageTracker implements Runnable {
+    final MemoryUsage usage;
+    int messageSize;
+    public PendingMarshalUsageTracker(final Message message) {
+        usage = message.getMemoryUsage();
+        if (usage != null) {
+            messageSize = message.getSize();
+            usage.increaseUsage(messageSize);
+        }
+    }
+
+    @Override
+    public void run() {
+        if (usage != null) {
+            usage.decreaseUsage(messageSize);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2f3d8bd..06c74db 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -81,6 +81,7 @@ import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.ListenableFuture;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.Task;
@@ -637,8 +638,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("Usage Manager Memory Limit ({}) reached on {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
-                                    memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName());
+                    LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
+                                    memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
                 }
 
                 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
@@ -895,7 +896,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
     void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
             Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
-        Future<Object> result = null;
+        ListenableFuture<Object> result = null;
         boolean needsOrderingWithTransactions = context.isInTransaction();
 
         producerExchange.incrementSend();
@@ -907,6 +908,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
                     message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                     if (messages.isCacheEnabled()) {
                         result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
+                        result.addListener(new PendingMarshalUsageTracker(message));
                     } else {
                         store.addMessage(context, message);
                     }
@@ -942,7 +944,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         if (!needsOrderingWithTransactions) {
             messageSent(context, message);
         }
-        if (result != null && !result.isCancelled()) {
+        if (result != null && message.isResponseRequired() && !result.isCancelled()) {
             try {
                 result.get();
             } catch (CancellationException e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
index cd8d0f9..df8658f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
@@ -17,9 +17,6 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -29,7 +26,7 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.MemoryUsage;
 
 abstract public class AbstractMessageStore implements MessageStore {
-    public static final FutureTask<Object> FUTURE;
+    public static final ListenableFuture<Object> FUTURE;
     protected final ActiveMQDestination destination;
     protected boolean prioritizedMessages;
 
@@ -89,27 +86,27 @@ abstract public class AbstractMessageStore implements MessageStore {
     }
 
     @Override
-    public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
+    public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
         addMessage(context, message);
         return FUTURE;
     }
 
     @Override
-    public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
+    public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException {
         addMessage(context, message, canOptimizeHint);
         return FUTURE;
     }
 
     @Override
-    public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
+    public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException {
         addMessage(context, message, canOptimizeHint);
         return FUTURE;
     }
 
     @Override
-    public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
+    public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
         addMessage(context, message);
-        return FUTURE;
+        return new InlineListenableFuture();
     }
 
     @Override
@@ -121,14 +118,7 @@ abstract public class AbstractMessageStore implements MessageStore {
         throw new IOException("update is not supported by: " + this);
     }
 
-    static class CallableImplementation implements Callable<Object> {
-        public Object call() throws Exception {
-            return null;
-        }
-    }
-
     static {
-       FUTURE = new FutureTask<Object>(new CallableImplementation());
-       FUTURE.run();
+       FUTURE = new InlineListenableFuture();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/InlineListenableFuture.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/InlineListenableFuture.java b/activemq-broker/src/main/java/org/apache/activemq/store/InlineListenableFuture.java
new file mode 100644
index 0000000..7c2b873
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/InlineListenableFuture.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class InlineListenableFuture implements ListenableFuture<Object> {
+    public Object call() throws Exception {
+        return null;
+    }
+
+    @Override
+    public void addListener(Runnable listener) {
+        listener.run();
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return true;
+    }
+
+    @Override
+    public Object get() throws InterruptedException, ExecutionException {
+        return null;
+    }
+
+    @Override
+    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/ListenableFuture.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ListenableFuture.java b/activemq-broker/src/main/java/org/apache/activemq/store/ListenableFuture.java
new file mode 100644
index 0000000..9072558
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ListenableFuture.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.util.concurrent.Future;
+
+public interface ListenableFuture<T> extends Future<T> {
+    /**
+     * register a listener to be run on completion or immediately if complete
+     * any exceptions will be caught and logged
+     * @param listener
+     */
+    void addListener(Runnable listener);
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index d465bc5..400245a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-import java.util.concurrent.Future;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
@@ -62,7 +61,7 @@ public interface MessageStore extends Service {
      * @throws IOException
      * @throws IOException
      */
-    Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
+    ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
 
     /**
      * Adds a message to the message store
@@ -74,18 +73,18 @@ public interface MessageStore extends Service {
      * @throws IOException
      * @throws IOException
      */
-    Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
+    ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
 
     /**
      * Adds a message to the message store
      *
      * @param context context
      * @param message
-     * @return a Future to track when this is complete
+     * @return a ListenableFuture to track when this is complete
      * @throws IOException
      * @throws IOException
      */
-    Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
+    ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
 
     /**
      * Adds a message to the message store
@@ -93,11 +92,11 @@ public interface MessageStore extends Service {
      * @param context context
      * @param message
      *  @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk
-     * @return a Future to track when this is complete
+     * @return a ListenableFuture to track when this is complete
      * @throws IOException
      * @throws IOException
      */
-    Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
+    ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
 
     /**
      * Looks up a message using either the String messageID or the

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index e79229b..3ddfadb 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -122,22 +122,22 @@ public class ProxyMessageStore implements MessageStore {
     }
 
     @Override
-    public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+    public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
        return delegate.asyncAddQueueMessage(context, message);
     }
 
     @Override
-    public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+    public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
        return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
     }
 
     @Override
-    public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+    public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
         return delegate.asyncAddTopicMessage(context, message);
      }
 
     @Override
-    public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+    public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
         return asyncAddTopicMessage(context,message,canOptimizeHint);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index c0635fa..de4d195 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -171,22 +171,22 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
      }
 
     @Override
-    public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+    public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
         return delegate.asyncAddTopicMessage(context, message);
      }
 
     @Override
-    public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+    public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
         return delegate.asyncAddTopicMessage(context,message, canOptimizeHint);
     }
 
     @Override
-    public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+    public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
         return delegate.asyncAddQueueMessage(context, message);
     }
 
     @Override
-    public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+    public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
         return delegate.asyncAddQueueMessage(context,message, canOptimizeHint);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index 7e02694..c3d1b8a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -22,7 +22,8 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.InlineListenableFuture;
+import org.apache.activemq.store.ListenableFuture;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.ProxyMessageStore;
@@ -38,7 +39,6 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 
 /**
  * Provides a TransactionStore implementation that can create transaction aware
@@ -149,15 +149,15 @@ public class MemoryTransactionStore implements TransactionStore {
             }
 
             @Override
-            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), message);
-                return AbstractMessageStore.FUTURE;
+                return new InlineListenableFuture();
              }
 
             @Override
-            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
+            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), message);
-                return AbstractMessageStore.FUTURE;
+                return new InlineListenableFuture();
              }
 
             @Override
@@ -190,15 +190,15 @@ public class MemoryTransactionStore implements TransactionStore {
             }
 
             @Override
-            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), message);
-                return AbstractMessageStore.FUTURE;
+                return new InlineListenableFuture();
              }
 
             @Override
-            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
+            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), message);
-                return AbstractMessageStore.FUTURE;
+                return new InlineListenableFuture();
              }
 
             @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index e3ce9ae..326310c 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -94,6 +94,7 @@ import org.apache.activemq.state.CommandVisitorAdapter;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.RequestTimedOutIOException;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
@@ -696,7 +697,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
                         // know that the connection is being shutdown.
                         RemoveInfo removeCommand = info.createRemoveCommand();
                         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-                        doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+                        try {
+                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+                        } catch (JMSException e) {
+                            if (e.getCause() instanceof RequestTimedOutIOException) {
+                                // expected
+                            } else {
+                                throw e;
+                            }
+                        }
                         doAsyncSendPacket(new ShutdownInfo());
                     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 1e84642..74425f1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -30,7 +30,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
@@ -60,7 +59,6 @@ import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
 import org.apache.activemq.store.kahadb.data.KahaLocation;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
@@ -370,7 +368,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         }
 
         @Override
-        public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
+        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
                 throws IOException {
             if (isConcurrentStoreAndDispatchQueues()) {
                 StoreQueueTask result = new StoreQueueTask(this, context, message);
@@ -712,7 +710,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         }
 
         @Override
-        public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
+        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
                 throws IOException {
             if (isConcurrentStoreAndDispatchTopics()) {
                 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
@@ -1238,7 +1236,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             this.future = new InnerFutureTask(this);
         }
 
-        public Future<Object> getFuture() {
+        public ListenableFuture<Object> getFuture() {
             return this.future;
         }
 
@@ -1295,8 +1293,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             return this.message;
         }
 
-        private class InnerFutureTask extends FutureTask<Object> {
+        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
 
+            private Runnable listener;
             public InnerFutureTask(Runnable runnable) {
                 super(runnable, null);
 
@@ -1309,6 +1308,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             public void complete() {
                 super.set(null);
             }
+
+            @Override
+            public void done() {
+                fireListener();
+            }
+
+            @Override
+            public void addListener(Runnable listener) {
+                this.listener = listener;
+                if (isDone()) {
+                    fireListener();
+                }
+            }
+
+            private void fireListener() {
+                if (listener != null) {
+                    try {
+                        listener.run();
+                    } catch (Exception ignored) {
+                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
+                    }
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index 12e5f00..47a9c34 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -33,9 +33,9 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.ListenableFuture;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.ProxyTopicMessageStore;
@@ -166,12 +166,12 @@ public class KahaDBTransactionStore implements TransactionStore {
             }
 
             @Override
-            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
                 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
             }
 
             @Override
-            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
+            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
                 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
             }
 
@@ -200,12 +200,12 @@ public class KahaDBTransactionStore implements TransactionStore {
             }
 
             @Override
-            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
                 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
             }
 
             @Override
-            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
+            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
                 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
             }
 
@@ -389,7 +389,7 @@ public class KahaDBTransactionStore implements TransactionStore {
         }
     }
 
-    Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
+    ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
             throws IOException {
 
         if (message.getTransactionId() != null) {
@@ -416,7 +416,7 @@ public class KahaDBTransactionStore implements TransactionStore {
         }
     }
 
-    Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
+    ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
             throws IOException {
 
         if (message.getTransactionId() != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index a7d09f1..c7ece83 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -71,12 +70,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             }
 
             @Override
-            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
                 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
             }
 
             @Override
-            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
                 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
             }
 
@@ -105,12 +104,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             }
 
             @Override
-            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
                 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
             }
 
             @Override
-            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
                 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
             }
 
@@ -384,7 +383,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         destination.addMessage(context, message);
     }
 
-    Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
+    ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
             throws IOException {
         if (message.getTransactionId() != null) {
             getTx(message.getTransactionId()).trackStore(transactionStore);
@@ -395,7 +394,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         }
     }
 
-    Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
+    ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
             throws IOException {
 
         if (message.getTransactionId() != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index b02cf0f..775b99a 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -22,7 +22,7 @@ import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent._
 import atomic._
 import org.fusesource.hawtbuf.Buffer
-import org.apache.activemq.store.MessageRecoveryListener
+import org.apache.activemq.store.{ListenableFuture, MessageRecoveryListener}
 import java.lang.ref.WeakReference
 import scala.Option._
 import org.fusesource.hawtbuf.Buffer._
@@ -97,12 +97,13 @@ object UowCompleted extends UowState {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
+class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
 
   private val latch:CountDownLatch=new CountDownLatch(1)
   @volatile
   var value:T = _
   var error:Throwable = _
+  var listener:Runnable = _
 
   def cancel(mayInterruptIfRunning: Boolean) = false
   def isCancelled = false
@@ -115,10 +116,12 @@ class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
   def set(v:T) = {
     value = v
     latch.countDown()
+    fireListener
   }
   def failed(v:Throwable) = {
     error = v
     latch.countDown()
+    fireListener
   }
 
   def get() = {
@@ -141,6 +144,25 @@ class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
   }
 
   def isDone = latch.await(0, TimeUnit.SECONDS);
+
+  def fireListener = {
+    if (listener != null) {
+      try {
+        listener.run()
+      } catch {
+        case e : Throwable => {
+          LevelDBStore.warn(e, "unexpected exception on future listener " +listener)
+        }
+      }
+    }
+  }
+
+  def addListener(l: Runnable) = {
+    listener = l
+    if (isDone) {
+      fireListener
+    }
+  }
 }
 
 object UowManagerConstants {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 9256bb5..d1f8f6b 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -47,8 +47,7 @@ object LevelDBStore extends Log {
       }
   })
 
-  val DONE = new CountDownFuture[AnyRef]();
-  DONE.set(null)
+  val DONE = new InlineListenableFuture;
 
   def toIOException(e: Throwable): IOException = {
     if (e.isInstanceOf[ExecutionException]) {
@@ -681,7 +680,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
     }
 
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
-    override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+    override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): ListenableFuture[AnyRef] = {
       check_running
       message.getMessageId.setEntryLocator(null)
       if(  message.getTransactionId!=null ) {
@@ -800,7 +799,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
 
     def subscription_with_key(key:Long) = subscriptions.find(_._2.subKey == key).map(_._2)
 
-    override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+    override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): ListenableFuture[AnyRef] = {
       super.asyncAddQueueMessage(context, message, false)
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
index aa92926..5eaab8d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
@@ -312,6 +312,12 @@ public class JmsMultipleClientsTestSupport {
         return currentTestName;
     }
 
+    public void assertDestinationMemoryUsageGoesToZero() throws Exception {
+        assertEquals("destination memory is back to 0", 0,
+                TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
+    }
+
+
 
     /*
      * This is copied from AutoFailTestSupport.  We may want to move it to someplace where more

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
index 625ed92..6c3dc15 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
@@ -56,6 +56,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     @Test(timeout = 60 * 1000)
@@ -69,6 +70,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     @Test(timeout = 60 * 1000)
@@ -82,6 +84,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     @Test(timeout = 2 * 60 * 1000)
@@ -95,6 +98,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     @Test(timeout = 60 * 1000)
@@ -108,6 +112,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     @Test(timeout = 60 * 1000)
@@ -121,6 +126,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     @Test(timeout = 60 * 1000)
@@ -134,6 +140,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     @Test(timeout = 2 * 60 * 1000)
@@ -147,6 +154,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     protected void configurePrefetchOfOne() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
index 2c530a7..61ba79c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.ThreadTracker;
@@ -155,10 +156,5 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
         assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
         assertDestinationMemoryUsageGoesToZero();
     }
-    
-    private void assertDestinationMemoryUsageGoesToZero() throws Exception {
-        assertEquals("destination memory is back to 0", 0, 
-                TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
index 1f80473..b822f5d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
@@ -20,12 +20,18 @@ package org.apache.activemq.broker.virtual;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.management.ObjectName;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -33,6 +39,7 @@ import org.apache.activemq.broker.region.virtual.CompositeTopic;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -43,14 +50,73 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
 public class VirtualDestPerfTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class);
+    public int messageSize = 5*1024;
+    public int messageCount = 10000;
     ActiveMQTopic target = new ActiveMQTopic("target");
     BrokerService brokerService;
     ActiveMQConnectionFactory connectionFactory;
 
     @Test
+    @Ignore("comparison test - 'new' no wait on future with async send broker side is always on")
+    public void testAsyncSendBurstToFillCache() throws Exception {
+        startBroker(4, true, true);
+        connectionFactory.setUseAsyncSend(true);
+
+        // a burst of messages to fill the cache
+        messageCount = 22000;
+        messageSize = 10*1024;
+
+        LinkedHashMap<Integer, Long> results = new LinkedHashMap<Integer, Long>();
+
+        final ActiveMQQueue queue = new ActiveMQQueue("targetQ");
+        for (Integer numThreads : new Integer[]{1, 2}) {
+            ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+            final AtomicLong numMessagesToSend = new AtomicLong(messageCount);
+            purge();
+            long startTime = System.currentTimeMillis();
+            for (int i=0;i<numThreads;i++) {
+                executor.execute(new Runnable(){
+                    @Override
+                    public void run() {
+                        try {
+                            produceMessages(numMessagesToSend, queue);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+            executor.shutdown();
+            executor.awaitTermination(5, TimeUnit.MINUTES);
+            long endTime = System.currentTimeMillis();
+            long seconds = (endTime - startTime) / 1000;
+            LOG.info("For numThreads {} duration {}", numThreads.intValue(), seconds);
+            results.put(numThreads, seconds);
+            LOG.info("Broker got {} messages", brokerService.getAdminView().getTotalEnqueueCount());
+        }
+
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+        LOG.info("Results: {}", results);
+    }
+
+    private void purge() throws Exception {
+        ObjectName[] queues = brokerService.getAdminView().getQueues();
+        if (queues.length == 1) {
+            QueueViewMBean queueViewMBean = (QueueViewMBean)
+                brokerService.getManagementContext().newProxyInstance(queues[0], QueueViewMBean.class, false);
+            queueViewMBean.purge();
+        }
+    }
+
+    @Test
     @Ignore("comparison test - takes too long and really needs a peek at the graph")
     public void testPerf() throws Exception {
         LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<Integer, Long>();
@@ -58,10 +124,10 @@ public class VirtualDestPerfTest {
 
         for (int i=2;i<11;i++) {
             for (Boolean concurrent : new Boolean[]{true, false}) {
-                startBroker(i, concurrent);
+                startBroker(i, concurrent, false);
 
                 long startTime = System.currentTimeMillis();
-                produceMessages();
+                produceMessages(new AtomicLong(messageCount), target);
                 long endTime = System.currentTimeMillis();
                 long seconds = (endTime - startTime) / 1000;
                 LOG.info("For routes {} duration {}", i, seconds);
@@ -89,20 +155,20 @@ public class VirtualDestPerfTest {
         return set.toString().replace(",","%0D%0A").replace("[","").replace("]","").replace(" ", "");
     }
 
-
-    protected void produceMessages() throws Exception {
+    protected void produceMessages(AtomicLong messageCount, ActiveMQDestination destination) throws Exception {
+        final ByteSequence payLoad = new ByteSequence(new byte[messageSize]);
         Connection connection = connectionFactory.createConnection();
-        MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(target);
+        MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(destination);
         messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
         ActiveMQBytesMessage message = new ActiveMQBytesMessage();
-        message.setContent(new ByteSequence(new byte[5*1024]));
-        for (int i=0; i<10000; i++) {
+        message.setContent(payLoad);
+        while (messageCount.decrementAndGet() >= 0) {
             messageProducer.send(message);
         }
         connection.close();
     }
 
-    private void startBroker(int fanoutCount, boolean concurrentSend) throws Exception {
+    private void startBroker(int fanoutCount, boolean concurrentSend, boolean concurrentStoreAndDispatchQueues) throws Exception {
         brokerService = new BrokerService();
         brokerService.setDeleteAllMessagesOnStartup(true);
         brokerService.setUseVirtualTopics(true);
@@ -111,6 +177,8 @@ public class VirtualDestPerfTest {
         PolicyMap destPolicyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
         defaultEntry.setExpireMessagesPeriod(0);
+        defaultEntry.setOptimizedDispatch(true);
+        defaultEntry.setCursorMemoryHighWaterMark(110);
         destPolicyMap.setDefaultEntry(defaultEntry);
         brokerService.setDestinationPolicy(destPolicyMap);
 
@@ -129,13 +197,13 @@ public class VirtualDestPerfTest {
         brokerService.start();
 
         connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
-        connectionFactory.setUseAsyncSend(false);
+        connectionFactory.setWatchTopicAdvisories(false);
         if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
 
             //with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default
             // will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce
             // order issues
-            ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
+            ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQueues);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
index b0c2b50..2c54455 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
@@ -180,7 +180,7 @@ public class QueueBrowsingTest {
 
     @Test
     public void testMemoryLimit() throws Exception {
-        broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024);
+        broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024);
 
         int messageToSend = 370;
 


Mime
View raw message