This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 46dd8f5 feat(version): add maxConsumerTimes to support higher client version (#230)
46dd8f5 is described below
commit 46dd8f597a25aafc27c488e48a9f335b8a64796c
Author: dinglei <libya_003@163.com>
AuthorDate: Tue Jan 14 12:43:21 2020 +0800
feat(version): add maxConsumerTimes to support higher client version (#230)
* feat(version): add maxConsumerTimes to support higher client version
* feat(version): add maxConsumerTimes to support higher client version
* feat(version): add maxConsumerTimes to support higher client version
---
include/DefaultMQPushConsumer.h | 3 ++
src/MQClientAPIImpl.cpp | 5 +++
src/MQClientAPIImpl.h | 1 +
src/consumer/ConsumeMessageConcurrentlyService.cpp | 3 ++
src/consumer/DefaultMQPushConsumer.cpp | 19 +++++++++--
src/protocol/CommandHeader.cpp | 8 +++--
src/protocol/CommandHeader.h | 4 +++
test/src/MQClientAPIImpTest.cpp | 20 ++++++++++++
test/src/protocol/CommandHeaderTest.cpp | 37 ++++++++++++++++++++++
9 files changed, 96 insertions(+), 4 deletions(-)
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 2ff7620..7f69258 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -111,6 +111,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
*/
void setConsumeThreadCount(int threadCount);
int getConsumeThreadCount() const;
+ void setMaxReconsumeTimes(int maxReconsumeTimes);
+ int getMaxReconsumeTimes() const;
/*
set pullMsg thread count, default value is cpu cores
@@ -144,6 +146,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
MQMessageListener* m_pMessageListener;
int m_consumeMessageBatchMaxSize;
int m_maxMsgCacheSize;
+ int m_maxReconsumeTimes = -1;
boost::asio::io_service m_async_ioService;
boost::scoped_ptr<boost::thread> m_async_service_thread;
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 26ab1b0..f2829a9 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -836,11 +836,16 @@ void MQClientAPIImpl::consumerSendMessageBack(const string addr,
const string& consumerGroup,
int delayLevel,
int timeoutMillis,
+ int maxReconsumeTimes,
const SessionCredentials& sessionCredentials)
{
ConsumerSendMsgBackRequestHeader* pRequestHeader = new ConsumerSendMsgBackRequestHeader();
pRequestHeader->group = consumerGroup;
pRequestHeader->offset = msg.getCommitLogOffset();
pRequestHeader->delayLevel = delayLevel;
+ pRequestHeader->unitMode = false;
+ pRequestHeader->originTopic = msg.getTopic();
+ pRequestHeader->originMsgId = msg.getMsgId();
+ pRequestHeader->maxReconsumeTimes = maxReconsumeTimes;
// string addr = socketAddress2IPPort(msg.getStoreHost());
RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 9f08dac..3fbf566 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -174,6 +174,7 @@ class MQClientAPIImpl {
const string& consumerGroup,
int delayLevel,
int timeoutMillis,
+ int maxReconsumeTimes,
const SessionCredentials& sessionCredentials);
virtual void lockBatchMQ(const string& addr,
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 371faa2..9c3a05b 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -194,6 +194,9 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
msgs[i].getReconsumeTimes());
if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
string brokerName = request->m_messageQueue.getBrokerName();
+ if (m_pConsumer->isUseNameSpaceMode()) {
+ MessageAccessor::withNameSpace(msgs[i], m_pConsumer->getNameSpace());
+ }
if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume
times is:%d",
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(),
i,
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index df77cac..8e2541e 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -258,8 +258,8 @@ bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int
delayLevel, s
else
brokerAddr = socketAddress2IPPort(msg.getStoreHost());
try {
- getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(),
delayLevel, 3000,
- getSessionCredentials());
+ getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(),
delayLevel,
+ getMaxReconsumeTimes(), 3000,
getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR(e.what());
return false;
@@ -918,6 +918,21 @@ void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
int DefaultMQPushConsumer::getConsumeThreadCount() const {
return m_consumeThreadCount;
}
+void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes) {
+ if (maxReconsumeTimes > 0) {
+ m_maxReconsumeTimes = maxReconsumeTimes;
+ } else {
+ LOG_ERROR("set maxReconsumeTimes with invalid value");
+ }
+}
+
+int DefaultMQPushConsumer::getMaxReconsumeTimes() const {
+ if (m_maxReconsumeTimes >= 0) {
+ return m_maxReconsumeTimes;
+ }
+ // return 16 as default;
+ return 16;
+}
void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
m_pullMsgThreadPoolNum = threadCount;
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 1360d04..30dcf59 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -492,16 +492,20 @@ void ConsumerSendMsgBackRequestHeader::Encode(Json::Value& outData)
{
outData["group"] = group;
outData["delayLevel"] = delayLevel;
outData["offset"] = UtilAll::to_string(offset);
-#ifdef ONS
+ outData["unitMode"] = UtilAll::to_string(unitMode);
outData["originMsgId"] = originMsgId;
outData["originTopic"] = originTopic;
-#endif
+ outData["maxReconsumeTimes"] = maxReconsumeTimes;
}
void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>&
requestMap) {
requestMap.insert(pair<string, string>("group", group));
requestMap.insert(pair<string, string>("delayLevel", UtilAll::to_string(delayLevel)));
requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+ requestMap.insert(pair<string, string>("unitMode", UtilAll::to_string(unitMode)));
+ requestMap.insert(pair<string, string>("originMsgId", originMsgId));
+ requestMap.insert(pair<string, string>("originTopic", originTopic));
+ requestMap.insert(pair<string, string>("maxReconsumeTimes", UtilAll::to_string(maxReconsumeTimes)));
}
//<!***************************************************************************
void GetConsumerListByGroupResponseBody::Decode(const MemoryBlock* mem, vector<string>&
cids) {
diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index cedaeb9..bf74e07 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -456,6 +456,10 @@ class ConsumerSendMsgBackRequestHeader : public CommandHeader {
string group;
int delayLevel;
int64 offset;
+ bool unitMode = false;
+ string originMsgId;
+ string originTopic;
+ int maxReconsumeTimes = 16;
};
//<!***************************************************************************
diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp
index cf63f67..3b2ab07 100644
--- a/test/src/MQClientAPIImpTest.cpp
+++ b/test/src/MQClientAPIImpTest.cpp
@@ -170,6 +170,26 @@ TEST(MQClientAPIImplTest, sendMessage) {
EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
}
+
+TEST(MQClientAPIImplTest, consumerSendMessageBack) {
+ SessionCredentials sc;
+ MQMessageExt msg;
+ MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+ Mock::AllowLeak(impl);
+ MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+ Mock::AllowLeak(pClient);
+ RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
+ RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr);
+ EXPECT_CALL(*pClient, invokeSync(_, _, _))
+ .Times(3)
+ .WillOnce(Return(nullptr))
+ .WillOnce(Return(pCommandFailed))
+ .WillOnce(Return(pCommandSuccuss));
+ EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup",
0, 1000, 16, sc));
+ EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup",
0, 1000, 16, sc));
+ EXPECT_NO_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0,
1000, 16, sc));
+}
+
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*";
diff --git a/test/src/protocol/CommandHeaderTest.cpp b/test/src/protocol/CommandHeaderTest.cpp
index ccfa5ba..6874adf 100644
--- a/test/src/protocol/CommandHeaderTest.cpp
+++ b/test/src/protocol/CommandHeaderTest.cpp
@@ -67,6 +67,43 @@ using rocketmq::UnregisterClientRequestHeader;
using rocketmq::UpdateConsumerOffsetRequestHeader;
using rocketmq::ViewMessageRequestHeader;
+TEST(commandHeader, ConsumerSendMsgBackRequestHeader) {
+ string group = "testGroup";
+ int delayLevel = 2;
+ int64 offset = 3027;
+ bool unitMode = true;
+ string originMsgId = "testOriginMsgId";
+ string originTopic = "testTopic";
+ int maxReconsumeTimes = 12;
+ ConsumerSendMsgBackRequestHeader header;
+ header.group = group;
+ header.delayLevel = delayLevel;
+ header.offset = offset;
+ header.unitMode = unitMode;
+ header.originMsgId = originMsgId;
+ header.originTopic = originTopic;
+ header.maxReconsumeTimes = maxReconsumeTimes;
+ map<string, string> requestMap;
+ header.SetDeclaredFieldOfCommandHeader(requestMap);
+ EXPECT_EQ(requestMap["group"], group);
+ EXPECT_EQ(requestMap["delayLevel"], "2");
+ EXPECT_EQ(requestMap["offset"], "3027");
+ EXPECT_EQ(requestMap["unitMode"], "1");
+ EXPECT_EQ(requestMap["originMsgId"], originMsgId);
+ EXPECT_EQ(requestMap["originTopic"], originTopic);
+ EXPECT_EQ(requestMap["maxReconsumeTimes"], "12");
+
+ Value outData;
+ header.Encode(outData);
+ EXPECT_EQ(outData["group"], group);
+ EXPECT_EQ(outData["delayLevel"], 2);
+ EXPECT_EQ(outData["offset"], "3027");
+ EXPECT_EQ(outData["unitMode"], "1");
+ EXPECT_EQ(outData["originMsgId"], originMsgId);
+ EXPECT_EQ(outData["originTopic"], originTopic);
+ EXPECT_EQ(outData["maxReconsumeTimes"], 12);
+}
+
TEST(commandHeader, GetRouteInfoRequestHeader) {
GetRouteInfoRequestHeader header("testTopic");
map<string, string> requestMap;
|