rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifplu...@apache.org
Subject [rocketmq-client-cpp] 22/29: refactor: PullAPIWrapper::processPullResult
Date Tue, 29 Dec 2020 03:36:39 GMT
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit c1c2eb89b647bb35c5ccd2cea74a2ce2296a8dee
Author: James Yin <ywhjames@hotmail.com>
AuthorDate: Wed Sep 23 17:46:48 2020 +0800

    refactor: PullAPIWrapper::processPullResult
---
 include/PullCallback.h                     |  2 +-
 include/c/CPullResult.h                    |  1 -
 src/common/PullCallbackWrap.cpp            |  6 ++--
 src/consumer/DefaultMQPushConsumerImpl.cpp | 34 +++++++++----------
 src/consumer/PullAPIWrapper.cpp            | 52 ++++++++++++++++--------------
 src/consumer/PullAPIWrapper.h              |  4 ++-
 src/consumer/PullResult.cpp                |  7 ++--
 src/consumer/PullResult.h                  | 15 +++++++--
 8 files changed, 68 insertions(+), 53 deletions(-)

diff --git a/include/PullCallback.h b/include/PullCallback.h
index 2e733e1..7eb63ef 100755
--- a/include/PullCallback.h
+++ b/include/PullCallback.h
@@ -31,7 +31,7 @@ class ROCKETMQCLIENT_API PullCallback {
  public:
   virtual ~PullCallback() = default;
 
-  virtual void onSuccess(PullResult& pullResult) = 0;
+  virtual void onSuccess(std::unique_ptr<PullResult> pull_result) = 0;
   virtual void onException(MQException& e) noexcept = 0;
 
   virtual PullCallbackType getPullCallbackType() const { return PULL_CALLBACK_TYPE_SIMPLE;
}
diff --git a/include/c/CPullResult.h b/include/c/CPullResult.h
index 6fc45e9..816887f 100644
--- a/include/c/CPullResult.h
+++ b/include/c/CPullResult.h
@@ -29,7 +29,6 @@ typedef enum E_CPullStatus {
   E_NO_NEW_MSG,
   E_NO_MATCHED_MSG,
   E_OFFSET_ILLEGAL,
-  E_BROKER_TIMEOUT  // indicate pull request timeout or received NULL response
 } CPullStatus;
 
 typedef struct _CPullResult_ {
diff --git a/src/common/PullCallbackWrap.cpp b/src/common/PullCallbackWrap.cpp
index 65cf63c..b23970b 100755
--- a/src/common/PullCallbackWrap.cpp
+++ b/src/common/PullCallbackWrap.cpp
@@ -31,9 +31,9 @@ void PullCallbackWrap::operationComplete(ResponseFuture* responseFuture)
noexcep
 
   if (response != nullptr) {
     try {
-      std::unique_ptr<PullResult> pullResult(client_api_impl_->processPullResponse(response.get()));
-      assert(pullResult != nullptr);
-      pull_callback_->onSuccess(*pullResult);
+      std::unique_ptr<PullResult> pull_result(client_api_impl_->processPullResponse(response.get()));
+      assert(pull_result != nullptr);
+      pull_callback_->onSuccess(std::move(pull_result));
     } catch (MQException& e) {
       pull_callback_->onException(e);
     }
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 768b7df..79bcdf5 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -55,53 +55,53 @@ class DefaultMQPushConsumerImpl::AsyncPullCallback : public AutoDeletePullCallba
 
   ~AsyncPullCallback() = default;
 
-  void onSuccess(PullResult& pullResult) override {
+  void onSuccess(std::unique_ptr<PullResult> pull_result) override {
     auto consumer = default_mq_push_consumer_.lock();
     if (nullptr == consumer) {
       LOG_WARN_NEW("AsyncPullCallback::onSuccess: DefaultMQPushConsumerImpl is released.");
       return;
     }
 
-    PullResult result =
-        consumer->pull_api_wrapper_->processPullResult(pull_request_->message_queue(),
pullResult, subscription_data_);
-    switch (result.pull_status()) {
+    pull_result.reset(consumer->pull_api_wrapper_->processPullResult(pull_request_->message_queue(),
+                                                                     std::move(pull_result),
subscription_data_));
+    switch (pull_result->pull_status()) {
       case FOUND: {
         int64_t prev_request_offset = pull_request_->next_offset();
-        pull_request_->set_next_offset(result.next_begin_offset());
+        pull_request_->set_next_offset(pull_result->next_begin_offset());
 
         int64_t first_msg_offset = (std::numeric_limits<int64_t>::max)();
-        if (!result.msg_found_list().empty()) {
-          first_msg_offset = result.msg_found_list()[0]->queue_offset();
+        if (!pull_result->msg_found_list().empty()) {
+          first_msg_offset = pull_result->msg_found_list()[0]->queue_offset();
 
-          pull_request_->process_queue()->putMessage(result.msg_found_list());
-          consumer->consume_service_->submitConsumeRequest(result.msg_found_list(),
pull_request_->process_queue(),
-                                                           pull_request_->message_queue(),
true);
+          pull_request_->process_queue()->putMessage(pull_result->msg_found_list());
+          consumer->consume_service_->submitConsumeRequest(
+              pull_result->msg_found_list(), pull_request_->process_queue(), pull_request_->message_queue(),
true);
         }
 
         consumer->executePullRequestImmediately(pull_request_);
 
-        if (result.next_begin_offset() < prev_request_offset || first_msg_offset <
prev_request_offset) {
+        if (pull_result->next_begin_offset() < prev_request_offset || first_msg_offset
< prev_request_offset) {
           LOG_WARN_NEW(
               "[BUG] pull message result maybe data wrong, nextBeginOffset:{} firstMsgOffset:{}
prevRequestOffset:{}",
-              result.next_begin_offset(), first_msg_offset, prev_request_offset);
+              pull_result->next_begin_offset(), first_msg_offset, prev_request_offset);
         }
       } break;
       case NO_NEW_MSG:
       case NO_MATCHED_MSG:
-        pull_request_->set_next_offset(result.next_begin_offset());
+        pull_request_->set_next_offset(pull_result->next_begin_offset());
         consumer->correctTagsOffset(pull_request_);
         consumer->executePullRequestImmediately(pull_request_);
         break;
       case NO_LATEST_MSG:
-        pull_request_->set_next_offset(result.next_begin_offset());
+        pull_request_->set_next_offset(pull_result->next_begin_offset());
         consumer->correctTagsOffset(pull_request_);
         consumer->executePullRequestLater(
-            pull_request_, consumer->getDefaultMQPushConsumerConfig()->pull_time_delay_mills_when_exception());
+            pull_request_, consumer->getDefaultMQPushConsumerConfig()->pull_time_delay_millis_when_exception());
         break;
       case OFFSET_ILLEGAL: {
-        LOG_WARN_NEW("the pull request offset illegal, {} {}", pull_request_->toString(),
result.toString());
+        LOG_WARN_NEW("the pull request offset illegal, {} {}", pull_request_->toString(),
pull_result->toString());
 
-        pull_request_->set_next_offset(result.next_begin_offset());
+        pull_request_->set_next_offset(pull_result->next_begin_offset());
         pull_request_->process_queue()->set_dropped(true);
 
         // update and persist offset, then removeProcessQueue
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index 88e96f6..e25f423 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -50,48 +50,52 @@ int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue&
mq) {
   return MASTER_ID;
 }
 
-PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
-                                             PullResult& pullResult,
-                                             SubscriptionData* subscriptionData) {
-  assert(std::type_index(typeid(pullResult)) == std::type_index(typeid(PullResultExt)));
-  auto& pullResultExt = dynamic_cast<PullResultExt&>(pullResult);
+PullResult* PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
+                                              std::unique_ptr<PullResult> pull_result,
+                                              SubscriptionData* subscription_data) {
+  auto* pull_result_ext = dynamic_cast<PullResultExt*>(pull_result.get());
+  if (pull_result_ext == nullptr) {
+    return pull_result.release();
+  }
 
   // update node
-  updatePullFromWhichNode(mq, pullResultExt.suggert_which_boker_id());
+  updatePullFromWhichNode(mq, pull_result_ext->suggert_which_boker_id());
 
-  std::vector<MessageExtPtr> msgListFilterAgain;
-  if (FOUND == pullResultExt.pull_status()) {
+  std::vector<MessageExtPtr> msg_list_filter_again;
+  if (FOUND == pull_result_ext->pull_status()) {
     // decode all msg list
-    std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::wrap(pullResultExt.message_binary()));
+    std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::wrap(pull_result_ext->message_binary()));
     auto msgList = MessageDecoder::decodes(*byteBuffer);
 
     // filter msg list again
-    if (subscriptionData != nullptr && !subscriptionData->tags_set().empty())
{
-      msgListFilterAgain.reserve(msgList.size());
+    if (subscription_data != nullptr && !subscription_data->tags_set().empty())
{
+      msg_list_filter_again.reserve(msgList.size());
       for (const auto& msg : msgList) {
         const auto& msgTag = msg->tags();
-        if (subscriptionData->containsTag(msgTag)) {
-          msgListFilterAgain.push_back(msg);
+        if (subscription_data->containsTag(msgTag)) {
+          msg_list_filter_again.push_back(msg);
         }
       }
     } else {
-      msgListFilterAgain.swap(msgList);
+      msg_list_filter_again.swap(msgList);
     }
 
-    for (auto& msg : msgListFilterAgain) {
-      const auto& tranMsg = msg->getProperty(MQMessageConst::PROPERTY_TRANSACTION_PREPARED);
-      if (UtilAll::stob(tranMsg)) {
-        msg->set_transaction_id(msg->getProperty(MQMessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+    if (!msg_list_filter_again.empty()) {
+      std::string min_offset = UtilAll::to_string(pull_result_ext->min_offset());
+      std::string max_offset = UtilAll::to_string(pull_result_ext->max_offset());
+      for (auto& msg : msg_list_filter_again) {
+        const auto& tranMsg = msg->getProperty(MQMessageConst::PROPERTY_TRANSACTION_PREPARED);
+        if (UtilAll::stob(tranMsg)) {
+          msg->set_transaction_id(msg->getProperty(MQMessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+        }
+        MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_MIN_OFFSET, min_offset);
+        MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_MAX_OFFSET, max_offset);
       }
-      MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_MIN_OFFSET,
-                                   UtilAll::to_string(pullResult.min_offset()));
-      MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_MAX_OFFSET,
-                                   UtilAll::to_string(pullResult.max_offset()));
     }
   }
 
-  return PullResult(pullResultExt.pull_status(), pullResultExt.next_begin_offset(), pullResultExt.min_offset(),
-                    pullResultExt.max_offset(), std::move(msgListFilterAgain));
+  return new PullResult(pull_result_ext->pull_status(), pull_result_ext->next_begin_offset(),
+                        pull_result_ext->min_offset(), pull_result_ext->max_offset(),
std::move(msg_list_filter_again));
 }
 
 PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq,
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 73d98d9..7bdbf5b 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -32,7 +32,9 @@ class PullAPIWrapper {
   PullAPIWrapper(MQClientInstance* instance, const std::string& consumerGroup);
   ~PullAPIWrapper();
 
-  PullResult processPullResult(const MQMessageQueue& mq, PullResult& pullResult,
SubscriptionData* subscriptionData);
+  PullResult* processPullResult(const MQMessageQueue& mq,
+                                std::unique_ptr<PullResult> pull_result,
+                                SubscriptionData* subscriptionData);
 
   PullResult* pullKernelImpl(const MQMessageQueue& mq,
                              const std::string& subExpression,
diff --git a/src/consumer/PullResult.cpp b/src/consumer/PullResult.cpp
index 672f2b3..3312721 100644
--- a/src/consumer/PullResult.cpp
+++ b/src/consumer/PullResult.cpp
@@ -21,10 +21,9 @@
 
 #include "UtilAll.h"
 
-static const char* kPullStatusStrings[] = {"FOUND", "NO_NEW_MSG", "NO_MATCHED_MSG",
-                                           "NO_LATEST_MSG"
-                                           "OFFSET_ILLEGAL",
-                                           "BROKER_TIMEOUT"};
+static const char* kPullStatusStrings[] = {
+    "FOUND", "NO_NEW_MSG", "NO_MATCHED_MSG", "NO_LATEST_MSG", "OFFSET_ILLEGAL",
+};
 
 namespace rocketmq {
 
diff --git a/src/consumer/PullResult.h b/src/consumer/PullResult.h
index 0802fef..df1f4ba 100644
--- a/src/consumer/PullResult.h
+++ b/src/consumer/PullResult.h
@@ -22,12 +22,23 @@
 namespace rocketmq {
 
 enum PullStatus {
+  /**
+   * Founded
+   */
   FOUND,
+  /**
+   * No new message can be pull
+   */
   NO_NEW_MSG,
+  /**
+   * Filtering results can not match
+   */
   NO_MATCHED_MSG,
   NO_LATEST_MSG,
-  OFFSET_ILLEGAL,
-  BROKER_TIMEOUT  // indicate pull request timeout or received NULL response
+  /**
+   * Illegal offset,may be too big or too small
+   */
+  OFFSET_ILLEGAL
 };
 
 class PullResult {


Mime
View raw message