rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifplu...@apache.org
Subject [rocketmq-client-cpp] 01/04: fix: leak of InvokeCallback
Date Wed, 24 Mar 2021 09:06:09 GMT
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 3bd226ee201a8e924e88420ba35de911f530ac60
Author: James Yin <ywhjames@hotmail.com>
AuthorDate: Thu Mar 11 01:32:53 2021 +0800

    fix: leak of InvokeCallback
---
 src/MQClientAPIImpl.cpp             | 32 ++++++++++----------------------
 src/MQClientAPIImpl.h               |  8 ++++----
 src/common/SendCallbackWrap.cpp     |  4 +---
 src/transport/ResponseFuture.cpp    | 11 +++++------
 src/transport/ResponseFuture.h      | 11 ++++++++---
 src/transport/TcpRemotingClient.cpp | 36 +++++++++++++++---------------------
 src/transport/TcpRemotingClient.h   |  6 +++---
 7 files changed, 46 insertions(+), 62 deletions(-)

diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 91c6dc1..e5372d0 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -159,21 +159,16 @@ void MQClientAPIImpl::sendMessageAsync(const std::string& addr,
                                        int64_t timeoutMillis,
                                        int retryTimesWhenSendFailed,
                                        DefaultMQProducerImplPtr producer) {
-  // delete in future
-  auto* cbw = new SendCallbackWrap(addr, brokerName, msg, std::forward<RemotingCommand>(request),
sendCallback,
-                                   topicPublishInfo, instance, retryTimesWhenSendFailed,
0, producer);
-
-  try {
-    sendMessageAsyncImpl(cbw, timeoutMillis);
-  } catch (RemotingException& e) {
-    deleteAndZero(cbw);
-    throw;
-  }
+  std::unique_ptr<InvokeCallback> cbw(
+      new SendCallbackWrap(addr, brokerName, msg, std::forward<RemotingCommand>(request),
sendCallback,
+                           topicPublishInfo, instance, retryTimesWhenSendFailed, 0, producer));
+  sendMessageAsyncImpl(cbw, timeoutMillis);
 }
 
-void MQClientAPIImpl::sendMessageAsyncImpl(SendCallbackWrap* cbw, int64_t timeoutMillis)
{
-  const auto& addr = cbw->getAddr();
-  auto& request = cbw->getRemotingCommand();
+void MQClientAPIImpl::sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw,
int64_t timeoutMillis) {
+  auto* scbw = static_cast<SendCallbackWrap*>(cbw.get());
+  const auto& addr = scbw->getAddr();
+  auto& request = scbw->getRemotingCommand();
   remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
 }
 
@@ -261,15 +256,8 @@ void MQClientAPIImpl::pullMessageAsync(const std::string& addr,
                                        RemotingCommand& request,
                                        int timeoutMillis,
                                        PullCallback* pullCallback) {
-  // delete in future
-  auto* cbw = new PullCallbackWrap(pullCallback, this);
-
-  try {
-    remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
-  } catch (RemotingException& e) {
-    deleteAndZero(cbw);
-    throw;
-  }
+  std::unique_ptr<InvokeCallback> cbw(new PullCallbackWrap(pullCallback, this));
+  remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
 }
 
 PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr, RemotingCommand&
