rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maixiao...@apache.org
Subject [rocketmq-exporter] branch master updated: fix rocketmq-exporter causes OOM Error (#65)
Date Thu, 16 Sep 2021 03:48:21 GMT
This is an automated email from the ASF dual-hosted git repository.

maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git


The following commit(s) were added to refs/heads/master by this push:
     new c4277a5  fix rocketmq-exporter causes OOM Error (#65)
c4277a5 is described below

commit c4277a5145b98d45a025afaba0473852344a41ff
Author: humkum <50660789+humkum@users.noreply.github.com>
AuthorDate: Thu Sep 16 11:48:17 2021 +0800

    fix rocketmq-exporter causes OOM Error (#65)
    
    Co-authored-by: hankunming <hankunming@xiaomi.com>
---
 .../exporter/collector/RMQMetricsCollector.java    | 1170 +++++++++++---------
 .../rocketmq/exporter/config/RMQConfigure.java     |   10 +
 .../service/impl/RMQMetricsServiceImpl.java        |   10 +-
 .../rocketmq/exporter/task/MetricsCollectTask.java |    4 -
 src/main/resources/application.yml                 |    1 +
 5 files changed, 643 insertions(+), 552 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index e6f7bc9..4463111 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -16,8 +16,11 @@
  */
 package org.apache.rocketmq.exporter.collector;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import io.prometheus.client.Collector;
 import io.prometheus.client.GaugeMetricFamily;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
 import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
@@ -41,179 +44,270 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class RMQMetricsCollector extends Collector {
-    //max offset of normal consume queue
-    private ConcurrentHashMap<ProducerMetric, Double> topicOffset = new ConcurrentHashMap<>();
+
+    private Cache<ProducerMetric, Double> topicOffset;
     //max offset of retry topic consume queue
-    private ConcurrentHashMap<ProducerMetric, Double> topicRetryOffset = new ConcurrentHashMap<>();
+    private Cache<ProducerMetric, Double> topicRetryOffset;
     //max offset of dlq consume queue
-    private ConcurrentHashMap<DLQTopicOffsetMetric, Double> topicDLQOffset = new ConcurrentHashMap<>();
+    private Cache<DLQTopicOffsetMetric, Double> topicDLQOffset;
 
     //total put numbers for topics
-    private ConcurrentHashMap<TopicPutNumMetric, Double> topicPutNums = new ConcurrentHashMap<>();
+    private Cache<TopicPutNumMetric, Double> topicPutNums;
     //total get numbers for topics
-    private ConcurrentHashMap<TopicPutNumMetric, Double> topicPutSize = new ConcurrentHashMap<>();
+    private Cache<TopicPutNumMetric, Double> topicPutSize;
 
     //diff for consumer group
-    private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerDiff = new ConcurrentHashMap<>();
+    private Cache<ConsumerTopicDiffMetric, Long> consumerDiff;
     //retry diff for consumer group
-    private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerRetryDiff = new ConcurrentHashMap<>();
+    private Cache<ConsumerTopicDiffMetric, Long> consumerRetryDiff;
     //dlq diff for consumer group
-    private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerDLQDiff = new ConcurrentHashMap<>();
+    private Cache<ConsumerTopicDiffMetric, Long> consumerDLQDiff;
     //consumer count
-    private ConcurrentHashMap<ConsumerCountMetric, Integer> consumerCounts = new ConcurrentHashMap<>();
+    private Cache<ConsumerCountMetric, Integer> consumerCounts;
 
     //count of consume fail
-    private ConcurrentHashMap<ConsumerRuntimeConsumeFailedMsgsMetric, Long> consumerClientFailedMsgCounts = new ConcurrentHashMap<>();
+    private Cache<ConsumerRuntimeConsumeFailedMsgsMetric, Long> consumerClientFailedMsgCounts;
     //TPS of consume fail
-    private ConcurrentHashMap<ConsumerRuntimeConsumeFailedTPSMetric, Double> consumerClientFailedTPS = new ConcurrentHashMap<>();
+    private Cache<ConsumerRuntimeConsumeFailedTPSMetric, Double> consumerClientFailedTPS;
     //TPS of consume success
-    private ConcurrentHashMap<ConsumerRuntimeConsumeOKTPSMetric, Double> consumerClientOKTPS = new ConcurrentHashMap<>();
+    private Cache<ConsumerRuntimeConsumeOKTPSMetric, Double> consumerClientOKTPS;
     //rt of consume
-    private ConcurrentHashMap<ConsumerRuntimeConsumeRTMetric, Double> consumerClientRT = new ConcurrentHashMap<>();
+    private Cache<ConsumerRuntimeConsumeRTMetric, Double> consumerClientRT;
     //pull RT
-    private ConcurrentHashMap<ConsumerRuntimePullRTMetric, Double> consumerClientPullRT = new ConcurrentHashMap<>();
+    private Cache<ConsumerRuntimePullRTMetric, Double> consumerClientPullRT;
     //pull tps
-    private ConcurrentHashMap<ConsumerRuntimePullTPSMetric, Double> consumerClientPullTPS = new ConcurrentHashMap<>();
+    private Cache<ConsumerRuntimePullTPSMetric, Double> consumerClientPullTPS;
 
     //broker offset for consumer-topic
-    private ConcurrentHashMap<ConsumerMetric, Long> groupBrokerTotalOffset = new ConcurrentHashMap<>();
+    private Cache<ConsumerMetric, Long> groupBrokerTotalOffset;
     //consumer offset for consumer-topic
-    private ConcurrentHashMap<ConsumerMetric, Long> groupConsumeTotalOffset = new ConcurrentHashMap<>();
+    private Cache<ConsumerMetric, Long> groupConsumeTotalOffset;
     //consume tps
-    private ConcurrentHashMap<ConsumerMetric, Double> groupConsumeTPS = new ConcurrentHashMap<>();
+    private Cache<ConsumerMetric, Double> groupConsumeTPS;
     //consumed message count for consumer-topic
-    private ConcurrentHashMap<ConsumerMetric, Double> groupGetNums = new ConcurrentHashMap<>();
+    private Cache<ConsumerMetric, Double> groupGetNums;
     //consumed message size(byte) for consumer-topic
-    private ConcurrentHashMap<ConsumerMetric, Double> groupGetSize = new ConcurrentHashMap<>();
+    private Cache<ConsumerMetric, Double> groupGetSize;
     //re-consumed message count for consumer-topic
-    private ConcurrentHashMap<ConsumerMetric, Long> sendBackNums = new ConcurrentHashMap<>();
+    private Cache<ConsumerMetric, Double> sendBackNums;
     // group latency time
-    private ConcurrentHashMap<ConsumerMetric, Long> groupLatencyByTime = new ConcurrentHashMap<>();
+    private Cache<ConsumerMetric, Long> groupLatencyByTime;
 
     //total put message count for one broker
-    private ConcurrentHashMap<BrokerMetric, Double> brokerPutNums = new ConcurrentHashMap<>();
+    private Cache<BrokerMetric, Double> brokerPutNums;
     //total get message count for one broker
-    private ConcurrentHashMap<BrokerMetric, Double> brokerGetNums = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayNow = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayNow = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalYesterdayMorning = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalYesterdayMorning = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayMorning = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayMorning = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeDispatchBehindBytes = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageSizeTotal = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutMessageAverageSize = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueCapacity = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeRemainTransientStoreBufferNumbs = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeEarliestMessageTimeStamp = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageEntireTimeMax = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeStartAcceptSendRequestTimeStamp = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueSize = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageTimesTotal = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeGetMessageEntireTimeMax = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePageCacheLockTimeMills = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDiskRatio = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeConsumeQueueDiskRatio = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps600 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps60 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps10 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps600 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps60 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps10 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps600 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps60 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps10 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps600 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps60 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps10 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps600 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps60 = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps10 = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10toMore = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap5to10s = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap4to5s = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap3to4s = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap2to3s = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap1to2s = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap500to1s = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap200to500ms = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap100to200ms = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap50to100ms = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10to50ms = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0to10ms = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0ms = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueCapacity = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueCapacity = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueSize = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueSize = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityFree = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityTotal = new ConcurrentHashMap<>();
-
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMaxOffset = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMinOffset = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeRemainHowManyDataToFlush = new ConcurrentHashMap<>();
+    private Cache<BrokerMetric, Double> brokerGetNums;
+
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayNow;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayNow;
+
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalYesterdayMorning;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalYesterdayMorning;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayMorning;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayMorning;
+
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeDispatchBehindBytes;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageSizeTotal;
+
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutMessageAverageSize;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueCapacity;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeRemainTransientStoreBufferNumbs;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeEarliestMessageTimeStamp;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageEntireTimeMax;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeStartAcceptSendRequestTimeStamp;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueSize;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageTimesTotal;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeGetMessageEntireTimeMax;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePageCacheLockTimeMills;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDiskRatio;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeConsumeQueueDiskRatio;
+
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps600;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps60;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps10;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps600;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps60;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps10;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps600;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps60;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps10;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps600;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps60;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps10;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps600;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps60;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps10;
+
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer;
+
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10toMore;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap5to10s;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap4to5s;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap3to4s;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap2to3s;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap1to2s;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap500to1s;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap200to500ms;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap100to200ms;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap50to100ms;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10to50ms;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0to10ms;
+    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0ms;
+
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueCapacity;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueCapacity;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueSize;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueSize;
+
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueHeadWaitTimeMills;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityFree;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityTotal;
+
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMaxOffset;
+    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMinOffset;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeRemainHowManyDataToFlush;
+
+    public RMQMetricsCollector(long outOfTimeSeconds) {
+        this.topicOffset = initCache(outOfTimeSeconds);
+        this.topicRetryOffset = initCache(outOfTimeSeconds);
+        this.topicDLQOffset = initCache(outOfTimeSeconds);
+        this.topicPutNums = initCache(outOfTimeSeconds);
+        this.topicPutSize = initCache(outOfTimeSeconds);
+        this.consumerDiff = initCache(outOfTimeSeconds);
+        this.consumerRetryDiff = initCache(outOfTimeSeconds);
+        this.consumerDLQDiff = initCache(outOfTimeSeconds);
+        this.consumerCounts = initCache(outOfTimeSeconds);
+        this.consumerClientFailedMsgCounts = initCache(outOfTimeSeconds);
+        this.consumerClientFailedTPS = initCache(outOfTimeSeconds);
+        this.consumerClientOKTPS = initCache(outOfTimeSeconds);
+        this.consumerClientRT = initCache(outOfTimeSeconds);
+        this.consumerClientPullRT = initCache(outOfTimeSeconds);
+        this.consumerClientPullTPS = initCache(outOfTimeSeconds);
+        this.groupBrokerTotalOffset = initCache(outOfTimeSeconds);
+        this.groupConsumeTotalOffset = initCache(outOfTimeSeconds);
+        this.groupConsumeTPS = initCache(outOfTimeSeconds);
+        this.groupGetNums = initCache(outOfTimeSeconds);
+        this.groupGetSize = initCache(outOfTimeSeconds);
+        this.sendBackNums = initCache(outOfTimeSeconds);
+        this.groupLatencyByTime = initCache(outOfTimeSeconds);
+        this.brokerPutNums = initCache(outOfTimeSeconds);
+        this.brokerGetNums = initCache(outOfTimeSeconds);
+        this.brokerRuntimeMsgPutTotalTodayNow = initCache(outOfTimeSeconds);
+        this.brokerRuntimeMsgGetTotalTodayNow = initCache(outOfTimeSeconds);
+        this.brokerRuntimeMsgGetTotalYesterdayMorning = initCache(outOfTimeSeconds);
+        this.brokerRuntimeMsgPutTotalYesterdayMorning = initCache(outOfTimeSeconds);
+        this.brokerRuntimeMsgGetTotalTodayMorning = initCache(outOfTimeSeconds);
+        this.brokerRuntimeMsgPutTotalTodayMorning = initCache(outOfTimeSeconds);
+        this.brokerRuntimeDispatchBehindBytes = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageSizeTotal = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageAverageSize = initCache(outOfTimeSeconds);
+        this.brokerRuntimeQueryThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
+        this.brokerRuntimeRemainTransientStoreBufferNumbs = initCache(outOfTimeSeconds);
+        this.brokerRuntimeEarliestMessageTimeStamp = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageEntireTimeMax = initCache(outOfTimeSeconds);
+        this.brokerRuntimeStartAcceptSendRequestTimeStamp = initCache(outOfTimeSeconds);
+        this.brokerRuntimeSendThreadPoolQueueSize = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageTimesTotal = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetMessageEntireTimeMax = initCache(outOfTimeSeconds);
+        this.brokerRuntimePageCacheLockTimeMills = initCache(outOfTimeSeconds);
+        this.brokerRuntimeCommitLogDiskRatio = initCache(outOfTimeSeconds);
+        this.brokerRuntimeConsumeQueueDiskRatio = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetFoundTps600 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetFoundTps60 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetFoundTps10 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetTotalTps600 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetTotalTps60 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetTotalTps10 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetTransferedTps600 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetTransferedTps60 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetTransferedTps10 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetMissTps600 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetMissTps60 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeGetMissTps10 = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutTps600 = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutTps60 = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutTps10 = initCache(outOfTimeSeconds);
+        this.brokerRuntimeDispatchMaxBuffer = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap10toMore = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap5to10s = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap4to5s = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap3to4s = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap2to3s = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap1to2s = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap500to1s = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap200to500ms = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap100to200ms = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap50to100ms = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap10to50ms = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap0to10ms = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutMessageDistributeTimeMap0ms = initCache(outOfTimeSeconds);
+        this.brokerRuntimePullThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
+        this.brokerRuntimeSendThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
+        this.brokerRuntimePullThreadPoolQueueSize = initCache(outOfTimeSeconds);
+        this.brokerRuntimeQueryThreadPoolQueueSize = initCache(outOfTimeSeconds);
+        this.brokerRuntimePullThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
+        this.brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
+        this.brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
+        this.brokerRuntimeCommitLogDirCapacityFree = initCache(outOfTimeSeconds);
+        this.brokerRuntimeCommitLogDirCapacityTotal = initCache(outOfTimeSeconds);
+        this.brokerRuntimeCommitLogMaxOffset = initCache(outOfTimeSeconds);
+        this.brokerRuntimeCommitLogMinOffset = initCache(outOfTimeSeconds);
+        this.brokerRuntimeRemainHowManyDataToFlush = initCache(outOfTimeSeconds);
+    }
+
+    private <T extends Cache> T initCache(long outOfTimeSeconds) {
+        return (T) CacheBuilder.newBuilder().expireAfterWrite(outOfTimeSeconds, TimeUnit.SECONDS).build();
+    }
+
     private final static Logger log = LoggerFactory.getLogger(RMQMetricsCollector.class);
 
     private static final List<String> GROUP_DIFF_LABEL_NAMES = Arrays.asList("group", "topic", "countOfOnlineConsumers", "msgModel");
 
     private static <T extends Number> void loadGroupDiffMetric(GaugeMetricFamily family, Map.Entry<ConsumerTopicDiffMetric, T> entry) {
         family.addMetric(
-                Arrays.asList(
-                        entry.getKey().getGroup(),
-                        entry.getKey().getTopic(),
-                        entry.getKey().getCountOfOnlineConsumers(),
-                        entry.getKey().getMsgModel()                
-                        ),
-                entry.getValue().doubleValue());
+            Arrays.asList(
+                entry.getKey().getGroup(),
+                entry.getKey().getTopic(),
+                entry.getKey().getCountOfOnlineConsumers(),
+                entry.getKey().getMsgModel()
+            ),
+            entry.getValue().doubleValue());
     }
 
     private static final List<String> GROUP_COUNT_LABEL_NAMES = Arrays.asList("caddr", "localaddr", "group");
 
     private void collectConsumerMetric(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily groupGetLatencyByConsumerDiff = new GaugeMetricFamily("rocketmq_group_diff", "GroupDiff", GROUP_DIFF_LABEL_NAMES);
-        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDiff.entrySet()) {
+        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDiff.asMap().entrySet()) {
             loadGroupDiffMetric(groupGetLatencyByConsumerDiff, entry);
         }
         mfs.add(groupGetLatencyByConsumerDiff);
 
         GaugeMetricFamily groupGetLatencyByConsumerRetryDiff = new GaugeMetricFamily("rocketmq_group_retrydiff", "GroupRetryDiff", GROUP_DIFF_LABEL_NAMES);
-        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerRetryDiff.entrySet()) {
+        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerRetryDiff.asMap().entrySet()) {
             loadGroupDiffMetric(groupGetLatencyByConsumerRetryDiff, entry);
         }
         mfs.add(groupGetLatencyByConsumerRetryDiff);
 
         GaugeMetricFamily groupGetLatencyByConsumerDLQDiff = new GaugeMetricFamily("rocketmq_group_dlqdiff", "GroupDLQDiff", GROUP_DIFF_LABEL_NAMES);
-        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDLQDiff.entrySet()) {
+        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDLQDiff.asMap().entrySet()) {
             loadGroupDiffMetric(groupGetLatencyByConsumerDLQDiff, entry);
         }
         mfs.add(groupGetLatencyByConsumerDLQDiff);
 
         GaugeMetricFamily consumerCountsF = new GaugeMetricFamily("rocketmq_group_count", "GroupCount", GROUP_COUNT_LABEL_NAMES);
-        for (Map.Entry<ConsumerCountMetric, Integer> entry : consumerCounts.entrySet()) {
+        for (Map.Entry<ConsumerCountMetric, Integer> entry : consumerCounts.asMap().entrySet()) {
             consumerCountsF.addMetric(
-                    Arrays.asList(
-                        entry.getKey().getCaddrs(),
-                        entry.getKey().getLocaladdrs(),
-                        entry.getKey().getGroup()
-                    ),
-                    entry.getValue().doubleValue());
+                Arrays.asList(
+                    entry.getKey().getCaddrs(),
+                    entry.getKey().getLocaladdrs(),
+                    entry.getKey().getGroup()
+                ),
+                entry.getValue().doubleValue());
         }
         mfs.add(consumerCountsF);
 
@@ -221,47 +315,47 @@ public class RMQMetricsCollector extends Collector {
 
 
     private static final List<String> TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
-            "cluster", "broker", "topic", "lastUpdateTimestamp"
+        "cluster", "broker", "topic", "lastUpdateTimestamp"
     );
 
     private static final List<String> DLQ_TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
-            "cluster", "broker", "group", "lastUpdateTimestamp"
+        "cluster", "broker", "group", "lastUpdateTimestamp"
     );
 
     private void loadTopicOffsetMetric(GaugeMetricFamily family, Map.Entry<ProducerMetric, Double> entry) {
         family.addMetric(
-                Arrays.asList(
-                        entry.getKey().getClusterName(),
-                        entry.getKey().getBrokerName(),
-                        entry.getKey().getTopicName(),
-                        String.valueOf(entry.getKey().getLastUpdateTimestamp())
-                ),
-                entry.getValue());
+            Arrays.asList(
+                entry.getKey().getClusterName(),
+                entry.getKey().getBrokerName(),
+                entry.getKey().getTopicName(),
+                String.valueOf(entry.getKey().getLastUpdateTimestamp())
+            ),
+            entry.getValue());
     }
 
     private void collectTopicOffsetMetric(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_producer_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
-        for (Map.Entry<ProducerMetric, Double> entry : topicOffset.entrySet()) {
+        for (Map.Entry<ProducerMetric, Double> entry : topicOffset.asMap().entrySet()) {
             loadTopicOffsetMetric(topicOffsetF, entry);
         }
         mfs.add(topicOffsetF);
 
         GaugeMetricFamily topicRetryOffsetF = new GaugeMetricFamily("rocketmq_topic_retry_offset", "TopicRetryOffset", TOPIC_OFFSET_LABEL_NAMES);
-        for (Map.Entry<ProducerMetric, Double> entry : topicRetryOffset.entrySet()) {
+        for (Map.Entry<ProducerMetric, Double> entry : topicRetryOffset.asMap().entrySet()) {
             loadTopicOffsetMetric(topicRetryOffsetF, entry);
         }
         mfs.add(topicRetryOffsetF);
 
         GaugeMetricFamily topicDLQOffsetF = new GaugeMetricFamily("rocketmq_topic_dlq_offset", "TopicRetryOffset", DLQ_TOPIC_OFFSET_LABEL_NAMES);
-        for (Map.Entry<DLQTopicOffsetMetric, Double> entry : topicDLQOffset.entrySet()) {
+        for (Map.Entry<DLQTopicOffsetMetric, Double> entry : topicDLQOffset.asMap().entrySet()) {
             topicDLQOffsetF.addMetric(
-                    Arrays.asList(
-                            entry.getKey().getClusterName(),
-                            entry.getKey().getBrokerName(),
-                            entry.getKey().getGroup(),
-                            String.valueOf(entry.getKey().getLastUpdateTimestamp())
-                    ),
-                    entry.getValue());
+                Arrays.asList(
+                    entry.getKey().getClusterName(),
+                    entry.getKey().getBrokerName(),
+                    entry.getKey().getGroup(),
+                    String.valueOf(entry.getKey().getLastUpdateTimestamp())
+                ),
+                entry.getValue());
         }
         mfs.add(topicDLQOffsetF);
 
@@ -290,42 +384,42 @@ public class RMQMetricsCollector extends Collector {
     }
 
     private static final List<String> GROUP_CLIENT_METRIC_LABEL_NAMES = Arrays.asList(
-            "clientAddr", "clientId", "group", "topic"
+        "clientAddr", "clientId", "group", "topic"
     );
 
     private void collectClientGroupMetric(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily consumerClientFailedMsgCountsF = new GaugeMetricFamily("rocketmq_client_consume_fail_msg_count", "consumerClientFailedMsgCounts", GROUP_CLIENT_METRIC_LABEL_NAMES);
-        for (Map.Entry<ConsumerRuntimeConsumeFailedMsgsMetric, Long> entry : consumerClientFailedMsgCounts.entrySet()) {
+        for (Map.Entry<ConsumerRuntimeConsumeFailedMsgsMetric, Long> entry : consumerClientFailedMsgCounts.asMap().entrySet()) {
             loadClientRuntimeStatsMetric(consumerClientFailedMsgCountsF, entry);
         }
         mfs.add(consumerClientFailedMsgCountsF);
 
         GaugeMetricFamily consumerClientFailedTPSF = new GaugeMetricFamily("rocketmq_client_consume_fail_msg_tps", "consumerClientFailedTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
-        for (Map.Entry<ConsumerRuntimeConsumeFailedTPSMetric, Double> entry : consumerClientFailedTPS.entrySet()) {
+        for (Map.Entry<ConsumerRuntimeConsumeFailedTPSMetric, Double> entry : consumerClientFailedTPS.asMap().entrySet()) {
             loadClientRuntimeStatsMetric(consumerClientFailedTPSF, entry);
         }
         mfs.add(consumerClientFailedTPSF);
 
         GaugeMetricFamily consumerClientOKTPSF = new GaugeMetricFamily("rocketmq_client_consume_ok_msg_tps", "consumerClientOKTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
-        for (Map.Entry<ConsumerRuntimeConsumeOKTPSMetric, Double> entry : consumerClientOKTPS.entrySet()) {
+        for (Map.Entry<ConsumerRuntimeConsumeOKTPSMetric, Double> entry : consumerClientOKTPS.asMap().entrySet()) {
             loadClientRuntimeStatsMetric(consumerClientOKTPSF, entry);
         }
         mfs.add(consumerClientOKTPSF);
 
         GaugeMetricFamily consumerClientRTF = new GaugeMetricFamily("rocketmq_client_consume_rt", "consumerClientRT", GROUP_CLIENT_METRIC_LABEL_NAMES);
-        for (Map.Entry<ConsumerRuntimeConsumeRTMetric, Double> entry : consumerClientRT.entrySet()) {
+        for (Map.Entry<ConsumerRuntimeConsumeRTMetric, Double> entry : consumerClientRT.asMap().entrySet()) {
             loadClientRuntimeStatsMetric(consumerClientRTF, entry);
         }
         mfs.add(consumerClientRTF);
 
         GaugeMetricFamily consumerClientPullRTF = new GaugeMetricFamily("rocketmq_client_consumer_pull_rt", "consumerClientPullRT", GROUP_CLIENT_METRIC_LABEL_NAMES);
-        for (Map.Entry<ConsumerRuntimePullRTMetric, Double> entry : consumerClientPullRT.entrySet()) {
+        for (Map.Entry<ConsumerRuntimePullRTMetric, Double> entry : consumerClientPullRT.asMap().entrySet()) {
             loadClientRuntimeStatsMetric(consumerClientPullRTF, entry);
         }
         mfs.add(consumerClientPullRTF);
 
         GaugeMetricFamily consumerClientPullTPSF = new GaugeMetricFamily("rocketmq_client_consumer_pull_tps", "consumerClientPullTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
-        for (Map.Entry<ConsumerRuntimePullTPSMetric, Double> entry : consumerClientPullTPS.entrySet()) {
+        for (Map.Entry<ConsumerRuntimePullTPSMetric, Double> entry : consumerClientPullTPS.asMap().entrySet()) {
             loadClientRuntimeStatsMetric(consumerClientPullTPSF, entry);
         }
         mfs.add(consumerClientPullTPSF);
@@ -334,40 +428,40 @@ public class RMQMetricsCollector extends Collector {
 
     private <T2 extends Number, T1 extends ConsumerRuntimeConsumeFailedMsgsMetric> void loadClientRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<T1, T2> entry) {
         family.addMetric(Arrays.asList(
-                entry.getKey().getCaddrs(),
-                entry.getKey().getLocaladdrs(),
-                entry.getKey().getGroup(),
-                entry.getKey().getTopic()
+            entry.getKey().getCaddrs(),
+            entry.getKey().getLocaladdrs(),
+            entry.getKey().getGroup(),
+            entry.getKey().getTopic()
         ), entry.getValue().doubleValue());
     }
 
     private static final List<String> GROUP_PULL_LATENCY_LABEL_NAMES = Arrays.asList(
-            "cluster", "broker", "topic", "group", "queueid"
+        "cluster", "broker", "topic", "group", "queueid"
     );
     private static final List<String> GROUP_LATENCY_BY_STORETIME_LABEL_NAMES = Arrays.asList(
-            "cluster", "broker", "topic", "group"
+        "cluster", "broker", "topic", "group"
     );
 
     private static final List<String> BROKER_NUMS_LABEL_NAMES = Arrays.asList("cluster", "brokerIP", "broker");
 
     private static void loadBrokerNums(GaugeMetricFamily family, Map.Entry<BrokerMetric, Double> entry) {
         family.addMetric(Arrays.asList(
-                entry.getKey().getClusterName(),
-                entry.getKey().getBrokerIP(),
-                entry.getKey().getBrokerName()),
-                entry.getValue()
+            entry.getKey().getClusterName(),
+            entry.getKey().getBrokerIP(),
+            entry.getKey().getBrokerName()),
+            entry.getValue()
         );
     }
 
     private void collectBrokerNums(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_tps", "BrokerPutNums", BROKER_NUMS_LABEL_NAMES);
-        for (Map.Entry<BrokerMetric, Double> entry : brokerPutNums.entrySet()) {
+        for (Map.Entry<BrokerMetric, Double> entry : brokerPutNums.asMap().entrySet()) {
             loadBrokerNums(brokerPutNumsGauge, entry);
         }
         mfs.add(brokerPutNumsGauge);
 
         GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_qps", "BrokerGetNums", BROKER_NUMS_LABEL_NAMES);
-        for (Map.Entry<BrokerMetric, Double> entry : brokerGetNums.entrySet()) {
+        for (Map.Entry<BrokerMetric, Double> entry : brokerGetNums.asMap().entrySet()) {
             loadBrokerNums(brokerGetNumsGauge, entry);
         }
         mfs.add(brokerGetNumsGauge);
@@ -375,94 +469,94 @@ public class RMQMetricsCollector extends Collector {
 
 
     private static final List<String> GROUP_NUMS_LABEL_NAMES = Arrays.asList(
-            "cluster", "broker", "topic", "group"
+        "cluster", "broker", "topic", "group"
     );
 
     private static <T extends Number> void loadGroupNumsMetric(GaugeMetricFamily family, Map.Entry<ConsumerMetric, T> entry) {
         family.addMetric(Arrays.asList(
-                entry.getKey().getClusterName(),
-                entry.getKey().getBrokerName(),
-                entry.getKey().getTopicName(),
-                entry.getKey().getConsumerGroupName()),
-                entry.getValue().doubleValue()
+            entry.getKey().getClusterName(),
+            entry.getKey().getBrokerName(),
+            entry.getKey().getTopicName(),
+            entry.getKey().getConsumerGroupName()),
+            entry.getValue().doubleValue()
         );
     }
 
     private void collectBrokerRuntimeStatsPutMessageDistributeTime(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily pmdt0 = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0ms", "PutMessageDistributeTimeMap0ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0ms.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0ms.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt0, entry);
         }
         mfs.add(pmdt0);
 
         GaugeMetricFamily pmdt0to10ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0to10ms", "PutMessageDistributeTimeMap0to10ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0to10ms.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0to10ms.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt0to10ms, entry);
         }
         mfs.add(pmdt0to10ms);
 
         GaugeMetricFamily pmdt10to50ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10to50ms", "PutMessageDistributeTimeMap10to50ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10to50ms.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10to50ms.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt10to50ms, entry);
         }
         mfs.add(pmdt10to50ms);
 
         GaugeMetricFamily pmdt50to100ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_50to100ms", "PutMessageDistributeTimeMap50to100ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap50to100ms.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap50to100ms.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt50to100ms, entry);
         }
         mfs.add(pmdt50to100ms);
 
         GaugeMetricFamily pmdt100to200ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_100to200ms", "PutMessageDistributeTimeMap100to200ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap100to200ms.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap100to200ms.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt100to200ms, entry);
         }
         mfs.add(pmdt100to200ms);
 
         GaugeMetricFamily pmdt200to500ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_200to500ms", "PutMessageDistributeTimeMap200to500ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap200to500ms.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap200to500ms.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt200to500ms, entry);
         }
         mfs.add(pmdt200to500ms);
 
         GaugeMetricFamily pmdt500to1s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_500to1s", "PutMessageDistributeTimeMap500to1s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap500to1s.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap500to1s.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt500to1s, entry);
         }
         mfs.add(pmdt500to1s);
 
         GaugeMetricFamily pmdt1to2s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_1to2s", "PutMessageDistributeTimeMap1to2s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap1to2s.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap1to2s.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt1to2s, entry);
         }
         mfs.add(pmdt1to2s);
 
         GaugeMetricFamily pmdt2to3s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_2to3s", "PutMessageDistributeTimeMap2to3s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap2to3s.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap2to3s.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt2to3s, entry);
         }
         mfs.add(pmdt2to3s);
 
         GaugeMetricFamily pmdt3to4s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_3to4s", "PutMessageDistributeTimeMap3to4s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap3to4s.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap3to4s.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt3to4s, entry);
         }
         mfs.add(pmdt3to4s);
 
         GaugeMetricFamily pmdt4to5s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_4to5s", "PutMessageDistributeTimeMap4to5s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap4to5s.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap4to5s.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt4to5s, entry);
         }
         mfs.add(pmdt4to5s);
 
         GaugeMetricFamily pmdt5to10s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_5to10s", "PutMessageDistributeTimeMap5to10s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap5to10s.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap5to10s.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt5to10s, entry);
         }
         mfs.add(pmdt5to10s);
 
         GaugeMetricFamily pmdt10stoMore = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10stomore", "PutMessageDistributeTimeMap10toMore", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10toMore.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10toMore.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(pmdt10stoMore, entry);
         }
         mfs.add(pmdt10stoMore);
