rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: feat(callback): use start pointer to manager callbacks (#232)
Date Wed, 15 Jan 2020 06:57:01 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new f968f57  feat(callback): use start pointer to manager callbacks (#232)
f968f57 is described below

commit f968f57b414726526ac081cd2ca6ff548d1591da
Author: dinglei <libya_003@163.com>
AuthorDate: Wed Jan 15 14:56:53 2020 +0800

    feat(callback): use start pointer to manager callbacks (#232)
---
 src/MQClientAPIImpl.cpp                   | 15 ++++----
 src/common/AsyncCallbackWrap.cpp          |  8 ++--
 src/common/AsyncCallbackWrap.h            | 12 +++---
 src/transport/ResponseFuture.cpp          | 12 ++----
 src/transport/ResponseFuture.h            |  6 +--
 src/transport/TcpRemotingClient.cpp       | 10 ++---
 src/transport/TcpRemotingClient.h         |  2 +-
 test/src/MQClientAPIImpTest.cpp           | 64 +++++++++++++++++++++++++++++++
 test/src/transport/ResponseFutureTest.cpp |  8 ++--
 9 files changed, 98 insertions(+), 39 deletions(-)

diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index f2829a9..8d72d64 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -414,11 +414,12 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
                                        int retrySendTimes) {
   int64 begin_time = UtilAll::currentTimeMillis();
   //<!delete in future;
-  AsyncCallbackWrap* cbw = new SendCallbackWrap(brokerName, msg, pSendCallback, this);
+  // AsyncCallbackWrap* cbw = new SendCallbackWrap(brokerName, msg, pSendCallback, this);
 
   LOG_DEBUG("sendMessageAsync request:%s, timeout:%lld, maxRetryTimes:%d retrySendTimes:%d",
request.ToString().data(),
             timeoutMilliseconds, maxRetryTimes, retrySendTimes);
-
+  // Use smart ptr to control cbw.
+  std::shared_ptr<AsyncCallbackWrap> cbw = std::make_shared<SendCallbackWrap>(brokerName,
msg, pSendCallback, this);
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds, maxRetryTimes,
retrySendTimes) == false) {
     LOG_WARN("invokeAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d",
addr.c_str(),
              msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
@@ -441,9 +442,9 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
     LOG_ERROR("sendMessageAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d,
retrySendTimes:%d",
               addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes);
 
-    if (cbw) {
+    if (cbw && pSendCallback != nullptr) {
       cbw->onException();
-      deleteAndZero(cbw);
+      // deleteAndZero(cbw);
     } else {
       THROW_MQEXCEPTION(MQClientException, "sendMessageAsync failed", -1);
     }
@@ -481,12 +482,12 @@ void MQClientAPIImpl::pullMessageAsync(const string& addr,
                                        int timeoutMillis,
                                        PullCallback* pullCallback,
                                        void* pArg) {
-  //<!delete in future;
-  AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+  // AsyncCallbackWrap* cbw = new PullCallbackWrap(pullCallback, this, pArg);
+  std::shared_ptr<AsyncCallbackWrap> cbw = std::make_shared<PullCallbackWrap>(pullCallback,
this, pArg);
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) {
     LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(),
               static_cast<AsyncArg*>(pArg)->mq.toString().data());
-    deleteAndZero(cbw);
+    // deleteAndZero(cbw);
     THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
   }
 }
diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp
index cffa219..4fe29a9 100644
--- a/src/common/AsyncCallbackWrap.cpp
+++ b/src/common/AsyncCallbackWrap.cpp
@@ -129,14 +129,14 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
bool b
 }
 
 //<!************************************************************************
-PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI,
void* pArg)
+PullCallbackWrap::PullCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI,
void* pArg)
     : AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
   m_pArg = *static_cast<AsyncArg*>(pArg);
 }
 
-PullCallbackWarp::~PullCallbackWarp() {}
+PullCallbackWrap::~PullCallbackWrap() {}
 
