rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wlliqip...@apache.org
Subject [rocketmq-exporter] 42/43: add client consume runtime info
Date Sun, 08 Mar 2020 13:17:02 GMT
This is an automated email from the ASF dual-hosted git repository.

wlliqipeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git

commit fd564d98c0e67bdf62adb3f55412585ec471fea4
Author: liwei5 <liwei5@vipkid.com.cn>
AuthorDate: Thu Feb 27 22:07:06 2020 +0800

    add client consume runtime info
---
 .../exporter/collector/RMQMetricsCollector.java    | 131 ++++++++++++++++++---
 .../config/CollectClientMetricExecutorConfig.java  |  46 ++++++++
 .../exporter/model/BrokerRuntimeStats.java         |  35 ------
 .../rocketmq/exporter/model/common/TwoTuple.java   |  19 +++
 .../model/metrics/ConsumerCountMetric.java         |  28 ++---
 .../model/metrics/ConsumerTopicDiffMetric.java     |  12 +-
 .../ConsumerRuntimeConsumeFailedMsgsMetric.java    |  73 ++++++++++++
 .../ConsumerRuntimeConsumeFailedTPSMetric.java     |  21 ++++
 .../ConsumerRuntimeConsumeOKTPSMetric.java         |  21 ++++
 .../ConsumerRuntimeConsumeRTMetric.java            |  21 ++++
 .../clientrunime/ConsumerRuntimePullRTMetric.java  |  21 ++++
 .../clientrunime/ConsumerRuntimePullTPSMetric.java |  21 ++++
 .../service/impl/RMQMetricsServiceImpl.java        |  74 +++++++++++-
 ...ientMetricCollectorFixedThreadPoolExecutor.java |  24 ++++
 .../exporter/task/ClientMetricTaskRunnable.java    | 124 +++++++++++++++++++
 .../rocketmq/exporter/task/MetricsCollectTask.java |  87 +++++++++++++-
 src/main/resources/application.yml                 |  10 +-
 src/main/resources/logback.xml                     |  15 ++-
 18 files changed, 705 insertions(+), 78 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index 298b2fe..d0efc73 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -28,6 +28,14 @@ import org.apache.rocketmq.exporter.model.metrics.DLQTopicOffsetMetric;
 import org.apache.rocketmq.exporter.model.metrics.ProducerMetric;
 import org.apache.rocketmq.exporter.model.metrics.TopicPutNumMetric;
 import org.apache.rocketmq.exporter.model.metrics.brokerruntime.BrokerRuntimeMetric;
