rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: fix(consumer): fix the issue that msg lost when send back to broker failed. (#209)
Date Thu, 02 Jan 2020 08:10:07 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 255de3b  fix(consumer): fix the issue that msg lost when send back to broker failed.
(#209)
255de3b is described below

commit 255de3b310e0b1346f3c63b6d4e4aa6b2ceb00a2
Author: dinglei <libya_003@163.com>
AuthorDate: Thu Jan 2 16:09:56 2020 +0800

    fix(consumer): fix the issue that msg lost when send back to broker failed. (#209)
    
    * fix(consumer): fix the issue that msg lost when send back to broker failed.
    
    * fix(consumer): fix the issue that msg lost when send back to broker failed.
    
    * fix(consumer): formate code for ci.
---
 include/DefaultMQPullConsumer.h                    |   2 +-
 include/DefaultMQPushConsumer.h                    |   2 +-
 include/MQConsumer.h                               |   2 +-
 src/consumer/ConsumeMessageConcurrentlyService.cpp | 118 ++++++++++++++++++---
 src/consumer/ConsumeMessageOrderlyService.cpp      |  44 +++++---
 src/consumer/ConsumeMsgService.h                   |  12 ++-
 src/consumer/DefaultMQPullConsumer.cpp             |   4 +-
 src/consumer/DefaultMQPushConsumer.cpp             |   4 +-
 8 files changed, 152 insertions(+), 36 deletions(-)

diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 939ce0e..33765cd 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
   //<!end mqadmin;
 
   //<!begin MQConsumer
-  virtual void sendMessageBack(MQMessageExt& msg, int delayLevel);
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs);
   virtual void doRebalance();
   virtual void persistConsumerOffset();
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 894c6b5..b29ca72 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -55,7 +55,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
   //<!end mqadmin;
 
   //<!begin MQConsumer
-  virtual void sendMessageBack(MQMessageExt& msg, int delayLevel);
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs);
   virtual void doRebalance();
   virtual void persistConsumerOffset();
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index 48d78f2..54b535a 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -32,7 +32,7 @@ class ConsumerRunningInfo;
 class ROCKETMQCLIENT_API MQConsumer : public MQClient {
  public:
   virtual ~MQConsumer() {}
-  virtual void sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;
   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs) = 0;
   virtual void doRebalance() = 0;
   virtual void persistConsumerOffset() = 0;
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 3b0204a..93cdcc3 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -73,7 +73,61 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul
              request->m_messageQueue.toString().c_str());
     return;
   }
-  m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest, this,
request, msgs));
+  if (!request->isDropped()) {
+    m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest,
this, request, msgs));
+  }
+}
+void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest>
pullRequest,
+                                                                  vector<MQMessageExt>&
msgs,
+                                                                  int millis) {
+  if (msgs.empty()) {
+    return;
+  }
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
+  if (request->isDropped()) {
+    LOG_INFO("Pull request is set as dropped with mq:%s, need release in next rebalance.",
+             (request->m_messageQueue).toString().c_str());
+    return;
+  }
+  if (!request->isDropped()) {
+    boost::asio::deadline_timer* t =
+        new boost::asio::deadline_timer(m_ioService, boost::posix_time::milliseconds(millis));
+    t->async_wait(
+        boost::bind(&(ConsumeMessageConcurrentlyService::static_submitConsumeRequest),
this, t, request, msgs));
+    LOG_INFO("Submit Message to Consumer [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
+             millis);
+  }
+}
+
+void ConsumeMessageConcurrentlyService::static_submitConsumeRequest(void* context,
+                                                                    boost::asio::deadline_timer*
t,
+                                                                    boost::weak_ptr<PullRequest>
pullRequest,
+                                                                    vector<MQMessageExt>&
msgs) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
+  ConsumeMessageConcurrentlyService* pService = (ConsumeMessageConcurrentlyService*)context;
+  if (pService) {
+    pService->triggersubmitConsumeRequestLater(t, request, msgs);
+  }
+}
+
+void ConsumeMessageConcurrentlyService::triggersubmitConsumeRequestLater(boost::asio::deadline_timer*
t,
+                                                                         boost::weak_ptr<PullRequest>
pullRequest,
+                                                                         vector<MQMessageExt>&
msgs) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
+  submitConsumeRequest(request, msgs);
+  deleteAndZero(t);
 }
 
 void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
pullRequest,
@@ -83,13 +137,12 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
     LOG_WARN("Pull request has been released");
     return;
   }
