rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifplu...@apache.org
Subject [rocketmq-client-cpp] branch re_dev updated: refactor: invoke async send in executor
Date Wed, 10 Mar 2021 07:36:08 GMT
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/re_dev by this push:
     new 726b7dc  refactor: invoke async send in executor
726b7dc is described below

commit 726b7dc424e798d917d4f29b2639242633e3bd56
Author: James Yin <ywhjames@hotmail.com>
AuthorDate: Wed Mar 10 15:34:49 2021 +0800

    refactor: invoke async send in executor
---
 include/DefaultMQProducerConfig.h            |  25 +++---
 include/DefaultMQProducerConfigProxy.h       |  41 +++++----
 src/producer/DefaultMQProducerConfigImpl.hpp |  34 +++++---
 src/producer/DefaultMQProducerImpl.cpp       | 119 ++++++++++++++++-----------
 src/producer/DefaultMQProducerImpl.h         |   2 +
 5 files changed, 136 insertions(+), 85 deletions(-)

diff --git a/include/DefaultMQProducerConfig.h b/include/DefaultMQProducerConfig.h
index 46f9c2f..67c7231 100644
--- a/include/DefaultMQProducerConfig.h
+++ b/include/DefaultMQProducerConfig.h
@@ -32,36 +32,39 @@ class ROCKETMQCLIENT_API DefaultMQProducerConfig : virtual public MQClientConfig
  public:
   virtual ~DefaultMQProducerConfig() = default;
 
-  // if msgbody size larger than maxMsgBodySize, exception will be throwed
+  virtual int async_send_thread_nums() const = 0;
+  virtual void set_async_send_thread_nums(int async_send_thread_nums) = 0;
+
+  // if msgbody size larger than max_message_size, exception will be throwed
   virtual int max_message_size() const = 0;
-  virtual void set_max_message_size(int maxMessageSize) = 0;
+  virtual void set_max_message_size(int max_message_size) = 0;
 
   /*
-   * if msgBody size is large than m_compressMsgBodyOverHowmuch
-   *  rocketmq cpp will compress msgBody according to compressLevel
+   * if msgBody size is large than compress_msg_body_over_howmuch,
+   * sdk will compress message body according to compress_level
    */
   virtual int compress_msg_body_over_howmuch() const = 0;
-  virtual void set_compress_msg_body_over_howmuch(int compressMsgBodyOverHowmuch) = 0;
+  virtual void set_compress_msg_body_over_howmuch(int compress_msg_body_over_howmuch) = 0;
 
   virtual int compress_level() const = 0;
-  virtual void set_compress_level(int compressLevel) = 0;
+  virtual void set_compress_level(int compress_level) = 0;
 
   // set and get timeout of per msg
   virtual int send_msg_timeout() const = 0;
-  virtual void set_send_msg_timeout(int sendMsgTimeout) = 0;
+  virtual void set_send_msg_timeout(int send_msg_timeout) = 0;
 
   // set msg max retry times, default retry times is 5
   virtual int retry_times() const = 0;
-  virtual void set_retry_times(int times) = 0;
+  virtual void set_retry_times(int retry_times) = 0;
 
   virtual int retry_times_for_async() const = 0;
-  virtual void set_retry_times_for_async(int times) = 0;
+  virtual void set_retry_times_for_async(int retry_times) = 0;
 
   virtual bool retry_another_broker_when_not_store_ok() const = 0;
-  virtual void set_retry_another_broker_when_not_store_ok(bool retryAnotherBrokerWhenNotStoreOK)
= 0;
+  virtual void set_retry_another_broker_when_not_store_ok(bool retry_another_broker_when_not_store_ok)
= 0;
 
   virtual bool send_latency_fault_enable() const { return false; };
-  virtual void set_send_latency_fault_enable(bool sendLatencyFaultEnable){};
+  virtual void set_send_latency_fault_enable(bool send_latency_fault_enable){};
 };
 
 }  // namespace rocketmq
diff --git a/include/DefaultMQProducerConfigProxy.h b/include/DefaultMQProducerConfigProxy.h
index b879b43..8870e98 100644
--- a/include/DefaultMQProducerConfigProxy.h
+++ b/include/DefaultMQProducerConfigProxy.h
@@ -32,70 +32,79 @@ class ROCKETMQCLIENT_API DefaultMQProducerConfigProxy : public MQClientConfigPro
   DefaultMQProducerConfigProxy(DefaultMQProducerConfigPtr producerConfig) : MQClientConfigProxy(producerConfig)
{}
   virtual ~DefaultMQProducerConfigProxy() = default;
 