+import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeFailedMsgsMetric;
+import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeFailedTPSMetric;
+import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeOKTPSMetric;
+import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeRTMetric;
+import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimePullRTMetric;
+import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimePullTPSMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,6 +65,19 @@ public class RMQMetricsCollector extends Collector {
     //consumer count
     private ConcurrentHashMap<ConsumerCountMetric, Integer> consumerCounts = new ConcurrentHashMap<>();
 
+     //count of consume fail
+     private ConcurrentHashMap<ConsumerRuntimeConsumeFailedMsgsMetric, Long> consumerClientFailedMsgCounts = new ConcurrentHashMap<>();
+     //TPS of consume fail 
+     private ConcurrentHashMap<ConsumerRuntimeConsumeFailedTPSMetric, Double> consumerClientFailedTPS = new ConcurrentHashMap<>();
+     //TPS of consume success
+     private ConcurrentHashMap<ConsumerRuntimeConsumeOKTPSMetric, Double> consumerClientOKTPS = new ConcurrentHashMap<>();
+     //rt of consume
+     private ConcurrentHashMap<ConsumerRuntimeConsumeRTMetric, Double> consumerClientRT = new ConcurrentHashMap<>();
+     //pull RT
+     private ConcurrentHashMap<ConsumerRuntimePullRTMetric, Double> consumerClientPullRT = new ConcurrentHashMap<>();
+     //pull tps
+     private ConcurrentHashMap<ConsumerRuntimePullTPSMetric, Double> consumerClientPullTPS = new ConcurrentHashMap<>();
+ 
     //broker offset for consumer-topic
     private ConcurrentHashMap<ConsumerMetric, Long> groupBrokerTotalOffset = new ConcurrentHashMap<>();
     //consumer offset for consumer-topic
@@ -139,27 +160,28 @@ public class RMQMetricsCollector extends Collector {
     private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
     private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
     private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
-
     private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityFree = new ConcurrentHashMap<>();
     private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityTotal = new ConcurrentHashMap<>();
 
     private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMaxOffset = new ConcurrentHashMap<>();
     private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMinOffset = new ConcurrentHashMap<>();
     private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeRemainHowManyDataToFlush = new ConcurrentHashMap<>();
+    private final static Logger log = LoggerFactory.getLogger(RMQMetricsCollector.class);
 
-    private static List<String> GROUP_DIFF_LABEL_NAMES = Arrays.asList("group", "topic", "countOfOnlineConsumers");
+    private static List<String> GROUP_DIFF_LABEL_NAMES = Arrays.asList("group", "topic", "countOfOnlineConsumers", "msgModel");
 
     private static <T extends Number> void loadGroupDiffMetric(GaugeMetricFamily family, Map.Entry<ConsumerTopicDiffMetric, T> entry) {
         family.addMetric(
                 Arrays.asList(
                         entry.getKey().getGroup(),
                         entry.getKey().getTopic(),
-                        entry.getKey().getCountOfOnlineConsumers()
-                ),
+                        entry.getKey().getCountOfOnlineConsumers(),
+                        entry.getKey().getMsgModel()                
+                        ),
                 entry.getValue().doubleValue());
     }
 
-    private static List<String> GROUP_COUNT_LABEL_NAMES = Arrays.asList("group", "caddr", "localaddr");
+    private static List<String> GROUP_COUNT_LABEL_NAMES = Arrays.asList("caddr", "localaddr", "group");
 
     private void collectConsumerMetric(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily groupGetLatencyByConsumerDiff = new GaugeMetricFamily("rocketmq_group_diff", "GroupDiff", GROUP_DIFF_LABEL_NAMES);
@@ -184,9 +206,9 @@ public class RMQMetricsCollector extends Collector {
         for (Map.Entry<ConsumerCountMetric, Integer> entry : consumerCounts.entrySet()) {
             consumerCountsF.addMetric(
                     Arrays.asList(
-                            entry.getKey().getGroup(),
-                            entry.getKey().getCaddr(),
-                            entry.getKey().getLocaladdr()
+                        entry.getKey().getCaddrs(),
+                        entry.getKey().getLocaladdrs(),
+                        entry.getKey().getGroup()
                     ),
                     entry.getValue().doubleValue());
         }
@@ -255,6 +277,8 @@ public class RMQMetricsCollector extends Collector {
 
         collectGroupNums(mfs);
 
+        collectClientGroupMetric(mfs);
+
         collectBrokerNums(mfs);
 
         collectBrokerRuntimeStats(mfs);
@@ -262,6 +286,58 @@ public class RMQMetricsCollector extends Collector {
         return mfs;
     }
 
+    private static List<String> GROUP_CLIENT_METRIC_LABEL_NAMES = Arrays.asList(
+            "clientAddr", "clientId", "group", "topic"
+    );
+
+    private void collectClientGroupMetric(List<MetricFamilySamples> mfs) {
+        GaugeMetricFamily consumerClientFailedMsgCountsF = new GaugeMetricFamily("rocketmq_client_consume_fail_msg_count", "consumerClientFailedMsgCounts", GROUP_CLIENT_METRIC_LABEL_NAMES);
+        for (Map.Entry<ConsumerRuntimeConsumeFailedMsgsMetric, Long> entry : consumerClientFailedMsgCounts.entrySet()) {
+            loadClientRuntimeStatsMetric(consumerClientFailedMsgCountsF, entry);
+        }
+        mfs.add(consumerClientFailedMsgCountsF);
+
+        GaugeMetricFamily consumerClientFailedTPSF = new GaugeMetricFamily("rocketmq_client_consume_fail_msg_tps", "consumerClientFailedTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
+        for (Map.Entry<ConsumerRuntimeConsumeFailedTPSMetric, Double> entry : consumerClientFailedTPS.entrySet()) {
+            loadClientRuntimeStatsMetric(consumerClientFailedTPSF, entry);
+        }
+        mfs.add(consumerClientFailedTPSF);
+
+        GaugeMetricFamily consumerClientOKTPSF = new GaugeMetricFamily("rocketmq_client_consume_ok_msg_tps", "consumerClientOKTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
+        for (Map.Entry<ConsumerRuntimeConsumeOKTPSMetric, Double> entry : consumerClientOKTPS.entrySet()) {
+            loadClientRuntimeStatsMetric(consumerClientOKTPSF, entry);
+        }
+        mfs.add(consumerClientOKTPSF);
+
+        GaugeMetricFamily consumerClientRTF = new GaugeMetricFamily("rocketmq_client_consume_rt", "consumerClientRT", GROUP_CLIENT_METRIC_LABEL_NAMES);
+        for (Map.Entry<ConsumerRuntimeConsumeRTMetric, Double> entry : consumerClientRT.entrySet()) {
+            loadClientRuntimeStatsMetric(consumerClientRTF, entry);
+        }
+        mfs.add(consumerClientRTF);
+
+        GaugeMetricFamily consumerClientPullRTF = new GaugeMetricFamily("rocketmq_client_consumer_pull_rt", "consumerClientPullRT", GROUP_CLIENT_METRIC_LABEL_NAMES);
+        for (Map.Entry<ConsumerRuntimePullRTMetric, Double> entry : consumerClientPullRT.entrySet()) {
+            loadClientRuntimeStatsMetric(consumerClientPullRTF, entry);
+        }
+        mfs.add(consumerClientPullRTF);
+
+        GaugeMetricFamily consumerClientPullTPSF = new GaugeMetricFamily("rocketmq_client_consumer_pull_tps", "consumerClientPullTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
+        for (Map.Entry<ConsumerRuntimePullTPSMetric, Double> entry : consumerClientPullTPS.entrySet()) {
+            loadClientRuntimeStatsMetric(consumerClientPullTPSF, entry);
+        }
+        mfs.add(consumerClientPullTPSF);
+    }
+
+
+    private <T2 extends Number, T1 extends ConsumerRuntimeConsumeFailedMsgsMetric> void loadClientRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<T1, T2> entry) {
+        family.addMetric(Arrays.asList(
+                entry.getKey().getCaddrs(),
+                entry.getKey().getLocaladdrs(),
+                entry.getKey().getGroup(),
+                entry.getKey().getTopic()
+        ), entry.getValue().doubleValue());
+    }
+
     private static List<String> GROUP_PULL_LATENCY_LABEL_NAMES = Arrays.asList(
             "cluster", "broker", "topic", "group", "queueid"
     );
@@ -407,7 +483,7 @@ public class RMQMetricsCollector extends Collector {
         mfs.add(groupBrokerTotalOffsetF);
 
         GaugeMetricFamily groupConsumeTotalOffsetF = new GaugeMetricFamily("rocketmq_group_consume_total_offset", "GroupConsumeTotalOffset", GROUP_NUMS_LABEL_NAMES);
-        for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.entrySet()) {
+        for (Map.Entry<ConsumerMetric, Long> entry : groupConsumeTotalOffset.entrySet()) {
             loadGroupNumsMetric(groupConsumeTotalOffsetF, entry);
         }
         mfs.add(groupConsumeTotalOffsetF);
@@ -462,17 +538,17 @@ public class RMQMetricsCollector extends Collector {
         }
     }
 
-    public void addGroupCountMetric(String group, String caddr, String localaddr, int count) {
-        this.consumerCounts.put(new ConsumerCountMetric(group, caddr, localaddr), count);
+    public void addGroupCountMetric(String group, String caddrs, String localaddrs, int count) {
+        this.consumerCounts.put(new ConsumerCountMetric(group, caddrs, localaddrs), count);
     }
 
-    public void addGroupDiffMetric(String countOfOnlineConsumers, String group, String topic, long value) {
+    public void addGroupDiffMetric(String countOfOnlineConsumers, String group, String topic, String msgModel, long value) {
         if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-            this.consumerRetryDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+            this.consumerRetryDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers, msgModel), value);
         } else if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
-            this.consumerDLQDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+            this.consumerDLQDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers, msgModel), value);
         } else {
-            this.consumerDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+            this.consumerDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers, msgModel), value);
         }
     }
 