request, int timeoutMillis) {
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 143a6a5..535f33d 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -20,8 +20,8 @@
 #include "CommunicationMode.h"
 #include "DefaultMQProducerImpl.h"
 #include "KVTable.h"
-#include "MQException.h"
 #include "MQClientInstance.h"
+#include "MQException.h"
 #include "MQMessageExt.h"
 #include "PullCallback.h"
 #include "SendCallback.h"
@@ -29,8 +29,8 @@
 #include "TopicConfig.h"
 #include "TopicList.h"
 #include "TopicPublishInfo.hpp"
-#include "protocol/body/TopicRouteData.hpp"
 #include "protocol/body/LockBatchRequestBody.hpp"
+#include "protocol/body/TopicRouteData.hpp"
 #include "protocol/body/UnlockBatchRequestBody.hpp"
 #include "protocol/header/CommandHeader.h"
 #include "protocol/heartbeat/HeartbeatData.hpp"
@@ -40,7 +40,7 @@ namespace rocketmq {
 class TcpRemotingClient;
 class ClientRemotingProcessor;
 class RPCHook;
-class SendCallbackWrap;
+class InvokeCallback;
 
 /**
  * wrap all RPC API
@@ -182,7 +182,7 @@ class MQClientAPIImpl {
                         int retryTimesWhenSendFailed,
                         DefaultMQProducerImplPtr producer);
 
-  void sendMessageAsyncImpl(SendCallbackWrap* cbw, int64_t timeoutMillis);
+  void sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw, int64_t timeoutMillis);
 
   PullResult* pullMessageSync(const std::string& addr, RemotingCommand& request,
int timeoutMillis);
 
diff --git a/src/common/SendCallbackWrap.cpp b/src/common/SendCallbackWrap.cpp
index 92fd6e1..562b30f 100644
--- a/src/common/SendCallbackWrap.cpp
+++ b/src/common/SendCallbackWrap.cpp
@@ -156,9 +156,7 @@ void SendCallbackWrap::onExceptionImpl(ResponseFuture* responseFuture,
       // resend
       addr_ = std::move(addr);
       broker_name_ = std::move(retryBrokerName);
-      instance_->getMQClientAPIImpl()->sendMessageAsyncImpl(this, timeoutMillis);
-
-      responseFuture->releaseInvokeCallback();  // for avoid delete this SendCallbackWrap
+      instance_->getMQClientAPIImpl()->sendMessageAsyncImpl(responseFuture->invoke_callback(),
timeoutMillis);
       return;
     } catch (MQException& e1) {
       producer->updateFaultItem(broker_name_, 3000, true);
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
index 9384135..a2887be 100755
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -20,11 +20,14 @@
 
 namespace rocketmq {
 
-ResponseFuture::ResponseFuture(int requestCode, int opaque, int64_t timeoutMillis, InvokeCallback*
invokeCallback)
+ResponseFuture::ResponseFuture(int requestCode,
+                               int opaque,
+                               int64_t timeoutMillis,
+                               std::unique_ptr<InvokeCallback> invokeCallback)
     : request_code_(requestCode),
       opaque_(opaque),
       timeout_millis_(timeoutMillis),
-      invoke_callback_(invokeCallback),
+      invoke_callback_(std::move(invokeCallback)),
       begin_timestamp_(UtilAll::currentTimeMillis()),
       send_request_ok_(false),
       response_command_(nullptr),
@@ -47,10 +50,6 @@ bool ResponseFuture::hasInvokeCallback() {
   return invoke_callback_ != nullptr;
 }
 
-InvokeCallback* ResponseFuture::releaseInvokeCallback() {
-  return invoke_callback_.release();
-}
-
 void ResponseFuture::executeInvokeCallback() noexcept {
   if (invoke_callback_ != nullptr) {
     invoke_callback_->operationComplete(this);
diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h
index 56ffc67..f950743 100755
--- a/src/transport/ResponseFuture.h
+++ b/src/transport/ResponseFuture.h
@@ -17,9 +17,10 @@
 #ifndef ROCKETMQ_TRANSPORT_RESPONSEFUTURE_H_
 #define ROCKETMQ_TRANSPORT_RESPONSEFUTURE_H_
 
-#include "concurrent/latch.hpp"
+#include <memory>
 #include "InvokeCallback.h"
 #include "RemotingCommand.h"
+#include "concurrent/latch.hpp"
 
 namespace rocketmq {
 
@@ -28,13 +29,15 @@ typedef std::shared_ptr<ResponseFuture> ResponseFuturePtr;
 
 class ResponseFuture {
  public:
-  ResponseFuture(int requestCode, int opaque, int64_t timeoutMillis, InvokeCallback* invokeCallback
= nullptr);
+  ResponseFuture(int requestCode,
+                 int opaque,
+                 int64_t timeoutMillis,
+                 std::unique_ptr<InvokeCallback> invokeCallback = nullptr);
   virtual ~ResponseFuture();
 
   void releaseThreadCondition();
 
   bool hasInvokeCallback();
-  InvokeCallback* releaseInvokeCallback();
 
   void executeInvokeCallback() noexcept;
 
@@ -59,6 +62,8 @@ class ResponseFuture {
   inline bool send_request_ok() const { return send_request_ok_; }
   inline void set_send_request_ok(bool sendRequestOK = true) { send_request_ok_ = sendRequestOK;
};
 
+  inline std::unique_ptr<InvokeCallback>& invoke_callback() { return invoke_callback_;
}
+
  private:
   int request_code_;
   int opaque_;
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index c6c222d..655a811 100644
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -279,7 +279,7 @@ std::unique_ptr<RemotingCommand> TcpRemotingClient::invokeSyncImpl(TcpTransportP
 
 void TcpRemotingClient::invokeAsync(const std::string& addr,
                                     RemotingCommand& request,
-                                    InvokeCallback* invokeCallback,
+                                    std::unique_ptr<InvokeCallback>& invokeCallback,
                                     int64_t timeoutMillis) {
   auto beginStartTime = UtilAll::currentTimeMillis();
   auto channel = GetTransport(addr);
@@ -304,33 +304,27 @@ void TcpRemotingClient::invokeAsync(const std::string& addr,
 void TcpRemotingClient::invokeAsyncImpl(TcpTransportPtr channel,
                                         RemotingCommand& request,
                                         int64_t timeoutMillis,
-                                        InvokeCallback* invokeCallback) {
+                                        std::unique_ptr<InvokeCallback>& invokeCallback)
{
   int code = request.code();
   int opaque = request.opaque();
 
   // delete in callback
-  auto responseFuture = std::make_shared<ResponseFuture>(code, opaque, timeoutMillis,
invokeCallback);
+  auto responseFuture = std::make_shared<ResponseFuture>(code, opaque, timeoutMillis,
std::move(invokeCallback));
   putResponseFuture(channel, opaque, responseFuture);
 
-  try {
-    if (SendCommand(channel, request)) {
-      responseFuture->set_send_request_ok(true);
-    } else {
-      // requestFail
-      responseFuture = popResponseFuture(channel, opaque);
-      if (responseFuture != nullptr) {
-        responseFuture->set_send_request_ok(false);
-        if (responseFuture->hasInvokeCallback()) {
-          handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, responseFuture));
-        }
+  if (SendCommand(channel, request)) {
+    responseFuture->set_send_request_ok(true);
+  } else {
+    // request fail
+    responseFuture = popResponseFuture(channel, opaque);
+    if (responseFuture != nullptr) {
+      responseFuture->set_send_request_ok(false);
+      if (responseFuture->hasInvokeCallback()) {
+        handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, responseFuture));
       }
-
-      LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort());
     }
-  } catch (const std::exception& e) {
-    LOG_WARN_NEW("send a request command to channel <{}> Exception.\n{}", channel->getPeerAddrAndPort(),
e.what());
-    THROW_MQEXCEPTION(RemotingSendRequestException, "send request to <" + channel->getPeerAddrAndPort()
+ "> failed",
-                      -1);
+
+    LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort());
   }
 }
 
@@ -553,7 +547,7 @@ bool TcpRemotingClient::CloseNameServerTransport(TcpTransportPtr channel)
{
   return removeItemFromTable;
 }
 
-bool TcpRemotingClient::SendCommand(TcpTransportPtr channel, RemotingCommand& msg) {
+bool TcpRemotingClient::SendCommand(TcpTransportPtr channel, RemotingCommand& msg) noexcept
{
   auto package = msg.encode();
   return channel->sendMessage(package->array(), package->size());
 }
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index 3ffbb66..7d86060 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -52,7 +52,7 @@ class TcpRemotingClient {
 
   void invokeAsync(const std::string& addr,
                    RemotingCommand& request,
-                   InvokeCallback* invokeCallback,
+                   std::unique_ptr<InvokeCallback>& invokeCallback,
                    int64_t timeoutMillis);
 
   void invokeOneway(const std::string& addr, RemotingCommand& request);
@@ -62,7 +62,7 @@ class TcpRemotingClient {
   std::vector<std::string> getNameServerAddressList() const { return namesrv_addr_list_;
}
 
  private:
-  static bool SendCommand(TcpTransportPtr channel, RemotingCommand& msg);
+  static bool SendCommand(TcpTransportPtr channel, RemotingCommand& msg) noexcept;
 
   void channelClosed(TcpTransportPtr channel);
 
@@ -88,7 +88,7 @@ class TcpRemotingClient {
   void invokeAsyncImpl(TcpTransportPtr channel,
                        RemotingCommand& request,
                        int64_t timeoutMillis,
-                       InvokeCallback* invokeCallback);
+                       std::unique_ptr<InvokeCallback>& invokeCallback);
   void invokeOnewayImpl(TcpTransportPtr channel, RemotingCommand& request);
 
   // rpc hook

Mime
View raw message