rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifplu...@apache.org
Subject [rocketmq-client-cpp] 18/29: refactor: DefaultMQPushConsumer
Date Tue, 29 Dec 2020 03:36:35 GMT
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 444dc1419e14f68699ff249a95630db9f26aed36
Author: James Yin <ywhjames@hotmail.com>
AuthorDate: Mon Sep 21 15:39:27 2020 +0800

    refactor: DefaultMQPushConsumer
---
 example/RequestReply.cpp                           |   2 +-
 include/DefaultMQPushConsumer.h                    |  18 +-
 include/DefaultMQPushConsumerConfig.h              |  22 +-
 include/DefaultMQPushConsumerConfigProxy.h         |  32 +-
 include/MQConsumer.h                               |  42 ---
 include/MQPushConsumer.h                           |  21 +-
 src/consumer/ConsumeMessageConcurrentlyService.cpp |   3 +-
 src/consumer/ConsumeMessageOrderlyService.cpp      |   2 +-
 src/consumer/DefaultMQPushConsumer.cpp             |  28 +-
 src/consumer/DefaultMQPushConsumerConfigImpl.hpp   |  38 +-
 src/consumer/DefaultMQPushConsumerImpl.cpp         | 418 ++++++++++-----------
 src/consumer/DefaultMQPushConsumerImpl.h           |  73 ++--
 src/consumer/MQConsumerInner.h                     |   8 +-
 src/consumer/RebalanceImpl.cpp                     |   1 -
 src/extern/CPushConsumer.cpp                       |   2 +-
 15 files changed, 322 insertions(+), 388 deletions(-)

diff --git a/example/RequestReply.cpp b/example/RequestReply.cpp
index 79b4212..c69e319 100644
--- a/example/RequestReply.cpp
+++ b/example/RequestReply.cpp
@@ -77,7 +77,7 @@ int main(int argc, char* argv[]) {
   consumer.set_consume_from_where(CONSUME_FROM_LAST_OFFSET);
 
   // recommend client configs
-  consumer.set_pull_time_delay_mills_when_exception(0L);
+  consumer.set_pull_time_delay_millis_when_exception(0L);
 
   consumer.subscribe(info.topic, "*");
 
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index b6276b4..e27ae67 100755
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -29,27 +29,25 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public DefaultMQPushConsumerCon
  public:
   DefaultMQPushConsumer(const std::string& groupname);
   DefaultMQPushConsumer(const std::string& groupname, RPCHookPtr rpcHook);
+
   virtual ~DefaultMQPushConsumer();
 
- public:  // MQConsumer
+ public:  // MQPushConsumer
   void start() override;
   void shutdown() override;
 
-  bool sendMessageBack(MessageExtPtr msg, int delayLevel) override;
-  bool sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) override;
-  void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) override;
+  void suspend() override;
+  void resume() override;
+
+  MQMessageListener* getMessageListener() const override;
 
- public:  // MQPushConsumer
-  void registerMessageListener(MQMessageListener* messageListener) override;
   void registerMessageListener(MessageListenerConcurrently* messageListener) override;
   void registerMessageListener(MessageListenerOrderly* messageListener) override;
 
-  MQMessageListener* getMessageListener() const override;
-
   void subscribe(const std::string& topic, const std::string& subExpression) override;
 
-  void suspend() override;
-  void resume() override;
+  bool sendMessageBack(MessageExtPtr msg, int delayLevel) override;
+  bool sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) override;
 
  public:
   void setRPCHook(RPCHookPtr rpcHook);
diff --git a/include/DefaultMQPushConsumerConfig.h b/include/DefaultMQPushConsumerConfig.h
index ee3988e..5c894b4 100644
--- a/include/DefaultMQPushConsumerConfig.h
+++ b/include/DefaultMQPushConsumerConfig.h
@@ -50,26 +50,26 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumerConfig : virtual public MQClientCo
   virtual void set_consume_thread_nums(int threadNum) = 0;
 
   /**
-   * the pull number of message size by each pullMsg for orderly consume, default value is 1
+   * max cache msg size per Queue in memory if consumer could not consume msgs immediately,
+   * default maxCacheMsgSize per Queue is 1000, set range is:1~65535
    */
-  virtual int consume_message_batch_max_size() const = 0;
-  virtual void set_consume_message_batch_max_size(int consumeMessageBatchMaxSize) = 0;
+  virtual int pull_threshold_for_queue() const = 0;
+  virtual void set_pull_threshold_for_queue(int maxCacheSize) = 0;
 
   /**
-   * max cache msg size per Queue in memory if consumer could not consume msgs immediately,
-   * default maxCacheMsgSize per Queue is 1000, set range is:1~65535
+   * the pull number of message size by each pullMsg for orderly consume, default value is 1
    */
-  virtual int max_cache_msg_size_per_queue() const = 0;
-  virtual void set_max_cache_msg_size_per_queue(int maxCacheSize) = 0;
+  virtual int consume_message_batch_max_size() const = 0;
+  virtual void set_consume_message_batch_max_size(int consumeMessageBatchMaxSize) = 0;
 
-  virtual int async_pull_timeout() const = 0;
-  virtual void set_async_pull_timeout(int asyncPullTimeout) = 0;
+  virtual int pull_batch_size() const = 0;
+  virtual void set_pull_batch_size(int pull_batch_size) = 0;
 
   virtual int max_reconsume_times() const = 0;
   virtual void set_max_reconsume_times(int maxReconsumeTimes) = 0;
 
-  virtual long pull_time_delay_mills_when_exception() const = 0;
-  virtual void set_pull_time_delay_mills_when_exception(long pullTimeDelayMillsWhenException) = 0;
+  virtual long pull_time_delay_millis_when_exception() const = 0;
+  virtual void set_pull_time_delay_millis_when_exception(long pull_time_delay_millis_when_exception) = 0;
 
   virtual AllocateMQStrategy* allocate_mq_strategy() const = 0;
   virtual void set_allocate_mq_strategy(AllocateMQStrategy* strategy) = 0;
diff --git a/include/DefaultMQPushConsumerConfigProxy.h b/include/DefaultMQPushConsumerConfigProxy.h
index 4a0c22f..e7b6895 100644
--- a/include/DefaultMQPushConsumerConfigProxy.h
+++ b/include/DefaultMQPushConsumerConfigProxy.h
@@ -62,6 +62,14 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumerConfigProxy : public MQClientConfi
     dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->set_consume_thread_nums(threadNum);
   }
 
+  int pull_threshold_for_queue() const override {
+    return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->pull_threshold_for_queue();
+  }
+
+  void set_pull_threshold_for_queue(int maxCacheSize) override {
+    dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->set_pull_threshold_for_queue(maxCacheSize);
+  }
+
   int consume_message_batch_max_size() const override {
     return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->consume_message_batch_max_size();
   }
@@ -71,20 +79,12 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumerConfigProxy : public MQClientConfi
         ->set_consume_message_batch_max_size(consumeMessageBatchMaxSize);
   }
 
