This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new fd29759 feat(consumer): remove event if consumer service shutdown (#233)
fd29759 is described below
commit fd29759a0280c4e2ffc63c1ac4578700dd68018e
Author: dinglei <libya_003@163.com>
AuthorDate: Thu Jan 16 15:19:36 2020 +0800
feat(consumer): remove event if consumer service shutdown (#233)
---
src/consumer/ConsumeMessageConcurrentlyService.cpp | 10 ++++++++--
src/consumer/DefaultMQPushConsumer.cpp | 16 +++++++++++-----
2 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 9c3a05b..deda8ac 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -73,8 +73,11 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul
request->m_messageQueue.toString().c_str());
return;
}
- if (!request->isDropped()) {
+ if (!request->isDropped() && !m_ioService.stopped()) {
m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest,
this, request, msgs));
+ } else {
+ LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post ConsumeRequest.",
+ request->m_messageQueue.toString().c_str());
}
}
void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest>
pullRequest,
@@ -93,13 +96,16 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_pt
(request->m_messageQueue).toString().c_str());
return;
}
- if (!request->isDropped()) {
+ if (!request->isDropped() && !m_ioService.stopped()) {
boost::asio::deadline_timer* t =
new boost::asio::deadline_timer(m_ioService, boost::posix_time::milliseconds(millis));
t->async_wait(
boost::bind(&(ConsumeMessageConcurrentlyService::static_submitConsumeRequest),
this, t, request, msgs));
LOG_INFO("Submit Message to Consumer [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
millis);
+ } else {
+ LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post delay ConsumeRequest.",
+ request->m_messageQueue.toString().c_str());
}
}
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 8e2541e..9da0ca8 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -575,11 +575,17 @@ bool DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr<PullRequest>
LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", request->m_messageQueue.toString().c_str());
return false;
}
- boost::asio::deadline_timer* t =
- new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis));
- t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest),
this, t, request));
- LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
millis);
- return true;
+ if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+ boost::asio::deadline_timer* t =
+ new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis));
+ t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest),
this, t, request));
+ LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
millis);
+ return true;
+ } else {
+ LOG_WARN("Service or TaskQueue shutdown, produce PullRequest of mq:%s failed",
+ request->m_messageQueue.toString().c_str());
+ return false;
+ }
}
bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest)
{
|