@@ -506,6 +582,31 @@ public class RMQMetricsCollector extends Collector {
         groupGetSize.put(new ConsumerMetric(topic, group), value);
     }
 
+    public void addConsumerClientFailedMsgCountsMetric(String group, String topic, String clientAddr, String clientId, long value) {
+        consumerClientFailedMsgCounts.put(new ConsumerRuntimeConsumeFailedMsgsMetric(group, topic, clientAddr, clientId), value);
+    }
+
+    public void addConsumerClientFailedTPSMetric(String group, String topic, String clientAddr, String clientId, double value) {
+        consumerClientFailedTPS.put(new ConsumerRuntimeConsumeFailedTPSMetric(group, topic, clientAddr, clientId), value);
+    }
+
+    public void addConsumerClientOKTPSMetric(String group, String topic, String clientAddr, String clientId, double value) {
+        consumerClientOKTPS.put(new ConsumerRuntimeConsumeOKTPSMetric(group, topic, clientAddr, clientId), value);
+    }
+
+    public void addConsumeRTMetricMetric(String group, String topic, String clientAddr, String clientId, double value) {
+        consumerClientRT.put(new ConsumerRuntimeConsumeRTMetric(group, topic, clientAddr, clientId), value);
+    }
+
+    public void addPullRTMetric(String group, String topic, String clientAddr, String clientId, double value) {
+        consumerClientPullRT.put(new ConsumerRuntimePullRTMetric(group, topic, clientAddr, clientId), value);
+    }
+
+    public void addPullTPSMetric(String group, String topic, String clientAddr, String clientId, double value) {
+        consumerClientPullTPS.put(new ConsumerRuntimePullTPSMetric(group, topic, clientAddr, clientId), value);
+    }
+
+
     public void addSendBackNumsMetric(String topic, String group, double value) {
         sendBackNums.put(new ConsumerMetric(topic, group), value);
     }