-  if (!request || request->isDropped()) {
-    LOG_WARN("the pull request had been dropped");
+  if (request->isDropped()) {
+    LOG_WARN("the pull request for %s Had been dropped before", request->m_messageQueue.toString().c_str());
     request->clearAllMsgs();  // add clear operation to avoid bad state when
                               // dropped pullRequest returns normal
     return;
   }
-
   if (msgs.empty()) {
     LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str());
     return;
@@ -99,16 +152,19 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
   if (m_pMessageListener != NULL) {
     resetRetryTopic(msgs);
     request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
-    LOG_DEBUG("=====Receive Messages:[%s][%s][%s]", msgs[0].getTopic().c_str(), msgs[0].getMsgId().c_str(),
-              msgs[0].getBody().c_str());
+    LOG_DEBUG("=====Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]", msgs[0].getTopic().c_str(),
+              msgs[0].getMsgId().c_str(), msgs[0].getBody().c_str(), msgs[0].getReconsumeTimes());
     if (m_pConsumer->isUseNameSpaceMode()) {
       MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
     }
-    status = m_pMessageListener->consumeMessage(msgs);
+    try {
+      status = m_pMessageListener->consumeMessage(msgs);
+    } catch (...) {
+      status = RECONSUME_LATER;
+      LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
+    }
   }
 
-  /*LOG_DEBUG("Consumed MSG size:%d of mq:%s",
-      msgs.size(), (request->m_messageQueue).toString().c_str());*/
   int ackIndex = -1;
   switch (status) {
     case CONSUME_SUCCESS:
@@ -121,28 +177,52 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
       break;
   }
 
+  std::vector<MQMessageExt> localRetryMsgs;
   switch (m_pConsumer->getMessageModel()) {
-    case BROADCASTING:
+    case BROADCASTING: {
       // Note: broadcasting reconsume should do by application, as it has big
       // affect to broker cluster
       if (ackIndex != (int)msgs.size())
         LOG_WARN("BROADCASTING, the message consume failed, drop it:%s", (request->m_messageQueue).toString().c_str());
       break;
-    case CLUSTERING:
+    }
+    case CLUSTERING: {
       // send back msg to broker;
       for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
-        LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT
-                 ", reconsume "
-                 "times is:%d",
+        LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume
times is:%d",
                  (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
                  msgs[i].getReconsumeTimes());
-        m_pConsumer->sendMessageBack(msgs[i], 0);
+        if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY && !m_pConsumer->sendMessageBack(msgs[i],
0)) {
+          LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume
times is:%d",
+                   (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
+                   msgs[i].getReconsumeTimes());
+          localRetryMsgs.push_back(msgs[i]);
+        }
       }
       break;
+    }
     default:
       break;
   }
 
+  if (!localRetryMsgs.empty()) {
+    LOG_ERROR("Client side re-consume launched due to both message consuming and SDK send-back
retry failure");
+    for (std::vector<MQMessageExt>::iterator itOrigin = msgs.begin(); itOrigin != msgs.end();)
{
+      bool remove = false;
+      for (std::vector<MQMessageExt>::iterator itRetry = localRetryMsgs.begin(); itRetry
!= localRetryMsgs.end();
+           itRetry++) {
+        if (itRetry->getQueueOffset() == itOrigin->getQueueOffset()) {
+          remove = true;
+          break;
+        }
+      }
+      if (remove) {
+        itOrigin = msgs.erase(itOrigin);
+      } else {
+        itOrigin++;
+      }
+    }
+  }
   // update offset
   int64 offset = request->removeMessage(msgs);
   if (offset >= 0) {
@@ -151,7 +231,13 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
     LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..",
              (request->m_messageQueue).toString().c_str());
   }
