rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [01/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:18 GMT
Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master [created] 057d0e9b1


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java
new file mode 100644
index 0000000..b8176d4
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DefaultMonitorListener.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+
+public class DefaultMonitorListener implements MonitorListener {
+    private final static String LOG_PREFIX = "[MONITOR] ";
+    private final static String LOG_NOTIFY = LOG_PREFIX + " [NOTIFY] ";
+    private final Logger log = ClientLogger.getLog();
+
+
+    public DefaultMonitorListener() {
+    }
+
+
+    @Override
+    public void beginRound() {
+        log.info(LOG_PREFIX + "=========================================beginRound");
+    }
+
+
+    @Override
+    public void reportUndoneMsgs(UndoneMsgs undoneMsgs) {
+        log.info(String.format(LOG_PREFIX + "reportUndoneMsgs: %s", undoneMsgs));
+    }
+
+
+    @Override
+    public void reportFailedMsgs(FailedMsgs failedMsgs) {
+        log.info(String.format(LOG_PREFIX + "reportFailedMsgs: %s", failedMsgs));
+    }
+
+
+    @Override
+    public void reportDeleteMsgsEvent(DeleteMsgsEvent deleteMsgsEvent) {
+        log.info(String.format(LOG_PREFIX + "reportDeleteMsgsEvent: %s", deleteMsgsEvent));
+    }
+
+
+    @Override
+    public void reportConsumerRunningInfo(TreeMap<String, ConsumerRunningInfo> criTable)
{
+
+        {
+            boolean result = ConsumerRunningInfo.analyzeSubscription(criTable);
+            if (!result) {
+                log.info(String.format(LOG_NOTIFY
+                        + "reportConsumerRunningInfo: ConsumerGroup: %s, Subscription different",
criTable
+                        .firstEntry().getValue().getProperties().getProperty("consumerGroup")));
+            }
+        }
+
+
+        {
+            Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<String, ConsumerRunningInfo> next = it.next();
+                String result = ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
+                if (!result.isEmpty()) {
+                    log.info(String.format(LOG_NOTIFY
+                                    + "reportConsumerRunningInfo: ConsumerGroup: %s, ClientId:
%s, %s",
+                            criTable.firstEntry().getValue().getProperties().getProperty("consumerGroup"),
+                            next.getKey(),
+                            result));
+                }
+            }
+        }
+    }
+
+
+    @Override
+    public void endRound() {
+        log.info(LOG_PREFIX + "=========================================endRound");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java
new file mode 100644
index 0000000..25ac420
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/DeleteMsgsEvent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
+
+
+public class DeleteMsgsEvent {
+    private OffsetMovedEvent offsetMovedEvent;
+    private long eventTimestamp;
+
+
+    public OffsetMovedEvent getOffsetMovedEvent() {
+        return offsetMovedEvent;
+    }
+
+
+    public void setOffsetMovedEvent(OffsetMovedEvent offsetMovedEvent) {
+        this.offsetMovedEvent = offsetMovedEvent;
+    }
+
+
+    public long getEventTimestamp() {
+        return eventTimestamp;
+    }
+
+
+    public void setEventTimestamp(long eventTimestamp) {
+        this.eventTimestamp = eventTimestamp;
+    }
+
+
+    @Override
+    public String toString() {
+        return "DeleteMsgsEvent [offsetMovedEvent=" + offsetMovedEvent + ", eventTimestamp="
+ eventTimestamp
+                + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java
new file mode 100644
index 0000000..3ae5c2f
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/FailedMsgs.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+public class FailedMsgs {
+    private String consumerGroup;
+    private String topic;
+    private long failedMsgsTotalRecently;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public long getFailedMsgsTotalRecently() {
+        return failedMsgsTotalRecently;
+    }
+
+
+    public void setFailedMsgsTotalRecently(long failedMsgsTotalRecently) {
+        this.failedMsgsTotalRecently = failedMsgsTotalRecently;
+    }
+
+
+    @Override
+    public String toString() {
+        return "FailedMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic
+                + ", failedMsgsTotalRecently=" + failedMsgsTotalRecently + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java
new file mode 100644
index 0000000..7ef4513
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorConfig.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.common.MixAll;
+
+
+public class MonitorConfig {
+    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
+            System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+    private int roundInterval = 1000 * 60;
+
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+
+    public int getRoundInterval() {
+        return roundInterval;
+    }
+
+
+    public void setRoundInterval(int roundInterval) {
+        this.roundInterval = roundInterval;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java
new file mode 100644
index 0000000..1586ef9
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+
+import java.util.TreeMap;
+
+public interface MonitorListener {
+    void beginRound();
+
+    void reportUndoneMsgs(UndoneMsgs undoneMsgs);
+
+    void reportFailedMsgs(FailedMsgs failedMsgs);
+
+    void reportDeleteMsgsEvent(DeleteMsgsEvent deleteMsgsEvent);
+
+    void reportConsumerRunningInfo(TreeMap<String/* clientId */, ConsumerRunningInfo>
criTable);
+
+    void endRound();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java
new file mode 100644
index 0000000..2b50862
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/MonitorService.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.OffsetWrapper;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import com.alibaba.rocketmq.common.protocol.body.TopicList;
+import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class MonitorService {
+    private final Logger log = ClientLogger.getLog();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorService"));
+
+    private final MonitorConfig monitorConfig;
+
+    private final MonitorListener monitorListener;
+
+    private final DefaultMQAdminExt defaultMQAdminExt;
+    private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
+            MixAll.TOOLS_CONSUMER_GROUP);
+    private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(
+            MixAll.MONITOR_CONSUMER_GROUP);
+
+
+    public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook
rpcHook) {
+        this.monitorConfig = monitorConfig;
+        this.monitorListener = monitorListener;
+
+        this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.defaultMQAdminExt.setInstanceName(instanceName());
+        this.defaultMQAdminExt.setNamesrvAddr(monitorConfig.getNamesrvAddr());
+
+        this.defaultMQPullConsumer.setInstanceName(instanceName());
+        this.defaultMQPullConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr());
+
+        this.defaultMQPushConsumer.setInstanceName(instanceName());
+        this.defaultMQPushConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr());
+        try {
+            this.defaultMQPushConsumer.setConsumeThreadMin(1);
+            this.defaultMQPushConsumer.setConsumeThreadMax(1);
+            this.defaultMQPushConsumer.subscribe(MixAll.OFFSET_MOVED_EVENT, "*");
+            this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently()
{
+
+                @Override
+                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                ConsumeConcurrentlyContext
context) {
+                    try {
+                        OffsetMovedEvent ome =
+                                OffsetMovedEvent.decode(msgs.get(0).getBody(), OffsetMovedEvent.class);
+
+                        DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent();
+                        deleteMsgsEvent.setOffsetMovedEvent(ome);
+                        deleteMsgsEvent.setEventTimestamp(msgs.get(0).getStoreTimestamp());
+
+                        MonitorService.this.monitorListener.reportDeleteMsgsEvent(deleteMsgsEvent);
+                    } catch (Exception e) {
+                    }
+
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                }
+            });
+        } catch (MQClientException e) {
+        }
+    }
+
+
+    private String instanceName() {
+        String name =
+                System.currentTimeMillis() + new Random().nextInt() + this.monitorConfig.getNamesrvAddr();
+
+        return "MonitorService_" + name.hashCode();
+    }
+
+    public static void main(String[] args) throws MQClientException {
+        main0(args, null);
+    }
+
+    public static void main0(String[] args, RPCHook rpcHook) throws MQClientException {
+        final MonitorService monitorService =
+                new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);
+        monitorService.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            private volatile boolean hasShutdown = false;
+
+
+            @Override
+            public void run() {
+                synchronized (this) {
+                    if (!this.hasShutdown) {
+                        this.hasShutdown = true;
+                        monitorService.shutdown();
+                    }
+                }
+            }
+        }, "ShutdownHook"));
+    }
+
+    public void start() throws MQClientException {
+        this.defaultMQPullConsumer.start();
+        this.defaultMQAdminExt.start();
+        this.defaultMQPushConsumer.start();
+        this.startScheduleTask();
+    }
+
+    public void shutdown() {
+        this.defaultMQPullConsumer.shutdown();
+        this.defaultMQAdminExt.shutdown();
+        this.defaultMQPushConsumer.shutdown();
+    }
+
+    private void startScheduleTask() {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    MonitorService.this.doMonitorWork();
+                } catch (Exception e) {
+                    log.error("doMonitorWork Exception", e);
+                }
+            }
+        }, 1000 * 20, this.monitorConfig.getRoundInterval(), TimeUnit.MILLISECONDS);
+    }
+
+    public void doMonitorWork() throws RemotingException, MQClientException, InterruptedException
{
+        long beginTime = System.currentTimeMillis();
+        this.monitorListener.beginRound();
+
+        TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+        for (String topic : topicList.getTopicList()) {
+            if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+
+                try {
+                    this.reportUndoneMsgs(consumerGroup);
+                } catch (Exception e) {
+                    // log.error("reportUndoneMsgs Exception", e);
+                }
+
+
+                try {
+                    this.reportConsumerRunningInfo(consumerGroup);
+                } catch (Exception e) {
+                    // log.error("reportConsumerRunningInfo Exception", e);
+                }
+            }
+        }
+        this.monitorListener.endRound();
+        long spentTimeMills = System.currentTimeMillis() - beginTime;
+        log.info("Execute one round monitor work, spent timemills: {}", spentTimeMills);
+    }
+
+    private void reportUndoneMsgs(final String consumerGroup) {
+        ConsumeStats cs = null;
+        try {
+            cs = defaultMQAdminExt.examineConsumeStats(consumerGroup);
+        } catch (Exception e) {
+            return;
+        }
+
+        ConsumerConnection cc = null;
+        try {
+            cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
+        } catch (Exception e) {
+            return;
+        }
+
+        if (cs != null) {
+
+            HashMap<String/* Topic */, ConsumeStats> csByTopic = new HashMap<String,
ConsumeStats>();
+            {
+                Iterator<Entry<MessageQueue, OffsetWrapper>> it = cs.getOffsetTable().entrySet().iterator();
+                while (it.hasNext()) {
+                    Entry<MessageQueue, OffsetWrapper> next = it.next();
+                    MessageQueue mq = next.getKey();
+                    OffsetWrapper ow = next.getValue();
+                    ConsumeStats csTmp = csByTopic.get(mq.getTopic());
+                    if (null == csTmp) {
+                        csTmp = new ConsumeStats();
+                        csByTopic.put(mq.getTopic(), csTmp);
+                    }
+
+                    csTmp.getOffsetTable().put(mq, ow);
+                }
+            }
+
+
+            {
+                Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator();
+                while (it.hasNext()) {
+                    Entry<String, ConsumeStats> next = it.next();
+                    UndoneMsgs undoneMsgs = new UndoneMsgs();
+                    undoneMsgs.setConsumerGroup(consumerGroup);
+                    undoneMsgs.setTopic(next.getKey());
+                    this.computeUndoneMsgs(undoneMsgs, next.getValue());
+                    this.monitorListener.reportUndoneMsgs(undoneMsgs);
+                    this.reportFailedMsgs(consumerGroup, next.getKey());
+                }
+            }
+        }
+    }
+
+    public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException,
+            MQBrokerException, RemotingException, MQClientException {
+        ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
+        TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
+        for (Connection c : cc.getConnectionSet()) {
+            String clientId = c.getClientId();
+
+            if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
+                continue;
+            }
+
+            try {
+                ConsumerRunningInfo info =
+                        defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId,
false);
+                infoMap.put(clientId, info);
+            } catch (Exception e) {
+            }
+        }
+
+        if (!infoMap.isEmpty()) {
+            this.monitorListener.reportConsumerRunningInfo(infoMap);
+        }
+    }
+
+    private void computeUndoneMsgs(final UndoneMsgs undoneMsgs, final ConsumeStats consumeStats)
{
+        long total = 0;
+        long singleMax = 0;
+        long delayMax = 0;
+        Iterator<Entry<MessageQueue, OffsetWrapper>> it = consumeStats.getOffsetTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, OffsetWrapper> next = it.next();
+            MessageQueue mq = next.getKey();
+            OffsetWrapper ow = next.getValue();
+            long diff = ow.getBrokerOffset() - ow.getConsumerOffset();
+
+            if (diff > singleMax) {
+                singleMax = diff;
+            }
+
+            if (diff > 0) {
+                total += diff;
+            }
+
+            // Delay
+            if (ow.getLastTimestamp() > 0) {
+                try {
+                    long maxOffset = this.defaultMQPullConsumer.maxOffset(mq);
+                    if (maxOffset > 0) {
+                        PullResult pull = this.defaultMQPullConsumer.pull(mq, "*", maxOffset
- 1, 1);
+                        switch (pull.getPullStatus()) {
+                            case FOUND:
+                                long delay =
+                                        pull.getMsgFoundList().get(0).getStoreTimestamp()
- ow.getLastTimestamp();
+                                if (delay > delayMax) {
+                                    delayMax = delay;
+                                }
+                                break;
+                            case NO_MATCHED_MSG:
+                            case NO_NEW_MSG:
+                            case OFFSET_ILLEGAL:
+                                break;
+                            default:
+                                break;
+                        }
+                    }
+                } catch (Exception e) {
+                }
+            }
+        }
+
+        undoneMsgs.setUndoneMsgsTotal(total);
+        undoneMsgs.setUndoneMsgsSingleMQ(singleMax);
+        undoneMsgs.setUndoneMsgsDelayTimeMills(delayMax);
+    }
+
+    private void reportFailedMsgs(final String consumerGroup, final String topic) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java
b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java
new file mode 100644
index 0000000..1638d14
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/monitor/UndoneMsgs.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.monitor;
+
+public class UndoneMsgs {
+    private String consumerGroup;
+    private String topic;
+
+    private long undoneMsgsTotal;
+
+    private long undoneMsgsSingleMQ;
+
+    private long undoneMsgsDelayTimeMills;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public long getUndoneMsgsTotal() {
+        return undoneMsgsTotal;
+    }
+
+
+    public void setUndoneMsgsTotal(long undoneMsgsTotal) {
+        this.undoneMsgsTotal = undoneMsgsTotal;
+    }
+
+
+    public long getUndoneMsgsSingleMQ() {
+        return undoneMsgsSingleMQ;
+    }
+
+
+    public void setUndoneMsgsSingleMQ(long undoneMsgsSingleMQ) {
+        this.undoneMsgsSingleMQ = undoneMsgsSingleMQ;
+    }
+
+
+    public long getUndoneMsgsDelayTimeMills() {
+        return undoneMsgsDelayTimeMills;
+    }
+
+
+    public void setUndoneMsgsDelayTimeMills(long undoneMsgsDelayTimeMills) {
+        this.undoneMsgsDelayTimeMills = undoneMsgsDelayTimeMills;
+    }
+
+
+    @Override
+    public String toString() {
+        return "UndoneMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic + ", undoneMsgsTotal="
+                + undoneMsgsTotal + ", undoneMsgsSingleMQ=" + undoneMsgsSingleMQ
+                + ", undoneMsgsDelayTimeMills=" + undoneMsgsDelayTimeMills + "]";
+    }
+}


Mime
View raw message