activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1322 ServerLocator option to ignore topology for load-balancing
Date Fri, 04 Aug 2017 20:27:05 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 64783c250 -> f12116d5a


ARTEMIS-1322 ServerLocator option to ignore topology for load-balancing


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b6b8fa41
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b6b8fa41
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b6b8fa41

Branch: refs/heads/master
Commit: b6b8fa411f4dcf9e6ac8248489579dac8d931cb3
Parents: 64783c2
Author: Justin Bertram <jbertram@apache.org>
Authored: Fri Aug 4 13:22:43 2017 -0500
Committer: Justin Bertram <jbertram@apache.org>
Committed: Fri Aug 4 13:56:30 2017 -0500

----------------------------------------------------------------------
 .../artemis/api/core/client/ActiveMQClient.java |  2 +
 .../artemis/api/core/client/ServerLocator.java  | 10 +++
 .../core/client/impl/ServerLocatorImpl.java     | 20 +++++-
 .../artemis/tests/util/ActiveMQTestBase.java    |  1 +
 .../distribution/NettySymmetricClusterTest.java | 69 ++++++++++++++++++++
 5 files changed, 99 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6b8fa41/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index 8fba747..caa2a39 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -133,6 +133,8 @@ public final class ActiveMQClient {
 
    public static final String DEFAULT_CORE_PROTOCOL = "CORE";
 
+   public static final boolean DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING = true;
+
    public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size";
 
    public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6b8fa41/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
index 69e287c..04bd1f6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
@@ -729,6 +729,16 @@ public interface ServerLocator extends AutoCloseable {
    void close();
 
    /**
+    *
+    *
+    * @param useTopologyForLoadBalancing
+    * @return
+    */
+   ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing);
+
+   boolean getUseTopologyForLoadBalancing();
+
+   /**
     * Exposes the Topology used by this ServerLocator.
     *
     * @return topology

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6b8fa41/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index c27d3c3..adae8f7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -207,6 +207,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
    private TransportConfiguration clusterTransportConfiguration;
 
+   private boolean useTopologyForLoadBalancing;
+
    private final Exception traceException = new Exception();
 
    // To be called when there are ServerLocator being finalized.
@@ -393,6 +395,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
 
       clusterConnection = false;
+
+      useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
    }
 
    public static ServerLocator newLocator(String uri) {
@@ -524,6 +528,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       groupID = locator.groupID;
       nodeID = locator.nodeID;
       clusterTransportConfiguration = locator.clusterTransportConfiguration;
+      useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
    }
 
    private TransportConfiguration selectConnector() {
@@ -534,8 +539,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       }
 
       synchronized (this) {
-         // if the topologyArray is null, we will use the initialConnectors
-         if (usedTopology != null) {
+         if (usedTopology != null && useTopologyForLoadBalancing) {
             if (logger.isTraceEnabled()) {
                logger.trace("Selecting connector from topology.");
             }
@@ -544,7 +548,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
             return pair.getA();
          } else {
-            // Get from initialconnectors
             if (logger.isTraceEnabled()) {
                logger.trace("Selecting connector from initial connectors.");
             }
@@ -1565,6 +1568,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
    }
 
    @Override
+   public ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing)
{
+      this.useTopologyForLoadBalancing = useTopologyForLoadBalancing;
+      return this;
+   }
+
+   @Override
+   public boolean getUseTopologyForLoadBalancing() {
+      return useTopologyForLoadBalancing;
+   }
+
+   @Override
    public Topology getTopology() {
       return topology;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6b8fa41/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 736ded1..6e396b7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1138,6 +1138,7 @@ public abstract class ActiveMQTestBase extends Assert {
          for (TopologyMemberImpl member : topology.getMembers()) {
             if (member.getLive() != null) {
                liveNodesCount++;
+               ActiveMQServerLogger.LOGGER.info("Found live server connected to " + server.getNodeID());
             }
             if (member.getBackup() != null) {
                backupNodesCount++;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6b8fa41/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java
index cc3a266..e7b8294 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java
@@ -16,10 +16,79 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.junit.Test;
+
 public class NettySymmetricClusterTest extends SymmetricClusterTest {
 
    @Override
    protected boolean isNetty() {
       return true;
    }
+
+   @Test
+   public void testConnectionLoadBalancingUsingInitialConnectors() throws Exception {
+      final String ADDRESS = "queues.testaddress";
+      final String QUEUE = "queue0";
+      final String URL = "(tcp://localhost:61616,tcp://localhost:61617)?useTopologyForLoadBalancing=false";
+      final int CONNECTION_COUNT = 50;
+
+      setupCluster();
+
+      startServers();
+
+      for (int i = 0; i < 5; i++) {
+         setupSessionFactory(i, isNetty());
+      }
+
+      for (int i = 0; i < 5; i++) {
+         createQueue(i, ADDRESS, QUEUE, null, false);
+      }
+
+      for (int i = 0; i < 5; i++) {
+         addConsumer(i, i, QUEUE, null);
+      }
+
+      for (int i = 0; i < 5; i++) {
+         waitForBindings(i, ADDRESS, 1, 1, true);
+      }
+
+      for (int i = 0; i < 5; i++) {
+         waitForBindings(i, ADDRESS, 4, 4, false);
+      }
+
+      int[] baseline = new int[5];
+      for (int i = 0; i < 5; i++) {
+         baseline[i] = servers[i].getActiveMQServerControl().getConnectionCount();
+      }
+
+      ClientSessionFactory[] clientSessionFactories = new ClientSessionFactory[CONNECTION_COUNT];
+      ServerLocator locator = ActiveMQClient.createServerLocator(URL);
+      for (int i = 0; i < CONNECTION_COUNT; i++) {
+         clientSessionFactories[i] = addSessionFactory(locator.createSessionFactory());
+      }
+
+      /**
+       * Since we are only using the initial connectors to load-balance then all the connections
should be on the first 2 nodes.
+       * Note: This still uses the load-balancing-policy so this would changed if we used
the random one instead of the default
+       * round-robin one.
+       */
+      assertEquals(CONNECTION_COUNT / 2, (servers[0].getActiveMQServerControl().getConnectionCount()
- baseline[0]));
+      assertEquals(CONNECTION_COUNT / 2, (servers[1].getActiveMQServerControl().getConnectionCount()
- baseline[1]));
+
+      for (int i = 0; i < CONNECTION_COUNT; i++) {
+         clientSessionFactories[i].close();
+      }
+
+      locator.setUseTopologyForLoadBalancing(true);
+      for (int i = 0; i < CONNECTION_COUNT; i++) {
+         clientSessionFactories[i] = addSessionFactory(locator.createSessionFactory());
+      }
+
+      for (int i = 0; i < 5; i++) {
+         assertTrue((servers[i].getActiveMQServerControl().getConnectionCount() - baseline[i])
< (CONNECTION_COUNT / 2));
+      }
+   }
 }


Mime
View raw message