rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinrongt...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #3314] Make mqClientApi request timeout settable
Date Sun, 12 Sep 2021 08:24:54 GMT
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3604100  [ISSUE #3314] Make mqClientApi request timeout settable
3604100 is described below

commit 36041006aaa92867077849f0dc060d4da295712e
Author: lizhiboo <lizhiboo@yeah.net>
AuthorDate: Sun Sep 12 16:24:37 2021 +0800

    [ISSUE #3314] Make mqClientApi request timeout settable
---
 .../main/java/org/apache/rocketmq/client/ClientConfig.java   | 11 ++++++++++-
 .../rocketmq/client/impl/factory/MQClientInstance.java       | 12 ++++++------
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index beeeb2f..b2c043e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -60,6 +60,8 @@ public class ClientConfig {
 
     private boolean useTLS = TlsSystemConfig.tlsEnable;
 
+    private int mqClientApiTimeout = 3 * 1000;
+
     private LanguageCode language = LanguageCode.JAVA;
 
     public String buildMQClientId() {
@@ -298,6 +300,13 @@ public class ClientConfig {
         this.accessChannel = accessChannel;
     }
 
+    public int getMqClientApiTimeout() {
+        return mqClientApiTimeout;
+    }
+
+    public void setMqClientApiTimeout(int mqClientApiTimeout) {
+        this.mqClientApiTimeout = mqClientApiTimeout;
+    }
 
     @Override
     public String toString() {
@@ -305,6 +314,6 @@ public class ClientConfig {
             + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval="
+ pollNameServerInterval
             + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval
             + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ",
unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
-            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name()
+ ", namespace=" + namespace + "]";
+            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name()
+ ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]";
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index d30534f..e897d49 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -447,7 +447,7 @@ public class MQClientInstance {
                 if (addr != null) {
                     try {
                         this.getMQClientAPIImpl().checkClientInBroker(
-                            addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
+                            addr, entry.getKey(), this.clientId, subscriptionData, clientConfig.getMqClientApiTimeout()
                         );
                     } catch (Exception e) {
                         if (e instanceof MQClientException) {
@@ -554,7 +554,7 @@ public class MQClientInstance {
                             }
 
                             try {
-                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData,
3000);
+                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData,
clientConfig.getMqClientApiTimeout());
                                 if (!this.brokerVersionTable.containsKey(brokerName)) {
                                     this.brokerVersionTable.put(brokerName, new HashMap<String,
Integer>(4));
                                 }
@@ -610,7 +610,7 @@ public class MQClientInstance {
                     TopicRouteData topicRouteData;
                     if (isDefault && defaultMQProducer != null) {
                         topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
-                            1000 * 3);
+                            clientConfig.getMqClientApiTimeout());
                         if (topicRouteData != null) {
                             for (QueueData data : topicRouteData.getQueueDatas()) {
                                 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(),
data.getReadQueueNums());
@@ -619,7 +619,7 @@ public class MQClientInstance {
                             }
                         }
                     } else {
-                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,
1000 * 3);
+                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,
clientConfig.getMqClientApiTimeout());
                     }
                     if (topicRouteData != null) {
                         TopicRouteData old = this.topicRouteTable.get(topic);
@@ -894,7 +894,7 @@ public class MQClientInstance {
                     String addr = entry1.getValue();
                     if (addr != null) {
                         try {
-                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup,
consumerGroup, 3000);
+                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup,
consumerGroup, clientConfig.getMqClientApiTimeout());
                             log.info("unregister client[Producer: {} Consumer: {}] from broker[{}
{} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
                         } catch (RemotingException e) {
                             log.error("unregister client exception from broker: " + addr,
e);
@@ -1064,7 +1064,7 @@ public class MQClientInstance {
 
         if (null != brokerAddr) {
             try {
-                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
+                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout());
             } catch (Exception e) {
                 log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group,
e);
             }

Mime
View raw message