activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1902 Ensure ServerConsumer close done once
Date Mon, 04 Jun 2018 13:50:09 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master b3354a0cb -> 2baf37756


ARTEMIS-1902 Ensure ServerConsumer close done once

Calling close multiple times on ServerConsumer can result in multiple
notifications being routed around the cluster.  This causes cluster
topology info to become skewed.  Which affects a number of components
such as message redistribution, metrics and can eventually cause OOM
should multiple queues be redistributing at the same time.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dde60b13
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dde60b13
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dde60b13

Branch: refs/heads/master
Commit: dde60b136a8ab2922bf9455abe6a78580ea0c442
Parents: b3354a0
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Fri Jun 1 16:33:18 2018 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Jun 4 09:33:26 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/postoffice/QueueInfo.java      | 22 +++--
 .../core/postoffice/impl/PostOfficeImpl.java    |  3 +-
 .../core/server/ActiveMQServerLogger.java       |  4 +
 .../core/server/impl/ServerConsumerImpl.java    |  9 +-
 .../ProtocolsMessageLoadBalancingTest.java      | 92 ++++++++++++++++----
 5 files changed, 108 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dde60b13/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/QueueInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/QueueInfo.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/QueueInfo.java
index a953fc2..581d648 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/QueueInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/QueueInfo.java
@@ -18,9 +18,11 @@ package org.apache.activemq.artemis.core.postoffice;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 
 public class QueueInfo implements Serializable {
 
@@ -38,7 +40,9 @@ public class QueueInfo implements Serializable {
 
    private List<SimpleString> filterStrings;
 
-   private int numberOfConsumers;
+   private volatile int consumersCount = 0;
+
+   private static final AtomicIntegerFieldUpdater<QueueInfo> consumerUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueInfo.class,
"consumersCount");
 
    private final int distance;
 
@@ -99,15 +103,23 @@ public class QueueInfo implements Serializable {
    }
 
    public int getNumberOfConsumers() {
-      return numberOfConsumers;
+      return consumerUpdater.get(this);
    }
 
    public void incrementConsumers() {
-      numberOfConsumers++;
+      consumerUpdater.incrementAndGet(this);
    }
 
    public void decrementConsumers() {
-      numberOfConsumers--;
+
+      consumerUpdater.getAndUpdate(this, value -> {
+         if (value > 0) {
+            return value--;
+         } else {
+            ActiveMQServerLogger.LOGGER.consumerCountError("Tried to decrement consumer count
below 0: " + this);
+            return value;
+         }
+      });
    }
 
    public boolean matchesAddress(SimpleString address) {
@@ -144,7 +156,7 @@ public class QueueInfo implements Serializable {
          ", filterStrings=" +
          filterStrings +
          ", numberOfConsumers=" +
-         numberOfConsumers +
+         consumersCount +
          ", distance=" +
          distance +
          "]";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dde60b13/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 0e21da2..5fbb2d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -376,7 +376,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
                   filterStrings.remove(filterString);
                }
 
-               if (info.getNumberOfConsumers() == 0) {
+               // The consumer count should never be < 0 but we should catch here just
in case.
+               if (info.getNumberOfConsumers() <= 0) {
                   if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                      logger.debug("PostOffice notification / CONSUMER_CLOSED: HDR_DISTANCE
not defined");
                      return;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dde60b13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 677f1ac..1011196 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1946,4 +1946,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.TRACE)
    @Message(id = 224094, value = "Quorum vote result await is interrupted", format = Message.Format.MESSAGE_FORMAT)
    void quorumVoteAwaitInterrupted();
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void consumerCountError(String reason);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dde60b13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index c81105a..22bfdaf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -157,6 +157,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
    private boolean anycast = false;
 
+   private boolean isClosed = false;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerConsumerImpl(final long id,
@@ -477,7 +479,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
    }
 
    @Override
-   public void close(final boolean failed) throws Exception {
+   public synchronized void close(final boolean failed) throws Exception {
+
+      // Close should only ever be done once per consumer.
+      if (isClosed) return;
+      isClosed = true;
+
       if (logger.isTraceEnabled()) {
          logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed,
new Exception("trace"));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dde60b13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
index f1d0906..1c98157 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
@@ -51,15 +52,13 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
    private static final int NUMBER_OF_SERVERS = 2;
    private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
 
-
    // I'm taking any number that /2 = Odd
    // to avoid perfect roundings and making sure messages are evenly distributed
    private static final int NUMBER_OF_MESSAGES = 77 * 2;
 
-
    @Parameterized.Parameters(name = "protocol={0}")
    public static Collection getParameters() {
-      return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}});
+      return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}, {"OPENWIRE"}});
    }
 
    @Parameterized.Parameter(0)
@@ -103,14 +102,19 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
    private ConnectionFactory getJmsConnectionFactory(int node) {
       if (protocol.equals("AMQP")) {
          return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
-      } else {
+      } else if (protocol.equals("OPENWIRE")) {
+         return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:" + (61616
+ node));
+      } else if (protocol.equals("CORE")) {
          return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
+      } else {
+         Assert.fail("Protocol " + protocol + " unkown");
+         return null;
       }
    }
 
    private void pauseClusteringBridges(ActiveMQServer server) throws Exception {
       for (ClusterConnection clusterConnection : server.getClusterManager().getClusterConnections())
{
-         for (MessageFlowRecord record : ((ClusterConnectionImpl)clusterConnection).getRecords().values())
{
+         for (MessageFlowRecord record : ((ClusterConnectionImpl) clusterConnection).getRecords().values())
{
             record.getBridge().pause();
          }
       }
@@ -123,8 +127,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
 
       ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
       Connection[] connection = new Connection[NUMBER_OF_SERVERS];
-      Session[]  session = new Session[NUMBER_OF_SERVERS];
-      MessageConsumer[]  consumer = new MessageConsumer[NUMBER_OF_SERVERS];
+      Session[] session = new Session[NUMBER_OF_SERVERS];
+      MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
 
       // this will pre create consumers to make sure messages are distributed evenly without
redistribution
       for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
@@ -142,10 +146,9 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
 
       pauseClusteringBridges(servers[0]);
 
-
       // sending Messages.. they should be load balanced
       {
-         ConnectionFactory cf =  getJmsConnectionFactory(0);
+         ConnectionFactory cf = getJmsConnectionFactory(0);
          Connection cn = cf.createConnection();
          Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
@@ -180,23 +183,82 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
    }
 
    @Test
-   public void testExpireRedistributed() throws Exception {
-
+   public void testRedistributionStoppedWithNoRemoteConsumers() throws Exception {
       startServers(MessageLoadBalancingType.ON_DEMAND);
 
       ConnectionFactory factory = getJmsConnectionFactory(1);
 
-
+      // Wait for cluster nodes to sync
       waitForBindings(0, "queues.0", 1, 0, true);
       waitForBindings(1, "queues.0", 1, 0, true);
 
       waitForBindings(0, "queues.0", 1, 0, false);
       waitForBindings(1, "queues.0", 1, 0, false);
 
+      // Create CFs
+      ConnectionFactory cf0 = getJmsConnectionFactory(0);
+      ConnectionFactory cf1 = getJmsConnectionFactory(1);
+
+      // Create Consumers
+      Connection cn0 = cf0.createConnection();
+      Session sn0 = cn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer c0 = sn0.createConsumer(sn0.createQueue(queueName.toString()));
+      cn0.start();
+
+      Connection cn1 = cf1.createConnection();
+      Session sn1 = cn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer c1 = sn1.createConsumer(sn0.createQueue(queueName.toString()));
+      cn1.start();
+
+      MessageProducer pd = sn0.createProducer(sn0.createQueue(queueName.toString()));
+
+      // Wait for cluster nodes to sync consumer count
+      waitForBindings(0, "queues.0", 1, 1, false);
+      waitForBindings(1, "queues.0", 1, 1, false);
+
+      // Start queue redistributor
+      c0.close();
+
+      // Send Messages to node1.
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         pd.send(sn0.createTextMessage("hello " + i));
+      }
+
+      // Ensure the messages are redistributed from node0 to node1.
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         assertNotNull(c1.receive(1000));
+      }
+
+      // Close client on node1.  This should make the node0 stop redistributing.
+      c1.close();
+      sn1.close();
+      cn1.close();
+
+      // Send more messages (these should stay in node0)
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         pd.send(sn0.createTextMessage("hello " + i));
+      }
+
+      // Messages should stay in node 1 and note get redistributed.
+      assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName).getMessageCount());
+      assertEquals(0, servers[1].locateQueue(queueName).getMessageCount());
+   }
+
+   @Test
+   public void testExpireRedistributed() throws Exception {
+      startServers(MessageLoadBalancingType.ON_DEMAND);
+
+      ConnectionFactory factory = getJmsConnectionFactory(1);
+
+      waitForBindings(0, "queues.0", 1, 0, true);
+      waitForBindings(1, "queues.0", 1, 0, true);
+
+      waitForBindings(0, "queues.0", 1, 0, false);
+      waitForBindings(1, "queues.0", 1, 0, false);
 
       // sending Messages..
       {
-         ConnectionFactory cf =  getJmsConnectionFactory(0);
+         ConnectionFactory cf = getJmsConnectionFactory(0);
          Connection cn = cf.createConnection();
          Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
@@ -212,7 +274,6 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
       // time to let stuff expire
       Thread.sleep(200);
 
-
       Connection connection = factory.createConnection();
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry"));
@@ -253,6 +314,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
 
       servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
       servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+      servers[0].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
+      servers[1].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
    }
 
    protected void stopServers() throws Exception {
@@ -280,5 +343,4 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
       return configuration;
    }
 
-
 }


Mime
View raw message