Repository: incubator-rocketmq
Updated Branches:
refs/heads/master [created] 057d0e9b1
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java
new file mode 100644
index 0000000..b8176d4
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+
+public class DefaultMonitorListener implements MonitorListener {
+ private final static String LOG_PREFIX = "[MONITOR] ";
+ private final static String LOG_NOTIFY = LOG_PREFIX + " [NOTIFY] ";
+ private final Logger log = ClientLogger.getLog();
+
+
+ public DefaultMonitorListener() {
+ }
+
+
+ @Override
+ public void beginRound() {
+ log.info(LOG_PREFIX + "=========================================beginRound");
+ }
+
+
+ @Override
+ public void reportUndoneMsgs(UndoneMsgs undoneMsgs) {
+ log.info(String.format(LOG_PREFIX + "reportUndoneMsgs: %s", undoneMsgs));
+ }
+
+
+ @Override
+ public void reportFailedMsgs(FailedMsgs failedMsgs) {
+ log.info(String.format(LOG_PREFIX + "reportFailedMsgs: %s", failedMsgs));
+ }
+
+
+ @Override
+ public void reportDeleteMsgsEvent(DeleteMsgsEvent deleteMsgsEvent) {
+ log.info(String.format(LOG_PREFIX + "reportDeleteMsgsEvent: %s", deleteMsgsEvent));
+ }
+
+
+ @Override
+ public void reportConsumerRunningInfo(TreeMap<String, ConsumerRunningInfo> criTable)
{
+
+ {
+ boolean result = ConsumerRunningInfo.analyzeSubscription(criTable);
+ if (!result) {
+ log.info(String.format(LOG_NOTIFY
+ + "reportConsumerRunningInfo: ConsumerGroup: %s, Subscription different",
criTable
+ .firstEntry().getValue().getProperties().getProperty("consumerGroup")));
+ }
+ }
+
+
+ {
+ Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerRunningInfo> next = it.next();
+ String result = ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
+ if (!result.isEmpty()) {
+ log.info(String.format(LOG_NOTIFY
+ + "reportConsumerRunningInfo: ConsumerGroup: %s, ClientId:
%s, %s",
+ criTable.firstEntry().getValue().getProperties().getProperty("consumerGroup"),
+ next.getKey(),
+ result));
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void endRound() {
+ log.info(LOG_PREFIX + "=========================================endRound");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java
new file mode 100644
index 0000000..25ac420
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
+
+
+public class DeleteMsgsEvent {
+ private OffsetMovedEvent offsetMovedEvent;
+ private long eventTimestamp;
+
+
+ public OffsetMovedEvent getOffsetMovedEvent() {
+ return offsetMovedEvent;
+ }
+
+
+ public void setOffsetMovedEvent(OffsetMovedEvent offsetMovedEvent) {
+ this.offsetMovedEvent = offsetMovedEvent;
+ }
+
+
+ public long getEventTimestamp() {
+ return eventTimestamp;
+ }
+
+
+ public void setEventTimestamp(long eventTimestamp) {
+ this.eventTimestamp = eventTimestamp;
+ }
+
+
+ @Override
+ public String toString() {
+ return "DeleteMsgsEvent [offsetMovedEvent=" + offsetMovedEvent + ", eventTimestamp="
+ eventTimestamp
+ + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java
new file mode 100644
index 0000000..3ae5c2f
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+public class FailedMsgs {
+ private String consumerGroup;
+ private String topic;
+ private long failedMsgsTotalRecently;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public long getFailedMsgsTotalRecently() {
+ return failedMsgsTotalRecently;
+ }
+
+
+ public void setFailedMsgsTotalRecently(long failedMsgsTotalRecently) {
+ this.failedMsgsTotalRecently = failedMsgsTotalRecently;
+ }
+
+
+ @Override
+ public String toString() {
+ return "FailedMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic
+ + ", failedMsgsTotalRecently=" + failedMsgsTotalRecently + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java
new file mode 100644
index 0000000..7ef4513
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.common.MixAll;
+
+
+public class MonitorConfig {
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
+ System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+ private int roundInterval = 1000 * 60;
+
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+
+ public int getRoundInterval() {
+ return roundInterval;
+ }
+
+
+ public void setRoundInterval(int roundInterval) {
+ this.roundInterval = roundInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java
new file mode 100644
index 0000000..1586ef9
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+
+import java.util.TreeMap;
+
+public interface MonitorListener {
+ void beginRound();
+
+ void reportUndoneMsgs(UndoneMsgs undoneMsgs);
+
+ void reportFailedMsgs(FailedMsgs failedMsgs);
+
+ void reportDeleteMsgsEvent(DeleteMsgsEvent deleteMsgsEvent);
+
+ void reportConsumerRunningInfo(TreeMap<String/* clientId */, ConsumerRunningInfo>
criTable);
+
+ void endRound();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java
new file mode 100644
index 0000000..2b50862
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.OffsetWrapper;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import com.alibaba.rocketmq.common.protocol.body.TopicList;
+import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class MonitorService {
+ private final Logger log = ClientLogger.getLog();
+ private final ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorService"));
+
+ private final MonitorConfig monitorConfig;
+
+ private final MonitorListener monitorListener;
+
+ private final DefaultMQAdminExt defaultMQAdminExt;
+ private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
+ MixAll.TOOLS_CONSUMER_GROUP);
+ private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(
+ MixAll.MONITOR_CONSUMER_GROUP);
+
+
+ public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook
rpcHook) {
+ this.monitorConfig = monitorConfig;
+ this.monitorListener = monitorListener;
+
+ this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.defaultMQAdminExt.setInstanceName(instanceName());
+ this.defaultMQAdminExt.setNamesrvAddr(monitorConfig.getNamesrvAddr());
+
+ this.defaultMQPullConsumer.setInstanceName(instanceName());
+ this.defaultMQPullConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr());
+
+ this.defaultMQPushConsumer.setInstanceName(instanceName());
+ this.defaultMQPushConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr());
+ try {
+ this.defaultMQPushConsumer.setConsumeThreadMin(1);
+ this.defaultMQPushConsumer.setConsumeThreadMax(1);
+ this.defaultMQPushConsumer.subscribe(MixAll.OFFSET_MOVED_EVENT, "*");
+ this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently()
{
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext
context) {
+ try {
+ OffsetMovedEvent ome =
+ OffsetMovedEvent.decode(msgs.get(0).getBody(), OffsetMovedEvent.class);
+
+ DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent();
+ deleteMsgsEvent.setOffsetMovedEvent(ome);
+ deleteMsgsEvent.setEventTimestamp(msgs.get(0).getStoreTimestamp());
+
+ MonitorService.this.monitorListener.reportDeleteMsgsEvent(deleteMsgsEvent);
+ } catch (Exception e) {
+ }
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ } catch (MQClientException e) {
+ }
+ }
+
+
+ private String instanceName() {
+ String name =
+ System.currentTimeMillis() + new Random().nextInt() + this.monitorConfig.getNamesrvAddr();
+
+ return "MonitorService_" + name.hashCode();
+ }
+
+ public static void main(String[] args) throws MQClientException {
+ main0(args, null);
+ }
+
+ public static void main0(String[] args, RPCHook rpcHook) throws MQClientException {
+ final MonitorService monitorService =
+ new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);
+ monitorService.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ private volatile boolean hasShutdown = false;
+
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ if (!this.hasShutdown) {
+ this.hasShutdown = true;
+ monitorService.shutdown();
+ }
+ }
+ }
+ }, "ShutdownHook"));
+ }
+
+ public void start() throws MQClientException {
+ this.defaultMQPullConsumer.start();
+ this.defaultMQAdminExt.start();
+ this.defaultMQPushConsumer.start();
+ this.startScheduleTask();
+ }
+
+ public void shutdown() {
+ this.defaultMQPullConsumer.shutdown();
+ this.defaultMQAdminExt.shutdown();
+ this.defaultMQPushConsumer.shutdown();
+ }
+
+ private void startScheduleTask() {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ MonitorService.this.doMonitorWork();
+ } catch (Exception e) {
+ log.error("doMonitorWork Exception", e);
+ }
+ }
+ }, 1000 * 20, this.monitorConfig.getRoundInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ public void doMonitorWork() throws RemotingException, MQClientException, InterruptedException
{
+ long beginTime = System.currentTimeMillis();
+ this.monitorListener.beginRound();
+
+ TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+ for (String topic : topicList.getTopicList()) {
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+
+ try {
+ this.reportUndoneMsgs(consumerGroup);
+ } catch (Exception e) {
+ // log.error("reportUndoneMsgs Exception", e);
+ }
+
+
+ try {
+ this.reportConsumerRunningInfo(consumerGroup);
+ } catch (Exception e) {
+ // log.error("reportConsumerRunningInfo Exception", e);
+ }
+ }
+ }
+ this.monitorListener.endRound();
+ long spentTimeMills = System.currentTimeMillis() - beginTime;
+ log.info("Execute one round monitor work, spent timemills: {}", spentTimeMills);
+ }
+
+ private void reportUndoneMsgs(final String consumerGroup) {
+ ConsumeStats cs = null;
+ try {
+ cs = defaultMQAdminExt.examineConsumeStats(consumerGroup);
+ } catch (Exception e) {
+ return;
+ }
+
+ ConsumerConnection cc = null;
+ try {
+ cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
+ } catch (Exception e) {
+ return;
+ }
+
+ if (cs != null) {
+
+ HashMap<String/* Topic */, ConsumeStats> csByTopic = new HashMap<String,
ConsumeStats>();
+ {
+ Iterator<Entry<MessageQueue, OffsetWrapper>> it = cs.getOffsetTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, OffsetWrapper> next = it.next();
+ MessageQueue mq = next.getKey();
+ OffsetWrapper ow = next.getValue();
+ ConsumeStats csTmp = csByTopic.get(mq.getTopic());
+ if (null == csTmp) {
+ csTmp = new ConsumeStats();
+ csByTopic.put(mq.getTopic(), csTmp);
+ }
+
+ csTmp.getOffsetTable().put(mq, ow);
+ }
+ }
+
+
+ {
+ Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumeStats> next = it.next();
+ UndoneMsgs undoneMsgs = new UndoneMsgs();
+ undoneMsgs.setConsumerGroup(consumerGroup);
+ undoneMsgs.setTopic(next.getKey());
+ this.computeUndoneMsgs(undoneMsgs, next.getValue());
+ this.monitorListener.reportUndoneMsgs(undoneMsgs);
+ this.reportFailedMsgs(consumerGroup, next.getKey());
+ }
+ }
+ }
+ }
+
+ public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException,
+ MQBrokerException, RemotingException, MQClientException {
+ ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
+ TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
+ for (Connection c : cc.getConnectionSet()) {
+ String clientId = c.getClientId();
+
+ if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
+ continue;
+ }
+
+ try {
+ ConsumerRunningInfo info =
+ defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId,
false);
+ infoMap.put(clientId, info);
+ } catch (Exception e) {
+ }
+ }
+
+ if (!infoMap.isEmpty()) {
+ this.monitorListener.reportConsumerRunningInfo(infoMap);
+ }
+ }
+
+ private void computeUndoneMsgs(final UndoneMsgs undoneMsgs, final ConsumeStats consumeStats)
{
+ long total = 0;
+ long singleMax = 0;
+ long delayMax = 0;
+ Iterator<Entry<MessageQueue, OffsetWrapper>> it = consumeStats.getOffsetTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, OffsetWrapper> next = it.next();
+ MessageQueue mq = next.getKey();
+ OffsetWrapper ow = next.getValue();
+ long diff = ow.getBrokerOffset() - ow.getConsumerOffset();
+
+ if (diff > singleMax) {
+ singleMax = diff;
+ }
+
+ if (diff > 0) {
+ total += diff;
+ }
+
+ // Delay
+ if (ow.getLastTimestamp() > 0) {
+ try {
+ long maxOffset = this.defaultMQPullConsumer.maxOffset(mq);
+ if (maxOffset > 0) {
+ PullResult pull = this.defaultMQPullConsumer.pull(mq, "*", maxOffset
- 1, 1);
+ switch (pull.getPullStatus()) {
+ case FOUND:
+ long delay =
+ pull.getMsgFoundList().get(0).getStoreTimestamp()
- ow.getLastTimestamp();
+ if (delay > delayMax) {
+ delayMax = delay;
+ }
+ break;
+ case NO_MATCHED_MSG:
+ case NO_NEW_MSG:
+ case OFFSET_ILLEGAL:
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ undoneMsgs.setUndoneMsgsTotal(total);
+ undoneMsgs.setUndoneMsgsSingleMQ(singleMax);
+ undoneMsgs.setUndoneMsgsDelayTimeMills(delayMax);
+ }
+
+ private void reportFailedMsgs(final String consumerGroup, final String topic) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java
new file mode 100644
index 0000000..1638d14
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+public class UndoneMsgs {
+ private String consumerGroup;
+ private String topic;
+
+ private long undoneMsgsTotal;
+
+ private long undoneMsgsSingleMQ;
+
+ private long undoneMsgsDelayTimeMills;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public long getUndoneMsgsTotal() {
+ return undoneMsgsTotal;
+ }
+
+
+ public void setUndoneMsgsTotal(long undoneMsgsTotal) {
+ this.undoneMsgsTotal = undoneMsgsTotal;
+ }
+
+
+ public long getUndoneMsgsSingleMQ() {
+ return undoneMsgsSingleMQ;
+ }
+
+
+ public void setUndoneMsgsSingleMQ(long undoneMsgsSingleMQ) {
+ this.undoneMsgsSingleMQ = undoneMsgsSingleMQ;
+ }
+
+
+ public long getUndoneMsgsDelayTimeMills() {
+ return undoneMsgsDelayTimeMills;
+ }
+
+
+ public void setUndoneMsgsDelayTimeMills(long undoneMsgsDelayTimeMills) {
+ this.undoneMsgsDelayTimeMills = undoneMsgsDelayTimeMills;
+ }
+
+
+ @Override
+ public String toString() {
+ return "UndoneMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic + ", undoneMsgsTotal="
+ + undoneMsgsTotal + ", undoneMsgsSingleMQ=" + undoneMsgsSingleMQ
+ + ", undoneMsgsDelayTimeMills=" + undoneMsgsDelayTimeMills + "]";
+ }
+}
|