+  int async_send_thread_nums() const override {
+    return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->async_send_thread_nums();
+  }
+
+  void set_async_send_thread_nums(int async_send_thread_nums) override {
+    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_async_send_thread_nums(async_send_thread_nums);
+  }
+
   int max_message_size() const override {
     return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size();
   }
 
-  void set_max_message_size(int maxMessageSize) override {
-    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_max_message_size(maxMessageSize);
+  void set_max_message_size(int max_message_size) override {
+    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_max_message_size(max_message_size);
   }
 
   int compress_msg_body_over_howmuch() const override {
     return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->compress_msg_body_over_howmuch();
   }
 
-  void set_compress_msg_body_over_howmuch(int compressMsgBodyOverHowmuch) override {
+  void set_compress_msg_body_over_howmuch(int compress_msg_body_over_howmuch) override {
     dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())
-        ->set_compress_msg_body_over_howmuch(compressMsgBodyOverHowmuch);
+        ->set_compress_msg_body_over_howmuch(compress_msg_body_over_howmuch);
   }
 
   int compress_level() const override {
     return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->compress_level();
   }
 
-  void set_compress_level(int compressLevel) override {
-    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_compress_level(compressLevel);
+  void set_compress_level(int compress_level) override {
+    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_compress_level(compress_level);
   }
 
   int send_msg_timeout() const override {
     return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->send_msg_timeout();
   }
 
-  void set_send_msg_timeout(int sendMsgTimeout) override {
-    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_send_msg_timeout(sendMsgTimeout);
+  void set_send_msg_timeout(int send_msg_timeout) override {
+    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_send_msg_timeout(send_msg_timeout);
   }
 
   int retry_times() const override {
     return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->retry_times();
   }
 
-  void set_retry_times(int times) override {
-    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_retry_times(times);
+  void set_retry_times(int retry_times) override {
+    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_retry_times(retry_times);
   }
 
   int retry_times_for_async() const override {
     return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->retry_times_for_async();
   }
 
-  void set_retry_times_for_async(int times) override {
-    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_retry_times_for_async(times);
+  void set_retry_times_for_async(int retry_times) override {
+    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_retry_times_for_async(retry_times);
   }
 
   bool retry_another_broker_when_not_store_ok() const override {
     return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->retry_another_broker_when_not_store_ok();
   }
 
-  void set_retry_another_broker_when_not_store_ok(bool retryAnotherBrokerWhenNotStoreOK)
override {
+  void set_retry_another_broker_when_not_store_ok(bool retry_another_broker_when_not_store_ok)
override {
     dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())
-        ->set_retry_another_broker_when_not_store_ok(retryAnotherBrokerWhenNotStoreOK);
+        ->set_retry_another_broker_when_not_store_ok(retry_another_broker_when_not_store_ok);
   }
 
   bool send_latency_fault_enable() const override {
     return dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->send_latency_fault_enable();
   }
 
-  void set_send_latency_fault_enable(bool sendLatencyFaultEnable) override {
-    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_send_latency_fault_enable(sendLatencyFaultEnable);
+  void set_send_latency_fault_enable(bool send_latency_fault_enable) override {
+    dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())
+        ->set_send_latency_fault_enable(send_latency_fault_enable);
   }
 
   inline DefaultMQProducerConfigPtr real_config() const {
diff --git a/src/producer/DefaultMQProducerConfigImpl.hpp b/src/producer/DefaultMQProducerConfigImpl.hpp
index 5b1f78d..00f3eff 100644
--- a/src/producer/DefaultMQProducerConfigImpl.hpp
+++ b/src/producer/DefaultMQProducerConfigImpl.hpp
@@ -18,6 +18,7 @@
 #define ROCKETMQ_PRODUCER_DEFAULTMQPRODUCERCONFIGIMPL_HPP_
 
 #include <algorithm>  // std::min, std::max
+#include <thread>
 
 #include "DefaultMQProducerConfig.h"
 #include "MQClientConfigImpl.hpp"
@@ -30,7 +31,8 @@ namespace rocketmq {
 class DefaultMQProducerConfigImpl : virtual public DefaultMQProducerConfig, public MQClientConfigImpl
{
  public:
   DefaultMQProducerConfigImpl()
-      : max_message_size_(1024 * 1024 * 4),         // 4MB
+      : async_send_thread_nums_(std::min(4, (int)std::thread::hardware_concurrency())),
+        max_message_size_(1024 * 1024 * 4),         // 4MB
         compress_msg_body_over_howmuch_(1024 * 4),  // 4KB
         compress_level_(5),
         send_msg_timeout_(3000),
@@ -40,36 +42,44 @@ class DefaultMQProducerConfigImpl : virtual public DefaultMQProducerConfig,
publ
 
   virtual ~DefaultMQProducerConfigImpl() = default;
 
+  int async_send_thread_nums() const override { return async_send_thread_nums_; }
+  void set_async_send_thread_nums(int async_send_thread_nums) override {
+    async_send_thread_nums_ = async_send_thread_nums;
+  }
+
   int max_message_size() const override { return max_message_size_; }
-  void set_max_message_size(int maxMessageSize) override { max_message_size_ = maxMessageSize;
}
+  void set_max_message_size(int max_message_size) override { max_message_size_ = max_message_size;
}
 
   int compress_msg_body_over_howmuch() const override { return compress_msg_body_over_howmuch_;
}
-  void set_compress_msg_body_over_howmuch(int compressMsgBodyOverHowmuch) override {
-    compress_msg_body_over_howmuch_ = compressMsgBodyOverHowmuch;
+  void set_compress_msg_body_over_howmuch(int compress_msg_body_over_howmuch) override {
+    compress_msg_body_over_howmuch_ = compress_msg_body_over_howmuch;
   }
 
   int compress_level() const override { return compress_level_; }
-  void set_compress_level(int compressLevel) override {
-    if ((compressLevel >= 0 && compressLevel <= 9) || compressLevel == -1)
{
-      compress_level_ = compressLevel;
+  void set_compress_level(int compress_level) override {
+    if ((compress_level >= 0 && compress_level <= 9) || compress_level == -1)
{
+      compress_level_ = compress_level;
     }
   }
 
   int send_msg_timeout() const override { return send_msg_timeout_; }
-  void set_send_msg_timeout(int sendMsgTimeout) override { send_msg_timeout_ = sendMsgTimeout;
}
+  void set_send_msg_timeout(int send_msg_timeout) override { send_msg_timeout_ = send_msg_timeout;
}
 
   int retry_times() const override { return retry_times_; }
-  void set_retry_times(int times) override { retry_times_ = std::min(std::max(0, times),
15); }
+  void set_retry_times(int retry_times) override { retry_times_ = std::min(std::max(0, retry_times),
15); }
 
   int retry_times_for_async() const override { return retry_times_for_async_; }
-  void set_retry_times_for_async(int times) override { retry_times_for_async_ = std::min(std::max(0,
times), 15); }
+  void set_retry_times_for_async(int retry_times) override {
+    retry_times_for_async_ = std::min(std::max(0, retry_times), 15);
+  }
 
   bool retry_another_broker_when_not_store_ok() const override { return retry_another_broker_when_not_store_ok_;
}
-  void set_retry_another_broker_when_not_store_ok(bool retryAnotherBrokerWhenNotStoreOK)
override {
-    retry_another_broker_when_not_store_ok_ = retryAnotherBrokerWhenNotStoreOK;
+  void set_retry_another_broker_when_not_store_ok(bool retry_another_broker_when_not_store_ok)
override {
+    retry_another_broker_when_not_store_ok_ = retry_another_broker_when_not_store_ok;
   }
 
  protected:
+  int async_send_thread_nums_;
   int max_message_size_;                // default: 4 MB
   int compress_msg_body_over_howmuch_;  // default: 4 KB
   int compress_level_;
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index 20704f8..ade43d1 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -29,14 +29,14 @@
 #include "CorrelationIdUtil.hpp"
 #include "Logging.h"
 #include "MQClientAPIImpl.h"
-#include "MQException.h"
 #include "MQClientInstance.h"
 #include "MQClientManager.h"
-#include "MessageDecoder.h"
+#include "MQException.h"
 #include "MQFaultStrategy.h"
 #include "MQProtos.h"
 #include "MessageBatch.h"
 #include "MessageClientIDSetter.h"
+#include "MessageDecoder.h"
 #include "MessageSysFlag.h"
 #include "RequestFutureTable.h"
 #include "TopicPublishInfo.hpp"
@@ -92,7 +92,10 @@ DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducerConfigPtr
config)
     : DefaultMQProducerImpl(config, nullptr) {}
 
 DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducerConfigPtr config, RPCHookPtr
rpcHook)
-    : MQClientImpl(config, rpcHook), mq_fault_strategy_(new MQFaultStrategy()), check_transaction_executor_(nullptr)
{}
+    : MQClientImpl(config, rpcHook),
+      mq_fault_strategy_(new MQFaultStrategy()),
+      async_send_executor_(nullptr),
+      check_transaction_executor_(nullptr) {}
 
 DefaultMQProducerImpl::~DefaultMQProducerImpl() = default;
 
@@ -120,11 +123,19 @@ void DefaultMQProducerImpl::start() {
           dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->group_name(),
this);
       if (!registerOK) {
         service_state_ = CREATE_JUST;
-        THROW_MQEXCEPTION(MQClientException, "The producer group[" + client_config_->group_name()
+
-                                                 "] has been created before, specify another
name please.",
+        THROW_MQEXCEPTION(MQClientException,
+                          "The producer group[" + client_config_->group_name() +
+                              "] has been created before, specify another name please.",
                           -1);
       }
 
+      if (nullptr == async_send_executor_) {
+        async_send_executor_.reset(new thread_pool_executor(
+            "AsyncSendThread", dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->async_send_thread_nums(),
+            false));
+      }
+      async_send_executor_->startup();
+
       client_instance_->start();
 
       LOG_INFO_NEW("the producer [{}] start OK.", client_config_->group_name());
@@ -147,6 +158,9 @@ void DefaultMQProducerImpl::shutdown() {
   switch (service_state_) {
     case RUNNING: {
       LOG_INFO("DefaultMQProducerImpl shutdown");
+
+      async_send_executor_->shutdown();
+
       client_instance_->unregisterProducer(client_config_->group_name());
       client_instance_->shutdown();
 
@@ -209,15 +223,18 @@ void DefaultMQProducerImpl::send(MQMessage& msg, SendCallback* sendCallback)
noe
 }
 
 void DefaultMQProducerImpl::send(MQMessage& msg, SendCallback* sendCallback, long timeout)
noexcept {
-  try {
-    (void)sendDefaultImpl(msg.getMessageImpl(), ASYNC, sendCallback, timeout);
-  } catch (MQException& e) {
-    LOG_ERROR_NEW("send failed, exception:{}", e.what());
-    sendCallback->invokeOnException(e);
-  } catch (std::exception& e) {
-    LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
-    exit(-1);
-  }
+  auto msg_impl = msg.getMessageImpl();
+  async_send_executor_->submit([this, msg_impl, sendCallback, timeout] {
+    try {
+      (void)sendDefaultImpl(msg_impl, ASYNC, sendCallback, timeout);
+    } catch (MQException& e) {
+      LOG_ERROR_NEW("send failed, exception:{}", e.what());
+      sendCallback->invokeOnException(e);
+    } catch (std::exception& e) {
+      LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
+      exit(-1);
+    }
+  });
 }
 
 void DefaultMQProducerImpl::send(MQMessage& msg, const MQMessageQueue& mq, SendCallback*
sendCallback) noexcept {
@@ -228,26 +245,30 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
                                  const MQMessageQueue& mq,
                                  SendCallback* sendCallback,
                                  long timeout) noexcept {
-  try {
-    Validators::checkMessage(msg, dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size());
+  auto msg_impl = msg.getMessageImpl();
+  async_send_executor_->submit([this, msg_impl, mq, sendCallback, timeout] {
+    try {
+      Validators::checkMessage(*msg_impl,
+                               dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size());
 
-    if (msg.topic() != mq.topic()) {
-      THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
-    }
+      if (msg_impl->topic() != mq.topic()) {
+        THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
+      }
 
-    try {
-      sendKernelImpl(msg.getMessageImpl(), mq, ASYNC, sendCallback, nullptr, timeout);
-    } catch (MQBrokerException& e) {
-      std::string info = std::string("unknown exception, ") + e.what();
-      THROW_MQEXCEPTION(MQClientException, info, e.GetError());
+      try {
+        sendKernelImpl(msg_impl, mq, ASYNC, sendCallback, nullptr, timeout);
+      } catch (MQBrokerException& e) {
+        std::string info = std::string("unknown exception, ") + e.what();
+        THROW_MQEXCEPTION(MQClientException, info, e.GetError());
+      }
+    } catch (MQException& e) {
+      LOG_ERROR_NEW("send failed, exception:{}", e.what());
+      sendCallback->invokeOnException(e);
+    } catch (std::exception& e) {
+      LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
+      exit(-1);
     }
-  } catch (MQException& e) {
-    LOG_ERROR_NEW("send failed, exception:{}", e.what());
-    sendCallback->invokeOnException(e);
-  } catch (std::exception& e) {
-    LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
-    exit(-1);
-  }
+  });
 }
 
 void DefaultMQProducerImpl::sendOneway(MQMessage& msg) {
@@ -303,20 +324,23 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
                                  void* arg,
                                  SendCallback* sendCallback,
                                  long timeout) noexcept {
-  try {
+  auto msg_impl = msg.getMessageImpl();
+  async_send_executor_->submit([this, msg_impl, selector, arg, sendCallback, timeout]
{
     try {
-      sendSelectImpl(msg.getMessageImpl(), selector, arg, ASYNC, sendCallback, timeout);
-    } catch (MQBrokerException& e) {
-      std::string info = std::string("unknown exception, ") + e.what();
-      THROW_MQEXCEPTION(MQClientException, info, e.GetError());
+      try {
+        sendSelectImpl(msg_impl, selector, arg, ASYNC, sendCallback, timeout);
+      } catch (MQBrokerException& e) {
+        std::string info = std::string("unknown exception, ") + e.what();
+        THROW_MQEXCEPTION(MQClientException, info, e.GetError());
+      }
+    } catch (MQException& e) {
+      LOG_ERROR_NEW("send failed, exception:{}", e.what());
+      sendCallback->invokeOnException(e);
+    } catch (std::exception& e) {
+      LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
+      exit(-1);
     }
-  } catch (MQException& e) {
-    LOG_ERROR_NEW("send failed, exception:{}", e.what());
-    sendCallback->invokeOnException(e);
-  } catch (std::exception& e) {
-    LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
-    exit(-1);
-  }
+  });
 }
 
 void DefaultMQProducerImpl::sendOneway(MQMessage& msg, MessageQueueSelector* selector,
void* arg) {
@@ -375,7 +399,10 @@ void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs,
const MQMessageQu
   send(batchMessage, mq, sendCallback);
 }
 
-void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const MQMessageQueue&
mq, SendCallback* sendCallback, long timeout) {
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs,
+                                 const MQMessageQueue& mq,
+                                 SendCallback* sendCallback,
+                                 long timeout) {
   MQMessage batchMessage(batch(msgs));
   send(batchMessage, mq, sendCallback, timeout);
 }
@@ -655,8 +682,8 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
     }
 
     std::string info = "Send [" + UtilAll::to_string(times) + "] times, still failed, cost
[" +
-                       UtilAll::to_string(UtilAll::currentTimeMillis() - beginTimestampFirst)
+ "]ms, Topic: " +
-                       msg->topic();
+                       UtilAll::to_string(UtilAll::currentTimeMillis() - beginTimestampFirst)
+
+                       "]ms, Topic: " + msg->topic();
     THROW_MQEXCEPTION(MQClientException, info, -1);
   }
 
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index da96df5..57eef51 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -22,6 +22,7 @@
 #include "MQClientImpl.h"
 #include "MQProducerInner.h"
 #include "MessageBatch.h"
+#include "concurrent/executor.hpp"
 
 namespace rocketmq {
 
@@ -178,6 +179,7 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
 
  private:
   std::unique_ptr<MQFaultStrategy> mq_fault_strategy_;
+  std::unique_ptr<thread_pool_executor> async_send_executor_;
   std::unique_ptr<thread_pool_executor> check_transaction_executor_;
 };
 


Mime
View raw message