rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifplu...@apache.org
Subject [rocketmq-client-cpp] 06/29: feat(request-reply): more request interface
Date Tue, 29 Dec 2020 03:36:23 GMT
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit c3d9a3cb77d88784af69e02eecfef516af3fe510
Author: James Yin <ywhjames@hotmail.com>
AuthorDate: Wed Jul 22 12:56:28 2020 +0800

    feat(request-reply): more request interface
---
 include/DefaultMQProducer.h                |   9 ++
 include/MQProducer.h                       |  10 ++
 src/MQClientAPIImpl.cpp                    |  10 +-
 src/common/CommunicationMode.h             |   8 +-
 src/consumer/DefaultMQPushConsumerImpl.cpp |   2 +-
 src/producer/DefaultMQProducer.cpp         |  27 ++++
 src/producer/DefaultMQProducerImpl.cpp     | 216 ++++++++++++++++++++++++-----
 src/producer/DefaultMQProducerImpl.h       |  10 ++
 src/producer/RequestResponseFuture.cpp     |   4 +
 src/producer/RequestResponseFuture.h       |   2 +
 10 files changed, 257 insertions(+), 41 deletions(-)

diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index e34f94e..f0baf01 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -75,6 +75,15 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer, public
DefaultMQ
 
   // RPC
   MQMessage request(MQMessage& msg, long timeout) override;
+  void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;
+  MQMessage request(MQMessage& msg, const MQMessageQueue& mq, long timeout) override;
+  void request(MQMessage& msg, const MQMessageQueue& mq, RequestCallback* requestCallback,
long timeout) override;
+  MQMessage request(MQMessage& msg, MessageQueueSelector* selector, void* arg, long timeout)
override;
+  void request(MQMessage& msg,
+               MessageQueueSelector* selector,
+               void* arg,
+               RequestCallback* requestCallback,
+               long timeout) override;
 
  public:  // DefaultMQProducerConfig
   bool isSendLatencyFaultEnable() const override;
diff --git a/include/MQProducer.h b/include/MQProducer.h
index dc32fa3..69caaec 100644
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -18,6 +18,7 @@
 #define __MQ_PRODUCER_H__
 
 #include "MQSelector.h"
+#include "RequestCallback.h"
 #include "SendCallback.h"
 #include "SendResult.h"
 #include "TransactionSendResult.h"
