This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 255de3b fix(consumer): fix the issue that msg lost when send back to broker failed.
(#209)
255de3b is described below
commit 255de3b310e0b1346f3c63b6d4e4aa6b2ceb00a2
Author: dinglei <libya_003@163.com>
AuthorDate: Thu Jan 2 16:09:56 2020 +0800
fix(consumer): fix the issue that msg lost when send back to broker failed. (#209)
* fix(consumer): fix the issue that msg lost when send back to broker failed.
* fix(consumer): fix the issue that msg lost when send back to broker failed.
* fix(consumer): formate code for ci.
---
include/DefaultMQPullConsumer.h | 2 +-
include/DefaultMQPushConsumer.h | 2 +-
include/MQConsumer.h | 2 +-
src/consumer/ConsumeMessageConcurrentlyService.cpp | 118 ++++++++++++++++++---
src/consumer/ConsumeMessageOrderlyService.cpp | 44 +++++---
src/consumer/ConsumeMsgService.h | 12 ++-
src/consumer/DefaultMQPullConsumer.cpp | 4 +-
src/consumer/DefaultMQPushConsumer.cpp | 4 +-
8 files changed, 152 insertions(+), 36 deletions(-)
diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 939ce0e..33765cd 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
//<!end mqadmin;
//<!begin MQConsumer
- virtual void sendMessageBack(MQMessageExt& msg, int delayLevel);
+ virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs);
virtual void doRebalance();
virtual void persistConsumerOffset();
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 894c6b5..b29ca72 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -55,7 +55,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
//<!end mqadmin;
//<!begin MQConsumer
- virtual void sendMessageBack(MQMessageExt& msg, int delayLevel);
+ virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs);
virtual void doRebalance();
virtual void persistConsumerOffset();
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index 48d78f2..54b535a 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -32,7 +32,7 @@ class ConsumerRunningInfo;
class ROCKETMQCLIENT_API MQConsumer : public MQClient {
public:
virtual ~MQConsumer() {}
- virtual void sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;
+ virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;
virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs) = 0;
virtual void doRebalance() = 0;
virtual void persistConsumerOffset() = 0;
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 3b0204a..93cdcc3 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -73,7 +73,61 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul
request->m_messageQueue.toString().c_str());
return;
}
- m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest, this,
request, msgs));
+ if (!request->isDropped()) {
+ m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest,
this, request, msgs));
+ }
+}
+void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest>
pullRequest,
+ vector<MQMessageExt>&
msgs,
+ int millis) {
+ if (msgs.empty()) {
+ return;
+ }
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
+ if (request->isDropped()) {
+ LOG_INFO("Pull request is set as dropped with mq:%s, need release in next rebalance.",
+ (request->m_messageQueue).toString().c_str());
+ return;
+ }
+ if (!request->isDropped()) {
+ boost::asio::deadline_timer* t =
+ new boost::asio::deadline_timer(m_ioService, boost::posix_time::milliseconds(millis));
+ t->async_wait(
+ boost::bind(&(ConsumeMessageConcurrentlyService::static_submitConsumeRequest),
this, t, request, msgs));
+ LOG_INFO("Submit Message to Consumer [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
+ millis);
+ }
+}
+
+void ConsumeMessageConcurrentlyService::static_submitConsumeRequest(void* context,
+ boost::asio::deadline_timer*
t,
+ boost::weak_ptr<PullRequest>
pullRequest,
+ vector<MQMessageExt>&
msgs) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
+ ConsumeMessageConcurrentlyService* pService = (ConsumeMessageConcurrentlyService*)context;
+ if (pService) {
+ pService->triggersubmitConsumeRequestLater(t, request, msgs);
+ }
+}
+
+void ConsumeMessageConcurrentlyService::triggersubmitConsumeRequestLater(boost::asio::deadline_timer*
t,
+ boost::weak_ptr<PullRequest>
pullRequest,
+ vector<MQMessageExt>&
msgs) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
+ submitConsumeRequest(request, msgs);
+ deleteAndZero(t);
}
void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
pullRequest,
@@ -83,13 +137,12 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
LOG_WARN("Pull request has been released");
return;
}
- if (!request || request->isDropped()) {
- LOG_WARN("the pull request had been dropped");
+ if (request->isDropped()) {
+ LOG_WARN("the pull request for %s Had been dropped before", request->m_messageQueue.toString().c_str());
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
return;
}
-
if (msgs.empty()) {
LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str());
return;
@@ -99,16 +152,19 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
if (m_pMessageListener != NULL) {
resetRetryTopic(msgs);
request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
- LOG_DEBUG("=====Receive Messages:[%s][%s][%s]", msgs[0].getTopic().c_str(), msgs[0].getMsgId().c_str(),
- msgs[0].getBody().c_str());
+ LOG_DEBUG("=====Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]", msgs[0].getTopic().c_str(),
+ msgs[0].getMsgId().c_str(), msgs[0].getBody().c_str(), msgs[0].getReconsumeTimes());
if (m_pConsumer->isUseNameSpaceMode()) {
MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
}
- status = m_pMessageListener->consumeMessage(msgs);
+ try {
+ status = m_pMessageListener->consumeMessage(msgs);
+ } catch (...) {
+ status = RECONSUME_LATER;
+ LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
+ }
}
- /*LOG_DEBUG("Consumed MSG size:%d of mq:%s",
- msgs.size(), (request->m_messageQueue).toString().c_str());*/
int ackIndex = -1;
switch (status) {
case CONSUME_SUCCESS:
@@ -121,28 +177,52 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
break;
}
+ std::vector<MQMessageExt> localRetryMsgs;
switch (m_pConsumer->getMessageModel()) {
- case BROADCASTING:
+ case BROADCASTING: {
// Note: broadcasting reconsume should do by application, as it has big
// affect to broker cluster
if (ackIndex != (int)msgs.size())
LOG_WARN("BROADCASTING, the message consume failed, drop it:%s", (request->m_messageQueue).toString().c_str());
break;
- case CLUSTERING:
+ }
+ case CLUSTERING: {
// send back msg to broker;
for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
- LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT
- ", reconsume "
- "times is:%d",
+ LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume
times is:%d",
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
msgs[i].getReconsumeTimes());
- m_pConsumer->sendMessageBack(msgs[i], 0);
+ if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY && !m_pConsumer->sendMessageBack(msgs[i],
0)) {
+ LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume
times is:%d",
+ (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
+ msgs[i].getReconsumeTimes());
+ localRetryMsgs.push_back(msgs[i]);
+ }
}
break;
+ }
default:
break;
}
+ if (!localRetryMsgs.empty()) {
+ LOG_ERROR("Client side re-consume launched due to both message consuming and SDK send-back
retry failure");
+ for (std::vector<MQMessageExt>::iterator itOrigin = msgs.begin(); itOrigin != msgs.end();)
{
+ bool remove = false;
+ for (std::vector<MQMessageExt>::iterator itRetry = localRetryMsgs.begin(); itRetry
!= localRetryMsgs.end();
+ itRetry++) {
+ if (itRetry->getQueueOffset() == itOrigin->getQueueOffset()) {
+ remove = true;
+ break;
+ }
+ }
+ if (remove) {
+ itOrigin = msgs.erase(itOrigin);
+ } else {
+ itOrigin++;
+ }
+ }
+ }
// update offset
int64 offset = request->removeMessage(msgs);
if (offset >= 0) {
@@ -151,7 +231,13 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..",
(request->m_messageQueue).toString().c_str());
}
-}
+ if (!localRetryMsgs.empty()) {
+ // submitConsumeRequest(request, localTryMsgs);
+ LOG_INFO("Send [%d ]messages back to mq:%s failed, call reconsume again after 1s.", localRetryMsgs.size(),
+ (request->m_messageQueue).toString().c_str());
+ submitConsumeRequestLater(request, localRetryMsgs, 1000);
+ }
+} // namespace rocketmq
void ConsumeMessageConcurrentlyService::resetRetryTopic(vector<MQMessageExt>& msgs)
{
string groupTopic = UtilAll::getRetryTopic(m_pConsumer->getGroupName());
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp b/src/consumer/ConsumeMessageOrderlyService.cpp
index 4d06e5f..fcff4a4 100644
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -167,20 +167,19 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
if (m_pMessageListener) {
if ((request->isLocked() && !request->isLockExpired()) || m_pConsumer->getMessageModel()
== BROADCASTING) {
- DefaultMQPushConsumer* pConsumer = (DefaultMQPushConsumer*)m_pConsumer;
+ // DefaultMQPushConsumer* pConsumer = (DefaultMQPushConsumer*)m_pConsumer;
uint64_t beginTime = UtilAll::currentTimeMillis();
bool continueConsume = true;
while (continueConsume) {
if ((UtilAll::currentTimeMillis() - beginTime) > m_MaxTimeConsumeContinuously)
{
- LOG_INFO(
- "continuely consume message queue:%s more than 60s, consume it "
- "later",
- request->m_messageQueue.toString().c_str());
- tryLockLaterAndReconsume(request, false);
+ LOG_INFO("Continuely consume %s more than 60s, consume it 1s later",
+ request->m_messageQueue.toString().c_str());
+ tryLockLaterAndReconsumeDelay(request, false, 1000);
break;
}
vector<MQMessageExt> msgs;
- request->takeMessages(msgs, pConsumer->getConsumeMessageBatchMaxSize());
+ // request->takeMessages(msgs, pConsumer->getConsumeMessageBatchMaxSize());
+ request->takeMessages(msgs, 1);
if (!msgs.empty()) {
request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
if (m_pConsumer->isUseNameSpaceMode()) {
@@ -188,9 +187,20 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
}
ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs);
if (consumeStatus == RECONSUME_LATER) {
- request->makeMessageToCosumeAgain(msgs);
- continueConsume = false;
- tryLockLaterAndReconsume(request, false);
+ if (msgs[0].getReconsumeTimes() <= 15) {
+ msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
+ request->makeMessageToCosumeAgain(msgs);
+ continueConsume = false;
+ tryLockLaterAndReconsumeDelay(request, false, 1000);
+ } else {
+ // need change to reconsumer delay level and print log.
+ LOG_INFO("Local Consume failed [%d] times, change [%s] delay to 5s.", msgs[0].getReconsumeTimes(),
+ msgs[0].getMsgId().c_str());
+ msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
+ continueConsume = false;
+ request->makeMessageToCosumeAgain(msgs);
+ tryLockLaterAndReconsumeDelay(request, false, 5000);
+ }
} else {
m_pConsumer->updateConsumeOffset(request->m_messageQueue, request->commit());
}
@@ -206,20 +216,26 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
LOG_DEBUG("consume once exit of mq:%s", request->m_messageQueue.toString().c_str());
} else {
LOG_ERROR("message queue:%s was not locked", request->m_messageQueue.toString().c_str());
- tryLockLaterAndReconsume(request, true);
+ tryLockLaterAndReconsumeDelay(request, true, 1000);
}
}
}
-void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr<PullRequest>
pullRequest, bool tryLockMQ) {
+void ConsumeMessageOrderlyService::tryLockLaterAndReconsumeDelay(boost::weak_ptr<PullRequest>
pullRequest,
+ bool tryLockMQ,
+ int millisDelay) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
- int retryTimer = tryLockMQ ? 500 : 100;
+ int retryTimer = millisDelay;
+ if (millisDelay >= 30000 || millisDelay <= 1000) {
+ retryTimer = 1000;
+ }
boost::asio::deadline_timer* t =
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(retryTimer));
t->async_wait(
- boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater),
this, request, tryLockMQ, t));
+ boost::bind(&ConsumeMessageOrderlyService::static_submitConsumeRequestLater, this,
request, tryLockMQ, t));
}
+
} // namespace rocketmq
diff --git a/src/consumer/ConsumeMsgService.h b/src/consumer/ConsumeMsgService.h
index a5d3ce7..a6c7206 100644
--- a/src/consumer/ConsumeMsgService.h
+++ b/src/consumer/ConsumeMsgService.h
@@ -53,6 +53,15 @@ class ConsumeMessageConcurrentlyService : public ConsumeMsgService {
virtual void stopThreadPool();
void ConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>&
msgs);
+ void submitConsumeRequestLater(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>&
msgs, int millis);
+
+ void triggersubmitConsumeRequestLater(boost::asio::deadline_timer* t,
+ boost::weak_ptr<PullRequest> pullRequest,
+ vector<MQMessageExt>& msgs);
+ static void static_submitConsumeRequest(void* context,
+ boost::asio::deadline_timer* t,
+ boost::weak_ptr<PullRequest> pullRequest,
+ vector<MQMessageExt>& msgs);
private:
void resetRetryTopic(vector<MQMessageExt>& msgs);
@@ -76,7 +85,8 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService {
virtual MessageListenerType getConsumeMsgSerivceListenerType();
void boost_asio_work();
- void tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> request, bool tryLockMQ);
+ // void tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> request, bool tryLockMQ);
+ void tryLockLaterAndReconsumeDelay(boost::weak_ptr<PullRequest> request, bool tryLockMQ,
int millisDelay);
static void static_submitConsumeRequestLater(void* context,
boost::weak_ptr<PullRequest> request,
bool tryLockMQ,
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index f549883..c8aac49 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -142,7 +142,9 @@ void DefaultMQPullConsumer::shutdown() {
}
}
-void DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {}
+bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+ return true;
+}
void DefaultMQPullConsumer::fetchSubscribeMessageQueues(const string& topic, vector<MQMessageQueue>&
mqs) {
mqs.clear();
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index c6ab2b2..376c3e9 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -251,13 +251,15 @@ DefaultMQPushConsumer::~DefaultMQPushConsumer() {
m_subTopics.clear();
}
-void DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
try {
getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(msg, getGroupName(),
delayLevel, 3000,
getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR(e.what());
+ return false;
}
+ return true;
}
void DefaultMQPushConsumer::fetchSubscribeMessageQueues(const string& topic, vector<MQMessageQueue>&
mqs) {
|