rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinrongt...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #3225]change Random to ThreadLocalRandom in broker
Date Mon, 13 Sep 2021 05:59:40 GMT
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bf343a3  [ISSUE #3225]change Random to ThreadLocalRandom in broker
bf343a3 is described below

commit bf343a31a94be1d2eb420a22c647d947f26c8e5d
Author: racoon <fuzhi.lfz@alibaba-inc.com>
AuthorDate: Mon Sep 13 13:59:37 2021 +0800

    [ISSUE #3225]change Random to ThreadLocalRandom in broker
---
 .../broker/processor/AbstractSendMessageProcessor.java        |  6 +++---
 .../rocketmq/broker/processor/ReplyMessageProcessor.java      |  4 +++-
 .../rocketmq/broker/processor/SendMessageProcessor.java       | 11 ++++++-----
 .../AbstractTransactionalMessageCheckListener.java            |  2 --
 .../queue/DefaultTransactionalMessageCheckListener.java       |  4 +++-
 5 files changed, 15 insertions(+), 12 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 35f8660..9d26e99 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -22,7 +22,8 @@ import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -60,7 +61,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
 
     protected final static int DLQ_NUMS_PER_GROUP = 1;
     protected final BrokerController brokerController;
-    protected final Random random = new Random(System.currentTimeMillis());
     protected final SocketAddress storeHost;
     private List<SendMessageHook> sendMessageHookList;
 
@@ -109,7 +109,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig)
{
         int queueIdInt = requestHeader.getQueueId();
         if (queueIdInt < 0) {
-            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+            queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums();
         }
         int sysFlag = requestHeader.getSysFlag();
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index 2890fc4..f31576f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -45,6 +45,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import java.util.concurrent.ThreadLocalRandom;
+
 public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor
{
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -125,7 +127,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor
implemen
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
 
         if (queueIdInt < 0) {
-            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+            queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums();
         }
 
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 2cd142f..6942c88 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
@@ -141,7 +142,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor
implement
         }
 
         String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
-        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
+        int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
         int topicSysFlag = 0;
         if (requestHeader.isUnitMode()) {
             topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
@@ -188,7 +189,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor
implement
         if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 
             || delayLevel < 0) {
             newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
-            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+            queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
 
             topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                     DLQ_NUMS_PER_GROUP,
@@ -353,7 +354,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor
implement
             int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
             if (reconsumeTimes >= maxReconsumeTimes) {
                 newTopic = MixAll.getDLQTopic(groupName);
-                int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+                int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
                 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                     DLQ_NUMS_PER_GROUP,
                     PermName.PERM_WRITE, 0
@@ -410,7 +411,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor
implement
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
 
         if (queueIdInt < 0) {
-            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+            queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums();
         }
 
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
@@ -666,7 +667,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor
implement
     }
 
     private int randomQueueId(int writeQueueNums) {
-        return (this.random.nextInt() % 99999999) % writeQueueNums;
+        return ThreadLocalRandom.current().nextInt(99999999) % writeQueueNums;
     }
 
     private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 4cf5647..0079fb5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.broker.transaction;
 
 import io.netty.channel.Channel;
-import java.util.Random;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -40,7 +39,6 @@ public abstract class AbstractTransactionalMessageCheckListener {
 
     //queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC
     protected final static int TCMT_QUEUE_NUMS = 1;
-    protected final Random random = new Random(System.currentTimeMillis());
 
     private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
         @Override
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
index ee87bd3..a28e332 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
@@ -30,6 +30,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 
+import java.util.concurrent.ThreadLocalRandom;
+
 public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener
{
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
 
@@ -58,7 +60,7 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio
 
     private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
         TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS,
PermName.PERM_READ | PermName.PERM_WRITE);
-        int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
+        int queueId = ThreadLocalRandom.current().nextInt(99999999) % TCMT_QUEUE_NUMS;
         MessageExtBrokerInner inner = new MessageExtBrokerInner();
         inner.setTopic(topicConfig.getTopicName());
         inner.setBody(msgExt.getBody());

Mime
View raw message