rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in StatsItem.java to improve performance (#3351)
Date Tue, 14 Sep 2021 15:48:25 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 311d76f  [ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in StatsItem.java
to improve performance (#3351)
311d76f is described below

commit 311d76f9c851107386b08a131909335fffa2a631
Author: huangli <areyouok@gmail.com>
AuthorDate: Tue Sep 14 23:48:15 2021 +0800

    [ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in StatsItem.java to improve performance
(#3351)
---
 .../ConsumeMessageConcurrentlyServiceTest.java     |  2 +-
 .../apache/rocketmq/common/stats/StatsItem.java    | 22 ++++++++++++----------
 .../apache/rocketmq/common/stats/StatsItemSet.java |  8 ++++----
 .../rocketmq/common/stats/StatsItemSetTest.java    |  4 +++-
 .../rocketmq/store/stats/BrokerStatsManager.java   |  6 +++---
 .../store/schedule/ScheduleMessageServiceTest.java |  6 +++---
 6 files changed, 26 insertions(+), 22 deletions(-)

diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index e8feb80..6fa76e0 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -178,7 +178,7 @@ public class ConsumeMessageConcurrentlyServiceTest {
         StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr);
         StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName());
 
-        assertThat(item.getValue().get()).isGreaterThan(0L);
+        assertThat(item.getValue().sum()).isGreaterThan(0L);
         MessageExt msg = messageAtomic.get();
         assertThat(msg).isNotNull();
         assertThat(msg.getTopic()).isEqualTo(topic);
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
index b078551..6007cb0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -21,14 +21,16 @@ import java.util.LinkedList;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.logging.InternalLogger;
 
 public class StatsItem {
 
-    private final AtomicLong value = new AtomicLong(0);
+    private final LongAdder value = new LongAdder();
 
-    private final AtomicLong times = new AtomicLong(0);
+    private final LongAdder times = new LongAdder();
 
     private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
 
@@ -157,8 +159,8 @@ public class StatsItem {
             if (this.csListMinute.size() == 0) {
                 this.csListMinute.add(new CallSnapshot(System.currentTimeMillis() - 10 *
1000, 0, 0));
             }
-            this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(),
this.value
-                .get()));
+            this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(),
this.value
+                .sum()));
             if (this.csListMinute.size() > 7) {
                 this.csListMinute.removeFirst();
             }
@@ -170,8 +172,8 @@ public class StatsItem {
             if (this.csListHour.size() == 0) {
                 this.csListHour.add(new CallSnapshot(System.currentTimeMillis() - 10 * 60
* 1000, 0, 0));
             }
-            this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(),
this.value
-                .get()));
+            this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(),
this.value
+                .sum()));
             if (this.csListHour.size() > 7) {
                 this.csListHour.removeFirst();
             }
@@ -183,8 +185,8 @@ public class StatsItem {
             if (this.csListDay.size() == 0) {
                 this.csListDay.add(new CallSnapshot(System.currentTimeMillis() - 1 * 60 *
60 * 1000, 0, 0));
             }
-            this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(),
this.value
-                .get()));
+            this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(),
this.value
+                .sum()));
             if (this.csListDay.size() > 25) {
                 this.csListDay.removeFirst();
             }
@@ -214,7 +216,7 @@ public class StatsItem {
                 ss.getAvgpt());
     }
 
-    public AtomicLong getValue() {
+    public LongAdder getValue() {
         return value;
     }
 
@@ -226,7 +228,7 @@ public class StatsItem {
         return statsName;
     }
 
-    public AtomicLong getTimes() {
+    public LongAdder getTimes() {
         return times;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index a28d008..8d5418e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -154,14 +154,14 @@ public class StatsItemSet {
 
     public void addValue(final String statsKey, final int incValue, final int incTimes) {
         StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
-        statsItem.getValue().addAndGet(incValue);
-        statsItem.getTimes().addAndGet(incTimes);
+        statsItem.getValue().add(incValue);
+        statsItem.getTimes().add(incTimes);
     }
 
     public void addRTValue(final String statsKey, final int incValue, final int incTimes)
{
         StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
-        statsItem.getValue().addAndGet(incValue);
-        statsItem.getTimes().addAndGet(incTimes);
+        statsItem.getValue().add(incValue);
+        statsItem.getTimes().add(incTimes);
     }
 
     public void delValue(final String statsKey) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
index 5b4c5d8..d834160 100644
--- a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.junit.After;
 import org.junit.Test;
@@ -95,7 +97,7 @@ public class StatsItemSetTest {
         }
     }
 
-    private AtomicLong test_unit() throws InterruptedException {
+    private LongAdder test_unit() throws InterruptedException {
         final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null);
         executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
             new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread"));
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 3e643e3..b9e11fd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -213,15 +213,15 @@ public class BrokerStatsManager {
     }
 
     public void incBrokerPutNums() {
-        this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
+        this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
     }
 
     public void incBrokerPutNums(final int incValue) {
-        this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+        this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
     }
 
     public void incBrokerGetNums(final int incValue) {
-        this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+        this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
     }
 
     public void incSendBackNums(final String group, final String topic) {
diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
index fa3c6bf..d375fb0 100644
--- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
@@ -149,9 +149,9 @@ public class ScheduleMessageServiceTest {
         assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
 
         // get the stats change
-        assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1);
-        assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().get()).isEqualTo(1L);
-        assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize());
+        assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().sum()).isEqualTo(1);
+        assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().sum()).isEqualTo(1L);
+        assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().sum()).isEqualTo(messageResult.getBufferTotalSize());
 
         // get the message body
         ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());

Mime
View raw message