rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #2165] Slave read enable not work sometimes When cluster deployed on DLedger mode (#2167)
Date Thu, 15 Oct 2020 03:10:56 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d5cb67f  [ISSUE #2165] Slave read enable not work sometimes When cluster deployed
on DLedger mode (#2167)
d5cb67f is described below

commit d5cb67ff802c5d92ba8b42f0a0ebe94a05eb9965
Author: 张旭 <maixiaohai00@gmail.com>
AuthorDate: Thu Oct 15 11:04:19 2020 +0800

    [ISSUE #2165] Slave read enable not work sometimes When cluster deployed on DLedger mode
(#2167)
    
    * [Client] Fix slaveReadEnable=true not work sometimes When cluster deployed on DLedger
mode
    
    * [Client] Add unit test for findBrokerAddressInSubscribe
    
    Co-authored-by: zhangxu16 <zhangxu16@xiaomi.com>
---
 .../client/impl/factory/MQClientInstance.java      |  5 +++
 .../client/impl/factory/MQClientInstanceTest.java  | 39 ++++++++++++++++++++++
 2 files changed, 44 insertions(+)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 48cc188..b5aaeb8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1043,6 +1043,11 @@ public class MQClientInstance {
             slave = brokerId != MixAll.MASTER_ID;
             found = brokerAddr != null;
 
+            if (!found && slave) {
+                brokerAddr = map.get(brokerId + 1);
+                found = brokerAddr != null;
+            }
+
             if (!found && !onlyThisBroker) {
                 Entry<Long, String> entry = map.entrySet().iterator().next();
                 brokerAddr = entry.getValue();
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index bb21321..e0506aa 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -19,9 +19,12 @@ package org.apache.rocketmq.client.impl.factory;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
@@ -30,8 +33,10 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.FieldSetter;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -42,6 +47,12 @@ public class MQClientInstanceTest {
     private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new
ClientConfig());
     private String topic = "FooBar";
     private String group = "FooBarGroup";
+    private ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new
ConcurrentHashMap<String, HashMap<Long, String>>();
+
+    @Before
+    public void init() throws Exception {
+        FieldSetter.setField(mqClientInstance, MQClientInstance.class.getDeclaredField("brokerAddrTable"),
brokerAddrTable);
+    }
 
     @Test
     public void testTopicRouteData2TopicPublishInfo() {
@@ -75,6 +86,34 @@ public class MQClientInstanceTest {
     }
 
     @Test
+    public void testFindBrokerAddressInSubscribe() {
+        // dledger normal case
+        String brokerName = "BrokerA";
+        HashMap<Long, String> addrMap = new HashMap<Long, String>();
+        addrMap.put(0L, "127.0.0.1:10911");
+        addrMap.put(1L, "127.0.0.1:10912");
+        addrMap.put(2L, "127.0.0.1:10913");
+        brokerAddrTable.put(brokerName, addrMap);
+        long brokerId = 1;
+        FindBrokerResult brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName,
brokerId, false);
+        assertThat(brokerResult).isNotNull();
+        assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912");
+        assertThat(brokerResult.isSlave()).isTrue();
+
+        // dledger case, when node n0 was voted as the leader
+        brokerName = "BrokerB";
+        HashMap<Long, String> addrMapNew = new HashMap<Long, String>();
+        addrMapNew.put(0L, "127.0.0.1:10911");
+        addrMapNew.put(2L, "127.0.0.1:10912");
+        addrMapNew.put(3L, "127.0.0.1:10913");
+        brokerAddrTable.put(brokerName, addrMapNew);
+        brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId,
false);
+        assertThat(brokerResult).isNotNull();
+        assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912");
+        assertThat(brokerResult.isSlave()).isTrue();
+    }
+
+    @Test
     public void testRegisterProducer() {
         boolean flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));
         assertThat(flag).isTrue();


Mime
View raw message