rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: fix(consumer): send back error when consuming failed. (#210)
Date Thu, 02 Jan 2020 11:38:26 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 87a6137  fix(consumer): send back error when consuming failed. (#210)
87a6137 is described below

commit 87a6137150b32b2c69b4e46d38f01f5c9fd8429c
Author: dinglei <libya_003@163.com>
AuthorDate: Thu Jan 2 19:38:19 2020 +0800

    fix(consumer): send back error when consuming failed. (#210)
---
 include/DefaultMQPullConsumer.h                    |  2 +-
 include/DefaultMQPushConsumer.h                    |  2 +-
 include/MQConsumer.h                               |  2 +-
 src/MQClientAPIImpl.cpp                            |  5 +++--
 src/MQClientAPIImpl.h                              |  3 ++-
 src/common/MQClient.cpp                            |  3 ++-
 src/consumer/ConsumeMessageConcurrentlyService.cpp | 20 ++++++++++++--------
 src/consumer/DefaultMQPullConsumer.cpp             |  2 +-
 src/consumer/DefaultMQPushConsumer.cpp             |  9 +++++++--
 src/consumer/Rebalance.cpp                         |  2 +-
 10 files changed, 31 insertions(+), 19 deletions(-)

diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 33765cd..899862b 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
   //<!end mqadmin;
 
   //<!begin MQConsumer
-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName);
   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs);
   virtual void doRebalance();
   virtual void persistConsumerOffset();
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index b29ca72..43f0fbb 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -55,7 +55,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
   //<!end mqadmin;
 
   //<!begin MQConsumer
-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName);
   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs);
   virtual void doRebalance();
   virtual void persistConsumerOffset();
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index 54b535a..87e2c1b 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -32,7 +32,7 @@ class ConsumerRunningInfo;
 class ROCKETMQCLIENT_API MQConsumer : public MQClient {
  public:
   virtual ~MQConsumer() {}
-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName)
= 0;
   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs) = 0;
   virtual void doRebalance() = 0;
   virtual void persistConsumerOffset() = 0;
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index f890968..0877a03 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -823,7 +823,8 @@ void MQClientAPIImpl::updateConsumerOffsetOneway(const string& addr,
   m_pRemotingClient->invokeOneway(addr, request);
 }
 
-void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt& msg,
+void MQClientAPIImpl::consumerSendMessageBack(const string addr,
+                                              MQMessageExt& msg,
                                               const string& consumerGroup,
                                               int delayLevel,
                                               int timeoutMillis,
@@ -833,7 +834,7 @@ void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt& msg,
   pRequestHeader->offset = msg.getCommitLogOffset();
   pRequestHeader->delayLevel = delayLevel;
 
-  string addr = socketAddress2IPPort(msg.getStoreHost());
+  // string addr = socketAddress2IPPort(msg.getStoreHost());
   RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
   callSignatureBeforeRequest(addr, request, sessionCredentials);
   request.Encode();
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 763e45d..9555d72 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -167,7 +167,8 @@ class MQClientAPIImpl {
                                   int timeoutMillis,
                                   const SessionCredentials& sessionCredentials);
 
-  void consumerSendMessageBack(MQMessageExt& msg,
+  void consumerSendMessageBack(const string addr,
+                               MQMessageExt& msg,
                                const string& consumerGroup,
                                int delayLevel,
                                int timeoutMillis,
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index afdc5fa..068b8c4 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -53,7 +53,8 @@ MQClient::~MQClient() {}
 string MQClient::getMQClientId() const {
   string clientIP = UtilAll::getLocalAddress();
   string processId = UtilAll::to_string(getpid());
-  return processId + "-" + clientIP + "@" + m_instanceName;
+  // return processId + "-" + clientIP + "@" + m_instanceName;
+  return clientIP + "@" + processId + "#" + m_instanceName;
 }
 
 //<!groupName;
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 93cdcc3..371faa2 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -189,14 +189,18 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
     case CLUSTERING: {
       // send back msg to broker;
       for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
-        LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume
times is:%d",
-                 (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
-                 msgs[i].getReconsumeTimes());
-        if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY && !m_pConsumer->sendMessageBack(msgs[i],
0)) {
-          LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume
times is:%d",
-                   (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
-                   msgs[i].getReconsumeTimes());
-          localRetryMsgs.push_back(msgs[i]);
+        LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume
times is:%d",
+                  (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
+                  msgs[i].getReconsumeTimes());
+        if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
+          string brokerName = request->m_messageQueue.getBrokerName();
+          if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
+            LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume
times is:%d",
+                     (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
+                     msgs[i].getReconsumeTimes());
+            msgs[i].setReconsumeTimes(msgs[i].getReconsumeTimes() + 1);
+            localRetryMsgs.push_back(msgs[i]);
+          }
         }
       }
       break;
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index c8aac49..1f58e86 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -142,7 +142,7 @@ void DefaultMQPullConsumer::shutdown() {
   }
 }
 
-bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, string&
brokerName) {
   return true;
 }
 
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 376c3e9..df77cac 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -251,9 +251,14 @@ DefaultMQPushConsumer::~DefaultMQPushConsumer() {
   m_subTopics.clear();
 }
 
-bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, string&
brokerName) {
+  string brokerAddr;
+  if (!brokerName.empty())
+    brokerAddr = getFactory()->findBrokerAddressInPublish(brokerName);
+  else
+    brokerAddr = socketAddress2IPPort(msg.getStoreHost());
   try {
-    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(msg, getGroupName(),
delayLevel, 3000,
+    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(),
delayLevel, 3000,
                                                                 getSessionCredentials());
   } catch (MQException& e) {
     LOG_ERROR(e.what());
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 0a4c06f..5546b61 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -128,7 +128,7 @@ void Rebalance::doRebalance() {
             std::stringstream ss;
             ss << "Allocation result for [Consumer Group: " << m_pConsumer->getGroupName()
<< ", Topic: " << topic
                << ", Current Consumer ID: " << m_pConsumer->getMQClientId()
<< "] is changed.\n "
-               << "Total Queue #: " << mqAll.size() << ", Total Consumer
#: " << cidAll.size()
+               << "Total Queue :#" << mqAll.size() << ", Total Consumer
:#" << cidAll.size()
                << " Allocated Queues are: \n";
 
             for (vector<MQMessageQueue>::size_type i = 0; i < allocateResult.size();
++i) {


Mime
View raw message