@@ -470,44 +564,44 @@ public class RMQMetricsCollector extends Collector {
 
     private void collectGroupNums(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily groupGetNumsGauge = new GaugeMetricFamily("rocketmq_consumer_tps", "GroupGetNums", GROUP_NUMS_LABEL_NAMES);
-        for (Map.Entry<ConsumerMetric, Double> entry : groupGetNums.entrySet()) {
+        for (Map.Entry<ConsumerMetric, Double> entry : groupGetNums.asMap().entrySet()) {
             loadGroupNumsMetric(groupGetNumsGauge, entry);
         }
         mfs.add(groupGetNumsGauge);
 
         GaugeMetricFamily groupConsumeTPSF = new GaugeMetricFamily("rocketmq_group_consume_tps", "GroupConsumeTPS", GROUP_NUMS_LABEL_NAMES);
-        for (Map.Entry<ConsumerMetric, Double> entry : groupConsumeTPS.entrySet()) {
+        for (Map.Entry<ConsumerMetric, Double> entry : groupConsumeTPS.asMap().entrySet()) {
             loadGroupNumsMetric(groupConsumeTPSF, entry);
         }
         mfs.add(groupConsumeTPSF);
 
         GaugeMetricFamily groupBrokerTotalOffsetF = new GaugeMetricFamily("rocketmq_consumer_offset", "GroupBrokerTotalOffset", GROUP_NUMS_LABEL_NAMES);
-        for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.entrySet()) {
+        for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.asMap().entrySet()) {
             loadGroupNumsMetric(groupBrokerTotalOffsetF, entry);
         }
         mfs.add(groupBrokerTotalOffsetF);
 
         GaugeMetricFamily groupConsumeTotalOffsetF = new GaugeMetricFamily("rocketmq_group_consume_total_offset", "GroupConsumeTotalOffset", GROUP_NUMS_LABEL_NAMES);
-        for (Map.Entry<ConsumerMetric, Long> entry : groupConsumeTotalOffset.entrySet()) {
+        for (Map.Entry<ConsumerMetric, Long> entry : groupConsumeTotalOffset.asMap().entrySet()) {
             loadGroupNumsMetric(groupConsumeTotalOffsetF, entry);
         }
         mfs.add(groupConsumeTotalOffsetF);
 
         GaugeMetricFamily groupGetSizeGauge = new GaugeMetricFamily("rocketmq_consumer_message_size", "GroupGetMessageSize", GROUP_NUMS_LABEL_NAMES);
-        for (Map.Entry<ConsumerMetric, Double> entry : groupGetSize.entrySet()) {
+        for (Map.Entry<ConsumerMetric, Double> entry : groupGetSize.asMap().entrySet()) {
             loadGroupNumsMetric(groupGetSizeGauge, entry);
         }
         mfs.add(groupGetSizeGauge);
 
         GaugeMetricFamily sendBackNumsGauge = new GaugeMetricFamily("rocketmq_send_back_nums", "SendBackNums", GROUP_NUMS_LABEL_NAMES);
-        for (Map.Entry<ConsumerMetric, Long> entry : sendBackNums.entrySet()) {
+        for (Map.Entry<ConsumerMetric, Double> entry : sendBackNums.asMap().entrySet()) {
             loadGroupNumsMetric(sendBackNumsGauge, entry);
         }
         mfs.add(sendBackNumsGauge);
 
         GaugeMetricFamily groupLatencyByTimeF = new GaugeMetricFamily("rocketmq_group_get_latency_by_storetime",
-                "GroupGetLatencyByStoreTime", GROUP_LATENCY_BY_STORETIME_LABEL_NAMES);
-        for (Map.Entry<ConsumerMetric, Long> entry : groupLatencyByTime.entrySet()) {
+            "GroupGetLatencyByStoreTime", GROUP_LATENCY_BY_STORETIME_LABEL_NAMES);
+        for (Map.Entry<ConsumerMetric, Long> entry : groupLatencyByTime.asMap().entrySet()) {
             loadGroupNumsMetric(groupLatencyByTimeF, entry);
         }
         mfs.add(groupLatencyByTimeF);
@@ -516,13 +610,13 @@ public class RMQMetricsCollector extends Collector {
 
     private void collectTopicNums(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_producer_tps", "TopicPutNums", TOPIC_NUMS_LABEL_NAMES);
-        for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutNums.entrySet()) {
+        for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutNums.asMap().entrySet()) {
             loadTopicNumsMetric(topicPutNumsGauge, entry);
         }
         mfs.add(topicPutNumsGauge);
 
         GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_producer_message_size", "TopicPutMessageSize", TOPIC_NUMS_LABEL_NAMES);
-        for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutSize.entrySet()) {
+        for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutSize.asMap().entrySet()) {
             loadTopicNumsMetric(topicPutSizeGauge, entry);
         }
         mfs.add(topicPutSizeGauge);
@@ -532,12 +626,12 @@ public class RMQMetricsCollector extends Collector {
 
     private void loadTopicNumsMetric(GaugeMetricFamily family, Map.Entry<TopicPutNumMetric, Double> entry) {
         family.addMetric(
-                Arrays.asList(
-                        entry.getKey().getClusterName(),
-                        entry.getKey().getBrokerName(),
-                        entry.getKey().getTopicName()
-                ),
-                entry.getValue()
+            Arrays.asList(
+                entry.getKey().getClusterName(),
+                entry.getKey().getBrokerName(),
+                entry.getKey().getTopicName()
+            ),
+            entry.getValue()
         );
     }
 
@@ -597,7 +691,7 @@ public class RMQMetricsCollector extends Collector {
         groupGetSize.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
     }
 
-    public void addSendBackNumsMetric(String clusterName, String brokerName, String topic, String group, long value) {
+    public void addSendBackNumsMetric(String clusterName, String brokerName, String topic, String group, double value) {
         sendBackNums.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
     }
 
@@ -629,403 +723,387 @@ public class RMQMetricsCollector extends Collector {
         brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
     }
 
-    public void clearBrokerPutNumsMetric(Set<BrokerMetric> masterMetricsSet) {
-        for (BrokerMetric brokerMetric : brokerPutNums.keySet()) {
-            if (!masterMetricsSet.contains(brokerMetric)) {
-                brokerPutNums.remove(brokerMetric);
-            }
-        }
-    }
-
     public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
         brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
     }
 
-    public void clearBrokerGetNumsMetric(Set<BrokerMetric> masterMetricsSet) {
-        for (BrokerMetric brokerMetric : brokerGetNums.keySet()) {
-            if (!masterMetricsSet.contains(brokerMetric)) {
-                brokerGetNums.remove(brokerMetric);
-            }
-        }
-    }
-
     public void addBrokerRuntimeStatsMetric(BrokerRuntimeStats stats, String clusterName, String brokerAddress, String brokerHost) {
         addBrokerRuntimePutMessageDistributeTimeMap(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion(), stats);
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion(), stats);
         addCommitLogDirCapacity(clusterName, brokerAddress, brokerHost, stats);
         addAllKindOfTps(clusterName, brokerAddress, brokerHost, stats);
 
         brokerRuntimeMsgPutTotalTodayNow.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getMsgPutTotalTodayNow());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getMsgPutTotalTodayNow());
 
         brokerRuntimeMsgGetTotalTodayNow.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getMsgGetTotalTodayNow());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getMsgGetTotalTodayNow());
 
         brokerRuntimeMsgPutTotalTodayMorning.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getMsgPutTotalTodayMorning());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getMsgPutTotalTodayMorning());
         brokerRuntimeMsgGetTotalTodayMorning.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getMsgGetTotalTodayMorning());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getMsgGetTotalTodayMorning());
         brokerRuntimeMsgPutTotalYesterdayMorning.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getMsgPutTotalYesterdayMorning());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getMsgPutTotalYesterdayMorning());
         brokerRuntimeMsgGetTotalYesterdayMorning.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getMsgGetTotalYesterdayMorning());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getMsgGetTotalYesterdayMorning());
         brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getSendThreadPoolQueueHeadWaitTimeMills());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getSendThreadPoolQueueHeadWaitTimeMills());
         brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getQueryThreadPoolQueueHeadWaitTimeMills());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getQueryThreadPoolQueueHeadWaitTimeMills());
         brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPullThreadPoolQueueHeadWaitTimeMills());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPullThreadPoolQueueHeadWaitTimeMills());
         brokerRuntimeQueryThreadPoolQueueSize.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getQueryThreadPoolQueueSize());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getQueryThreadPoolQueueSize());
         brokerRuntimePullThreadPoolQueueSize.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPullThreadPoolQueueSize());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPullThreadPoolQueueSize());
         brokerRuntimeSendThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getSendThreadPoolQueueCapacity());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getSendThreadPoolQueueCapacity());
         brokerRuntimePullThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPullThreadPoolQueueCapacity());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPullThreadPoolQueueCapacity());
 
         brokerRuntimeRemainHowManyDataToFlush.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getRemainHowManyDataToFlush());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getRemainHowManyDataToFlush());
         brokerRuntimeCommitLogMinOffset.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getCommitLogMinOffset());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getCommitLogMinOffset());
         brokerRuntimeCommitLogMaxOffset.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getCommitLogMaxOffset());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getCommitLogMaxOffset());
 
 
         brokerRuntimeDispatchMaxBuffer.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getDispatchMaxBuffer());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getDispatchMaxBuffer());
         brokerRuntimeConsumeQueueDiskRatio.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getConsumeQueueDiskRatio());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getConsumeQueueDiskRatio());
         brokerRuntimeCommitLogDiskRatio.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getCommitLogDiskRatio());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getCommitLogDiskRatio());
         brokerRuntimePageCacheLockTimeMills.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPageCacheLockTimeMills());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPageCacheLockTimeMills());
         brokerRuntimeGetMessageEntireTimeMax.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetMessageEntireTimeMax());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetMessageEntireTimeMax());
         brokerRuntimePutMessageTimesTotal.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPutMessageTimesTotal());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPutMessageTimesTotal());
         brokerRuntimeSendThreadPoolQueueSize.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getSendThreadPoolQueueSize());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getSendThreadPoolQueueSize());
         brokerRuntimeStartAcceptSendRequestTimeStamp.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getStartAcceptSendRequestTimeStamp());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getStartAcceptSendRequestTimeStamp());
         brokerRuntimePutMessageEntireTimeMax.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPutMessageEntireTimeMax());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPutMessageEntireTimeMax());
         brokerRuntimeEarliestMessageTimeStamp.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getEarliestMessageTimeStamp());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getEarliestMessageTimeStamp());
         brokerRuntimeRemainTransientStoreBufferNumbs.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getRemainTransientStoreBufferNumbs());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getRemainTransientStoreBufferNumbs());
         brokerRuntimeQueryThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
         brokerRuntimePutMessageAverageSize.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPutMessageAverageSize());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPutMessageAverageSize());
         brokerRuntimePutMessageSizeTotal.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPutMessageSizeTotal());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
         brokerRuntimeDispatchBehindBytes.put(new BrokerRuntimeMetric(
-                clusterName, brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getDispatchBehindBytes());
+            clusterName, brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getDispatchBehindBytes());
     }
 
     private void addAllKindOfTps(String clusterName, String brokerAddress, String brokerHost, BrokerRuntimeStats stats) {
         brokerRuntimePutTps10.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPutTps().getTen());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPutTps().getTen());
         brokerRuntimePutTps60.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPutTps().getSixty());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPutTps().getSixty());
         brokerRuntimePutTps600.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getPutTps().getSixHundred());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getPutTps().getSixHundred());
         brokerRuntimeGetMissTps10.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetMissTps().getTen());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetMissTps().getTen());
         brokerRuntimeGetMissTps60.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetMissTps().getSixty());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetMissTps().getSixty());
         brokerRuntimeGetMissTps600.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetMissTps().getSixHundred());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetMissTps().getSixHundred());
         brokerRuntimeGetTransferedTps10.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetTransferedTps().getTen());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetTransferedTps().getTen());
         brokerRuntimeGetTransferedTps60.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetTransferedTps().getSixty());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetTransferedTps().getSixty());
         brokerRuntimeGetTransferedTps600.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetTransferedTps().getSixHundred());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetTransferedTps().getSixHundred());
         brokerRuntimeGetTotalTps10.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetTotalTps().getTen());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetTotalTps().getTen());
         brokerRuntimeGetTotalTps60.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetTotalTps().getSixty());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetTotalTps().getSixty());
         brokerRuntimeGetTotalTps600.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetTotalTps().getSixHundred());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetTotalTps().getSixHundred());
         brokerRuntimeGetFoundTps10.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetFoundTps().getTen());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetFoundTps().getTen());
         brokerRuntimeGetFoundTps60.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetFoundTps().getSixty());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetFoundTps().getSixty());
         brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
         brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
     }
 
     private void addCommitLogDirCapacity(String clusterName, String brokerAddress, String brokerHost, BrokerRuntimeStats stats) {
         brokerRuntimeCommitLogDirCapacityTotal.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getCommitLogDirCapacityTotal());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getCommitLogDirCapacityTotal());
         brokerRuntimeCommitLogDirCapacityFree.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                stats.getBrokerVersionDesc(),
