rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq-client-cpp] branch master updated: [ISSUE #272]Try to support messaging consuming tps status (#302)
Date Thu, 23 Apr 2020 09:38:14 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new b963a96  [ISSUE #272]Try to support messaging consuming tps status (#302)
b963a96 is described below

commit b963a96619d59ad4eeb32b47679b29ac8a4566d0
Author: dinglei <libya_003@163.com>
AuthorDate: Thu Apr 23 17:38:03 2020 +0800

    [ISSUE #272]Try to support messaging consuming tps status (#302)
    
    * Add initial structure for consumer status.
    
    * Add status server manager to control status process.
    
    * Add RT and TPS status in consumer.
    
    * Add sampling data every 10 seconds.
    
    * Close unit test as default.
---
 src/consumer/ConsumeMessageConcurrentlyService.cpp |  25 ++-
 src/consumer/ConsumeMessageOrderlyService.cpp      |  12 ++
 src/consumer/DefaultMQPushConsumerImpl.cpp         |  14 ++
 src/protocol/ConsumerRunningInfo.cpp               |  25 ++-
 src/protocol/ConsumerRunningInfo.h                 |   7 +-
 src/status/ConsumeStats.cpp                        |  39 ++++
 src/status/ConsumeStats.h                          |  42 ++++
 src/status/StatsItem.h                             |  51 +++++
 src/status/StatsServer.cpp                         | 221 +++++++++++++++++++++
 src/status/StatsServer.h                           |  84 ++++++++
 src/status/StatsServerManager.cpp                  |  55 +++++
 src/status/StatsServerManager.h                    |  53 +++++
 test/src/protocol/ConsumerRunningInfoTest.cpp      |  15 ++
 13 files changed, 628 insertions(+), 15 deletions(-)

diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index ef43bb7..c341b28 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -22,6 +22,7 @@
 #include "DefaultMQPushConsumer.h"
 #include "Logging.h"
 #include "MessageAccessor.h"
+#include "StatsServerManager.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
@@ -158,10 +159,11 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
   }
   ConsumeMessageContext consumeMessageContext;
   DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
+  std::string groupName = pConsumer->getGroupName();
   if (pConsumer) {
     if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook())
{
       consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
-      consumeMessageContext.setConsumerGroup(pConsumer->getGroupName());
+      consumeMessageContext.setConsumerGroup(groupName);
       consumeMessageContext.setMessageQueue(request->m_messageQueue);
       consumeMessageContext.setMsgList(msgs);
       consumeMessageContext.setSuccess(false);
@@ -196,12 +198,16 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
           pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
           continue;
         }
+        uint64 startTimeStamp = UtilAll::currentTimeMillis();
         try {
           status = m_pMessageListener->consumeMessage(msgInner);
         } catch (...) {
           status = RECONSUME_LATER;
           LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
         }
+        uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
+        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
+                                                                                groupName,
consumerRT);
         consumeMessageContext.setMsgIndex(i);  // indicate message position,not support batch
consumer
         if (status == CONSUME_SUCCESS) {
           consumeMessageContext.setStatus("CONSUME_SUCCESS");
@@ -214,12 +220,16 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
         pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
       }
     } else {
+      uint64 startTimeStamp = UtilAll::currentTimeMillis();
       try {
         status = m_pMessageListener->consumeMessage(msgs);
       } catch (...) {
         status = RECONSUME_LATER;
         LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
       }
+      uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
+      StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
+                                                                              groupName,
consumerRT, msgs.size());
     }
   }
 
