rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq-exporter] 06/07: Avoid crash when get consumer offsets as a group of topic consuming
Date Mon, 03 Jun 2019 10:57:02 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git

commit 378184de875742e83cbfc1e92eda98d25c85ef26
Author: breezecoolyang <breezecoolyang@users.noreply.github.com>
AuthorDate: Mon Jun 3 10:47:05 2019 +0800

    Avoid crash when get consumer offsets as a group of topic consuming
---
 .../rocketmq/exporter/task/MetricsCollectTask.java | 23 ++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index 3f359e9..6ae442d 100644
--- a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -106,17 +106,20 @@ public class MetricsCollectTask {
                 GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
                 if (groupList != null && !groupList.getGroupList().isEmpty()) {
                     for (String group : groupList.getGroupList()) {
-                        ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group,topic);
-                        Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries
= consumeStatus.getOffsetTable().entrySet();
-                        for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry
: consumeStatusEntries) {
-                            MessageQueue q          =   consumeStatusEntry.getKey();
-                            OffsetWrapper offset    =   consumeStatusEntry.getValue();
-                            if (consumeOffsetMap.containsKey(q.getBrokerName())) {
-                                consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName())
+ offset.getConsumerOffset());
-                            }
-                            else {
-                                consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
+                        try {
+                            ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group,
topic);
+                            Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries
= consumeStatus.getOffsetTable().entrySet();
+                            for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry
: consumeStatusEntries) {
+                                MessageQueue q = consumeStatusEntry.getKey();
+                                OffsetWrapper offset = consumeStatusEntry.getValue();
+                                if (consumeOffsetMap.containsKey(q.getBrokerName())) {
+                                    consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName())
+ offset.getConsumerOffset());
+                                } else {
+                                    consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
+                                }
                             }
+                        } catch (Exception e) {
+                            log.info("ignore this consumer", e.getMessage());
                         }
                         Set<Map.Entry<String, Long>> consumeOffsetEntries = consumeOffsetMap.entrySet();
                         for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetEntries)
{


Mime
View raw message