-                stats.getBootTimestamp(),
-                stats.getBrokerVersion()), stats.getCommitLogDirCapacityFree());
+            clusterName,
+            brokerAddress, brokerHost,
+            stats.getBrokerVersionDesc(),
+            stats.getBootTimestamp(),
+            stats.getBrokerVersion()), stats.getCommitLogDirCapacityFree());
     }
 
     private void addBrokerRuntimePutMessageDistributeTimeMap(
-            String clusterName, String brokerAddress, String brokerHost,
-            String brokerDes, long bootTimestamp, int brokerVersion,
-            BrokerRuntimeStats stats) {
+        String clusterName, String brokerAddress, String brokerHost,
+        String brokerDes, long bootTimestamp, int brokerVersion,
+        BrokerRuntimeStats stats) {
         brokerRuntimePutMessageDistributeTimeMap0ms.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("<=0ms"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("<=0ms"));
         brokerRuntimePutMessageDistributeTimeMap0to10ms.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("0~10ms"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("0~10ms"));
         brokerRuntimePutMessageDistributeTimeMap10to50ms.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("10~50ms"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("10~50ms"));
         brokerRuntimePutMessageDistributeTimeMap50to100ms.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("50~100ms"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("50~100ms"));
         brokerRuntimePutMessageDistributeTimeMap100to200ms.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("100~200ms"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("100~200ms"));
         brokerRuntimePutMessageDistributeTimeMap200to500ms.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("200~500ms"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("200~500ms"));
         brokerRuntimePutMessageDistributeTimeMap500to1s.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("500ms~1s"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("500ms~1s"));
         brokerRuntimePutMessageDistributeTimeMap1to2s.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("1~2s"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("1~2s"));
         brokerRuntimePutMessageDistributeTimeMap2to3s.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("2~3s"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("2~3s"));
         brokerRuntimePutMessageDistributeTimeMap3to4s.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("3~4s"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("3~4s"));
         brokerRuntimePutMessageDistributeTimeMap4to5s.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("4~5s"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("4~5s"));
         brokerRuntimePutMessageDistributeTimeMap5to10s.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("5~10s"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("5~10s"));
         brokerRuntimePutMessageDistributeTimeMap10toMore.put(new BrokerRuntimeMetric(
-                clusterName,
-                brokerAddress, brokerHost,
-                brokerDes,
-                bootTimestamp,
-                brokerVersion), stats.getPutMessageDistributeTimeMap().get("10s~"));
+            clusterName,
+            brokerAddress, brokerHost,
+            brokerDes,
+            bootTimestamp,
+            brokerVersion), stats.getPutMessageDistributeTimeMap().get("10s~"));
     }
 
     private static <T extends Number> void loadBrokerRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<BrokerRuntimeMetric, T> entry) {
         family.addMetric(Arrays.asList(
-                entry.getKey().getClusterName(),
-                entry.getKey().getBrokerAddress(),
-                entry.getKey().getBrokerHost(),
-                entry.getKey().getBrokerDes(),
-                String.valueOf(entry.getKey().getBootTimestamp()),
-                String.valueOf(entry.getKey().getBrokerVersion())
+            entry.getKey().getClusterName(),
+            entry.getKey().getBrokerAddress(),
+            entry.getKey().getBrokerHost(),
+            entry.getKey().getBrokerDes(),
+            String.valueOf(entry.getKey().getBootTimestamp()),
+            String.valueOf(entry.getKey().getBrokerVersion())
         ), entry.getValue().doubleValue());
     }
 
