rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: feat(version): add maxConsumerTimes to support higher client version (#230)
Date Tue, 14 Jan 2020 04:43:30 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 46dd8f5  feat(version): add maxConsumerTimes to support higher client version (#230)
46dd8f5 is described below

commit 46dd8f597a25aafc27c488e48a9f335b8a64796c
Author: dinglei <libya_003@163.com>
AuthorDate: Tue Jan 14 12:43:21 2020 +0800

    feat(version): add maxConsumerTimes to support higher client version (#230)
    
    * feat(version): add maxConsumerTimes to support higher client version
    
    * feat(version): add maxConsumerTimes to support higher client version
    
    * feat(version): add maxConsumerTimes to support higher client version
---
 include/DefaultMQPushConsumer.h                    |  3 ++
 src/MQClientAPIImpl.cpp                            |  5 +++
 src/MQClientAPIImpl.h                              |  1 +
 src/consumer/ConsumeMessageConcurrentlyService.cpp |  3 ++
 src/consumer/DefaultMQPushConsumer.cpp             | 19 +++++++++--
 src/protocol/CommandHeader.cpp                     |  8 +++--
 src/protocol/CommandHeader.h                       |  4 +++
 test/src/MQClientAPIImpTest.cpp                    | 20 ++++++++++++
 test/src/protocol/CommandHeaderTest.cpp            | 37 ++++++++++++++++++++++
 9 files changed, 96 insertions(+), 4 deletions(-)

diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 2ff7620..7f69258 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -111,6 +111,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
   */
   void setConsumeThreadCount(int threadCount);
   int getConsumeThreadCount() const;
+  void setMaxReconsumeTimes(int maxReconsumeTimes);
+  int getMaxReconsumeTimes() const;
 
   /*
     set pullMsg thread count, default value is cpu cores
@@ -144,6 +146,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
   MQMessageListener* m_pMessageListener;
   int m_consumeMessageBatchMaxSize;
   int m_maxMsgCacheSize;
+  int m_maxReconsumeTimes = -1;
   boost::asio::io_service m_async_ioService;
   boost::scoped_ptr<boost::thread> m_async_service_thread;
 
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 26ab1b0..f2829a9 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -836,11 +836,16 @@ void MQClientAPIImpl::consumerSendMessageBack(const string addr,
                                               const string& consumerGroup,
                                               int delayLevel,
                                               int timeoutMillis,
+                                              int maxReconsumeTimes,
                                               const SessionCredentials& sessionCredentials)
{
   ConsumerSendMsgBackRequestHeader* pRequestHeader = new ConsumerSendMsgBackRequestHeader();
   pRequestHeader->group = consumerGroup;
   pRequestHeader->offset = msg.getCommitLogOffset();
   pRequestHeader->delayLevel = delayLevel;
+  pRequestHeader->unitMode = false;
+  pRequestHeader->originTopic = msg.getTopic();
+  pRequestHeader->originMsgId = msg.getMsgId();
+  pRequestHeader->maxReconsumeTimes = maxReconsumeTimes;
 
   // string addr = socketAddress2IPPort(msg.getStoreHost());
   RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 9f08dac..3fbf566 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -174,6 +174,7 @@ class MQClientAPIImpl {
                                        const string& consumerGroup,
                                        int delayLevel,
                                        int timeoutMillis,
+                                       int maxReconsumeTimes,
                                        const SessionCredentials& sessionCredentials);
 
   virtual void lockBatchMQ(const string& addr,
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 371faa2..9c3a05b 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -194,6 +194,9 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
                   msgs[i].getReconsumeTimes());
         if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
           string brokerName = request->m_messageQueue.getBrokerName();
+          if (m_pConsumer->isUseNameSpaceMode()) {
+            MessageAccessor::withNameSpace(msgs[i], m_pConsumer->getNameSpace());
+          }
           if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
             LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume
times is:%d",
                      (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index df77cac..8e2541e 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -258,8 +258,8 @@ bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int
delayLevel, s
   else
     brokerAddr = socketAddress2IPPort(msg.getStoreHost());
   try {
-    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(),
delayLevel, 3000,
-                                                                getSessionCredentials());
+    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(),
delayLevel,
+                                                                getMaxReconsumeTimes(), 3000,
getSessionCredentials());
   } catch (MQException& e) {
     LOG_ERROR(e.what());
     return false;
@@ -918,6 +918,21 @@ void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
 int DefaultMQPushConsumer::getConsumeThreadCount() const {
   return m_consumeThreadCount;
 }
+void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes) {
+  if (maxReconsumeTimes > 0) {
+    m_maxReconsumeTimes = maxReconsumeTimes;
+  } else {
+    LOG_ERROR("set maxReconsumeTimes with invalid value");
+  }
+}
+
+int DefaultMQPushConsumer::getMaxReconsumeTimes() const {
+  if (m_maxReconsumeTimes >= 0) {
+    return m_maxReconsumeTimes;
+  }
+  // return 16 as default;
+  return 16;
+}
 
 void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
   m_pullMsgThreadPoolNum = threadCount;
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 1360d04..30dcf59 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -492,16 +492,20 @@ void ConsumerSendMsgBackRequestHeader::Encode(Json::Value& outData)
{
   outData["group"] = group;
   outData["delayLevel"] = delayLevel;
   outData["offset"] = UtilAll::to_string(offset);
-#ifdef ONS
+  outData["unitMode"] = UtilAll::to_string(unitMode);
   outData["originMsgId"] = originMsgId;
   outData["originTopic"] = originTopic;
-#endif
+  outData["maxReconsumeTimes"] = maxReconsumeTimes;
 }
 
 void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>&
requestMap) {
   requestMap.insert(pair<string, string>("group", group));
   requestMap.insert(pair<string, string>("delayLevel", UtilAll::to_string(delayLevel)));
   requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+  requestMap.insert(pair<string, string>("unitMode", UtilAll::to_string(unitMode)));
+  requestMap.insert(pair<string, string>("originMsgId", originMsgId));
+  requestMap.insert(pair<string, string>("originTopic", originTopic));
+  requestMap.insert(pair<string, string>("maxReconsumeTimes", UtilAll::to_string(maxReconsumeTimes)));
 }
 //<!***************************************************************************
 void GetConsumerListByGroupResponseBody::Decode(const MemoryBlock* mem, vector<string>&
cids) {
diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index cedaeb9..bf74e07 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -456,6 +456,10 @@ class ConsumerSendMsgBackRequestHeader : public CommandHeader {
   string group;
   int delayLevel;
   int64 offset;
+  bool unitMode = false;
+  string originMsgId;
+  string originTopic;
+  int maxReconsumeTimes = 16;
 };
 
 //<!***************************************************************************
diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp
index cf63f67..3b2ab07 100644
--- a/test/src/MQClientAPIImpTest.cpp
+++ b/test/src/MQClientAPIImpTest.cpp
@@ -170,6 +170,26 @@ TEST(MQClientAPIImplTest, sendMessage) {
   EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
   EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
 }
+
+TEST(MQClientAPIImplTest, consumerSendMessageBack) {
+  SessionCredentials sc;
+  MQMessageExt msg;
+  MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+  Mock::AllowLeak(impl);
+  MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+  Mock::AllowLeak(pClient);
+  RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
+  RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr);
+  EXPECT_CALL(*pClient, invokeSync(_, _, _))
+      .Times(3)
+      .WillOnce(Return(nullptr))
+      .WillOnce(Return(pCommandFailed))
+      .WillOnce(Return(pCommandSuccuss));
+  EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup",
0, 1000, 16, sc));
+  EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup",
0, 1000, 16, sc));
+  EXPECT_NO_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0,
1000, 16, sc));
+}
+
 int main(int argc, char* argv[]) {
   InitGoogleMock(&argc, argv);
   testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*";
diff --git a/test/src/protocol/CommandHeaderTest.cpp b/test/src/protocol/CommandHeaderTest.cpp
index ccfa5ba..6874adf 100644
--- a/test/src/protocol/CommandHeaderTest.cpp
+++ b/test/src/protocol/CommandHeaderTest.cpp
@@ -67,6 +67,43 @@ using rocketmq::UnregisterClientRequestHeader;
 using rocketmq::UpdateConsumerOffsetRequestHeader;
 using rocketmq::ViewMessageRequestHeader;
 
+TEST(commandHeader, ConsumerSendMsgBackRequestHeader) {
+  string group = "testGroup";
+  int delayLevel = 2;
+  int64 offset = 3027;
+  bool unitMode = true;
+  string originMsgId = "testOriginMsgId";
+  string originTopic = "testTopic";
+  int maxReconsumeTimes = 12;
+  ConsumerSendMsgBackRequestHeader header;
+  header.group = group;
+  header.delayLevel = delayLevel;
+  header.offset = offset;
+  header.unitMode = unitMode;
+  header.originMsgId = originMsgId;
+  header.originTopic = originTopic;
+  header.maxReconsumeTimes = maxReconsumeTimes;
+  map<string, string> requestMap;
+  header.SetDeclaredFieldOfCommandHeader(requestMap);
+  EXPECT_EQ(requestMap["group"], group);
+  EXPECT_EQ(requestMap["delayLevel"], "2");
+  EXPECT_EQ(requestMap["offset"], "3027");
+  EXPECT_EQ(requestMap["unitMode"], "1");
+  EXPECT_EQ(requestMap["originMsgId"], originMsgId);
+  EXPECT_EQ(requestMap["originTopic"], originTopic);
+  EXPECT_EQ(requestMap["maxReconsumeTimes"], "12");
+
+  Value outData;
+  header.Encode(outData);
+  EXPECT_EQ(outData["group"], group);
+  EXPECT_EQ(outData["delayLevel"], 2);
+  EXPECT_EQ(outData["offset"], "3027");
+  EXPECT_EQ(outData["unitMode"], "1");
+  EXPECT_EQ(outData["originMsgId"], originMsgId);
+  EXPECT_EQ(outData["originTopic"], originTopic);
+  EXPECT_EQ(outData["maxReconsumeTimes"], 12);
+}
+
 TEST(commandHeader, GetRouteInfoRequestHeader) {
   GetRouteInfoRequestHeader header("testTopic");
   map<string, string> requestMap;


Mime
View raw message