@@ -245,6 +255,19 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
       break;
     }
     case CLUSTERING: {
+      // status consumer tps
+      int okCount = 0;
+      int failedCount = 0;
+      if (ackIndex == -1) {
+        failedCount = msgs.size();
+      } else {
+        okCount = msgs.size();
+      }
+      StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(request->m_messageQueue.getTopic(),
+                                                                                 groupName,
okCount);
+      StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(request->m_messageQueue.getTopic(),
+                                                                                     groupName,
failedCount);
+
       // send back msg to broker;
       for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
         LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume
times is:%d",
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp b/src/consumer/ConsumeMessageOrderlyService.cpp
index 849fa8f..9ee8b82 100644
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -23,6 +23,7 @@
 #include "DefaultMQPushConsumer.h"
 #include "Logging.h"
 #include "Rebalance.h"
+#include "StatsServerManager.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
@@ -187,6 +188,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
           }
           ConsumeMessageContext consumeMessageContext;
           DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
+          std::string groupName = pConsumer->getGroupName();
           if (pConsumer) {
             if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook())
{
               consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
@@ -198,8 +200,16 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
               pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
             }
           }
+          uint64 startTimeStamp = UtilAll::currentTimeMillis();
           ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs);
+
+          uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
+          StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
+                                                                                  groupName,
consumerRT);
           if (consumeStatus == RECONSUME_LATER) {
+            StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(
+                request->m_messageQueue.getTopic(), groupName, 1);
+
             if (pConsumer) {
               consumeMessageContext.setMsgIndex(0);
               consumeMessageContext.setStatus("RECONSUME_LATER");
@@ -221,6 +231,8 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
               tryLockLaterAndReconsumeDelay(request, false, 5000);
             }
           } else {
+            StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(
+                request->m_messageQueue.getTopic(), groupName, 1);
             if (pConsumer) {
               consumeMessageContext.setMsgIndex(0);
               consumeMessageContext.setStatus("CONSUME_SUCCESS");
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 39b3ccd..b114456 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -29,6 +29,7 @@
 #include "PullAPIWrapper.h"
 #include "PullSysFlag.h"
 #include "Rebalance.h"
+#include "StatsServerManager.h"
 #include "UtilAll.h"
 #include "Validators.h"
 #include "task_queue.h"
@@ -315,6 +316,8 @@ void DefaultMQPushConsumerImpl::start() {
   switch (m_serviceState) {
     case CREATE_JUST: {
       m_serviceState = START_FAILED;
+      // Start status server
+      StatsServerManager::getInstance()->getConsumeStatServer()->start();
       DefaultMQClient::start();
       dealWithMessageTrace();
       LOG_INFO("DefaultMQPushConsumerImpl:%s start", m_GroupName.c_str());
@@ -401,6 +404,9 @@ void DefaultMQPushConsumerImpl::shutdown() {
   switch (m_serviceState) {
     case RUNNING: {
       LOG_INFO("DefaultMQPushConsumerImpl shutdown");
+
+      // Shutdown status server
+      StatsServerManager::getInstance()->getConsumeStatServer()->shutdown();
       shutdownMessageTraceInnerProducer();
       m_async_ioService.stop();
       m_async_service_thread->interrupt();
@@ -988,6 +994,14 @@ ConsumerRunningInfo* DefaultMQPushConsumerImpl::getConsumerRunningInfo()
{
   getSubscriptions(result);
   info->setSubscriptionSet(result);
 
+  for (const auto& it : result) {
+    ConsumeStats consumeStat;
+    // we should get it from status service.
+    consumeStat =
+        StatsServerManager::getInstance()->getConsumeStatServer()->getConsumeStats(it.getTopic(),
this->getGroupName());
+    info->setStatusTable(it.getTopic(), consumeStat);
+  }
+
   std::map<MQMessageQueue, boost::shared_ptr<PullRequest>> requestTable = m_pRebalance->getPullRequestTable();
 
   for (const auto& it : requestTable) {
diff --git a/src/protocol/ConsumerRunningInfo.cpp b/src/protocol/ConsumerRunningInfo.cpp
index 951cc4b..08e5b2c 100644
--- a/src/protocol/ConsumerRunningInfo.cpp
+++ b/src/protocol/ConsumerRunningInfo.cpp
@@ -46,17 +46,13 @@ void ConsumerRunningInfo::setMqTable(MessageQueue queue, ProcessQueueInfo
queueI
   mqTable[queue] = queueInfo;
 }
 
-/*const map<string, ConsumeStatus> ConsumerRunningInfo::getStatusTable() const
-{
-return statusTable;
+const map<string, ConsumeStats> ConsumerRunningInfo::getStatusTable() const {
+  return statusTable;
 }
 
-
-void ConsumerRunningInfo::setStatusTable(const map<string, ConsumeStatus>&
-input_statusTable)
-{
-statusTable = input_statusTable;
-}    */
+void ConsumerRunningInfo::setStatusTable(string topic, ConsumeStats consumeStats) {
+  statusTable[topic] = consumeStats;
+}
 
 const vector<SubscriptionData> ConsumerRunningInfo::getSubscriptionSet() const {
   return subscriptionSet;
@@ -95,11 +91,18 @@ string ConsumerRunningInfo::encode() {
       root["subscriptionSet"].append(it->toJson());
     }
   }
+  {
+    Json::Value stats;
+    for (map<string, ConsumeStats>::iterator it = statusTable.begin(); it != statusTable.end();
++it) {
+      stats[it->first] = it->second.toJson();
+    }
+    if (!stats.isNull()) {
+      root["statusTable"] = stats;
+    }
+  }
 
   Json::FastWriter fastwrite;
   string finals = fastwrite.write(root);
-
-  Json::Value mq;
   string key = "\"mqTable\":";
   key.append("{");
   for (map<MessageQueue, ProcessQueueInfo>::iterator it = mqTable.begin(); it != mqTable.end();
++it) {
diff --git a/src/protocol/ConsumerRunningInfo.h b/src/protocol/ConsumerRunningInfo.h
index de0d4b2..f20ca2f 100644
--- a/src/protocol/ConsumerRunningInfo.h
+++ b/src/protocol/ConsumerRunningInfo.h
@@ -17,6 +17,7 @@
 #ifndef __CONSUMERRUNNINGINFO_H__
 #define __CONSUMERRUNNINGINFO_H__
 
+#include "ConsumeStats.h"
 #include "MessageQueue.h"
 #include "ProcessQueueInfo.h"
 #include "SubscriptionData.h"
@@ -47,8 +48,8 @@ class ConsumerRunningInfo {
   void setProperty(const string& key, const string& value);
   const map<MessageQueue, ProcessQueueInfo> getMqTable() const;
   void setMqTable(MessageQueue queue, ProcessQueueInfo queueInfo);
-  // const map<string, ConsumeStatus> getStatusTable() const;
-  // void setStatusTable(const map<string, ConsumeStatus>& input_statusTable) ;
+  const map<string, ConsumeStats> getStatusTable() const;
+  void setStatusTable(string topic, ConsumeStats consumeStats);
   const vector<SubscriptionData> getSubscriptionSet() const;
   void setSubscriptionSet(const vector<SubscriptionData>& input_subscriptionSet);
   const string getJstack() const;
@@ -59,7 +60,7 @@ class ConsumerRunningInfo {
   map<string, string> properties;
   vector<SubscriptionData> subscriptionSet;
   map<MessageQueue, ProcessQueueInfo> mqTable;
-  // map<string, ConsumeStatus> statusTable;
+  map<string, ConsumeStats> statusTable;
   string jstack;
 };
 }  // namespace rocketmq
diff --git a/src/status/ConsumeStats.cpp b/src/status/ConsumeStats.cpp
new file mode 100644
index 0000000..3272347
--- /dev/null
+++ b/src/status/ConsumeStats.cpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeStats.h"
+
+namespace rocketmq {
+ConsumeStats::ConsumeStats() {
+  pullRT = 0.0;
+  pullTPS = 0.0;
+  consumeRT = 0.0;
+  consumeOKTPS = 0.0;
+  consumeFailedTPS = 0.0;
+  consumeFailedMsgs = 0;
+}
+Json::Value ConsumeStats::toJson() const {
+  Json::Value outJson;
+  outJson["pullRT"] = pullRT;
+  outJson["pullTPS"] = pullTPS;
+  outJson["consumeRT"] = consumeRT;
+  outJson["consumeOKTPS"] = consumeOKTPS;
+  outJson["consumeFailedTPS"] = consumeFailedTPS;
+  outJson["consumeFailedMsgs"] = consumeFailedMsgs;
+  return outJson;
+}
+}  // namespace rocketmq
diff --git a/src/status/ConsumeStats.h b/src/status/ConsumeStats.h
new file mode 100644
index 0000000..e6f76a5
--- /dev/null
+++ b/src/status/ConsumeStats.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CONSUMER_STATUS_H__
+#define __CONSUMER_STATUS_H__
+
+#include "UtilAll.h"
+#include "json/json.h"
+
+namespace rocketmq {
+class ConsumeStats {
+ public:
+  ConsumeStats();
+  virtual ~ConsumeStats() {}
+  Json::Value toJson() const;
+
+ public:
+  double pullRT;
+  double pullTPS;
+  double consumeRT;
+  double consumeOKTPS;
+  double consumeFailedTPS;
+  uint64 consumeFailedMsgs;
+};
+
+}  // namespace rocketmq
+
+#endif
diff --git a/src/status/StatsItem.h b/src/status/StatsItem.h
new file mode 100644
index 0000000..5fd3f3d
--- /dev/null
+++ b/src/status/StatsItem.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CONSUMER_STATUS_ITEM_H__
+#define __CONSUMER_STATUS_ITEM_H__
+
+#include "RocketMQClient.h"
+
+namespace rocketmq {
+class StatsItem {
+ public:
+  StatsItem() {
+    pullRT = 0;
+    pullRTCount = 0;
+    pullCount = 0;
+    consumeRT = 0;
+    consumeRTCount = 0;
+    consumeOKCount = 0;
+    consumeFailedCount = 0;
+    consumeFailedMsgs = 0;
+  };
+  virtual ~StatsItem() {}
+
+ public:
+  uint64 pullRT;
+  uint64 pullRTCount;
+  uint64 pullCount;
+  uint64 consumeRT;
+  uint64 consumeRTCount;
+  uint64 consumeOKCount;
+  uint64 consumeFailedCount;
+  uint64 consumeFailedMsgs;
+};
+
+}  // namespace rocketmq
+
+#endif
diff --git a/src/status/StatsServer.cpp b/src/status/StatsServer.cpp
new file mode 100644
index 0000000..fd1c0ec
--- /dev/null
+++ b/src/status/StatsServer.cpp
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StatsServer.h"
+#include <memory>
+#include <string>
+#include "Logging.h"
+#include "RocketMQClient.h"
+#include "StatsItem.h"
+
+namespace rocketmq {
+const std::string StatsServer::TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
+const std::string StatsServer::TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
+const std::string StatsServer::TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
+const std::string StatsServer::TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
+const std::string StatsServer::TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
+const int StatsServer::SAMPLING_PERIOD = 10;
+StatsServer::StatsServer() {
+  m_status = CREATE_JUST;
+  serverName = "default";
+}
+StatsServer::StatsServer(std::string sName) {
+  m_status = CREATE_JUST;
+  serverName = sName;
+}
+StatsServer::~StatsServer() {}
+void StatsServer::start() {
+  switch (m_status) {
+    case CREATE_JUST:
+      m_status = START_FAILED;
+      startScheduledTask();
+      LOG_INFO("Default Status Service Start.");
+      m_status = RUNNING;
+      break;
+    case RUNNING:
+    case SHUTDOWN_ALREADY:
+    case START_FAILED:
+      break;
+    default:
+      break;
+  }
+}
+void StatsServer::shutdown() {
+  switch (m_status) {
+    case CREATE_JUST:
+    case RUNNING:
+      LOG_INFO("Default Status Service ShutDown.");
+      stopScheduledTask();
+      m_status = SHUTDOWN_ALREADY;
+      break;
+    case SHUTDOWN_ALREADY:
+    case START_FAILED:
+      break;
+    default:
+      break;
+  }
+}
+ConsumeStats StatsServer::getConsumeStats(std::string topic, std::string groupName) {
+  ConsumeStats consumeStats;
+  LOG_DEBUG("getConsumeStats Topic:%s, Group:%s", topic.c_str(), groupName.c_str());
+  if (m_status == RUNNING) {
+    std::string key = topic + "@" + groupName;
+    LOG_DEBUG("getConsumeStats Key:%s", key.c_str());
+    return getConsumeStats(key);
+  }
+  return consumeStats;
+}
+void StatsServer::incPullRT(std::string topic, std::string groupName, uint64 rt) {
+  std::string key = topic + "@" + groupName;
+  std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
+  if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
+    StatsItem item;
+    m_consumeStatsItems[key] = item;
+  }
+  m_consumeStatsItems[key].pullRT += rt;
+  m_consumeStatsItems[key].pullRTCount += 1;
+}
+void StatsServer::incPullTPS(std::string topic, std::string groupName, uint64 msgCount) {
+  std::string key = topic + "@" + groupName;
+  std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
+  if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
+    StatsItem item;
+    m_consumeStatsItems[key] = item;
+  }
+  m_consumeStatsItems[key].pullCount += msgCount;
+}
+void StatsServer::incConsumeRT(std::string topic, std::string groupName, uint64 rt, uint64
msgCount) {
+  std::string key = topic + "@" + groupName;
+  LOG_DEBUG("incConsumeRT before Key:%s, RT:%lld, Count: %lld", key.c_str(), rt, msgCount);
+
+  std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
+  if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
+    StatsItem item;
+    m_consumeStatsItems[key] = item;
+  }
+  m_consumeStatsItems[key].consumeRT += rt;
+  m_consumeStatsItems[key].consumeRTCount += msgCount;
+  LOG_DEBUG("incConsumeRT After Key:%s, RT:%lld, Count: %lld", key.c_str(), m_consumeStatsItems[key].consumeRT,
+            m_consumeStatsItems[key].consumeRTCount);
+}
+void StatsServer::incConsumeOKTPS(std::string topic, std::string groupName, uint64 msgCount)
{
+  std::string key = topic + "@" + groupName;
+  LOG_DEBUG("incConsumeOKTPS Before Key:%s, Count: %lld", key.c_str(), msgCount);
+  std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
+  if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
+    StatsItem item;
+    m_consumeStatsItems[key] = item;
+  }
+  m_consumeStatsItems[key].consumeOKCount += msgCount;
+  LOG_DEBUG("incConsumeOKTPS After Key:%s, Count: %lld", key.c_str(), m_consumeStatsItems[key].consumeOKCount);
+}
+void StatsServer::incConsumeFailedTPS(std::string topic, std::string groupName, uint64 msgCount)
{
+  std::string key = topic + "@" + groupName;
+  LOG_DEBUG("incConsumeFailedTPS Key:%s, Count: %lld", key.c_str(), msgCount);
+  std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
+  if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
+    StatsItem item;
+    m_consumeStatsItems[key] = item;
+  }
+  m_consumeStatsItems[key].consumeFailedCount += msgCount;
+}
+void StatsServer::startScheduledTask() {
+  m_consumer_status_service_thread.reset(new boost::thread(boost::bind(&StatsServer::doStartScheduledTask,
this)));
+}
+void StatsServer::stopScheduledTask() {
+  if (m_consumer_status_service_thread) {
+    m_consumerStatus_ioService.stop();
+    m_consumer_status_service_thread->interrupt();
+    m_consumer_status_service_thread->join();
+    m_consumer_status_service_thread.reset();
+  }
+}
+void StatsServer::doStartScheduledTask() {
+  boost::asio::io_service::work work(m_consumerStatus_ioService);
+  boost::system::error_code ec1;
+  std::shared_ptr<boost::asio::deadline_timer> t1 = std::make_shared<boost::asio::deadline_timer>(
+      m_consumerStatus_ioService, boost::posix_time::seconds(SAMPLING_PERIOD));
+  t1->async_wait(boost::bind(&StatsServer::scheduledTaskInSeconds, this, ec1, t1));
+
+  boost::system::error_code errorCode;
+  m_consumerStatus_ioService.run(errorCode);
+}
+void StatsServer::scheduledTaskInSeconds(boost::system::error_code& ec,
+                                         std::shared_ptr<boost::asio::deadline_timer>
t) {
+  samplingInSeconds();
+
+  boost::system::error_code e;
+  t->expires_from_now(t->expires_from_now() + boost::posix_time::seconds(SAMPLING_PERIOD),
e);
+  t->async_wait(boost::bind(&StatsServer::scheduledTaskInSeconds, this, ec, t));
+}
+void StatsServer::samplingInSeconds() {
+  LOG_DEBUG("samplingInSeconds==");
+
+  // do samplings
+  std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
+  for (std::map<std::string, StatsItem>::iterator it = m_consumeStatsItems.begin();
it != m_consumeStatsItems.end();
+       ++it) {
+    ConsumeStats consumeStats;
+    if (it->second.pullRTCount != 0) {
+      consumeStats.pullRT = (1.0 * (it->second.pullRT)) / (it->second.pullRTCount);
+    }
+    it->second.pullRT = 0;
+    it->second.pullRTCount = 0;
+    consumeStats.pullTPS = (1.0 * (it->second.pullCount)) / SAMPLING_PERIOD;
+    it->second.pullCount = 0;
+    if (it->second.consumeRTCount != 0) {
+      consumeStats.consumeRT = (1.0 * (it->second.consumeRT)) / (it->second.consumeRTCount);
+      LOG_DEBUG("samplingInSeconds Key[%s], consumeRT:%.2f,Total RT:%lld, Count: %lld", it->first.c_str(),
+                consumeStats.consumeRT, it->second.consumeRT, it->second.consumeRTCount);
+    }
+    it->second.consumeRT = 0;
+    it->second.consumeRTCount = 0;
+    consumeStats.consumeOKTPS = (1.0 * (it->second.consumeOKCount)) / SAMPLING_PERIOD;
+    LOG_DEBUG("samplingInSeconds Key[%s], consumeOKTPS:%.2f, Count: %lld", it->first.c_str(),
consumeStats.consumeOKTPS,
+              it->second.consumeOKCount);
+    it->second.consumeOKCount = 0;
+    consumeStats.consumeFailedTPS = (1.0 * (it->second.consumeFailedCount)) / SAMPLING_PERIOD;
+    it->second.consumeFailedCount = 0;
+    LOG_DEBUG("samplingInSeconds Key[%s], consumeFailedTPS:%.2f, Count: %lld", it->first.c_str(),
+              consumeStats.consumeFailedTPS, it->second.consumeFailedCount);
+    consumeStats.consumeFailedMsgs = it->second.consumeFailedMsgs;
+    it->second.consumeFailedMsgs = 0;
+    updateConsumeStats(it->first, consumeStats);
+  }
+}
+
+void StatsServer::updateConsumeStats(std::string topic, std::string groupName, ConsumeStats
consumeStats) {
+  if (m_status == RUNNING) {
+    std::string key = topic + "@" + groupName;
+    updateConsumeStats(key, consumeStats);
+  }
+}
+void StatsServer::updateConsumeStats(std::string key, ConsumeStats consumeStats) {
+  LOG_DEBUG("updateConsumeStats Key:%s, Count: %lld", key.c_str(), consumeStats.consumeOKTPS);
+
+  std::lock_guard<std::mutex> lock(m_consumeStatusMutex);
+  m_consumeStatus[key] = consumeStats;
+}
+ConsumeStats StatsServer::getConsumeStats(std::string key) {
+  ConsumeStats consumeStats;
+  std::lock_guard<std::mutex> lock(m_consumeStatusMutex);
+  if (m_consumeStatus.find(key) != m_consumeStatus.end()) {
+    return m_consumeStatus[key];
+  }
+  return consumeStats;
+}
+}  // namespace rocketmq
diff --git a/src/status/StatsServer.h b/src/status/StatsServer.h
new file mode 100644
index 0000000..11f2d67
--- /dev/null
+++ b/src/status/StatsServer.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CONSUMER_STATUS_SERVICE_H__
+#define __CONSUMER_STATUS_SERVICE_H__
+
+#include <boost/asio.hpp>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/system/error_code.hpp>
+#include <boost/thread/thread.hpp>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include "ConsumeStats.h"
+#include "RocketMQClient.h"
+#include "ServiceState.h"
+#include "StatsItem.h"
+
+namespace rocketmq {
+class StatsServer {
+ public:
+  static const std::string TOPIC_AND_GROUP_CONSUME_OK_TPS;
+  static const std::string TOPIC_AND_GROUP_CONSUME_FAILED_TPS;
+  static const std::string TOPIC_AND_GROUP_CONSUME_RT;
+  static const std::string TOPIC_AND_GROUP_PULL_TPS;
+  static const std::string TOPIC_AND_GROUP_PULL_RT;
+  static const int SAMPLING_PERIOD;
+  StatsServer();
+  StatsServer(std::string serverName);
+  virtual ~StatsServer();
+  void start();
+  void shutdown();
+  ConsumeStats getConsumeStats(std::string topic, std::string groupName);
+  void incPullRT(std::string topic, std::string groupName, uint64 rt);
+  void incPullTPS(std::string topic, std::string groupName, uint64 msgCount);
+  void incConsumeRT(std::string topic, std::string groupName, uint64 rt, uint64 msgCount
= 1);
+  void incConsumeOKTPS(std::string topic, std::string groupName, uint64 msgCount);
+  void incConsumeFailedTPS(std::string topic, std::string groupName, uint64 msgCount);
+
+ private:
+  void startScheduledTask();
+  void stopScheduledTask();
+  void doStartScheduledTask();
+  void scheduledTaskInSeconds(boost::system::error_code& ec, std::shared_ptr<boost::asio::deadline_timer>
t);
+  void samplingInSeconds();
+  void updateConsumeStats(std::string topic, std::string groupName, ConsumeStats consumeStats);
+  void updateConsumeStats(std::string key, ConsumeStats consumeStats);
+  ConsumeStats getConsumeStats(std::string key);
+
+ public:
+  std::string serverName;
+
+ private:
+  ServiceState m_status;
+  std::mutex m_consumeStatusMutex;
+  std::map<std::string, ConsumeStats> m_consumeStatus;
+  boost::asio::io_service m_consumerStatus_ioService;
+  // boost::asio::io_service::work m_consumerStatus_work;
+  std::unique_ptr<boost::thread> m_consumer_status_service_thread;
+  std::mutex m_consumeStatsItemMutex;
+  std::map<std::string, StatsItem> m_consumeStatsItems;
+};
+
+}  // namespace rocketmq
+
+#endif
diff --git a/src/status/StatsServerManager.cpp b/src/status/StatsServerManager.cpp
new file mode 100644
index 0000000..f3a6d1c
--- /dev/null
+++ b/src/status/StatsServerManager.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StatsServerManager.h"
+#include "RocketMQClient.h"
+#include "StatsServer.h"
+#include "string"
+
+namespace rocketmq {
+StatsServerManager::StatsServerManager() {
+  serverName = "default";
+}
+
+StatsServerManager::~StatsServerManager() {
+  m_consumeStatusServers.clear();
+}
+std::shared_ptr<StatsServer> StatsServerManager::getConsumeStatServer() {
+  return getConsumeStatServer(serverName);
+}
+std::shared_ptr<StatsServer> StatsServerManager::getConsumeStatServer(std::string serverName)
{
+  std::map<std::string, std::shared_ptr<StatsServer>>::iterator it = m_consumeStatusServers.find(serverName);
+  if (it != m_consumeStatusServers.end()) {
+    return it->second;
+  } else {
+    std::shared_ptr<StatsServer> server = std::make_shared<StatsServer>();
+    m_consumeStatusServers[serverName] = server;
+    return server;
+  }
+}
+void StatsServerManager::removeConsumeStatServer(std::string serverName) {
+  std::map<std::string, std::shared_ptr<StatsServer>>::iterator it = m_consumeStatusServers.find(serverName);
+  if (it != m_consumeStatusServers.end()) {
+    m_consumeStatusServers.erase(it);
+  }
+}
+
+StatsServerManager* StatsServerManager::getInstance() {
+  static StatsServerManager instance;
+  return &instance;
+}
+}  // namespace rocketmq
diff --git a/src/status/StatsServerManager.h b/src/status/StatsServerManager.h
new file mode 100644
index 0000000..923ea61
--- /dev/null
+++ b/src/status/StatsServerManager.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CONSUMER_STATUS_SERVICE_MANAGER_H__
+#define __CONSUMER_STATUS_SERVICE_MANAGER_H__
+
+#include <map>
+#include <memory>
+#include <string>
+#include "ConsumeStats.h"
+#include "RocketMQClient.h"
+#include "ServiceState.h"
+#include "StatsServer.h"
+
+namespace rocketmq {
+class StatsServerManager {
+ public:
+  virtual ~StatsServerManager();
+  // void start();
+  // void shutdown();
+  std::shared_ptr<StatsServer> getConsumeStatServer();
+  std::shared_ptr<StatsServer> getConsumeStatServer(std::string serverName);
+  void removeConsumeStatServer(std::string serverName);
+
+  static StatsServerManager* getInstance();
+
+ private:
+  StatsServerManager();
+
+ public:
+  std::string serverName;
+
+ private:
+  std::map<std::string, std::shared_ptr<StatsServer>> m_consumeStatusServers;
+};
+
+}  // namespace rocketmq
+
+#endif
diff --git a/test/src/protocol/ConsumerRunningInfoTest.cpp b/test/src/protocol/ConsumerRunningInfoTest.cpp
index 8642e49..44604ec 100644
--- a/test/src/protocol/ConsumerRunningInfoTest.cpp
+++ b/test/src/protocol/ConsumerRunningInfoTest.cpp
@@ -24,6 +24,7 @@
 #include "json/reader.h"
 #include "json/value.h"
 
+#include "ConsumeStats.h"
 #include "ConsumerRunningInfo.h"
 #include "MessageQueue.h"
 #include "ProcessQueueInfo.h"
@@ -40,6 +41,7 @@ using Json::Reader;
 using Json::Value;
 
 using rocketmq::ConsumerRunningInfo;
+using rocketmq::ConsumeStats;
 using rocketmq::MessageQueue;
 using rocketmq::ProcessQueueInfo;
 using rocketmq::SubscriptionData;
@@ -91,6 +93,19 @@ TEST(ConsumerRunningInfo, init) {
   EXPECT_EQ(mqTable.size(), 2);
   EXPECT_EQ(mqTable[messageQueue1].commitOffset, 1024);
   EXPECT_EQ(mqTable[messageQueue2].cachedMsgCount, 1023);
+  // consumeStats
+  EXPECT_TRUE(info.getStatusTable().empty());
+
+  ConsumeStats consumeStats;
+  consumeStats.pullTPS = 22.5;
+  ConsumeStats consumeStats2;
+  consumeStats2.consumeOKTPS = 3.168;
+  info.setStatusTable("TopicA", consumeStats);
+  info.setStatusTable("TopicB", consumeStats2);
+  map<string, ConsumeStats> statsTable = info.getStatusTable();
+  EXPECT_EQ(statsTable.size(), 2);
+  EXPECT_EQ(statsTable["TopicA"].pullTPS, 22.5);
+  EXPECT_EQ(statsTable["TopicB"].consumeOKTPS, 3.168);
 
   // encode start
   info.setProperty(ConsumerRunningInfo::PROP_NAMESERVER_ADDR, "127.0.0.1:9876");


Mime
View raw message