@@ -1035,291 +1113,291 @@ public class RMQMetricsCollector extends Collector {
         collectBrokerRuntimeStatsPutMessageDistributeTime(mfs);
 
         GaugeMetricFamily brokerRuntimeMsgPutTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_put_total_today_now", "brokerRuntimeMsgPutTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayNow.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayNow.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayNowF, entry);
         }
         mfs.add(brokerRuntimeMsgPutTotalTodayNowF);
 
         GaugeMetricFamily brokerRuntimeMsgGetTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_today_now", "brokerRuntimeMsgGetTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayNow.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayNow.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayNowF, entry);
         }
         mfs.add(brokerRuntimeMsgGetTotalTodayNowF);
 
         GaugeMetricFamily brokerRuntimeDispatchBehindBytesF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_behind_bytes", "brokerRuntimeDispatchBehindBytes", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchBehindBytes.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchBehindBytes.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchBehindBytesF, entry);
         }
         mfs.add(brokerRuntimeDispatchBehindBytesF);
 
         GaugeMetricFamily brokerRuntimePutMessageSizeTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_size_total", "brokerRuntimePutMessageSizeTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageSizeTotal.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageSizeTotal.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageSizeTotalF, entry);
         }
         mfs.add(brokerRuntimePutMessageSizeTotalF);
 
         GaugeMetricFamily brokerRuntimePutMessageAverageSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_average_size", "brokerRuntimePutMessageAverageSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutMessageAverageSize.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutMessageAverageSize.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageAverageSizeF, entry);
         }
         mfs.add(brokerRuntimePutMessageAverageSizeF);
 
         GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpool_queue_capacity", "brokerRuntimeQueryThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueCapacity.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueCapacity.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueCapacityF, entry);
         }
         mfs.add(brokerRuntimeQueryThreadPoolQueueCapacityF);
 
         GaugeMetricFamily brokerRuntimeRemainTransientStoreBufferNumbsF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_transientstore_buffer_numbs", "brokerRuntimeRemainTransientStoreBufferNumbs", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeRemainTransientStoreBufferNumbs.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeRemainTransientStoreBufferNumbs.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeRemainTransientStoreBufferNumbsF, entry);
         }
         mfs.add(brokerRuntimeRemainTransientStoreBufferNumbsF);
 
         GaugeMetricFamily brokerRuntimeEarliestMessageTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_earliest_message_timestamp", "brokerRuntimeEarliestMessageTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeEarliestMessageTimeStamp.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeEarliestMessageTimeStamp.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeEarliestMessageTimeStampF, entry);
         }
         mfs.add(brokerRuntimeEarliestMessageTimeStampF);
 
         GaugeMetricFamily brokerRuntimePutMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_entire_time_max", "brokerRuntimePutMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageEntireTimeMax.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageEntireTimeMax.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageEntireTimeMaxF, entry);
         }
         mfs.add(brokerRuntimePutMessageEntireTimeMaxF);
 
         GaugeMetricFamily brokerRuntimeStartAcceptSendRequestTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_start_accept_sendrequest_time", "brokerRuntimeStartAcceptSendRequestTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeStartAcceptSendRequestTimeStamp.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeStartAcceptSendRequestTimeStamp.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeStartAcceptSendRequestTimeStampF, entry);
         }
         mfs.add(brokerRuntimeStartAcceptSendRequestTimeStampF);
 
         GaugeMetricFamily brokerRuntimeSendThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpool_queue_size", "brokerRuntimeSendThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueSize.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueSize.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueSizeF, entry);
         }
         mfs.add(brokerRuntimeSendThreadPoolQueueSizeF);
 
         GaugeMetricFamily brokerRuntimePutMessageTimesTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_times_total", "brokerRuntimePutMessageTimesTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageTimesTotal.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageTimesTotal.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageTimesTotalF, entry);
         }
         mfs.add(brokerRuntimePutMessageTimesTotalF);
 
         GaugeMetricFamily brokerRuntimeGetMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_getmessage_entire_time_max", "brokerRuntimeGetMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeGetMessageEntireTimeMax.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeGetMessageEntireTimeMax.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetMessageEntireTimeMaxF, entry);
         }
         mfs.add(brokerRuntimeGetMessageEntireTimeMaxF);
 
         GaugeMetricFamily brokerRuntimePageCacheLockTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pagecache_lock_time_mills", "brokerRuntimePageCacheLockTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePageCacheLockTimeMills.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePageCacheLockTimeMills.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePageCacheLockTimeMillsF, entry);
         }
         mfs.add(brokerRuntimePageCacheLockTimeMillsF);
 
         GaugeMetricFamily brokerRuntimeCommitLogDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_disk_ratio", "brokerRuntimeCommitLogDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDiskRatio.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDiskRatio.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDiskRatioF, entry);
         }
         mfs.add(brokerRuntimeCommitLogDiskRatioF);
 
         GaugeMetricFamily brokerRuntimeConsumeQueueDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_consumequeue_disk_ratio", "brokerRuntimeConsumeQueueDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeConsumeQueueDiskRatio.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeConsumeQueueDiskRatio.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeConsumeQueueDiskRatioF, entry);
         }
         mfs.add(brokerRuntimeConsumeQueueDiskRatioF);
 
         GaugeMetricFamily brokerRuntimeGetFoundTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps600", "brokerRuntimeGetFoundTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps600.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps600.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps600F, entry);
         }
         mfs.add(brokerRuntimeGetFoundTps600F);
 
         GaugeMetricFamily brokerRuntimeGetFoundTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps60", "brokerRuntimeGetFoundTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps60.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps60.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps60F, entry);
         }
         mfs.add(brokerRuntimeGetFoundTps60F);
 
         GaugeMetricFamily brokerRuntimeGetFoundTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps10", "brokerRuntimeGetFoundTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps10.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps10.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps10F, entry);
         }
         mfs.add(brokerRuntimeGetFoundTps10F);
 
         GaugeMetricFamily brokerRuntimeGetTotalTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps600", "brokerRuntimeGetTotalTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps600.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps600.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps600F, entry);
         }
         mfs.add(brokerRuntimeGetTotalTps600F);
 
         GaugeMetricFamily brokerRuntimeGetTotalTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps60", "brokerRuntimeGetTotalTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps60.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps60.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps60F, entry);
         }
         mfs.add(brokerRuntimeGetTotalTps60F);
 
         GaugeMetricFamily brokerRuntimeGetTotalTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps10", "brokerRuntimeGetTotalTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps10.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps10.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps10F, entry);
         }
         mfs.add(brokerRuntimeGetTotalTps10F);
 
         GaugeMetricFamily brokerRuntimeGetTransferedTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps600", "brokerRuntimeGetTransferedTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps600.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps600.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps600F, entry);
         }
         mfs.add(brokerRuntimeGetTransferedTps600F);
 
         GaugeMetricFamily brokerRuntimeGetTransferedTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps60", "brokerRuntimeGetTransferedTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps60.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps60.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps60F, entry);
         }
         mfs.add(brokerRuntimeGetTransferedTps60F);
 
         GaugeMetricFamily brokerRuntimeGetTransferedTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps10", "brokerRuntimeGetTransferedTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps10.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps10.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps10F, entry);
         }
         mfs.add(brokerRuntimeGetTransferedTps10F);
 
         GaugeMetricFamily brokerRuntimeGetMissTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps600", "brokerRuntimeGetMissTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps600.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps600.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps600F, entry);
         }
         mfs.add(brokerRuntimeGetMissTps600F);
 
         GaugeMetricFamily brokerRuntimeGetMissTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps60", "brokerRuntimeGetMissTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps60.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps60.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps60F, entry);
         }
         mfs.add(brokerRuntimeGetMissTps60F);
 
         GaugeMetricFamily brokerRuntimeGetMissTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps10", "brokerRuntimeGetMissTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps10.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps10.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps10F, entry);
         }
         mfs.add(brokerRuntimeGetMissTps10F);
 
         GaugeMetricFamily brokerRuntimePutTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps600", "brokerRuntimePutTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps600.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps600.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePutTps600F, entry);
         }
         mfs.add(brokerRuntimePutTps600F);
 
         GaugeMetricFamily brokerRuntimePutTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps60", "brokerRuntimePutTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps60.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps60.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePutTps60F, entry);
         }
         mfs.add(brokerRuntimePutTps60F);
 
         GaugeMetricFamily brokerRuntimePutTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps10", "brokerRuntimePutTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps10.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps10.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePutTps10F, entry);
         }
         mfs.add(brokerRuntimePutTps10F);
 
         GaugeMetricFamily brokerRuntimeDispatchMaxBufferF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_maxbuffer", "brokerRuntimeDispatchMaxBuffer", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchMaxBuffer.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchMaxBuffer.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchMaxBufferF, entry);
         }
         mfs.add(brokerRuntimeDispatchMaxBufferF);
 
         GaugeMetricFamily brokerRuntimePullThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_capacity", "brokerRuntimePullThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueCapacity.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueCapacity.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueCapacityF, entry);
         }
         mfs.add(brokerRuntimePullThreadPoolQueueCapacityF);
 
         GaugeMetricFamily brokerRuntimeSendThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_capacity", "brokerRuntimeSendThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueCapacity.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueCapacity.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueCapacityF, entry);
         }
         mfs.add(brokerRuntimeSendThreadPoolQueueCapacityF);
 
         GaugeMetricFamily brokerRuntimePullThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_size", "brokerRuntimePullThreadPoolQueueSizeF", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueSize.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueSize.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueSizeF, entry);
         }
         mfs.add(brokerRuntimePullThreadPoolQueueSizeF);
 
         GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_size", "brokerRuntimeQueryThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueSize.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueSize.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueSizeF, entry);
         }
         mfs.add(brokerRuntimeQueryThreadPoolQueueSizeF);
 
         GaugeMetricFamily brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills", "brokerRuntimePullThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF, entry);
         }
         mfs.add(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF);
 
         GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills", "brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF, entry);
         }
         mfs.add(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF);
 
         GaugeMetricFamily brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills", "brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF, entry);
         }
         mfs.add(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF);
 
         GaugeMetricFamily brokerRuntimeMsgGetTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_yesterdaymorning", "brokerRuntimeMsgGetTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalYesterdayMorning.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalYesterdayMorning.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalYesterdayMorningF, entry);
         }
         mfs.add(brokerRuntimeMsgGetTotalYesterdayMorningF);
 
         GaugeMetricFamily brokerRuntimeMsgPutTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_yesterdaymorning", "brokerRuntimeMsgPutTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalYesterdayMorning.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalYesterdayMorning.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalYesterdayMorningF, entry);
         }
         mfs.add(brokerRuntimeMsgPutTotalYesterdayMorningF);
 
         GaugeMetricFamily brokerRuntimeMsgGetTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_todaymorning", "brokerRuntimeMsgGetTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayMorning.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayMorning.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayMorningF, entry);
         }
         mfs.add(brokerRuntimeMsgGetTotalTodayMorningF);
 
         GaugeMetricFamily brokerRuntimeMsgPutTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_todaymorning", "brokerRuntimeMsgPutTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayMorning.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayMorning.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayMorningF, entry);
         }
         mfs.add(brokerRuntimeMsgPutTotalTodayMorningF);
 
         GaugeMetricFamily brokerRuntimeCommitLogDirCapacityFreeF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_free", "brokerRuntimeCommitLogDirCapacityFree", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityFree.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityFree.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityFreeF, entry);
         }
         mfs.add(brokerRuntimeCommitLogDirCapacityFreeF);
 
         GaugeMetricFamily brokerRuntimeCommitLogDirCapacityTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_total", "brokerRuntimeCommitLogDirCapacityTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityTotal.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityTotal.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityTotalF, entry);
         }
         mfs.add(brokerRuntimeCommitLogDirCapacityTotalF);
 
         GaugeMetricFamily brokerRuntimeCommitLogMaxOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_maxoffset", "brokerRuntimeCommitLogMaxOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMaxOffset.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMaxOffset.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMaxOffsetF, entry);
         }
         mfs.add(brokerRuntimeCommitLogMaxOffsetF);
 
         GaugeMetricFamily brokerRuntimeCommitLogMinOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_minoffset", "brokerRuntimeCommitLogMinOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMinOffset.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMinOffset.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMinOffsetF, entry);
         }
         mfs.add(brokerRuntimeCommitLogMinOffsetF);
 
         GaugeMetricFamily brokerRuntimeRemainHowManyDataToFlushF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_howmanydata_toflush", "brokerRuntimeRemainHowManyDataToFlush", BROKER_RUNTIME_METRIC_LABEL_NAMES);