-  int max_cache_msg_size_per_queue() const override {
-    return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->max_cache_msg_size_per_queue();
-  }
-
-  void set_max_cache_msg_size_per_queue(int maxCacheSize) override {
-    dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->set_max_cache_msg_size_per_queue(maxCacheSize);
-  }
-
-  int async_pull_timeout() const override {
-    return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->async_pull_timeout();
+  int pull_batch_size() const override {
+    return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->pull_batch_size();
   }
 
-  void set_async_pull_timeout(int asyncPullTimeout) override {
-    dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->set_async_pull_timeout(asyncPullTimeout);
+  void set_pull_batch_size(int pull_batch_size) override {
+    dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->set_pull_batch_size(pull_batch_size);
   }
 
   int max_reconsume_times() const override {
@@ -95,13 +95,13 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumerConfigProxy : public MQClientConfi
     dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->set_max_reconsume_times(maxReconsumeTimes);
   }
 
-  long pull_time_delay_mills_when_exception() const override {
-    return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->pull_time_delay_mills_when_exception();
+  long pull_time_delay_millis_when_exception() const override {
+    return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->pull_time_delay_millis_when_exception();
   }
 
-  void set_pull_time_delay_mills_when_exception(long pullTimeDelayMillsWhenException) override {
+  void set_pull_time_delay_millis_when_exception(long pull_time_delay_millis_when_exception) override {
     dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())
-        ->set_pull_time_delay_mills_when_exception(pullTimeDelayMillsWhenException);
+        ->set_pull_time_delay_millis_when_exception(pull_time_delay_millis_when_exception);
   }
 
   AllocateMQStrategy* allocate_mq_strategy() const override {
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
deleted file mode 100755
index 49c0b48..0000000
--- a/include/MQConsumer.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ROCKETMQ_MQCONSUMER_H_
-#define ROCKETMQ_MQCONSUMER_H_
-
-#include "MQMessageExt.h"
-
-namespace rocketmq {
-
-/**
- * MQConsumer - interface for consumer
- */
-class ROCKETMQCLIENT_API MQConsumer {
- public:
-  virtual ~MQConsumer() = default;
-
- public:  // MQConsumer in Java
-  virtual void start() = 0;
-  virtual void shutdown() = 0;
-
-  virtual bool sendMessageBack(MessageExtPtr msg, int delayLevel) = 0;
-  virtual bool sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) = 0;
-  virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) = 0;
-};
-
-}  // namespace rocketmq
-
-#endif  // ROCKETMQ_MQCONSUMER_H_
diff --git a/include/MQPushConsumer.h b/include/MQPushConsumer.h
index 0ee8aaa..f9ac0da 100644
--- a/include/MQPushConsumer.h
+++ b/include/MQPushConsumer.h
@@ -17,7 +17,7 @@
 #ifndef ROCKETMQ_MQPUSHCONSUMER_H_
 #define ROCKETMQ_MQPUSHCONSUMER_H_
 
-#include "MQConsumer.h"
+#include "MQMessageExt.h"
 #include "MQMessageListener.h"
 
 namespace rocketmq {
@@ -25,21 +25,24 @@ namespace rocketmq {
 /**
  * MQPushConsumer - interface for push consumer
  */
-class ROCKETMQCLIENT_API MQPushConsumer : public MQConsumer  // base interface
-{
+class ROCKETMQCLIENT_API MQPushConsumer {
  public:  // MQPushConsumer in Java
-  // [[deprecated]]
-  virtual void registerMessageListener(MQMessageListener* messageListener) = 0;
-  virtual void registerMessageListener(MessageListenerConcurrently* messageListener) = 0;
-  virtual void registerMessageListener(MessageListenerOrderly* messageListener) = 0;
+  virtual void start() = 0;
+  virtual void shutdown() = 0;
+
+  virtual void suspend() = 0;
+  virtual void resume() = 0;
 
   virtual MQMessageListener* getMessageListener() const = 0;
 
+  virtual void registerMessageListener(MessageListenerConcurrently* messageListener) = 0;
+  virtual void registerMessageListener(MessageListenerOrderly* messageListener) = 0;
+
   virtual void subscribe(const std::string& topic, const std::string& subExpression) = 0;
   // virtual void subscribe(const std::string& topic, MessageSelector* selector) = 0;
 
-  virtual void suspend() = 0;
-  virtual void resume() = 0;
+  virtual bool sendMessageBack(MessageExtPtr msg, int delay_level) = 0;
+  virtual bool sendMessageBack(MessageExtPtr msg, int delay_level, const std::string& broker_name) = 0;
 };
 
 }  // namespace rocketmq
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index ffbf892..ed0d89b 100755
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -75,8 +75,7 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(std::vector<MessageExtPtr
     return;
   }
 
-  consumer_->resetRetryTopic(
-      msgs, consumer_->getDefaultMQPushConsumerConfig()->group_name());  // set where to sendMessageBack
+  consumer_->resetRetryAndNamespace(msgs);  // set where to sendMessageBack
 
   ConsumeStatus status = RECONSUME_LATER;
   try {
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp b/src/consumer/ConsumeMessageOrderlyService.cpp
index a14431e..aa81707 100755
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -148,7 +148,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(ProcessQueuePtr processQueue,
 
       std::vector<MessageExtPtr> msgs;
       processQueue->takeMessages(msgs, consumeBatchSize);
-      consumer_->resetRetryTopic(msgs, consumer_->getDefaultMQPushConsumerConfig()->group_name());
+      consumer_->resetRetryAndNamespace(msgs);
       if (!msgs.empty()) {
         ConsumeStatus status = RECONSUME_LATER;
         try {
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 894e965..7d45f75 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -49,20 +49,16 @@ void DefaultMQPushConsumer::shutdown() {
   push_consumer_impl_->shutdown();
 }
 
-bool DefaultMQPushConsumer::sendMessageBack(MessageExtPtr msg, int delayLevel) {
-  return push_consumer_impl_->sendMessageBack(msg, delayLevel);
-}
-
-bool DefaultMQPushConsumer::sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) {
-  return push_consumer_impl_->sendMessageBack(msg, delayLevel, brokerName);
+void DefaultMQPushConsumer::suspend() {
+  push_consumer_impl_->suspend();
 }
 
-void DefaultMQPushConsumer::fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) {
-  push_consumer_impl_->fetchSubscribeMessageQueues(topic, mqs);
+void DefaultMQPushConsumer::resume() {
+  push_consumer_impl_->resume();
 }
 
-void DefaultMQPushConsumer::registerMessageListener(MQMessageListener* messageListener) {
-  push_consumer_impl_->registerMessageListener(messageListener);
+MQMessageListener* DefaultMQPushConsumer::getMessageListener() const {
+  return push_consumer_impl_->getMessageListener();
 }
 
 void DefaultMQPushConsumer::registerMessageListener(MessageListenerConcurrently* messageListener) {
@@ -73,20 +69,16 @@ void DefaultMQPushConsumer::registerMessageListener(MessageListenerOrderly* mess
   push_consumer_impl_->registerMessageListener(messageListener);
 }
 
-MQMessageListener* DefaultMQPushConsumer::getMessageListener() const {
-  return push_consumer_impl_->getMessageListener();
-}
-
 void DefaultMQPushConsumer::subscribe(const std::string& topic, const std::string& subExpression) {
   push_consumer_impl_->subscribe(topic, subExpression);
 }
 
-void DefaultMQPushConsumer::suspend() {
-  push_consumer_impl_->suspend();
+bool DefaultMQPushConsumer::sendMessageBack(MessageExtPtr msg, int delayLevel) {
+  return push_consumer_impl_->sendMessageBack(msg, delayLevel);
 }
 
-void DefaultMQPushConsumer::resume() {
-  push_consumer_impl_->resume();
+bool DefaultMQPushConsumer::sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) {
+  return push_consumer_impl_->sendMessageBack(msg, delayLevel, brokerName);
 }
 
 void DefaultMQPushConsumer::setRPCHook(RPCHookPtr rpcHook) {
diff --git a/src/consumer/DefaultMQPushConsumerConfigImpl.hpp b/src/consumer/DefaultMQPushConsumerConfigImpl.hpp
index e2ba06d..870dfcd 100644
--- a/src/consumer/DefaultMQPushConsumerConfigImpl.hpp
+++ b/src/consumer/DefaultMQPushConsumerConfigImpl.hpp
@@ -37,11 +37,11 @@ class DefaultMQPushConsumerConfigImpl : virtual public DefaultMQPushConsumerConf
         consume_from_where_(ConsumeFromWhere::CONSUME_FROM_LAST_OFFSET),
         consume_timestamp_("0"),
         consume_thread_nums_(std::min(8, (int)std::thread::hardware_concurrency())),
+        pull_threshold_for_queue_(1000),
         consume_message_batch_max_size_(1),
-        max_msg_cache_size_(1000),
-        async_pull_timeout_(30 * 1000),
-        max_reconsume_times_(-1),
-        pull_time_delay_mills_when_exception_(3000),
+        pull_batch_size_(32),
+        max_reconsume_times_(16),
+        pull_time_delay_millis_when_exception_(3000),
         allocate_mq_strategy_(new AllocateMQAveragely()) {}
   virtual ~DefaultMQPushConsumerConfigImpl() = default;
 
@@ -61,6 +61,9 @@ class DefaultMQPushConsumerConfigImpl : virtual public DefaultMQPushConsumerConf
     }
   }
 
+  int pull_threshold_for_queue() const override { return pull_threshold_for_queue_; }
+  void set_pull_threshold_for_queue(int maxCacheSize) override { pull_threshold_for_queue_ = maxCacheSize; }
+
   int consume_message_batch_max_size() const override { return consume_message_batch_max_size_; }
   void set_consume_message_batch_max_size(int consumeMessageBatchMaxSize) override {
     if (consumeMessageBatchMaxSize >= 1) {
@@ -68,22 +71,15 @@ class DefaultMQPushConsumerConfigImpl : virtual public DefaultMQPushConsumerConf
     }
   }
 
-  int max_cache_msg_size_per_queue() const override { return max_msg_cache_size_; }
-  void set_max_cache_msg_size_per_queue(int maxCacheSize) override {
-    if (maxCacheSize > 0 && maxCacheSize < 65535) {
-      max_msg_cache_size_ = maxCacheSize;
-    }
-  }
-
-  int async_pull_timeout() const override { return async_pull_timeout_; }
-  void set_async_pull_timeout(int asyncPullTimeout) override { async_pull_timeout_ = asyncPullTimeout; }
+  int pull_batch_size() const override { return pull_batch_size_; }
+  void set_pull_batch_size(int pull_batch_size) override { pull_batch_size_ = pull_batch_size; }
 
   int max_reconsume_times() const override { return max_reconsume_times_; }
   void set_max_reconsume_times(int maxReconsumeTimes) override { max_reconsume_times_ = maxReconsumeTimes; }
 
-  long pull_time_delay_mills_when_exception() const override { return pull_time_delay_mills_when_exception_; }
-  void set_pull_time_delay_mills_when_exception(long pullTimeDelayMillsWhenException) override {
-    pull_time_delay_mills_when_exception_ = pullTimeDelayMillsWhenException;
+  long pull_time_delay_millis_when_exception() const override { return pull_time_delay_millis_when_exception_; }
+  void set_pull_time_delay_millis_when_exception(long pull_time_delay_millis_when_exception) override {
+    pull_time_delay_millis_when_exception_ = pull_time_delay_millis_when_exception;
   }
 
   AllocateMQStrategy* allocate_mq_strategy() const override { return allocate_mq_strategy_.get(); }
@@ -96,13 +92,15 @@ class DefaultMQPushConsumerConfigImpl : virtual public DefaultMQPushConsumerConf
   std::string consume_timestamp_;
 
   int consume_thread_nums_;
-  int consume_message_batch_max_size_;
-  int max_msg_cache_size_;
 
-  int async_pull_timeout_;  // 30s
+  int pull_threshold_for_queue_;
+
+  int consume_message_batch_max_size_;  // 1
+  int pull_batch_size_;                 // 32
+
   int max_reconsume_times_;
 
-  long pull_time_delay_mills_when_exception_;  // 3000
+  long pull_time_delay_millis_when_exception_;  // 3000
 
   std::unique_ptr<AllocateMQStrategy> allocate_mq_strategy_;
 };
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 740e18b..841d4ee 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -30,6 +30,7 @@
 #include "MQClientInstance.h"
 #include "MQClientManager.h"
 #include "MQProtos.h"
+#include "NamespaceUtil.h"
 #include "LocalFileOffsetStore.h"
 #include "PullAPIWrapper.h"
 #include "PullMessageService.hpp"
@@ -40,9 +41,12 @@
 #include "UtilAll.h"
 #include "Validators.h"
 
+static const long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
+static const long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
+
 namespace rocketmq {
 
-class AsyncPullCallback : public AutoDeletePullCallback {
+class DefaultMQPushConsumerImpl::AsyncPullCallback : public AutoDeletePullCallback {
  public:
   AsyncPullCallback(DefaultMQPushConsumerImplPtr pushConsumer,
                     PullRequestPtr request,
@@ -52,51 +56,47 @@ class AsyncPullCallback : public AutoDeletePullCallback {
   ~AsyncPullCallback() = default;
 
   void onSuccess(PullResult& pullResult) override {
-    auto defaultMQPushConsumer = default_mq_push_consumer_.lock();
-    if (nullptr == defaultMQPushConsumer) {
+    auto consumer = default_mq_push_consumer_.lock();
+    if (nullptr == consumer) {
       LOG_WARN_NEW("AsyncPullCallback::onSuccess: DefaultMQPushConsumerImpl is released.");
       return;
     }
 
-    PullResult result = defaultMQPushConsumer->getPullAPIWrapper()->processPullResult(pull_request_->message_queue(),
-                                                                                      pullResult, subscription_data_);
+    PullResult result =
+        consumer->pull_api_wrapper_->processPullResult(pull_request_->message_queue(), pullResult, subscription_data_);
     switch (result.pull_status()) {
       case FOUND: {
-        int64_t prevRequestOffset = pull_request_->next_offset();
+        int64_t prev_request_offset = pull_request_->next_offset();
         pull_request_->set_next_offset(result.next_begin_offset());
 
-        int64_t firstMsgOffset = (std::numeric_limits<int64_t>::max)();
-        if (result.msg_found_list().empty()) {
-          defaultMQPushConsumer->executePullRequestImmediately(pull_request_);
-        } else {
-          firstMsgOffset = result.msg_found_list()[0]->queue_offset();
+        int64_t first_msg_offset = (std::numeric_limits<int64_t>::max)();
+        if (!result.msg_found_list().empty()) {
+          first_msg_offset = result.msg_found_list()[0]->queue_offset();
 
           pull_request_->process_queue()->putMessage(result.msg_found_list());
-          defaultMQPushConsumer->getConsumerMsgService()->submitConsumeRequest(
-              result.msg_found_list(), pull_request_->process_queue(), pull_request_->message_queue(), true);
-
-          defaultMQPushConsumer->executePullRequestImmediately(pull_request_);
+          consumer->consume_service_->submitConsumeRequest(result.msg_found_list(), pull_request_->process_queue(),
+                                                           pull_request_->message_queue(), true);
         }
 
-        if (result.next_begin_offset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) {
+        consumer->executePullRequestImmediately(pull_request_);
+
+        if (result.next_begin_offset() < prev_request_offset || first_msg_offset < prev_request_offset) {
           LOG_WARN_NEW(
               "[BUG] pull message result maybe data wrong, nextBeginOffset:{} firstMsgOffset:{} prevRequestOffset:{}",
-              result.next_begin_offset(), firstMsgOffset, prevRequestOffset);
+              result.next_begin_offset(), first_msg_offset, prev_request_offset);
         }
-
       } break;
       case NO_NEW_MSG:
       case NO_MATCHED_MSG:
         pull_request_->set_next_offset(result.next_begin_offset());
-        defaultMQPushConsumer->correctTagsOffset(pull_request_);
-        defaultMQPushConsumer->executePullRequestImmediately(pull_request_);
+        consumer->correctTagsOffset(pull_request_);
+        consumer->executePullRequestImmediately(pull_request_);
         break;
       case NO_LATEST_MSG:
         pull_request_->set_next_offset(result.next_begin_offset());
-        defaultMQPushConsumer->correctTagsOffset(pull_request_);
-        defaultMQPushConsumer->executePullRequestLater(
-            pull_request_,
-            defaultMQPushConsumer->getDefaultMQPushConsumerConfig()->pull_time_delay_mills_when_exception());
+        consumer->correctTagsOffset(pull_request_);
+        consumer->executePullRequestLater(
+            pull_request_, consumer->getDefaultMQPushConsumerConfig()->pull_time_delay_mills_when_exception());
         break;
       case OFFSET_ILLEGAL: {
         LOG_WARN_NEW("the pull request offset illegal, {} {}", pull_request_->toString(), result.toString());
@@ -105,16 +105,16 @@ class AsyncPullCallback : public AutoDeletePullCallback {
         pull_request_->process_queue()->set_dropped(true);
 
         // update and persist offset, then removeProcessQueue
-        auto pullRequest = pull_request_;
-        defaultMQPushConsumer->executeTaskLater(
-            [defaultMQPushConsumer, pullRequest]() {
+        auto pull_request = pull_request_;
+        consumer->executeTaskLater(
+            [consumer, pull_request]() {
               try {
-                defaultMQPushConsumer->getOffsetStore()->updateOffset(pullRequest->message_queue(),
-                                                                      pullRequest->next_offset(), false);
-                defaultMQPushConsumer->getOffsetStore()->persist(pullRequest->message_queue());
-                defaultMQPushConsumer->getRebalanceImpl()->removeProcessQueue(pullRequest->message_queue());
+                consumer->getOffsetStore()->updateOffset(pull_request->message_queue(), pull_request->next_offset(),
+                                                         false);
+                consumer->getOffsetStore()->persist(pull_request->message_queue());
+                consumer->getRebalanceImpl()->removeProcessQueue(pull_request->message_queue());
 
-                LOG_WARN_NEW("fix the pull request offset, {}", pullRequest->toString());
+                LOG_WARN_NEW("fix the pull request offset, {}", pull_request->toString());
               } catch (std::exception& e) {
                 LOG_ERROR_NEW("executeTaskLater Exception: {}", e.what());
               }
@@ -127,8 +127,8 @@ class AsyncPullCallback : public AutoDeletePullCallback {
   }
 
   void onException(MQException& e) noexcept override {
-    auto defaultMQPushConsumer = default_mq_push_consumer_.lock();
-    if (nullptr == defaultMQPushConsumer) {
+    auto consumer = default_mq_push_consumer_.lock();
+    if (nullptr == consumer) {
       LOG_WARN_NEW("AsyncPullCallback::onException: DefaultMQPushConsumerImpl is released.");
       return;
     }
@@ -138,8 +138,8 @@ class AsyncPullCallback : public AutoDeletePullCallback {
     }
 
     // TODO
-    defaultMQPushConsumer->executePullRequestLater(
-        pull_request_, defaultMQPushConsumer->getDefaultMQPushConsumerConfig()->pull_time_delay_mills_when_exception());
+    consumer->executePullRequestLater(
+        pull_request_, consumer->getDefaultMQPushConsumerConfig()->pull_time_delay_millis_when_exception());
   }
 
  private:
@@ -156,11 +156,11 @@ DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl(DefaultMQPushConsumerConfig
       start_time_(UtilAll::currentTimeMillis()),
       pause_(false),
       consume_orderly_(false),
+      message_listener_(nullptr),
+      consume_service_(nullptr),
       rebalance_impl_(new RebalancePushImpl(this)),
       pull_api_wrapper_(nullptr),
-      offset_store_(nullptr),
-      consume_service_(nullptr),
-      message_listener_(nullptr) {}
+      offset_store_(nullptr) {}
 
 DefaultMQPushConsumerImpl::~DefaultMQPushConsumerImpl() = default;
 
@@ -176,6 +176,10 @@ void DefaultMQPushConsumerImpl::start() {
 
   switch (service_state_) {
     case CREATE_JUST: {
+      // wrap namespace
+      client_config_->set_group_name(
+          NamespaceUtil::wrapNamespace(client_config_->name_space(), client_config_->group_name()));
+
       LOG_INFO_NEW("the consumer [{}] start beginning.", client_config_->group_name());
 
       service_state_ = START_FAILED;
@@ -193,10 +197,8 @@ void DefaultMQPushConsumerImpl::start() {
 
       // init rebalance_impl_
       rebalance_impl_->set_consumer_group(client_config_->group_name());
-      rebalance_impl_->set_message_model(
-          dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->message_model());
-      rebalance_impl_->set_allocate_mq_strategy(
-          dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->allocate_mq_strategy());
+      rebalance_impl_->set_message_model(getDefaultMQPushConsumerConfig()->message_model());
+      rebalance_impl_->set_allocate_mq_strategy(getDefaultMQPushConsumerConfig()->allocate_mq_strategy());
       rebalance_impl_->set_client_instance(client_instance_.get());
 
       // init pull_api_wrapper_
@@ -204,7 +206,7 @@ void DefaultMQPushConsumerImpl::start() {
       // TODO: registerFilterMessageHook
 
       // init offset_store_
-      switch (dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->message_model()) {
+      switch (getDefaultMQPushConsumerConfig()->message_model()) {
         case MessageModel::BROADCASTING:
           offset_store_.reset(new LocalFileOffsetStore(client_instance_.get(), client_config_->group_name()));
           break;
@@ -219,16 +221,14 @@ void DefaultMQPushConsumerImpl::start() {
         LOG_INFO_NEW("start orderly consume service: {}", client_config_->group_name());
         consume_orderly_ = true;
         consume_service_.reset(new ConsumeMessageOrderlyService(
-            this, dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->consume_thread_nums(),
-            message_listener_));
+            this, getDefaultMQPushConsumerConfig()->consume_thread_nums(), message_listener_));
       } else {
         // for backward compatible, defaultly and concurrently listeners are allocating
         // ConsumeMessageConcurrentlyService
         LOG_INFO_NEW("start concurrently consume service: {}", client_config_->group_name());
         consume_orderly_ = false;
         consume_service_.reset(new ConsumeMessageConcurrentlyService(
-            this, dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->consume_thread_nums(),
-            message_listener_));
+            this, getDefaultMQPushConsumerConfig()->consume_thread_nums(), message_listener_));
       }
       consume_service_->start();
 
@@ -274,13 +274,13 @@ void DefaultMQPushConsumerImpl::checkConfig() {
                       "consumerGroup can not equal " + DEFAULT_CONSUMER_GROUP + ", please specify another one.", -1);
   }
 
-  if (dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->message_model() != BROADCASTING &&
-      dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->message_model() != CLUSTERING) {
+  if (getDefaultMQPushConsumerConfig()->message_model() != BROADCASTING &&
+      getDefaultMQPushConsumerConfig()->message_model() != CLUSTERING) {
     THROW_MQEXCEPTION(MQClientException, "messageModel is valid", -1);
   }
 
   // allocateMessageQueueStrategy
-  if (nullptr == dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->allocate_mq_strategy()) {
+  if (nullptr == getDefaultMQPushConsumerConfig()->allocate_mq_strategy()) {
     THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1);
   }
 
@@ -302,7 +302,7 @@ void DefaultMQPushConsumerImpl::copySubscription() {
     rebalance_impl_->setSubscriptionData(it.first, subscriptionData);
   }
 
-  switch (dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->message_model()) {
+  switch (getDefaultMQPushConsumerConfig()->message_model()) {
     case BROADCASTING:
       break;
     case CLUSTERING: {
@@ -335,9 +335,9 @@ void DefaultMQPushConsumerImpl::shutdown() {
       persistConsumerOffset();
       client_instance_->unregisterConsumer(client_config_->group_name());
       client_instance_->shutdown();
-      LOG_INFO_NEW("the consumer [{}] shutdown OK", client_config_->group_name());
       rebalance_impl_->destroy();
       service_state_ = SHUTDOWN_ALREADY;
+      LOG_INFO_NEW("the consumer [{}] shutdown OK", client_config_->group_name());
       break;
     }
     case CREATE_JUST:
@@ -348,53 +348,31 @@ void DefaultMQPushConsumerImpl::shutdown() {
   }
 }
 
-bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int delayLevel) {
-  return sendMessageBack(msg, delayLevel, null);
+void DefaultMQPushConsumerImpl::suspend() {
+  pause_ = true;
+  LOG_INFO_NEW("suspend this consumer, {}", client_config_->group_name());
 }
 
-bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) {
-  try {
-    std::string brokerAddr = brokerName.empty() ? socketAddress2String(msg->store_host())
-                                                : client_instance_->findBrokerAddressInPublish(brokerName);
-
-    client_instance_->getMQClientAPIImpl()->consumerSendMessageBack(
-        brokerAddr, msg, dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->group_name(), delayLevel,
-        5000, getMaxReconsumeTimes());
-    return true;
-  } catch (const std::exception& e) {
-    LOG_ERROR_NEW("sendMessageBack exception, group: {}, msg: {}. {}",
-                  dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->group_name(), msg->toString(),
-                  e.what());
-  }
-  return false;
+void DefaultMQPushConsumerImpl::resume() {
+  pause_ = false;
+  doRebalance();
+  LOG_INFO_NEW("resume this consumer, {}", client_config_->group_name());
 }
 
-void DefaultMQPushConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic,
-                                                            std::vector<MQMessageQueue>& mqs) {
-  mqs.clear();
-  try {
-    client_instance_->getMQAdminImpl()->fetchSubscribeMessageQueues(topic, mqs);
-  } catch (MQException& e) {
-    LOG_ERROR_NEW("{}", e.what());
-  }
+MQMessageListener* DefaultMQPushConsumerImpl::getMessageListener() const {
+  return message_listener_;
 }
 
-void DefaultMQPushConsumerImpl::registerMessageListener(MQMessageListener* messageListener) {
-  if (nullptr != messageListener) {
-    message_listener_ = messageListener;
+void DefaultMQPushConsumerImpl::registerMessageListener(MessageListenerConcurrently* message_listener) {
+  if (nullptr != message_listener) {
+    message_listener_ = message_listener;
   }
 }
 
-void DefaultMQPushConsumerImpl::registerMessageListener(MessageListenerConcurrently* messageListener) {
-  registerMessageListener(static_cast<MQMessageListener*>(messageListener));
-}
-
-void DefaultMQPushConsumerImpl::registerMessageListener(MessageListenerOrderly* messageListener) {
-  registerMessageListener(static_cast<MQMessageListener*>(messageListener));
-}
-
-MQMessageListener* DefaultMQPushConsumerImpl::getMessageListener() const {
-  return message_listener_;
+void DefaultMQPushConsumerImpl::registerMessageListener(MessageListenerOrderly* message_listener) {
+  if (nullptr != message_listener) {
+    message_listener_ = message_listener;
+  }
 }
 
 void DefaultMQPushConsumerImpl::subscribe(const std::string& topic, const std::string& subExpression) {
@@ -402,51 +380,22 @@ void DefaultMQPushConsumerImpl::subscribe(const std::string& topic, const std::s
   subscription_[topic] = subExpression;
 }
 
-void DefaultMQPushConsumerImpl::suspend() {
-  pause_ = true;
-  LOG_INFO_NEW("suspend this consumer, {}", client_config_->group_name());
-}
-
-void DefaultMQPushConsumerImpl::resume() {
-  pause_ = false;
-  doRebalance();
-  LOG_INFO_NEW("resume this consumer, {}", client_config_->group_name());
-}
-
-void DefaultMQPushConsumerImpl::doRebalance() {
-  if (!pause_) {
-    rebalance_impl_->doRebalance(isConsumeOrderly());
-  }
-}
-
-void DefaultMQPushConsumerImpl::persistConsumerOffset() {
-  if (isServiceStateOk()) {
-    std::vector<MQMessageQueue> mqs = rebalance_impl_->getAllocatedMQ();
-    if (dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->message_model() == BROADCASTING) {
-      offset_store_->persistAll(mqs);
-    } else {
-      for (const auto& mq : mqs) {
-        offset_store_->persist(mq);
-      }
-    }
+std::vector<SubscriptionData> DefaultMQPushConsumerImpl::subscriptions() const {
+  std::vector<SubscriptionData> result;
+  auto& subTable = rebalance_impl_->getSubscriptionInner();
+  for (const auto& it : subTable) {
+    result.push_back(*(it.second));
   }
+  return result;
 }
 
 void DefaultMQPushConsumerImpl::updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info) {
   rebalance_impl_->setTopicSubscribeInfo(topic, info);
 }
 
-void DefaultMQPushConsumerImpl::updateConsumeOffset(const MQMessageQueue& mq, int64_t offset) {
-  if (offset >= 0) {
-    offset_store_->updateOffset(mq, offset, false);
-  } else {
-    LOG_ERROR_NEW("updateConsumeOffset of mq:{} error", mq.toString());
-  }
-}
-
-void DefaultMQPushConsumerImpl::correctTagsOffset(PullRequestPtr pullRequest) {
-  if (0L == pullRequest->process_queue()->getCacheMsgCount()) {
-    offset_store_->updateOffset(pullRequest->message_queue(), pullRequest->next_offset(), true);
+void DefaultMQPushConsumerImpl::doRebalance() {
+  if (!pause_) {
+    rebalance_impl_->doRebalance(consume_orderly());
   }
 }
 
@@ -458,79 +407,70 @@ void DefaultMQPushConsumerImpl::executePullRequestImmediately(PullRequestPtr pul
   client_instance_->getPullMessageService()->executePullRequestImmediately(pullRequest);
 }
 
-void DefaultMQPushConsumerImpl::executeTaskLater(const handler_type& task, long timeDelay) {
-  client_instance_->getPullMessageService()->executeTaskLater(task, timeDelay);
-}
-
-void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pullRequest) {
-  if (nullptr == pullRequest) {
+void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pull_request) {
+  if (nullptr == pull_request) {
     LOG_ERROR("PullRequest is NULL, return");
     return;
   }
 
-  auto processQueue = pullRequest->process_queue();
-  if (processQueue->dropped()) {
-    LOG_WARN_NEW("the pull request[{}] is dropped.", pullRequest->toString());
+  auto process_queue = pull_request->process_queue();
+  if (process_queue->dropped()) {
+    LOG_WARN_NEW("the pull request[{}] is dropped.", pull_request->toString());
     return;
   }
 
-  processQueue->set_last_pull_timestamp(UtilAll::currentTimeMillis());
+  process_queue->set_last_pull_timestamp(UtilAll::currentTimeMillis());
 
-  int cachedMessageCount = processQueue->getCacheMsgCount();
-  if (cachedMessageCount >
-      dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->max_cache_msg_size_per_queue()) {
+  int cachedMessageCount = process_queue->getCacheMsgCount();
+  if (cachedMessageCount > getDefaultMQPushConsumerConfig()->pull_threshold_for_queue()) {
     // too many message in cache, wait to process
-    executePullRequestLater(pullRequest, 1000);
+    executePullRequestLater(pull_request, 1000);
     return;
   }
 
-  if (isConsumeOrderly()) {
-    if (processQueue->locked()) {
-      if (!pullRequest->locked_first()) {
-        const auto offset = rebalance_impl_->computePullFromWhere(pullRequest->message_queue());
-        bool brokerBusy = offset < pullRequest->next_offset();
+  if (consume_orderly()) {
+    if (process_queue->locked()) {
+      if (!pull_request->locked_first()) {
+        const auto offset = rebalance_impl_->computePullFromWhere(pull_request->message_queue());
+        bool brokerBusy = offset < pull_request->next_offset();
         LOG_INFO_NEW(
             "the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
-            pullRequest->toString(), offset, UtilAll::to_string(brokerBusy));
+            pull_request->toString(), offset, UtilAll::to_string(brokerBusy));
         if (brokerBusy) {
           LOG_INFO_NEW(
               "[NOTIFYME] the first time to pull message, but pull request offset larger than broker consume offset. "
               "pullRequest: {} NewOffset: {}",
-              pullRequest->toString(), offset);
+              pull_request->toString(), offset);
         }
 
-        pullRequest->set_locked_first(true);
-        pullRequest->set_next_offset(offset);
+        pull_request->set_locked_first(true);
+        pull_request->set_next_offset(offset);
       }
     } else {
-      executePullRequestLater(
-          pullRequest,
-          dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->pull_time_delay_mills_when_exception());
-      LOG_INFO_NEW("pull message later because not locked in broker, {}", pullRequest->toString());
+      executePullRequestLater(pull_request, getDefaultMQPushConsumerConfig()->pull_time_delay_millis_when_exception());
+      LOG_INFO_NEW("pull message later because not locked in broker, {}", pull_request->toString());
       return;
     }
   }
 
-  const auto& messageQueue = pullRequest->message_queue();
-  SubscriptionData* subscriptionData = rebalance_impl_->getSubscriptionData(messageQueue.topic());
-  if (nullptr == subscriptionData) {
-    executePullRequestLater(
-        pullRequest,
-        dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->pull_time_delay_mills_when_exception());
-    LOG_WARN_NEW("find the consumer's subscription failed, {}", pullRequest->toString());
+  const auto& message_queue = pull_request->message_queue();
+  SubscriptionData* subscription_data = rebalance_impl_->getSubscriptionData(message_queue.topic());
+  if (nullptr == subscription_data) {
+    executePullRequestLater(pull_request, getDefaultMQPushConsumerConfig()->pull_time_delay_millis_when_exception());
+    LOG_WARN_NEW("find the consumer's subscription failed, {}", pull_request->toString());
     return;
   }
 
   bool commitOffsetEnable = false;
   int64_t commitOffsetValue = 0;
-  if (CLUSTERING == dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->message_model()) {
-    commitOffsetValue = offset_store_->readOffset(messageQueue, READ_FROM_MEMORY);
+  if (CLUSTERING == getDefaultMQPushConsumerConfig()->message_model()) {
+    commitOffsetValue = offset_store_->readOffset(message_queue, READ_FROM_MEMORY);
     if (commitOffsetValue > 0) {
       commitOffsetEnable = true;
     }
   }
 
-  const auto& subExpression = subscriptionData->sub_string();
+  const auto& subExpression = subscription_data->sub_string();
 
   int sysFlag = PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
                                           true,                    // suspend
@@ -538,97 +478,135 @@ void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pullRequest) {
                                           false);                  // class filter
 
   try {
-    auto* callback = new AsyncPullCallback(shared_from_this(), pullRequest, subscriptionData);
-    pull_api_wrapper_->pullKernelImpl(
-        messageQueue,                                                                            // 1
-        subExpression,                                                                           // 2
-        subscriptionData->sub_version(),                                                         // 3
-        pullRequest->next_offset(),                                                              // 4
-        32,                                                                                      // 5
-        sysFlag,                                                                                 // 6
-        commitOffsetValue,                                                                       // 7
-        1000 * 15,                                                                               // 8
-        dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->async_pull_timeout(),  // 9
-        CommunicationMode::ASYNC,                                                                // 10
-        callback);                                                                               // 11
+    auto* callback = new AsyncPullCallback(shared_from_this(), pull_request, subscription_data);
+    pull_api_wrapper_->pullKernelImpl(message_queue,                                        // mq
+                                      subExpression,                                        // subExpression
+                                      subscription_data->sub_version(),                     // subVersion
+                                      pull_request->next_offset(),                          // offset
+                                      getDefaultMQPushConsumerConfig()->pull_batch_size(),  // maxNums
+                                      sysFlag,                                              // sysFlag
+                                      commitOffsetValue,                                    // commitOffset
+                                      BROKER_SUSPEND_MAX_TIME_MILLIS,        // brokerSuspendMaxTimeMillis
+                                      CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,  // timeoutMillis
+                                      CommunicationMode::ASYNC,              // communicationMode
+                                      callback);                             // pullCallback
   } catch (MQException& e) {
     LOG_ERROR_NEW("pullKernelImpl exception: {}", e.what());
-    executePullRequestLater(
-        pullRequest,
-        dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->pull_time_delay_mills_when_exception());
+    executePullRequestLater(pull_request, getDefaultMQPushConsumerConfig()->pull_time_delay_millis_when_exception());
   }
 }
 
-void DefaultMQPushConsumerImpl::resetRetryTopic(const std::vector<MessageExtPtr>& msgs,
-                                                const std::string& consumerGroup) {
-  std::string groupTopic = UtilAll::getRetryTopic(consumerGroup);
+void DefaultMQPushConsumerImpl::correctTagsOffset(PullRequestPtr pullRequest) {
+  if (0L == pullRequest->process_queue()->getCacheMsgCount()) {
+    offset_store_->updateOffset(pullRequest->message_queue(), pullRequest->next_offset(), true);
+  }
+}
+
+void DefaultMQPushConsumerImpl::executeTaskLater(const handler_type& task, long timeDelay) {
+  client_instance_->getPullMessageService()->executeTaskLater(task, timeDelay);
+}
+
+void DefaultMQPushConsumerImpl::resetRetryAndNamespace(const std::vector<MessageExtPtr>& msgs) {
+  std::string retry_topic = UtilAll::getRetryTopic(groupName());
   for (auto& msg : msgs) {
-    std::string retryTopic = msg->getProperty(MQMessageConst::PROPERTY_RETRY_TOPIC);
-    if (!retryTopic.empty() && groupTopic == msg->topic()) {
-      msg->set_topic(retryTopic);
+    std::string group_topic = msg->getProperty(MQMessageConst::PROPERTY_RETRY_TOPIC);
+    if (!group_topic.empty() && retry_topic == msg->topic()) {
+      msg->set_topic(group_topic);
     }
   }
-}
 
-int DefaultMQPushConsumerImpl::getMaxReconsumeTimes() {
-  // default reconsume times: 16
-  if (dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->max_reconsume_times() == -1) {
-    return 16;
-  } else {
-    return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->max_reconsume_times();
+  const auto& name_space = client_config_->name_space();
+  if (!name_space.empty()) {
+    for (auto& msg : msgs) {
+      msg->set_topic(NamespaceUtil::withoutNamespace(msg->topic(), name_space));
+    }
   }
 }
 
-const std::string& DefaultMQPushConsumerImpl::groupName() const {
-  return client_config_->group_name();
+bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int delay_level) {
+  return sendMessageBack(msg, delay_level, null);
 }
 
-MessageModel DefaultMQPushConsumerImpl::messageModel() const {
-  return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->message_model();
-};
+bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int delay_level, const std::string& brokerName) {
+  try {
+    msg->set_topic(NamespaceUtil::wrapNamespace(client_config_->name_space(), msg->topic()));
 
-ConsumeType DefaultMQPushConsumerImpl::consumeType() const {
-  return CONSUME_PASSIVELY;
+    std::string brokerAddr = brokerName.empty() ? socketAddress2String(msg->store_host())
+                                                : client_instance_->findBrokerAddressInPublish(brokerName);
+
+    client_instance_->getMQClientAPIImpl()->consumerSendMessageBack(
+        brokerAddr, msg, getDefaultMQPushConsumerConfig()->group_name(), delay_level, 5000,
+        getDefaultMQPushConsumerConfig()->max_reconsume_times());
+    return true;
+  } catch (const std::exception& e) {
+    LOG_ERROR_NEW("sendMessageBack exception, group: {}, msg: {}. {}", getDefaultMQPushConsumerConfig()->group_name(),
+                  msg->toString(), e.what());
+  }
+  return false;
 }
 
-ConsumeFromWhere DefaultMQPushConsumerImpl::consumeFromWhere() const {
-  return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->consume_from_where();
+void DefaultMQPushConsumerImpl::persistConsumerOffset() {
+  if (isServiceStateOk()) {
+    std::vector<MQMessageQueue> mqs = rebalance_impl_->getAllocatedMQ();
+    if (getDefaultMQPushConsumerConfig()->message_model() == BROADCASTING) {
+      offset_store_->persistAll(mqs);
+    } else {
+      for (const auto& mq : mqs) {
+        offset_store_->persist(mq);
+      }
+    }
+  }
 }
 
-std::vector<SubscriptionData> DefaultMQPushConsumerImpl::subscriptions() const {
-  std::vector<SubscriptionData> result;
-  auto& subTable = rebalance_impl_->getSubscriptionInner();
-  for (const auto& it : subTable) {
-    result.push_back(*(it.second));
+void DefaultMQPushConsumerImpl::updateConsumeOffset(const MQMessageQueue& mq, int64_t offset) {
+  if (offset >= 0) {
+    offset_store_->updateOffset(mq, offset, false);
+  } else {
+    LOG_ERROR_NEW("updateConsumeOffset of mq:{} error", mq.toString());
   }
-  return result;
 }
 
 ConsumerRunningInfo* DefaultMQPushConsumerImpl::consumerRunningInfo() {
   auto* info = new ConsumerRunningInfo();
 
   info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, UtilAll::to_string(consume_orderly_));
-  info->setProperty(
-      ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
-      UtilAll::to_string(dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get())->consume_thread_nums()));
+  info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
+                    UtilAll::to_string(getDefaultMQPushConsumerConfig()->consume_thread_nums()));
   info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(start_time_));
 
-  auto subSet = subscriptions();
-  info->setSubscriptionSet(subSet);
+  auto sub_set = subscriptions();
+  info->setSubscriptionSet(sub_set);
 
   auto processQueueTable = rebalance_impl_->getProcessQueueTable();
-
   for (const auto& it : processQueueTable) {
     const auto& mq = it.first;
     const auto& pq = it.second;
 
-    ProcessQueueInfo pqinfo;
-    pqinfo.setCommitOffset(offset_store_->readOffset(mq, MEMORY_FIRST_THEN_STORE));
-    pq->fillProcessQueueInfo(pqinfo);
-    info->setMqTable(mq, pqinfo);
+    ProcessQueueInfo pq_info;
+    pq_info.setCommitOffset(offset_store_->readOffset(mq, MEMORY_FIRST_THEN_STORE));
+    pq->fillProcessQueueInfo(pq_info);
+    info->setMqTable(mq, pq_info);
   }
 
+  // TODO: ConsumeStatus
+
   return info;
 }
 
+const std::string& DefaultMQPushConsumerImpl::groupName() const {
+  return client_config_->group_name();
+}
+
+MessageModel DefaultMQPushConsumerImpl::messageModel() const {
+  return getDefaultMQPushConsumerConfig()->message_model();
+};
+
+ConsumeType DefaultMQPushConsumerImpl::consumeType() const {
+  return CONSUME_PASSIVELY;
+}
+
+ConsumeFromWhere DefaultMQPushConsumerImpl::consumeFromWhere() const {
+  return getDefaultMQPushConsumerConfig()->consume_from_where();
+}
+
 }  // namespace rocketmq
diff --git a/src/consumer/DefaultMQPushConsumerImpl.h b/src/consumer/DefaultMQPushConsumerImpl.h
index 1e3fdc7..ecac586 100755
--- a/src/consumer/DefaultMQPushConsumerImpl.h
+++ b/src/consumer/DefaultMQPushConsumerImpl.h
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef ROCKETMQ_CONXUMER_DEFAULTMQPUSHCONSUMERIMPL_H_
-#define ROCKETMQ_CONXUMER_DEFAULTMQPUSHCONSUMERIMPL_H_
+#ifndef ROCKETMQ_CONSUMER_DEFAULTMQPUSHCONSUMERIMPL_H_
+#define ROCKETMQ_CONSUMER_DEFAULTMQPUSHCONSUMERIMPL_H_
 
 #include <memory>
 #include <string>
@@ -43,6 +43,9 @@ class DefaultMQPushConsumerImpl : public std::enable_shared_from_this<DefaultMQP
                                   public MQPushConsumer,
                                   public MQClientImpl,
                                   public MQConsumerInner {
+ private:
+  class AsyncPullCallback;
+
  public:
   /**
    * create() - Factory method for DefaultMQPushConsumerImpl, used to ensure that all objects of
@@ -63,72 +66,66 @@ class DefaultMQPushConsumerImpl : public std::enable_shared_from_this<DefaultMQP
  public:
   virtual ~DefaultMQPushConsumerImpl();
 
- public:  // MQClient
+ public:  // MQPushConsumer
   void start() override;
   void shutdown() override;
 
- public:  // MQConsumer
-  bool sendMessageBack(MessageExtPtr msg, int delayLevel) override;
-  bool sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) override;
-  void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) override;
+  void suspend() override;
+  void resume() override;
+
+  MQMessageListener* getMessageListener() const override;
 
- public:  // MQPushConsumer
-  void registerMessageListener(MQMessageListener* messageListener) override;
   void registerMessageListener(MessageListenerConcurrently* messageListener) override;
   void registerMessageListener(MessageListenerOrderly* messageListener) override;
 
-  MQMessageListener* getMessageListener() const override;
-
   void subscribe(const std::string& topic, const std::string& subExpression) override;
 
-  void suspend() override;
-  void resume() override;
+  bool sendMessageBack(MessageExtPtr msg, int delayLevel) override;
+  bool sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) override;
 
  public:  // MQConsumerInner
   const std::string& groupName() const override;
   MessageModel messageModel() const override;
   ConsumeType consumeType() const override;
   ConsumeFromWhere consumeFromWhere() const override;
+
   std::vector<SubscriptionData> subscriptions() const override;
 
+  // service discovery
+  void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info) override;
+
+  // load balancing
   void doRebalance() override;
+
+  // offset persistence
   void persistConsumerOffset() override;
-  void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info) override;
 
   ConsumerRunningInfo* consumerRunningInfo() override;
 
  public:
-  void updateConsumeOffset(const MQMessageQueue& mq, int64_t offset);
-  void correctTagsOffset(PullRequestPtr pullRequest);
-
   void executePullRequestLater(PullRequestPtr pullRequest, long timeDelay);
   void executePullRequestImmediately(PullRequestPtr pullRequest);
-  void executeTaskLater(const handler_type& task, long timeDelay);
 
   void pullMessage(PullRequestPtr pullrequest);
 
-  void resetRetryTopic(const std::vector<MessageExtPtr>& msgs, const std::string& consumerGroup);
+  void resetRetryAndNamespace(const std::vector<MessageExtPtr>& msgs);
+
+  void updateConsumeOffset(const MQMessageQueue& mq, int64_t offset);
 
  private:
   void checkConfig();
   void copySubscription();
   void updateTopicSubscribeInfoWhenSubscriptionChanged();
 
- public:
-  int getMaxReconsumeTimes();
-
-  inline bool isPause() const { return pause_; };
-  inline void setPause(bool pause) { pause_ = pause; }
-
-  inline bool isConsumeOrderly() { return consume_orderly_; }
-
-  inline RebalanceImpl* getRebalanceImpl() const { return rebalance_impl_.get(); }
+  void correctTagsOffset(PullRequestPtr pullRequest);
 
-  inline PullAPIWrapper* getPullAPIWrapper() const { return pull_api_wrapper_.get(); }
+  void executeTaskLater(const handler_type& task, long timeDelay);
 
-  inline OffsetStore* getOffsetStore() const { return offset_store_.get(); }
+ public:
+  inline bool pause() const { return pause_; };
+  inline void set_pause(bool pause) { pause_ = pause; }
 
-  inline ConsumeMsgService* getConsumerMsgService() const { return consume_service_.get(); }
+  inline bool consume_orderly() { return consume_orderly_; }
 
   inline MessageListenerType getMessageListenerType() const {
     if (nullptr != message_listener_) {
@@ -137,7 +134,11 @@ class DefaultMQPushConsumerImpl : public std::enable_shared_from_this<DefaultMQP
     return messageListenerDefaultly;
   }
 
-  inline DefaultMQPushConsumerConfig* getDefaultMQPushConsumerConfig() {
+  inline RebalanceImpl* getRebalanceImpl() const { return rebalance_impl_.get(); }
+
+  inline OffsetStore* getOffsetStore() const { return offset_store_.get(); }
+
+  inline DefaultMQPushConsumerConfig* getDefaultMQPushConsumerConfig() const {
     return dynamic_cast<DefaultMQPushConsumerConfig*>(client_config_.get());
   }
 
@@ -148,13 +149,15 @@ class DefaultMQPushConsumerImpl : public std::enable_shared_from_this<DefaultMQP
   bool consume_orderly_;
 
   std::map<std::string, std::string> subscription_;
+
+  MQMessageListener* message_listener_;
+  std::unique_ptr<ConsumeMsgService> consume_service_;
+
   std::unique_ptr<RebalanceImpl> rebalance_impl_;
   std::unique_ptr<PullAPIWrapper> pull_api_wrapper_;
   std::unique_ptr<OffsetStore> offset_store_;
-  std::unique_ptr<ConsumeMsgService> consume_service_;
-  MQMessageListener* message_listener_;
 };
 
 }  // namespace rocketmq
 
-#endif  // ROCKETMQ_CONXUMER_DEFAULTMQPUSHCONSUMERIMPL_H_
+#endif  // ROCKETMQ_CONSUMER_DEFAULTMQPUSHCONSUMERIMPL_H_
diff --git a/src/consumer/MQConsumerInner.h b/src/consumer/MQConsumerInner.h
index b851316..d38e839 100644
--- a/src/consumer/MQConsumerInner.h
+++ b/src/consumer/MQConsumerInner.h
@@ -36,11 +36,17 @@ class MQConsumerInner {
   virtual MessageModel messageModel() const = 0;
   virtual ConsumeType consumeType() const = 0;
   virtual ConsumeFromWhere consumeFromWhere() const = 0;
+
   virtual std::vector<SubscriptionData> subscriptions() const = 0;
 
+  // service discovery
+  virtual void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info) = 0;
+
+  // load balancing
   virtual void doRebalance() = 0;
+
+  // offset persistence
   virtual void persistConsumerOffset() = 0;
-  virtual void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info) = 0;
 
   virtual ConsumerRunningInfo* consumerRunningInfo() = 0;
 };
diff --git a/src/consumer/RebalanceImpl.cpp b/src/consumer/RebalanceImpl.cpp
index a3988cd..1454584 100644
--- a/src/consumer/RebalanceImpl.cpp
+++ b/src/consumer/RebalanceImpl.cpp
@@ -467,7 +467,6 @@ void RebalanceImpl::destroy() {
   for (const auto& it : process_queue_table_) {
     it.second->set_dropped(true);
   }
-
   process_queue_table_.clear();
 }
 
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index feb7693..0ba977e 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -215,7 +215,7 @@ int SetPushConsumerMaxCacheMessageSize(CPushConsumer* consumer, int maxCacheSize
   if (consumer == NULL || maxCacheSize <= 0) {
     return NULL_POINTER;
   }
-  reinterpret_cast<DefaultMQPushConsumer*>(consumer)->set_max_cache_msg_size_per_queue(maxCacheSize);
+  reinterpret_cast<DefaultMQPushConsumer*>(consumer)->set_pull_threshold_for_queue(maxCacheSize);
   return OK;
 }
 


Mime
View raw message