-void PullCallbackWarp::onException() {
+void PullCallbackWrap::onException() {
   if (m_pAsyncCallBack == NULL)
     return;
 
@@ -149,7 +149,7 @@ void PullCallbackWarp::onException() {
   }
 }
 
-void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest)
{
+void PullCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest)
{
   unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
   if (m_pAsyncCallBack == NULL) {
     LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
diff --git a/src/common/AsyncCallbackWrap.h b/src/common/AsyncCallbackWrap.h
index c4b3f66..8802fdb 100644
--- a/src/common/AsyncCallbackWrap.h
+++ b/src/common/AsyncCallbackWrap.h
@@ -29,7 +29,7 @@ class ResponseFuture;
 class MQClientAPIImpl;
 class DefaultMQProducer;
 //<!***************************************************************************
-enum asyncCallBackType { asyncCallbackWrap = 0, sendCallbackWrap = 1, pullCallbackWarp =
2 };
+enum asyncCallBackType { asyncCallbackWrap = 0, sendCallbackWrap = 1, pullCallbackWrap =
2 };
 
 struct AsyncCallbackWrap {
  public:
@@ -50,7 +50,7 @@ class SendCallbackWrap : public AsyncCallbackWrap {
   SendCallbackWrap(const string& brokerName,
                    const MQMessage& msg,
                    AsyncCallback* pAsyncCallback,
-                   MQClientAPIImpl* pclientAPI);
+                   MQClientAPIImpl* pClientAPI);
 
   virtual ~SendCallbackWrap(){};
   virtual void operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest);
@@ -63,13 +63,13 @@ class SendCallbackWrap : public AsyncCallbackWrap {
 };
 
 //<!***************************************************************************
-class PullCallbackWarp : public AsyncCallbackWrap {
+class PullCallbackWrap : public AsyncCallbackWrap {
  public:
-  PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg);
-  virtual ~PullCallbackWarp();
+  PullCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pClientAPI, void* pArg);
+  virtual ~PullCallbackWrap();
   virtual void operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest);
   virtual void onException();
-  virtual asyncCallBackType getCallbackType() { return pullCallbackWarp; }
+  virtual asyncCallBackType getCallbackType() { return pullCallbackWrap; }
 
  private:
   AsyncArg m_pArg;
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
old mode 100755
new mode 100644
index b0b2613..8ac926b
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -29,7 +29,7 @@ ResponseFuture::ResponseFuture(int requestCode,
                                TcpRemotingClient* powner,
                                int64 timeout,
                                bool bAsync,
-                               AsyncCallbackWrap* pCallback)
+                               std::shared_ptr<AsyncCallbackWrap> pCallback)
     : m_requestCode(requestCode),
       m_opaque(opaque),
       m_timeout(timeout),
@@ -45,13 +45,7 @@ ResponseFuture::ResponseFuture(int requestCode,
   m_beginTimestamp = UtilAll::currentTimeMillis();
 }
 