diff --git a/src/main/java/org/apache/rocketmq/exporter/config/CollectClientMetricExecutorConfig.java b/src/main/java/org/apache/rocketmq/exporter/config/CollectClientMetricExecutorConfig.java
new file mode 100644
index 0000000..da107ec
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/config/CollectClientMetricExecutorConfig.java
@@ -0,0 +1,46 @@
+package org.apache.rocketmq.exporter.config;
+
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConfigurationProperties(prefix = "threadpool.collect-client-metric-executor")
+public class CollectClientMetricExecutorConfig {
+    private int corePoolSize = 20;
+    private int maximumPoolSize = 20;
+    private long keepAliveTime = 4000L;
+    private int queueSize = 1000;
+
+    public int getCorePoolSize() {
+        return corePoolSize;
+    }
+
+    public void setCorePoolSize(int corePoolSize) {
+        this.corePoolSize = corePoolSize;
+    }
+
+    public int getMaximumPoolSize() {
+        return maximumPoolSize;
+    }
+
+    public void setMaximumPoolSize(int maximumPoolSize) {
+        this.maximumPoolSize = maximumPoolSize;
+    }
+
+    public long getKeepAliveTime() {
+        return keepAliveTime;
+    }
+
+    public void setKeepAliveTime(long keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public int getQueueSize() {
+        return queueSize;
+    }
+
+    public void setQueueSize(int queueSize) {
+        this.queueSize = queueSize;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
index b6fe3dc..0b187a1 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
@@ -8,61 +8,29 @@ import java.util.List;
 import java.util.Map;
 
 public class BrokerRuntimeStats {
-    //今天生产的消息总量
     private long msgPutTotalTodayNow;
-    //今天消费的消息总量
     private long msgGetTotalTodayNow;
-
-    //今天早上生产消息总量
     private long msgPutTotalTodayMorning;
-    //今天早上消费消息总量
     private long msgGetTotalTodayMorning;
-
-    //昨天早上生产的消息总量
     private long msgPutTotalYesterdayMorning;
-    //昨天早上消费的消息总量
     private long msgGetTotalYesterdayMorning;
-
-    //延迟消息位点
     private List<ScheduleMessageOffsetTable> scheduleMessageOffsetTables = new ArrayList<>();
-
-    //发送线程最大等待时间
     private long sendThreadPoolQueueHeadWaitTimeMills;
-    //拉取消息线程最大等待时间
     private long queryThreadPoolQueueHeadWaitTimeMills;
-    //拉取线程最大等待时间
     private long pullThreadPoolQueueHeadWaitTimeMills;
-
-    //查询线程任务个数
     private long queryThreadPoolQueueSize;
-    //拉取线程任务个数
     private long pullThreadPoolQueueSize;
-    //发送线程等待队列长度
     private long sendThreadPoolQueueCapacity;
-    //拉取线程等待队列长度
     private long pullThreadPoolQueueCapacity;
-
-    //刷pagecache时间统计
     private Map<String, Integer> putMessageDistributeTimeMap = new HashMap<>();
-    //还有多少字节的数据没有刷盘
     private double remainHowManyDataToFlush;
-
-    //commitlog 最小位点
     private long commitLogMinOffset;
-    //commitlog 最大位点
     private long commitLogMaxOffset;
-
-    //broker运行时间描述
     private String runtime;
-    //broker 启动时间
     private long bootTimestamp;
-    //broker 磁盘总量
     private double commitLogDirCapacityTotal;
-    //broker 磁盘剩余
     private double commitLogDirCapacityFree;
-    //broker 版本号
     private int brokerVersion;
-    //
     private long dispatchMaxBuffer;
 
     private PutTps putTps = new PutTps();
@@ -74,7 +42,6 @@ public class BrokerRuntimeStats {
     private double consumeQueueDiskRatio;
     private double commitLogDiskRatio;
 
-    //page cache锁定时间
     private long pageCacheLockTimeMills;
 
     private long getMessageEntireTimeMax;
@@ -89,9 +56,7 @@ public class BrokerRuntimeStats {
 
     private long remainTransientStoreBufferNumbs;
     private long queryThreadPoolQueueCapacity;
-    //发送消息平均体积大小
     private double putMessageAverageSize;
-    //全部发送消息数
     private long putMessageSizeTotal;
     private long dispatchBehindBytes;
 
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/common/TwoTuple.java b/src/main/java/org/apache/rocketmq/exporter/model/common/TwoTuple.java
new file mode 100644
index 0000000..80d88aa
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/common/TwoTuple.java
@@ -0,0 +1,19 @@
+package org.apache.rocketmq.exporter.model.common;
+
+public class TwoTuple<T1, T2> {
+    private T1 first;
+    private T2 second;
+
+    public TwoTuple(T1 first, T2 second) {
+        this.first = first;
+        this.second = second;
+    }
+
+    public T1 getFirst() {
+        return first;
+    }
+
+    public T2 getSecond() {
+        return second;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java
index 67d9f20..657ca70 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java
@@ -1,30 +1,30 @@
 package org.apache.rocketmq.exporter.model.metrics;
 
 public class ConsumerCountMetric {
-    private String caddr;
-    private String localaddr;
+    private String caddrs;
+    private String localaddrs;
     private String group;
 
-    public ConsumerCountMetric(String group, String caddr, String localaddr) {
+    public ConsumerCountMetric(String group, String caddrs, String localaddrs) {
+        this.caddrs = caddrs;
+        this.localaddrs = localaddrs;
         this.group = group;
-        this.caddr = caddr;
-        this.localaddr = localaddr;
     }
 
-    public String getCaddr() {
-        return caddr;
+    public String getCaddrs() {
+        return caddrs;
     }
 
-    public void setCaddr(String caddr) {
-        this.caddr = caddr;
+    public void setCaddrs(String caddrs) {
+        this.caddrs = caddrs;
     }
 
-    public String getLocaladdr() {
-        return localaddr;
+    public String getLocaladdrs() {
+        return localaddrs;
     }
 
-    public void setLocaladdr(String localaddr) {
-        this.localaddr = localaddr;
+    public void setLocaladdrs(String localaddrs) {
+        this.localaddrs = localaddrs;
     }
 
     public String getGroup() {
@@ -53,6 +53,6 @@ public class ConsumerCountMetric {
 
     @Override
     public String toString() {
-        return "group: " + group + " caddr: " + caddr + " localaddr: " + localaddr;
+        return "group: " + group + " caddr: " + caddrs + " localaddr: " + localaddrs;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java
index 84b15c1..2d78b03 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java
@@ -1,15 +1,17 @@
 package org.apache.rocketmq.exporter.model.metrics;
 
 public class ConsumerTopicDiffMetric {
-    public ConsumerTopicDiffMetric(String group, String topic, String countOfOnlineConsumers) {
+    public ConsumerTopicDiffMetric(String group, String topic, String countOfOnlineConsumers, String msgModel) {
         this.group = group;
         this.topic = topic;
         this.countOfOnlineConsumers = countOfOnlineConsumers;
+        this.msgModel = msgModel;
     }
 
     private String group;
     private String topic;
     private String countOfOnlineConsumers;
+    private String msgModel;//0:broadcast, 1:cluster
 
     public String getGroup() {
         return group;
@@ -35,6 +37,14 @@ public class ConsumerTopicDiffMetric {
         this.countOfOnlineConsumers = countOfOnlineConsumers;
     }
 
+    public String getMsgModel() {
+        return msgModel;
+    }
+
+    public void setMsgModel(String msgModel) {
+        this.msgModel = msgModel;
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (!(obj instanceof ConsumerTopicDiffMetric)) {
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeFailedMsgsMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeFailedMsgsMetric.java
new file mode 100644
index 0000000..791ff11
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeFailedMsgsMetric.java
@@ -0,0 +1,73 @@
+package org.apache.rocketmq.exporter.model.metrics.clientrunime;
+
+public class ConsumerRuntimeConsumeFailedMsgsMetric {
+    private String group;
+    private String topic;
+    private String caddrs;
+    private String localaddrs;
+
+    public ConsumerRuntimeConsumeFailedMsgsMetric(String group, String topic, String caddrs, String localaddrs) {
+        this.group = group;
+        this.topic = topic;
+        this.caddrs = caddrs;
+        this.localaddrs = localaddrs;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getCaddrs() {
+        return caddrs;
+    }
+
+    public void setCaddrs(String caddrs) {
+        this.caddrs = caddrs;
+    }
+
+    public String getLocaladdrs() {
+        return localaddrs;
+    }
+
+    public void setLocaladdrs(String localaddrs) {
+        this.localaddrs = localaddrs;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ConsumerRuntimeConsumeFailedMsgsMetric)) {
+            return false;
+        }
+        ConsumerRuntimeConsumeFailedMsgsMetric other = (ConsumerRuntimeConsumeFailedMsgsMetric) obj;
+
+        return other.group.equals(group) &&
+                other.topic.equals(topic) &&
+                other.getCaddrs().equalsIgnoreCase(caddrs);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + group.hashCode();
+        hash = 37 * hash + topic.hashCode();
+        hash = 37 * hash + caddrs.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "group: " + group + " topic: " + topic + " caddrs: " + caddrs + " localaddrs: " + localaddrs;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeFailedTPSMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeFailedTPSMetric.java
new file mode 100644
index 0000000..d9786be
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeFailedTPSMetric.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.exporter.model.metrics.clientrunime;
+
+public class ConsumerRuntimeConsumeFailedTPSMetric extends ConsumerRuntimeConsumeFailedMsgsMetric {
+    public ConsumerRuntimeConsumeFailedTPSMetric(String group, String topic, String caddrs, String localaddrs) {
+        super(group, topic, caddrs, localaddrs);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + this.getGroup().hashCode();
+        hash = 37 * hash + this.getTopic().hashCode();
+        hash = 37 * hash + this.getCaddrs().hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "group: " + this.getGroup() + " topic: " + this.getTopic() + " caddrs: " + this.getCaddrs() + " localaddrs: " + this.getLocaladdrs();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeOKTPSMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeOKTPSMetric.java
new file mode 100644
index 0000000..5eb9e50
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeOKTPSMetric.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.exporter.model.metrics.clientrunime;
+
+public class ConsumerRuntimeConsumeOKTPSMetric extends ConsumerRuntimeConsumeFailedMsgsMetric {
+    public ConsumerRuntimeConsumeOKTPSMetric(String group, String topic, String caddrs, String localaddrs) {
+        super(group, topic, caddrs, localaddrs);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + this.getGroup().hashCode();
+        hash = 37 * hash + this.getTopic().hashCode();
+        hash = 37 * hash + this.getCaddrs().hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "group: " + this.getGroup() + " topic: " + this.getTopic() + " caddrs: " + this.getCaddrs() + " localaddrs: " + this.getLocaladdrs();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeRTMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeRTMetric.java
new file mode 100644
index 0000000..80ccf56
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimeConsumeRTMetric.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.exporter.model.metrics.clientrunime;
+
+public class ConsumerRuntimeConsumeRTMetric extends ConsumerRuntimeConsumeFailedMsgsMetric {
+    public ConsumerRuntimeConsumeRTMetric(String group, String topic, String caddrs, String localaddrs) {
+        super(group, topic, caddrs, localaddrs);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + this.getGroup().hashCode();
+        hash = 37 * hash + this.getTopic().hashCode();
+        hash = 37 * hash + this.getCaddrs().hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "group: " + this.getGroup() + " topic: " + this.getTopic() + " caddrs: " + this.getCaddrs() + " localaddrs: " + this.getLocaladdrs();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimePullRTMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimePullRTMetric.java
new file mode 100644
index 0000000..8efcce5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimePullRTMetric.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.exporter.model.metrics.clientrunime;
+
+public class ConsumerRuntimePullRTMetric extends ConsumerRuntimeConsumeFailedMsgsMetric {
+    public ConsumerRuntimePullRTMetric(String group, String topic, String caddrs, String localaddrs) {
+        super(group, topic, caddrs, localaddrs);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + this.getGroup().hashCode();
+        hash = 37 * hash + this.getTopic().hashCode();
+        hash = 37 * hash + this.getCaddrs().hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "group: " + this.getGroup() + " topic: " + this.getTopic() + " caddrs: " + this.getCaddrs() + " localaddrs: " + this.getLocaladdrs();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimePullTPSMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimePullTPSMetric.java
new file mode 100644
index 0000000..e56f6a5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/clientrunime/ConsumerRuntimePullTPSMetric.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.exporter.model.metrics.clientrunime;
+
+public class ConsumerRuntimePullTPSMetric extends ConsumerRuntimeConsumeFailedMsgsMetric {
+    public ConsumerRuntimePullTPSMetric(String group, String topic, String caddrs, String localaddrs) {
+        super(group, topic, caddrs, localaddrs);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + this.getGroup().hashCode();
+        hash = 37 * hash + this.getTopic().hashCode();
+        hash = 37 * hash + this.getCaddrs().hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "group: " + this.getGroup() + " topic: " + this.getTopic() + " caddrs: " + this.getCaddrs() + " localaddrs: " + this.getLocaladdrs();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
index 6cf977f..b38b921 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
@@ -16,14 +16,18 @@
  */
 package org.apache.rocketmq.exporter.service.impl;
 
+import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
 import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
 import org.apache.rocketmq.exporter.service.RMQMetricsService;
 import org.springframework.stereotype.Service;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Enumeration;
+import java.util.Iterator;
+
 
 @Service
 public class RMQMetricsServiceImpl implements RMQMetricsService {
@@ -40,6 +44,72 @@ public class RMQMetricsServiceImpl implements RMQMetricsService {
     }
 
     public void metrics(StringWriter writer) throws IOException {
-        TextFormat.write004(writer, registry.metricFamilySamples());
+        this.writeEscapedHelp(writer, registry.metricFamilySamples());
+    }
+
+    public void writeEscapedHelp(Writer writer, Enumeration<Collector.MetricFamilySamples> mfs) throws IOException {
+        while (mfs.hasMoreElements()) {
+            Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
+            for (Iterator var3 = metricFamilySamples.samples.iterator(); var3.hasNext(); writer.write(10)) {
+                Collector.MetricFamilySamples.Sample sample = (Collector.MetricFamilySamples.Sample) var3.next();
+                writer.write(sample.name);
+                if (sample.labelNames.size() > 0) {
+                    writer.write(123);
+
+                    for (int i = 0; i < sample.labelNames.size(); ++i) {
+                        writer.write((String) sample.labelNames.get(i));
+                        writer.write("=\"");
+                        writeEscapedLabelValue(writer, (String) sample.labelValues.get(i));
+                        writer.write("\",");
+                    }
+
+                    writer.write(125);
+                }
+
+                writer.write(32);
+                writer.write(Collector.doubleToGoString(sample.value));
+                if (sample.timestampMs != null) {
+                    writer.write(32);
+                    writer.write(sample.timestampMs.toString());
+                }
+            }
+        }
+
     }
+
+    private static void writeEscapedLabelValue(Writer writer, String s) throws IOException {
+        for (int i = 0; i < s.length(); ++i) {
+            char c = s.charAt(i);
+            switch (c) {
+                case '\n':
+                    writer.append("\\n");
+                    break;
+                case '"':
+                    writer.append("\\\"");
+                    break;
+                case '\\':
+                    writer.append("\\\\");
+                    break;
+                default:
+                    writer.append(c);
+            }
+        }
+
+    }
+
+    private static String typeString(Collector.Type t) {
+        switch (t) {
+            case GAUGE:
+                return "gauge";
+            case COUNTER:
+                return "counter";
+            case SUMMARY:
+                return "summary";
+            case HISTOGRAM:
+                return "histogram";
+            default:
+                return "untyped";
+        }
+    }
+
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/ClientMetricCollectorFixedThreadPoolExecutor.java b/src/main/java/org/apache/rocketmq/exporter/task/ClientMetricCollectorFixedThreadPoolExecutor.java
new file mode 100644
index 0000000..c3bd38d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/task/ClientMetricCollectorFixedThreadPoolExecutor.java
@@ -0,0 +1,24 @@
+package org.apache.rocketmq.exporter.task;
+
+import org.apache.rocketmq.broker.latency.FutureTaskExt;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class ClientMetricCollectorFixedThreadPoolExecutor extends ThreadPoolExecutor {
+    public ClientMetricCollectorFixedThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+                                                        long keepAliveTime, TimeUnit unit,
+                                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+                                                        RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+    }
+
+
+    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+        return new FutureTaskExt(runnable, value);
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/ClientMetricTaskRunnable.java b/src/main/java/org/apache/rocketmq/exporter/task/ClientMetricTaskRunnable.java
new file mode 100644
index 0000000..210dc2e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/task/ClientMetricTaskRunnable.java
@@ -0,0 +1,124 @@
+package org.apache.rocketmq.exporter.task;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.exporter.service.RMQMetricsService;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.slf4j.Logger;
+
+public class ClientMetricTaskRunnable implements Runnable {
+    private String consumerGroup;
+    private ConsumerConnection connection;
+    private boolean enableCollectJStack;
+    private MQAdminExt mqAdmin;
+    private Logger logger;
+    private RMQMetricsService metricsService;
+
+    public ClientMetricTaskRunnable(String consumerGroup, ConsumerConnection connection,
+                                    boolean enableCollectJStack, MQAdminExt mqAdmin, Logger logger,
+                                    RMQMetricsService metricsService) {
+        this.consumerGroup = consumerGroup;
+        this.connection = connection;
+        this.enableCollectJStack = enableCollectJStack;
+        this.mqAdmin = mqAdmin;
+        this.logger = logger;
+        this.metricsService = metricsService;
+    }
+
+    @Override
+
+    public void run() {
+        if (this.connection == null || this.connection.getConnectionSet() == null ||
+                this.connection.getConnectionSet().isEmpty()) {
+            return;
+        }
+        logger.debug(String.format("ClientMetricTask-group=%s,enable jstack=%s",
+                consumerGroup,
+                this.enableCollectJStack
+
+        ));
+        long start = System.currentTimeMillis();
+        ConsumerRunningInfo runningInfo = null;
+        for (Connection conn : this.connection.getConnectionSet()) {
+            try {
+                runningInfo = mqAdmin.getConsumerRunningInfo(this.consumerGroup, conn.getClientId(), this.enableCollectJStack);
+            } catch (InterruptedException | RemotingException e) {
+                logger.warn(String.format("ClientMetricTask-exception.ignore. group=%s,client id=%s, client addr=%s, language=%s,version=%d",
+                        consumerGroup,
+                        conn.getClientId(),
+                        conn.getClientAddr(),
+                        conn.getLanguage(),
+                        conn.getVersion()
+                        ),
+                        e);
+                runningInfo = null;
+            } catch (MQClientException e) {
+                logger.warn(String.format("ClientMetricTask-exception.ignore. group=%s,client id=%s, client addr=%s, language=%s,version=%d, error code=%d, error msg=%s",
+                        consumerGroup,
+                        conn.getClientId(),
+                        conn.getClientAddr(),
+                        conn.getLanguage(),
+                        conn.getVersion(),
+                        e.getResponseCode(),
+                        e.getErrorMessage())
+                );
+                runningInfo = null;
+            }
+            if (runningInfo == null) {
+                continue;
+            }
+            if (!StringUtils.isBlank(runningInfo.getJstack())) {
+                logger.error(String.format("group=%s, jstack=%s", consumerGroup, runningInfo.getJstack()));
+            }
+            if (runningInfo.getStatusTable() != null && !runningInfo.getStatusTable().isEmpty()) {
+                for (String topic : runningInfo.getStatusTable().keySet()) {
+                    metricsService.getCollector().addConsumerClientFailedMsgCountsMetric(
+                            this.consumerGroup,
+                            topic,
+                            conn.getClientAddr(),
+                            conn.getClientId(),
+                            runningInfo.getStatusTable().get(topic).getConsumeFailedMsgs());
+                    metricsService.getCollector().addConsumerClientFailedTPSMetric(
+                            this.consumerGroup,
+                            topic,
+                            conn.getClientAddr(),
+                            conn.getClientId(),
+                            runningInfo.getStatusTable().get(topic).getConsumeFailedTPS());
+                    metricsService.getCollector().addConsumerClientOKTPSMetric(
+                            this.consumerGroup,
+                            topic,
+                            conn.getClientAddr(),
+                            conn.getClientId(),
+                            runningInfo.getStatusTable().get(topic).getConsumeOKTPS());
+                    metricsService.getCollector().addConsumeRTMetricMetric(
+                            this.consumerGroup,
+                            topic,
+                            conn.getClientAddr(),
+                            conn.getClientId(),
+                            runningInfo.getStatusTable().get(topic).getConsumeRT());
+                    metricsService.getCollector().addPullRTMetric(
+                            this.consumerGroup,
+                            topic,
+                            conn.getClientAddr(),
+                            conn.getClientId(),
+                            runningInfo.getStatusTable().get(topic).getPullRT());
+                    metricsService.getCollector().addPullTPSMetric(
+                            this.consumerGroup,
+                            topic,
+                            conn.getClientAddr(),
+                            conn.getClientId(),
+                            runningInfo.getStatusTable().get(topic).getPullTPS());
+
+
+                }
+
+            }
+        }
+        long cost = System.currentTimeMillis() - start;
+        logger.debug(String.format("one-ClientMetricTask-group=%s, cost=%d, online-instance count=%d", this.consumerGroup, cost, this.connection.getConnectionSet().size()));
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index b3e7d5e..8d05278 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -29,14 +29,18 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.Connection;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.exporter.config.CollectClientMetricExecutorConfig;
 import org.apache.rocketmq.exporter.config.RMQConfigure;
 import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
+import org.apache.rocketmq.exporter.model.common.TwoTuple;
 import org.apache.rocketmq.exporter.service.RMQMetricsService;
 import org.apache.rocketmq.exporter.util.Utils;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -48,13 +52,24 @@ import org.apache.rocketmq.tools.admin.MQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 @Component
 public class MetricsCollectTask {
@@ -64,9 +79,36 @@ public class MetricsCollectTask {
     @Resource
     private RMQConfigure rmqConfigure;
     @Resource
+    @Qualifier("collectClientMetricExecutor")
+    private ExecutorService collectClientMetricExecutor;
+    @Resource
     private RMQMetricsService metricsService;
     private final static Logger log = LoggerFactory.getLogger(MetricsCollectTask.class);
 
+    private BlockingQueue<Runnable> collectClientTaskBlockQueue;
+
+    @Bean(name = "collectClientMetricExecutor")
+    private ExecutorService collectClientMetricExecutor(CollectClientMetricExecutorConfig collectClientMetricExecutorConfig) {
+        collectClientTaskBlockQueue = new LinkedBlockingDeque<Runnable>(collectClientMetricExecutorConfig.getQueueSize());
+        ExecutorService executorService = new ClientMetricCollectorFixedThreadPoolExecutor(
+                collectClientMetricExecutorConfig.getCorePoolSize(),
+                collectClientMetricExecutorConfig.getMaximumPoolSize(),
+                collectClientMetricExecutorConfig.getKeepAliveTime(),
+                TimeUnit.MILLISECONDS,
+                this.collectClientTaskBlockQueue,
+                new ThreadFactory() {
+                    private final AtomicLong threadIndex = new AtomicLong(0);
+
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet());
+                    }
+                },
+                new ThreadPoolExecutor.DiscardOldestPolicy()
+        );
+        return executorService;
+    }
+
     @PostConstruct
     public void init() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
         log.info("MetricsCollectTask init starting....");
@@ -180,8 +222,10 @@ public class MetricsCollectTask {
                 int countOfOnlineConsumers = 0;
 
                 double consumeTPS = 0F;
+                MessageModel messageModel = MessageModel.CLUSTERING;
                 try {
                     onlineConsumers = mqAdminExt.examineConsumerConnectionInfo(group);
+                    messageModel = onlineConsumers.getMessageModel();
                 } catch (InterruptedException | RemotingException ex) {
                     log.error(String.format("get topic's(%s) online consumers(%s) exception", topic, group), ex);
                 } catch (MQClientException ex) {
@@ -195,6 +239,25 @@ public class MetricsCollectTask {
                 } else {
                     countOfOnlineConsumers = onlineConsumers.getConnectionSet().size();
                 }
+                {
+                    String cAddrs = "", localAddrs = "";
+                    if (countOfOnlineConsumers > 0) {
+                        TwoTuple<String, String> addresses = buildClientAddresses(onlineConsumers.getConnectionSet());
+                        cAddrs = addresses.getFirst();
+                        localAddrs = addresses.getSecond();
+                    }
+                    metricsService.getCollector().addGroupCountMetric(group, cAddrs, localAddrs, countOfOnlineConsumers);
+                }
+                if (countOfOnlineConsumers > 0) {
+                    collectClientMetricExecutor.submit(new ClientMetricTaskRunnable(
+                            group,
+                            onlineConsumers,
+                            false,
+                            this.mqAdminExt,
+                            log,
+                            this.metricsService
+                    ));
+                }
                 try {
                     consumeStats = mqAdminExt.examineConsumeStats(group, topic);
                 } catch (InterruptedException | RemotingException ex) {
@@ -211,7 +274,13 @@ public class MetricsCollectTask {
                 {
                     diff = consumeStats.computeTotalDiff();
                     consumeTPS = consumeStats.getConsumeTps();
-                    metricsService.getCollector().addGroupDiffMetric(String.valueOf(countOfOnlineConsumers), group, topic, diff);
+                    metricsService.getCollector().addGroupDiffMetric(
+                            String.valueOf(countOfOnlineConsumers),
+                            group,
+                            topic,
+                            String.valueOf(messageModel.ordinal()),
+                            diff
+                    );
                     metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
                 }
                 Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStats.getOffsetTable().entrySet();
@@ -489,6 +558,22 @@ public class MetricsCollectTask {
         log.info("broker runtime stats collection task finished...." + (System.currentTimeMillis() - start));
     }
 
+    private static TwoTuple<String, String> buildClientAddresses(HashSet<Connection> connectionSet) {
+        if (connectionSet == null || connectionSet.isEmpty()) {
+            return new TwoTuple<>("", "");
+        }
+        List<String> clientAddresses = new ArrayList<>();
+        List<String> clientIdAddresses = new ArrayList<>();
+
+        for (Connection connection : connectionSet) {
+            clientAddresses.add(connection.getClientAddr());//tcp连接地址
+            clientIdAddresses.add(connection.getClientId());//本地ip组成的id
+        }
+        String str1 = String.join(",", clientAddresses);
+        String str2 = String.join(",", clientIdAddresses);
+        return new TwoTuple<>(str1, str2);
+    }
+
     private void handleTopicNotExistException(int responseCode, Exception ex, String topic, String group) {
         if (responseCode == ResponseCode.TOPIC_NOT_EXIST || responseCode == ResponseCode.CONSUMER_NOT_ONLINE) {
             log.error(String.format("get topic's(%s) consumer-stats(%s) exception, detail: %s", topic, group, ex.getMessage()));
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 8ff9e07..ac716bb 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -15,11 +15,17 @@ rocketmq:
   config:
     webTelemetryPath: /metrics
     rocketmqVersion: 4_2_0
-    namesrvAddr: 127.0.0.1:9876 #
+    namesrvAddr: 127.0.0.1:9876
     enableCollect: true
 
+threadpool:
+  collect-client-metric-executor:
+    core-pool-size: 10
+    maximum-pool-size: 10
+    keep-alive-time: 3000
+    queueSize: 5000
 task:
-  count: 5
+  count: 5 # num of scheduled-tasks
   collectTopicOffset:
     cron: 15 0/1 * * * ?
   collectConsumerOffset:
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 8032516..e54fd39 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -8,15 +8,14 @@
 
     <appender name="FILE"
               class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <file>./logs/exporterlogs/rocketmq-exporter.log</file>
+        <file>${user.home}/logs/exporterlogs/rocketmq-exporter.log</file>
         <append>true</append>
-        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>./logs/exporterlogs/rocketmq-exporter-%d{yyyy-MM-dd}.%i.log
-            </fileNamePattern>
-            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-                <maxFileSize>104857600</maxFileSize>
-            </timeBasedFileNamingAndTriggeringPolicy>
-            <totalSizeCap>20gb</totalSizeCap>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <fileNamePattern>${user.home}/logs/exporterlogs/rocketmq-exporter-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+            <maxFileSize>50MB</maxFileSize>
+            <totalSizeCap>10GB</totalSizeCap>
+            <maxHistory>3</maxHistory>
+            <cleanHistoryOnStart>true</cleanHistoryOnStart>
         </rollingPolicy>
         <encoder>
             <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>


Mime
View raw message