-        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeRemainHowManyDataToFlush.entrySet()) {
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeRemainHowManyDataToFlush.asMap().entrySet()) {
             loadBrokerRuntimeStatsMetric(brokerRuntimeRemainHowManyDataToFlushF, entry);
         }
         mfs.add(brokerRuntimeRemainHowManyDataToFlushF);
     }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
index d42f1a8..3689fc0 100644
--- a/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
+++ b/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
@@ -54,6 +54,8 @@ public class RMQConfigure {
 
     private String secretKey;
 
+    private long outOfTimeSeconds;
+
     public boolean enableACL() {
         return this.enableACL;
     }
@@ -133,4 +135,12 @@ public class RMQConfigure {
     public void setSecretKey(String secretKey) {
         this.secretKey = secretKey;
     }
+
+    public long getOutOfTimeSeconds() {
+        return outOfTimeSeconds;
+    }
+
+    public void setOutOfTimeSeconds(long outOfTimeSeconds) {
+        this.outOfTimeSeconds = outOfTimeSeconds;
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
index b38b921..e0088d2 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.exporter.service.impl;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
+import org.apache.rocketmq.exporter.config.RMQConfigure;
 import org.apache.rocketmq.exporter.service.RMQMetricsService;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.io.IOException;
@@ -31,6 +33,9 @@ import java.util.Iterator;
 
 @Service
 public class RMQMetricsServiceImpl implements RMQMetricsService {
+    @Autowired
+    private RMQConfigure configure;
+
     private CollectorRegistry registry = new CollectorRegistry();
     private final RMQMetricsCollector rmqMetricsCollector;
 
@@ -38,8 +43,9 @@ public class RMQMetricsServiceImpl implements RMQMetricsService {
         return rmqMetricsCollector;
     }
 
-    public RMQMetricsServiceImpl() {
-        rmqMetricsCollector = new RMQMetricsCollector();
+    public RMQMetricsServiceImpl(RMQConfigure configure) {
+        this.configure = configure;
+        rmqMetricsCollector = new RMQMetricsCollector(configure.getOutOfTimeSeconds());
         rmqMetricsCollector.register(registry);
     }
 
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index 35586df..19be6f4 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -546,7 +546,6 @@ public class MetricsCollectTask {
             return;
         }
 
-        Set<BrokerMetric> masterMetricsSet = new HashSet<>();
         Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
         for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
             String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
@@ -577,11 +576,8 @@ public class MetricsCollectTask {
             } catch (Exception ex) {
                 log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
             }
-            masterMetricsSet.add(new BrokerMetric(clusterName, brokerIP, brokerName));
         }
 
-        metricsService.getCollector().clearBrokerPutNumsMetric(masterMetricsSet);
-        metricsService.getCollector().clearBrokerGetNumsMetric(masterMetricsSet);
         log.info("broker stats collection task finished...." + (System.currentTimeMillis() - start));
     }
 
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index dd85a06..86954a4 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -20,6 +20,7 @@ rocketmq:
     enableACL: false # if >=4.4.0
     accessKey:  # if >=4.4.0
     secretKey:  # if >=4.4.0
+    outOfTimeSeconds:  # Cache clear time with no update
 
 threadpool:
   collect-client-metric-executor:

Mime
View raw message