-ResponseFuture::~ResponseFuture() {
-  deleteAndZero(m_pCallbackWrap);
-  /*
-    do not delete m_pResponseCommand when destruct, as m_pResponseCommand
-    is used by MQClientAPIImpl concurrently, and will be released by producer or consumer;
-   */
-}
+ResponseFuture::~ResponseFuture() {}
 
 void ResponseFuture::releaseThreadCondition() {
   m_defaultEvent.notify_all();
@@ -174,7 +168,7 @@ RemotingCommand* ResponseFuture::getCommand() const {
   return m_pResponseCommand;
 }
 
-AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() {
+std::shared_ptr<AsyncCallbackWrap> ResponseFuture::getAsyncCallbackWrap() {
   return m_pCallbackWrap;
 }
 
diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h
old mode 100755
new mode 100644
index 66be663..a2a4a12
--- a/src/transport/ResponseFuture.h
+++ b/src/transport/ResponseFuture.h
@@ -41,7 +41,7 @@ class ResponseFuture {
                  TcpRemotingClient* powner,
                  int64 timeoutMilliseconds,
                  bool bAsync = false,
-                 AsyncCallbackWrap* pCallback = nullptr);
+                 std::shared_ptr<AsyncCallbackWrap> pCallback = std::shared_ptr<AsyncCallbackWrap>());
   virtual ~ResponseFuture();
 
   void releaseThreadCondition();
@@ -63,7 +63,7 @@ class ResponseFuture {
   int getRetrySendTimes() const;
   int64 leftTime() const;
   const bool getAsyncFlag();
-  AsyncCallbackWrap* getAsyncCallbackWrap();
+  std::shared_ptr<AsyncCallbackWrap> getAsyncCallbackWrap();
 
   void setMaxRetrySendTimes(int maxRetryTimes);
   void setRetrySendTimes(int retryTimes);
@@ -78,7 +78,7 @@ class ResponseFuture {
   int64 m_timeout;  // ms
 
   const bool m_bAsync;
-  AsyncCallbackWrap* m_pCallbackWrap;
+  std::shared_ptr<AsyncCallbackWrap> m_pCallbackWrap;
 
   AsyncCallbackStatus m_asyncCallbackStatus;
   std::mutex m_asyncCallbackLock;
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
old mode 100755
new mode 100644
index 0207bbe..4c708f7
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -159,8 +159,8 @@ bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand&
req
   if (pTcp != nullptr) {
     int code = request.getCode();
     int opaque = request.getOpaque();
-
-    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque,
this, timeoutMillis));
+    std::shared_ptr<AsyncCallbackWrap> cbw;
+    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque,
this, timeoutMillis, false, cbw));
     addResponseFuture(opaque, responseFuture);
 
     if (SendCommand(pTcp, request)) {
@@ -193,8 +193,8 @@ RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
RemotingComma
   if (pTcp != nullptr) {
     int code = request.getCode();
     int opaque = request.getOpaque();
-
-    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque,
this, timeoutMillis));
+    std::shared_ptr<AsyncCallbackWrap> cbw;
+    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque,
this, timeoutMillis, false, cbw));
     addResponseFuture(opaque, responseFuture);
 
     if (SendCommand(pTcp, request)) {
@@ -224,7 +224,7 @@ RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
RemotingComma
 
 bool TcpRemotingClient::invokeAsync(const string& addr,
                                     RemotingCommand& request,
-                                    AsyncCallbackWrap* callback,
+                                    std::shared_ptr<AsyncCallbackWrap> callback,
                                     int64 timeoutMillis,
                                     int maxRetrySendTimes,
                                     int retrySendTimes) {
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index 265177f..66760b6 100644
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -49,7 +49,7 @@ class TcpRemotingClient {
 
   virtual bool invokeAsync(const string& addr,
                            RemotingCommand& request,
-                           AsyncCallbackWrap* cbw,
+                           std::shared_ptr<AsyncCallbackWrap> cbw,
                            int64 timeoutMilliseconds,
                            int maxRetrySendTimes = 1,
                            int retrySendTimes = 1);
diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp
index 3b2ab07..2c0687a 100644
--- a/test/src/MQClientAPIImpTest.cpp
+++ b/test/src/MQClientAPIImpTest.cpp
@@ -39,6 +39,7 @@ class MockTcpRemotingClient : public TcpRemotingClient {
       : TcpRemotingClient(pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout) {}
 
   MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int));
+  MOCK_METHOD6(invokeAsync, bool(const string&, RemotingCommand&, std::shared_ptr<AsyncCallbackWrap>,
int64, int, int));
 };
 class MockMQClientAPIImpl : public MQClientAPIImpl {
  public:
@@ -137,6 +138,18 @@ TEST(MQClientAPIImplTest, getMinOffset) {
   int64 offset = impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
   EXPECT_EQ(2048, offset);
 }
+class MyMockAutoDeleteSendCallback : public AutoDeleteSendCallBack {
+ public:
+  virtual ~MyMockAutoDeleteSendCallback() {}
+  virtual void onSuccess(SendResult& sendResult) {
+    std::cout << "send Success" << std::endl;
+    return;
+  }
+  virtual void onException(MQException& e) {
+    std::cout << "send Exception" << e << std::endl;
+    return;
+  }
+};
 
 TEST(MQClientAPIImplTest, sendMessage) {
   string cid = "testClientId";
@@ -169,6 +182,57 @@ TEST(MQClientAPIImplTest, sendMessage) {
   EXPECT_EQ(result.getOffsetMsgId(), "MessageID");
   EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
   EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
+
+  // Try to test Async send
+
+  EXPECT_CALL(*pClient, invokeAsync(_, _, _, _, _, _))
+      .Times(7)
+      .WillOnce(Return(false))
+      .WillOnce(Return(true))
+      .WillOnce(Return(false))
+      .WillOnce(Return(true))
+      .WillOnce(Return(false))
+      .WillOnce(Return(false))
+      .WillOnce(Return(false));
+
+  SendMessageRequestHeader* requestHeader2 = new SendMessageRequestHeader();
+  requestHeader2->producerGroup = cid;
+  requestHeader2->topic = (message.getTopic());
+  requestHeader2->defaultTopic = DEFAULT_TOPIC;
+  requestHeader2->defaultTopicQueueNums = 4;
+  requestHeader2->bornTimestamp = UtilAll::currentTimeMillis();
+  EXPECT_ANY_THROW(
+      impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader2, 100,
1, ComMode_ASYNC, nullptr, sc));
+
+  SendMessageRequestHeader* requestHeader3 = new SendMessageRequestHeader();
+  requestHeader3->producerGroup = cid;
+  requestHeader3->topic = (message.getTopic());
+  requestHeader3->defaultTopic = DEFAULT_TOPIC;
+  requestHeader3->defaultTopicQueueNums = 4;
+  requestHeader3->bornTimestamp = UtilAll::currentTimeMillis();
+  SendCallback* pSendCallback = new MyMockAutoDeleteSendCallback();
+  EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader3,
100, 1, ComMode_ASYNC,
+                                    pSendCallback, sc));
+
+  SendMessageRequestHeader* requestHeader4 = new SendMessageRequestHeader();
+  requestHeader4->producerGroup = cid;
+  requestHeader4->topic = (message.getTopic());
+  requestHeader4->defaultTopic = DEFAULT_TOPIC;
+  requestHeader4->defaultTopicQueueNums = 4;
+  requestHeader4->bornTimestamp = UtilAll::currentTimeMillis();
+  SendCallback* pSendCallback2 = new MyMockAutoDeleteSendCallback();
+  EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader4,
1000, 2, ComMode_ASYNC,
+                                    pSendCallback2, sc));
+
+  SendMessageRequestHeader* requestHeader5 = new SendMessageRequestHeader();
+  requestHeader5->producerGroup = cid;
+  requestHeader5->topic = (message.getTopic());
+  requestHeader5->defaultTopic = DEFAULT_TOPIC;
+  requestHeader5->defaultTopicQueueNums = 4;
+  requestHeader5->bornTimestamp = UtilAll::currentTimeMillis();
+  SendCallback* pSendCallback3 = new MyMockAutoDeleteSendCallback();
+  EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader5,
1000, 3, ComMode_ASYNC,
+                                    pSendCallback3, sc));
 }
 
 TEST(MQClientAPIImplTest, consumerSendMessageBack) {
diff --git a/test/src/transport/ResponseFutureTest.cpp b/test/src/transport/ResponseFutureTest.cpp
index 9c23ed8..058dbe7 100644
--- a/test/src/transport/ResponseFutureTest.cpp
+++ b/test/src/transport/ResponseFutureTest.cpp
@@ -68,8 +68,8 @@ TEST(responseFuture, init) {
   EXPECT_TRUE(responseFuture.getAsyncCallbackWrap() == nullptr);
 
   // ~ResponseFuture  delete pcall
-  SendCallbackWrap* pcall = new SendCallbackWrap("", MQMessage(), nullptr, nullptr);
-  ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, true, pcall);
+  std::shared_ptr<AsyncCallbackWrap> callBack = std::make_shared<SendCallbackWrap>("",
MQMessage(), nullptr, nullptr);
+  ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, true, callBack);
   EXPECT_TRUE(twoResponseFuture.getAsyncFlag());
   EXPECT_FALSE(twoResponseFuture.getAsyncCallbackWrap() == nullptr);
 }
@@ -104,10 +104,10 @@ TEST(responseFuture, response) {
   ResponseFuture twoResponseFuture(13, 4, NULL, 1000, true);
   EXPECT_TRUE(twoResponseFuture.getAsyncFlag());
 
-  ResponseFuture threeSesponseFuture(13, 4, NULL, 1000);
+  ResponseFuture threeResponseFuture(13, 4, NULL, 1000);
 
   uint64_t millis = UtilAll::currentTimeMillis();
-  RemotingCommand* remotingCommand = threeSesponseFuture.waitResponse(10);
+  RemotingCommand* remotingCommand = threeResponseFuture.waitResponse(10);
   uint64_t useTime = UtilAll::currentTimeMillis() - millis;
   EXPECT_LT(useTime, 30);
 


Mime
View raw message