@@ -73,6 +74,15 @@ class ROCKETMQCLIENT_API MQProducer {
 
   // RPC
   virtual MQMessage request(MQMessage& msg, long timeout) = 0;
+  virtual void request(MQMessage& msg, RequestCallback* requestCallback, long timeout)
= 0;
+  virtual MQMessage request(MQMessage& msg, const MQMessageQueue& mq, long timeout)
= 0;
+  virtual void request(MQMessage& msg, const MQMessageQueue& mq, RequestCallback*
requestCallback, long timeout) = 0;
+  virtual MQMessage request(MQMessage& msg, MessageQueueSelector* selector, void* arg,
long timeout) = 0;
+  virtual void request(MQMessage& msg,
+                       MessageQueueSelector* selector,
+                       void* arg,
+                       RequestCallback* requestCallback,
+                       long timeout) = 0;
 };
 
 }  // namespace rocketmq
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index a1868b6..8ef0d97 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -130,14 +130,14 @@ SendResult* MQClientAPIImpl::sendMessage(const std::string& addr,
   request.set_body(msg->getBody());
 
   switch (communicationMode) {
-    case ComMode_ONEWAY:
+    case CommunicationMode::ONEWAY:
       m_remotingClient->invokeOneway(addr, request);
       return nullptr;
-    case ComMode_ASYNC:
+    case CommunicationMode::ASYNC:
       sendMessageAsync(addr, brokerName, msg, std::move(request), sendCallback, topicPublishInfo,
instance,
                        timeoutMillis, retryTimesWhenSendFailed, producer);
       return nullptr;
-    case ComMode_SYNC:
+    case CommunicationMode::SYNC:
       return sendMessageSync(addr, brokerName, msg, request, timeoutMillis);
     default:
       assert(false);
@@ -244,10 +244,10 @@ PullResult* MQClientAPIImpl::pullMessage(const std::string& addr,
   RemotingCommand request(PULL_MESSAGE, requestHeader);
 
   switch (communicationMode) {
-    case ComMode_ASYNC:
+    case CommunicationMode::ASYNC:
       pullMessageAsync(addr, request, timeoutMillis, pullCallback);
       return nullptr;
-    case ComMode_SYNC:
+    case CommunicationMode::SYNC:
       return pullMessageSync(addr, request, timeoutMillis);
     default:
       assert(false);
diff --git a/src/common/CommunicationMode.h b/src/common/CommunicationMode.h
index bf8c0fe..f05a3ef 100644
--- a/src/common/CommunicationMode.h
+++ b/src/common/CommunicationMode.h
@@ -14,13 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __COMMUNICATION_MODE_H__
-#define __COMMUNICATION_MODE_H__
+#ifndef ROCKETMQ_COMMON_COMMUNICATIONMODE_H_
+#define ROCKETMQ_COMMON_COMMUNICATIONMODE_H_
 
 namespace rocketmq {
 
-enum CommunicationMode { ComMode_SYNC, ComMode_ASYNC, ComMode_ONEWAY };
+enum CommunicationMode { SYNC, ASYNC, ONEWAY };
 
 }  // namespace rocketmq
 
-#endif  // __COMMUNICATION_MODE_H__
+#endif  // ROCKETMQ_COMMON_COMMUNICATIONMODE_H_
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 800b4c7..6e83861 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -566,7 +566,7 @@ void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pullRequest)
{
                                      commitOffsetValue,                            // 7
                                      1000 * 15,                                    // 8
                                      m_pushConsumerConfig->getAsyncPullTimeout(),  //
9
-                                     ComMode_ASYNC,                                // 10
+                                     CommunicationMode::ASYNC,                     // 10
                                      callback);                                    // 11
   } catch (MQException& e) {
     LOG_ERROR_NEW("pullKernelImpl exception: {}", e.what());
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 8996470..179dfdb 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -146,6 +146,33 @@ MQMessage DefaultMQProducer::request(MQMessage& msg, long timeout)
{
   return producer_impl_->request(msg, timeout);
 }
 
+void DefaultMQProducer::request(MQMessage& msg, RequestCallback* requestCallback, long
timeout) {
+  producer_impl_->request(msg, requestCallback, timeout);
+}
+
+MQMessage DefaultMQProducer::request(MQMessage& msg, const MQMessageQueue& mq, long
timeout) {
+  return producer_impl_->request(msg, mq, timeout);
+}
+
+void DefaultMQProducer::request(MQMessage& msg,
+                                const MQMessageQueue& mq,
+                                RequestCallback* requestCallback,
+                                long timeout) {
+  producer_impl_->request(msg, mq, requestCallback, timeout);
+}
+
+MQMessage DefaultMQProducer::request(MQMessage& msg, MessageQueueSelector* selector,
void* arg, long timeout) {
+  return producer_impl_->request(msg, selector, arg, timeout);
+}
+
+void DefaultMQProducer::request(MQMessage& msg,
+                                MessageQueueSelector* selector,
+                                void* arg,
+                                RequestCallback* requestCallback,
+                                long timeout) {
+  producer_impl_->request(msg, selector, arg, requestCallback, timeout);
+}
+
 bool DefaultMQProducer::isSendLatencyFaultEnable() const {
   return std::dynamic_pointer_cast<DefaultMQProducerImpl>(producer_impl_)->isSendLatencyFaultEnable();
 }
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index 54d16fc..158dc4d 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -45,6 +45,47 @@
 
 namespace rocketmq {
 
+class RequestSendCallback : public AutoDeleteSendCallback {
+ public:
+  RequestSendCallback(std::shared_ptr<RequestResponseFuture> requestFuture) : request_future_(requestFuture)
{}
+
+  void onSuccess(SendResult& sendResult) override { request_future_->setSendRequestOk(true);
}
+
+  void onException(MQException& e) noexcept override {
+    request_future_->setSendRequestOk(false);
+    request_future_->putResponseMessage(nullptr);
+    request_future_->setCause(std::make_exception_ptr(e));
+  }
+
+ private:
+  std::shared_ptr<RequestResponseFuture> request_future_;
+};
+
+class AsyncRequestSendCallback : public AutoDeleteSendCallback {
+ public:
+  AsyncRequestSendCallback(std::shared_ptr<RequestResponseFuture> requestFuture) :
request_future_(requestFuture) {}
+
+  void onSuccess(SendResult& sendResult) override { request_future_->setSendRequestOk(true);
}
+
+  void onException(MQException& e) noexcept override {
+    request_future_->setCause(std::make_exception_ptr(e));
+    auto response_future = RequestFutureTable::removeRequestFuture(request_future_->getCorrelationId());
+    if (response_future != nullptr) {
+      // response_future is same as request_future_
+      response_future->setSendRequestOk(false);
+      response_future->putResponseMessage(nullptr);
+      try {
+        response_future->executeRequestCallback();
+      } catch (std::exception& e) {
+        LOG_WARN_NEW("execute requestCallback in requestFail, and callback throw {}", e.what());
+      }
+    }
+  }
+
+ private:
+  std::shared_ptr<RequestResponseFuture> request_future_;
+};
+
 DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducerConfigPtr config)
     : DefaultMQProducerImpl(config, nullptr) {}
 
@@ -135,7 +176,8 @@ SendResult DefaultMQProducerImpl::send(MQMessage& msg) {
 
 SendResult DefaultMQProducerImpl::send(MQMessage& msg, long timeout) {
   try {
-    std::unique_ptr<SendResult> sendResult(sendDefaultImpl(msg.getMessageImpl(), ComMode_SYNC,
nullptr, timeout));
+    std::unique_ptr<SendResult> sendResult(
+        sendDefaultImpl(msg.getMessageImpl(), CommunicationMode::SYNC, nullptr, timeout));
     return *sendResult;
   } catch (MQException& e) {
     LOG_ERROR_NEW("send failed, exception:{}", e.what());
@@ -156,7 +198,7 @@ SendResult DefaultMQProducerImpl::send(MQMessage& msg, const MQMessageQueue&
mq,
 
   try {
     std::unique_ptr<SendResult> sendResult(
-        sendKernelImpl(msg.getMessageImpl(), mq, ComMode_SYNC, nullptr, nullptr, timeout));
+        sendKernelImpl(msg.getMessageImpl(), mq, CommunicationMode::SYNC, nullptr, nullptr,
timeout));
     return *sendResult;
   } catch (MQException& e) {
     LOG_ERROR_NEW("send failed, exception:{}", e.what());
@@ -170,7 +212,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg, SendCallback* sendCallback)
noe
 
 void DefaultMQProducerImpl::send(MQMessage& msg, SendCallback* sendCallback, long timeout)
noexcept {
   try {
-    (void)sendDefaultImpl(msg.getMessageImpl(), ComMode_ASYNC, sendCallback, timeout);
+    (void)sendDefaultImpl(msg.getMessageImpl(), CommunicationMode::ASYNC, sendCallback, timeout);
   } catch (MQException& e) {
     LOG_ERROR_NEW("send failed, exception:{}", e.what());
     sendCallback->onException(e);
@@ -199,7 +241,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
     }
 
     try {
-      sendKernelImpl(msg.getMessageImpl(), mq, ComMode_ASYNC, sendCallback, nullptr, timeout);
+      sendKernelImpl(msg.getMessageImpl(), mq, CommunicationMode::ASYNC, sendCallback, nullptr,
timeout);
     } catch (MQBrokerException& e) {
       std::string info = std::string("unknown exception, ") + e.what();
       THROW_MQEXCEPTION(MQClientException, info, e.GetError());
@@ -218,7 +260,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
 
 void DefaultMQProducerImpl::sendOneway(MQMessage& msg) {
   try {
-    sendDefaultImpl(msg.getMessageImpl(), ComMode_ONEWAY, nullptr, m_producerConfig->getSendMsgTimeout());
+    sendDefaultImpl(msg.getMessageImpl(), CommunicationMode::ONEWAY, nullptr, m_producerConfig->getSendMsgTimeout());
   } catch (MQBrokerException e) {
     std::string info = std::string("unknown exception, ") + e.what();
     THROW_MQEXCEPTION(MQClientException, info, e.GetError());
@@ -233,7 +275,8 @@ void DefaultMQProducerImpl::sendOneway(MQMessage& msg, const MQMessageQueue&
mq)
   }
 
   try {
-    sendKernelImpl(msg.getMessageImpl(), mq, ComMode_ONEWAY, nullptr, nullptr, m_producerConfig->getSendMsgTimeout());
+    sendKernelImpl(msg.getMessageImpl(), mq, CommunicationMode::ONEWAY, nullptr, nullptr,
+                   m_producerConfig->getSendMsgTimeout());
   } catch (MQBrokerException e) {
     std::string info = std::string("unknown exception, ") + e.what();
     THROW_MQEXCEPTION(MQClientException, info, e.GetError());
@@ -247,7 +290,7 @@ SendResult DefaultMQProducerImpl::send(MQMessage& msg, MessageQueueSelector*
sel
 SendResult DefaultMQProducerImpl::send(MQMessage& msg, MessageQueueSelector* selector,
void* arg, long timeout) {
   try {
     std::unique_ptr<SendResult> result(
-        sendSelectImpl(msg.getMessageImpl(), selector, arg, ComMode_SYNC, nullptr, timeout));
+        sendSelectImpl(msg.getMessageImpl(), selector, arg, CommunicationMode::SYNC, nullptr,
timeout));
     return *result.get();
   } catch (MQException& e) {
     LOG_ERROR_NEW("send failed, exception:{}", e.what());
@@ -269,7 +312,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
                                  long timeout) noexcept {
   try {
     try {
-      sendSelectImpl(msg.getMessageImpl(), selector, arg, ComMode_ASYNC, sendCallback, timeout);
+      sendSelectImpl(msg.getMessageImpl(), selector, arg, CommunicationMode::ASYNC, sendCallback,
timeout);
     } catch (MQBrokerException& e) {
       std::string info = std::string("unknown exception, ") + e.what();
       THROW_MQEXCEPTION(MQClientException, info, e.GetError());
@@ -288,7 +331,8 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
 
 void DefaultMQProducerImpl::sendOneway(MQMessage& msg, MessageQueueSelector* selector,
void* arg) {
   try {
-    sendSelectImpl(msg.getMessageImpl(), selector, arg, ComMode_ONEWAY, nullptr, m_producerConfig->getSendMsgTimeout());
+    sendSelectImpl(msg.getMessageImpl(), selector, arg, CommunicationMode::ONEWAY, nullptr,
+                   m_producerConfig->getSendMsgTimeout());
   } catch (MQBrokerException e) {
     std::string info = std::string("unknown exception, ") + e.what();
     THROW_MQEXCEPTION(MQClientException, info, e.GetError());
@@ -344,23 +388,116 @@ MessagePtr DefaultMQProducerImpl::batch(std::vector<MQMessage>&
msgs) {
   }
 }
 
-class RequestSendCallback : public AutoDeleteSendCallback {
- public:
-  RequestSendCallback(std::shared_ptr<RequestResponseFuture> requestFuture) : m_requestFuture(requestFuture)
{}
+MQMessage DefaultMQProducerImpl::request(MQMessage& msg, long timeout) {
+  auto beginTimestamp = UtilAll::currentTimeMillis();
+  prepareSendRequest(msg, timeout);
+  const auto& correlationId = msg.getProperty(MQMessageConst::PROPERTY_CORRELATION_ID);
+
+  std::exception_ptr eptr = nullptr;
+  MessagePtr responseMessage;
+  try {
+    auto requestResponseFuture = std::make_shared<RequestResponseFuture>(correlationId,
timeout, nullptr);
+    RequestFutureTable::putRequestFuture(correlationId, requestResponseFuture);
 
-  void onSuccess(SendResult& sendResult) override { m_requestFuture->setSendRequestOk(true);
}
+    auto cost = UtilAll::currentTimeMillis() - beginTimestamp;
+    sendDefaultImpl(msg.getMessageImpl(), CommunicationMode::ASYNC, new RequestSendCallback(requestResponseFuture),
+                    timeout - cost);
 
-  void onException(MQException& e) noexcept override {
-    m_requestFuture->setSendRequestOk(false);
-    m_requestFuture->putResponseMessage(nullptr);
-    m_requestFuture->setCause(std::make_exception_ptr(e));
+    responseMessage = requestResponseFuture->waitResponseMessage(timeout - cost);
+    if (responseMessage == nullptr) {
+      if (requestResponseFuture->isSendRequestOk()) {
+        std::string info = "send request message to <" + msg.getTopic() + "> OK, but
wait reply message timeout, " +
+                           UtilAll::to_string(timeout) + " ms.";
+        THROW_MQEXCEPTION(RequestTimeoutException, info, ClientErrorCode::REQUEST_TIMEOUT_EXCEPTION);
+      } else {
+        std::string info = "send request message to <" + msg.getTopic() + "> fail";
+        THROW_MQEXCEPTION2(MQClientException, info, -1, requestResponseFuture->getCause());
+      }
+    }
+  } catch (...) {
+    eptr = std::current_exception();
   }
 
- private:
-  std::shared_ptr<RequestResponseFuture> m_requestFuture;
-};
+  // finally
+  RequestFutureTable::removeRequestFuture(correlationId);
 
-MQMessage DefaultMQProducerImpl::request(MQMessage& msg, long timeout) {
+  if (eptr != nullptr) {
+    std::rethrow_exception(eptr);
+  }
+
+  return MQMessage(responseMessage);
+}
+
+void DefaultMQProducerImpl::request(MQMessage& msg, RequestCallback* requestCallback,
long timeout) {
+  auto beginTimestamp = UtilAll::currentTimeMillis();
+  prepareSendRequest(msg, timeout);
+  const auto& correlationId = msg.getProperty(MQMessageConst::PROPERTY_CORRELATION_ID);
+
+  auto requestResponseFuture = std::make_shared<RequestResponseFuture>(correlationId,
timeout, requestCallback);
+  RequestFutureTable::putRequestFuture(correlationId, requestResponseFuture);
+
+  auto cost = UtilAll::currentTimeMillis() - beginTimestamp;
+  sendDefaultImpl(msg.getMessageImpl(), CommunicationMode::ASYNC, new AsyncRequestSendCallback(requestResponseFuture),
+                  timeout - cost);
+}
+
+MQMessage DefaultMQProducerImpl::request(MQMessage& msg, const MQMessageQueue& mq,
long timeout) {
+  auto beginTimestamp = UtilAll::currentTimeMillis();
+  prepareSendRequest(msg, timeout);
+  const auto& correlationId = msg.getProperty(MQMessageConst::PROPERTY_CORRELATION_ID);
+
+  std::exception_ptr eptr = nullptr;
+  MessagePtr responseMessage;
+  try {
+    auto requestResponseFuture = std::make_shared<RequestResponseFuture>(correlationId,
timeout, nullptr);
+    RequestFutureTable::putRequestFuture(correlationId, requestResponseFuture);
+
+    auto cost = UtilAll::currentTimeMillis() - beginTimestamp;
+    sendKernelImpl(msg.getMessageImpl(), mq, CommunicationMode::ASYNC, new RequestSendCallback(requestResponseFuture),
+                   nullptr, timeout - cost);
+
+    responseMessage = requestResponseFuture->waitResponseMessage(timeout - cost);
+    if (responseMessage == nullptr) {
+      if (requestResponseFuture->isSendRequestOk()) {
+        std::string info = "send request message to <" + msg.getTopic() + "> OK, but
wait reply message timeout, " +
+                           UtilAll::to_string(timeout) + " ms.";
+        THROW_MQEXCEPTION(RequestTimeoutException, info, ClientErrorCode::REQUEST_TIMEOUT_EXCEPTION);
+      } else {
+        std::string info = "send request message to <" + msg.getTopic() + "> fail";
+        THROW_MQEXCEPTION2(MQClientException, info, -1, requestResponseFuture->getCause());
+      }
+    }
+  } catch (...) {
+    eptr = std::current_exception();
+  }
+
+  // finally
+  RequestFutureTable::removeRequestFuture(correlationId);
+
+  if (eptr != nullptr) {
+    std::rethrow_exception(eptr);
+  }
+
+  return MQMessage(responseMessage);
+}
+
+void DefaultMQProducerImpl::request(MQMessage& msg,
+                                    const MQMessageQueue& mq,
+                                    RequestCallback* requestCallback,
+                                    long timeout) {
+  auto beginTimestamp = UtilAll::currentTimeMillis();
+  prepareSendRequest(msg, timeout);
+  const auto& correlationId = msg.getProperty(MQMessageConst::PROPERTY_CORRELATION_ID);
+
+  auto requestResponseFuture = std::make_shared<RequestResponseFuture>(correlationId,
timeout, requestCallback);
+  RequestFutureTable::putRequestFuture(correlationId, requestResponseFuture);
+
+  auto cost = UtilAll::currentTimeMillis() - beginTimestamp;
+  sendKernelImpl(msg.getMessageImpl(), mq, CommunicationMode::ASYNC,
+                 new AsyncRequestSendCallback(requestResponseFuture), nullptr, timeout -
cost);
+}
+
+MQMessage DefaultMQProducerImpl::request(MQMessage& msg, MessageQueueSelector* selector,
void* arg, long timeout) {
   auto beginTimestamp = UtilAll::currentTimeMillis();
   prepareSendRequest(msg, timeout);
   const auto& correlationId = msg.getProperty(MQMessageConst::PROPERTY_CORRELATION_ID);
@@ -372,8 +509,8 @@ MQMessage DefaultMQProducerImpl::request(MQMessage& msg, long timeout)
{
     RequestFutureTable::putRequestFuture(correlationId, requestResponseFuture);
 
     auto cost = UtilAll::currentTimeMillis() - beginTimestamp;
-    sendDefaultImpl(msg.getMessageImpl(), CommunicationMode::ComMode_ASYNC,
-                    new RequestSendCallback(requestResponseFuture), timeout - cost);
+    sendSelectImpl(msg.getMessageImpl(), selector, arg, CommunicationMode::ASYNC,
+                   new RequestSendCallback(requestResponseFuture), timeout - cost);
 
     responseMessage = requestResponseFuture->waitResponseMessage(timeout - cost);
     if (responseMessage == nullptr) {
@@ -400,6 +537,23 @@ MQMessage DefaultMQProducerImpl::request(MQMessage& msg, long timeout)
{
   return MQMessage(responseMessage);
 }
 
+void DefaultMQProducerImpl::request(MQMessage& msg,
+                                    MessageQueueSelector* selector,
+                                    void* arg,
+                                    RequestCallback* requestCallback,
+                                    long timeout) {
+  auto beginTimestamp = UtilAll::currentTimeMillis();
+  prepareSendRequest(msg, timeout);
+  const auto& correlationId = msg.getProperty(MQMessageConst::PROPERTY_CORRELATION_ID);
+
+  auto requestResponseFuture = std::make_shared<RequestResponseFuture>(correlationId,
timeout, requestCallback);
+  RequestFutureTable::putRequestFuture(correlationId, requestResponseFuture);
+
+  auto cost = UtilAll::currentTimeMillis() - beginTimestamp;
+  sendSelectImpl(msg.getMessageImpl(), selector, arg, CommunicationMode::ASYNC,
+                 new AsyncRequestSendCallback(requestResponseFuture), timeout - cost);
+}
+
 void DefaultMQProducerImpl::prepareSendRequest(Message& msg, long timeout) {
   const auto correlationId = CorrelationIdUtil::createCorrelationId();
   const auto& requestClientId = m_clientInstance->getClientId();
@@ -441,7 +595,7 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
   if (topicPublishInfo != nullptr && topicPublishInfo->ok()) {
     bool callTimeout = false;
     std::unique_ptr<SendResult> sendResult;
-    int timesTotal = communicationMode == CommunicationMode::ComMode_SYNC ? 1 + m_producerConfig->getRetryTimes()
: 1;
+    int timesTotal = communicationMode == CommunicationMode::SYNC ? 1 + m_producerConfig->getRetryTimes()
: 1;
     int times = 0;
     std::string lastBrokerName;
     for (; times < timesTotal; times++) {
@@ -466,11 +620,11 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
         endTimestamp = UtilAll::currentTimeMillis();
         updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
         switch (communicationMode) {
-          case ComMode_ASYNC:
+          case CommunicationMode::ASYNC:
             return nullptr;
-          case ComMode_ONEWAY:
+          case CommunicationMode::ONEWAY:
             return nullptr;
-          case ComMode_SYNC:
+          case CommunicationMode::SYNC:
             if (sendResult->getSendStatus() != SEND_OK) {
               if (m_producerConfig->isRetryAnotherBrokerWhenNotStoreOK()) {
                 continue;
@@ -569,7 +723,7 @@ SendResult* DefaultMQProducerImpl::sendKernelImpl(MessagePtr msg,
 
       SendResult* sendResult = nullptr;
       switch (communicationMode) {
-        case ComMode_ASYNC: {
+        case CommunicationMode::ASYNC: {
           long costTimeAsync = UtilAll::currentTimeMillis() - beginStartTime;
           if (timeout < costTimeAsync) {
             THROW_MQEXCEPTION(RemotingTooMuchRequestException, "sendKernelImpl call timeout",
-1);
@@ -578,8 +732,8 @@ SendResult* DefaultMQProducerImpl::sendKernelImpl(MessagePtr msg,
               brokerAddr, mq.getBrokerName(), msg, std::move(requestHeader), timeout, communicationMode,
sendCallback,
               topicPublishInfo, m_clientInstance, m_producerConfig->getRetryTimes4Async(),
shared_from_this());
         } break;
-        case ComMode_ONEWAY:
-        case ComMode_SYNC: {
+        case CommunicationMode::ONEWAY:
+        case CommunicationMode::SYNC: {
           long costTimeSync = UtilAll::currentTimeMillis() - beginStartTime;
           if (timeout < costTimeSync) {
             THROW_MQEXCEPTION(RemotingTooMuchRequestException, "sendKernelImpl call timeout",
-1);
@@ -637,7 +791,7 @@ TransactionSendResult* DefaultMQProducerImpl::sendMessageInTransactionImpl(Messa
   MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_TRANSACTION_PREPARED, "true");
   MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_PRODUCER_GROUP, m_producerConfig->getGroupName());
   try {
-    sendResult.reset(sendDefaultImpl(msg, ComMode_SYNC, nullptr, timeout));
+    sendResult.reset(sendDefaultImpl(msg, CommunicationMode::SYNC, nullptr, timeout));
   } catch (MQException& e) {
     THROW_MQEXCEPTION(MQClientException, "send message Exception", -1);
   }
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index 0b1a93a..b3f4209 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -92,7 +92,17 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) override;
   SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq, long
timeout) override;
 
+  // RPC
   MQMessage request(MQMessage& msg, long timeout) override;
+  void request(MQMessage& msg, RequestCallback* requestCallback, long timeout) override;
+  MQMessage request(MQMessage& msg, const MQMessageQueue& mq, long timeout) override;
+  void request(MQMessage&, const MQMessageQueue& mq, RequestCallback* requestCallback,
long timeout) override;
+  MQMessage request(MQMessage& msg, MessageQueueSelector* selector, void* arg, long timeout)
override;
+  void request(MQMessage& msg,
+               MessageQueueSelector* selector,
+               void* arg,
+               RequestCallback* requestCallback,
+               long timeout) override;
 
  public:  // MQProducerInner
   TransactionListener* getCheckListener() override;
diff --git a/src/producer/RequestResponseFuture.cpp b/src/producer/RequestResponseFuture.cpp
index c3b28f9..d798057 100644
--- a/src/producer/RequestResponseFuture.cpp
+++ b/src/producer/RequestResponseFuture.cpp
@@ -85,6 +85,10 @@ void RequestResponseFuture::putResponseMessage(MessagePtr responseMsg)
{
   }
 }
 
+const std::string& RequestResponseFuture::getCorrelationId() {
+  return m_correlationId;
+}
+
 bool RequestResponseFuture::isSendRequestOk() {
   return m_sendRequestOk;
 }
diff --git a/src/producer/RequestResponseFuture.h b/src/producer/RequestResponseFuture.h
index d4e3e68..9f82eb8 100644
--- a/src/producer/RequestResponseFuture.h
+++ b/src/producer/RequestResponseFuture.h
@@ -37,6 +37,8 @@ class RequestResponseFuture {
   MessagePtr waitResponseMessage(int64_t timeout);
   void putResponseMessage(MessagePtr responseMsg);
 
+  const std::string& getCorrelationId();
+
   bool isSendRequestOk();
   void setSendRequestOk(bool sendRequestOk);
 


Mime
View raw message