-}
+  if (!localRetryMsgs.empty()) {
+    // submitConsumeRequest(request, localTryMsgs);
+    LOG_INFO("Send [%d ]messages back to mq:%s failed, call reconsume again after 1s.", localRetryMsgs.size(),
+             (request->m_messageQueue).toString().c_str());
+    submitConsumeRequestLater(request, localRetryMsgs, 1000);
+  }
+}  // namespace rocketmq
 
 void ConsumeMessageConcurrentlyService::resetRetryTopic(vector<MQMessageExt>& msgs)
{
   string groupTopic = UtilAll::getRetryTopic(m_pConsumer->getGroupName());
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp b/src/consumer/ConsumeMessageOrderlyService.cpp
index 4d06e5f..fcff4a4 100644
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -167,20 +167,19 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
 
   if (m_pMessageListener) {
     if ((request->isLocked() && !request->isLockExpired()) || m_pConsumer->getMessageModel()
== BROADCASTING) {
-      DefaultMQPushConsumer* pConsumer = (DefaultMQPushConsumer*)m_pConsumer;
+      // DefaultMQPushConsumer* pConsumer = (DefaultMQPushConsumer*)m_pConsumer;
       uint64_t beginTime = UtilAll::currentTimeMillis();
       bool continueConsume = true;
       while (continueConsume) {
         if ((UtilAll::currentTimeMillis() - beginTime) > m_MaxTimeConsumeContinuously)
{
-          LOG_INFO(
-              "continuely consume message queue:%s more than 60s, consume it "
-              "later",
-              request->m_messageQueue.toString().c_str());
-          tryLockLaterAndReconsume(request, false);
+          LOG_INFO("Continuely consume %s more than 60s, consume it 1s later",
+                   request->m_messageQueue.toString().c_str());
+          tryLockLaterAndReconsumeDelay(request, false, 1000);
           break;
         }
         vector<MQMessageExt> msgs;
-        request->takeMessages(msgs, pConsumer->getConsumeMessageBatchMaxSize());
+        // request->takeMessages(msgs, pConsumer->getConsumeMessageBatchMaxSize());
+        request->takeMessages(msgs, 1);
         if (!msgs.empty()) {
           request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
           if (m_pConsumer->isUseNameSpaceMode()) {
@@ -188,9 +187,20 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
           }
           ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs);
           if (consumeStatus == RECONSUME_LATER) {
-            request->makeMessageToCosumeAgain(msgs);
-            continueConsume = false;
-            tryLockLaterAndReconsume(request, false);
+            if (msgs[0].getReconsumeTimes() <= 15) {
+              msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
+              request->makeMessageToCosumeAgain(msgs);
+              continueConsume = false;
+              tryLockLaterAndReconsumeDelay(request, false, 1000);
+            } else {
+              // need change to reconsumer delay level and print log.
+              LOG_INFO("Local Consume failed [%d] times, change [%s] delay to 5s.", msgs[0].getReconsumeTimes(),
+                       msgs[0].getMsgId().c_str());
+              msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
+              continueConsume = false;
+              request->makeMessageToCosumeAgain(msgs);
+              tryLockLaterAndReconsumeDelay(request, false, 5000);
+            }
           } else {
             m_pConsumer->updateConsumeOffset(request->m_messageQueue, request->commit());
           }
@@ -206,20 +216,26 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
       LOG_DEBUG("consume once exit of mq:%s", request->m_messageQueue.toString().c_str());
     } else {
       LOG_ERROR("message queue:%s was not locked", request->m_messageQueue.toString().c_str());
-      tryLockLaterAndReconsume(request, true);
+      tryLockLaterAndReconsumeDelay(request, true, 1000);
     }
   }
 }
-void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr<PullRequest>
pullRequest, bool tryLockMQ) {
+void ConsumeMessageOrderlyService::tryLockLaterAndReconsumeDelay(boost::weak_ptr<PullRequest>
pullRequest,
+                                                                 bool tryLockMQ,
+                                                                 int millisDelay) {
   boost::shared_ptr<PullRequest> request = pullRequest.lock();
   if (!request) {
     LOG_WARN("Pull request has been released");
     return;
   }
-  int retryTimer = tryLockMQ ? 500 : 100;
+  int retryTimer = millisDelay;
+  if (millisDelay >= 30000 || millisDelay <= 1000) {
+    retryTimer = 1000;
+  }
   boost::asio::deadline_timer* t =
       new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(retryTimer));
   t->async_wait(
-      boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater),
this, request, tryLockMQ, t));
+      boost::bind(&ConsumeMessageOrderlyService::static_submitConsumeRequestLater, this,
request, tryLockMQ, t));
 }
+
 }  // namespace rocketmq
diff --git a/src/consumer/ConsumeMsgService.h b/src/consumer/ConsumeMsgService.h
index a5d3ce7..a6c7206 100644
--- a/src/consumer/ConsumeMsgService.h
+++ b/src/consumer/ConsumeMsgService.h
@@ -53,6 +53,15 @@ class ConsumeMessageConcurrentlyService : public ConsumeMsgService {
   virtual void stopThreadPool();
 
   void ConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>&
msgs);
+  void submitConsumeRequestLater(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>&
msgs, int millis);
+
+  void triggersubmitConsumeRequestLater(boost::asio::deadline_timer* t,
+                                        boost::weak_ptr<PullRequest> pullRequest,
+                                        vector<MQMessageExt>& msgs);
+  static void static_submitConsumeRequest(void* context,
+                                          boost::asio::deadline_timer* t,
+                                          boost::weak_ptr<PullRequest> pullRequest,
+                                          vector<MQMessageExt>& msgs);
 
  private:
   void resetRetryTopic(vector<MQMessageExt>& msgs);
@@ -76,7 +85,8 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService {
   virtual MessageListenerType getConsumeMsgSerivceListenerType();
 
   void boost_asio_work();
-  void tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> request, bool tryLockMQ);
+  // void tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> request, bool tryLockMQ);
+  void tryLockLaterAndReconsumeDelay(boost::weak_ptr<PullRequest> request, bool tryLockMQ,
int millisDelay);
   static void static_submitConsumeRequestLater(void* context,
                                                boost::weak_ptr<PullRequest> request,
                                                bool tryLockMQ,
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index f549883..c8aac49 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -142,7 +142,9 @@ void DefaultMQPullConsumer::shutdown() {
   }
 }
 
-void DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {}
+bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+  return true;
+}
 
 void DefaultMQPullConsumer::fetchSubscribeMessageQueues(const string& topic, vector<MQMessageQueue>&
mqs) {
   mqs.clear();
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index c6ab2b2..376c3e9 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -251,13 +251,15 @@ DefaultMQPushConsumer::~DefaultMQPushConsumer() {
   m_subTopics.clear();
 }
 
-void DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
   try {
     getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(msg, getGroupName(),
delayLevel, 3000,
                                                                 getSessionCredentials());
   } catch (MQException& e) {
     LOG_ERROR(e.what());
+    return false;
   }
+  return true;
 }
 
 void DefaultMQPushConsumer::fetchSubscribeMessageQueues(const string& topic, vector<MQMessageQueue>&
mqs) {


Mime
View raw message