rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: feat(consumer): remove event if consumer service shutdown (#233)
Date Thu, 16 Jan 2020 07:19:41 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new fd29759  feat(consumer): remove event if consumer service shutdown (#233)
fd29759 is described below

commit fd29759a0280c4e2ffc63c1ac4578700dd68018e
Author: dinglei <libya_003@163.com>
AuthorDate: Thu Jan 16 15:19:36 2020 +0800

    feat(consumer): remove event if consumer service shutdown (#233)
---
 src/consumer/ConsumeMessageConcurrentlyService.cpp | 10 ++++++++--
 src/consumer/DefaultMQPushConsumer.cpp             | 16 +++++++++++-----
 2 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 9c3a05b..deda8ac 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -73,8 +73,11 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul
              request->m_messageQueue.toString().c_str());
     return;
   }
-  if (!request->isDropped()) {
+  if (!request->isDropped() && !m_ioService.stopped()) {
     m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest,
this, request, msgs));
+  } else {
+    LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post ConsumeRequest.",
+             request->m_messageQueue.toString().c_str());
   }
 }
 void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest>
pullRequest,
@@ -93,13 +96,16 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_pt
              (request->m_messageQueue).toString().c_str());
     return;
   }
-  if (!request->isDropped()) {
+  if (!request->isDropped() && !m_ioService.stopped()) {
     boost::asio::deadline_timer* t =
         new boost::asio::deadline_timer(m_ioService, boost::posix_time::milliseconds(millis));
     t->async_wait(
         boost::bind(&(ConsumeMessageConcurrentlyService::static_submitConsumeRequest),
this, t, request, msgs));
     LOG_INFO("Submit Message to Consumer [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
              millis);
+  } else {
+    LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post delay ConsumeRequest.",
+             request->m_messageQueue.toString().c_str());
   }
 }
 
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 8e2541e..9da0ca8 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -575,11 +575,17 @@ bool DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr<PullRequest>
     LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", request->m_messageQueue.toString().c_str());
     return false;
   }
-  boost::asio::deadline_timer* t =
-      new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis));
-  t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest),
this, t, request));
-  LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
millis);
-  return true;
+  if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+    boost::asio::deadline_timer* t =
+        new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis));
+    t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest),
this, t, request));
+    LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
millis);
+    return true;
+  } else {
+    LOG_WARN("Service or TaskQueue shutdown, produce PullRequest of mq:%s failed",
+             request->m_messageQueue.toString().c_str());
+    return false;
+  }
 }
 
 bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest)
{


Mime
View raw message