rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifplu...@apache.org
Subject [rocketmq-client-cpp] 26/29: feat: DefaultLitePullConsumer
Date Tue, 29 Dec 2020 03:36:43 GMT
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit a750efad307a98c49b20d2beebc763633af2e472
Author: James Yin <ywhjames@hotmail.com>
AuthorDate: Mon Sep 21 15:46:55 2020 +0800

    feat: DefaultLitePullConsumer
---
 ...llConsumeMessage.c => PullConsumeMessage.c.bak} |   0
 example/{PullConsumer.cpp => PullConsumer.cpp.bak} |   0
 include/DefaultLitePullConsumer.h                  |  79 ++
 include/DefaultLitePullConsumerConfig.h            |  85 ++
 include/DefaultLitePullConsumerConfigProxy.h       | 175 ++++
 include/DefaultMQPullConsumer.h                    |  81 --
 include/DefaultMQPullConsumerConfigProxy.h         |  48 --
 include/LitePullConsumer.h                         |  79 ++
 include/MQAdmin.h                                  |   2 +-
 include/MQPullConsumer.h                           | 103 ---
 ...rConfig.h => TopicMessageQueueChangeListener.h} |  25 +-
 src/MQClientAPIImpl.cpp                            |   2 +-
 src/MQClientAPIImpl.h                              |   2 +-
 src/MQClientImpl.cpp                               |   2 +-
 src/MQClientImpl.h                                 |   2 +-
 src/common/PullCallbackWrap.cpp                    |   0
 src/common/PullCallbackWrap.h                      |   0
 src/common/PullSysFlag.cpp                         |  11 +-
 src/common/PullSysFlag.h                           |   3 +-
 src/common/SendCallbackWrap.cpp                    |   0
 src/common/SendCallbackWrap.h                      |   0
 src/concurrent/blocking_queue.hpp                  |  94 +++
 src/concurrent/concurrent_queue.hpp                |   2 +-
 src/concurrent/executor_impl.hpp                   |   1 +
 src/consumer/AssignedMessageQueue.hpp              | 219 +++++
 src/consumer/DefaultLitePullConsumer.cpp           | 130 +++
 src/consumer/DefaultLitePullConsumerConfigImpl.hpp | 150 ++++
 src/consumer/DefaultLitePullConsumerImpl.cpp       | 894 +++++++++++++++++++++
 src/consumer/DefaultLitePullConsumerImpl.h         | 228 ++++++
 src/consumer/DefaultMQPullConsumer.cpp             | 110 ---
 src/consumer/DefaultMQPullConsumerImpl.cpp         | 375 ---------
 src/consumer/DefaultMQPullConsumerImpl.h           | 173 ----
 .../consumer/MessageQueueListener.h                |  14 +-
 src/consumer/RebalanceImpl.cpp                     |  99 +--
 src/consumer/RebalanceImpl.h                       |   2 +-
 src/consumer/RebalanceLitePullImpl.cpp             | 119 +++
 ...RebalancePullImpl.h => RebalanceLitePullImpl.h} |  14 +-
 src/consumer/RebalancePullImpl.cpp                 |  48 --
 src/extern/CPullConsumer.cpp                       | 504 ++++++------
 ...lConsumerTest.cpp => CPullConsumerTest.cpp.bak} |   2 +-
 40 files changed, 2598 insertions(+), 1279 deletions(-)

diff --git a/example/PullConsumeMessage.c b/example/PullConsumeMessage.c.bak
similarity index 100%
rename from example/PullConsumeMessage.c
rename to example/PullConsumeMessage.c.bak
diff --git a/example/PullConsumer.cpp b/example/PullConsumer.cpp.bak
similarity index 100%
rename from example/PullConsumer.cpp
rename to example/PullConsumer.cpp.bak
diff --git a/include/DefaultLitePullConsumer.h b/include/DefaultLitePullConsumer.h
new file mode 100755
index 0000000..8ad2e9b
--- /dev/null
+++ b/include/DefaultLitePullConsumer.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_DEFAULTLITEPULLCONSUMER_H_
+#define ROCKETMQ_DEFAULTLITEPULLCONSUMER_H_
+
+#include "DefaultLitePullConsumerConfigProxy.h"
+#include "LitePullConsumer.h"
+#include "RPCHook.h"
+
+namespace rocketmq {
+
+class ROCKETMQCLIENT_API DefaultLitePullConsumer : public DefaultLitePullConsumerConfigProxy,  // base
+                                                   public LitePullConsumer                     // interface
+{
+ public:
+  DefaultLitePullConsumer(const std::string& groupname);
+  DefaultLitePullConsumer(const std::string& groupname, RPCHookPtr rpcHook);
+  virtual ~DefaultLitePullConsumer();
+
+ public:  // LitePullConsumer
+  void start() override;
+  void shutdown() override;
+
+  bool isAutoCommit() const override;
+  void setAutoCommit(bool auto_commit) override;
+
+  void subscribe(const std::string& topic, const std::string& subExpression) override;
+  void subscribe(const std::string& topic, const MessageSelector& selector) override;
+
+  void unsubscribe(const std::string& topic) override;
+
+  std::vector<MQMessageExt> poll() override;
+  std::vector<MQMessageExt> poll(long timeout) override;
+
+  std::vector<MQMessageQueue> fetchMessageQueues(const std::string& topic) override;
+
+  void assign(const std::vector<MQMessageQueue>& messageQueues) override;
+
+  void seek(const MQMessageQueue& messageQueue, int64_t offset) override;
+  void seekToBegin(const MQMessageQueue& messageQueue) override;
+  void seekToEnd(const MQMessageQueue& messageQueue) override;
+
+  int64_t offsetForTimestamp(const MQMessageQueue& messageQueue, int64_t timestamp) override;
+
+  void pause(const std::vector<MQMessageQueue>& messageQueues) override;
+  void resume(const std::vector<MQMessageQueue>& messageQueues) override;
+
+  void commitSync() override;
+
+  int64_t committed(const MQMessageQueue& messageQueue) override;
+
+  void registerTopicMessageQueueChangeListener(
+      const std::string& topic,
+      TopicMessageQueueChangeListener* topicMessageQueueChangeListener) override;
+
+ public:
+  void setRPCHook(RPCHookPtr rpcHook);
+
+ protected:
+  std::shared_ptr<LitePullConsumer> pull_consumer_impl_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_DEFAULTLITEPULLCONSUMER_H_
diff --git a/include/DefaultLitePullConsumerConfig.h b/include/DefaultLitePullConsumerConfig.h
new file mode 100644
index 0000000..037d331
--- /dev/null
+++ b/include/DefaultLitePullConsumerConfig.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_DEFAULTLITEPULLCONSUMERCONFIG_H_
+#define ROCKETMQ_DEFAULTLITEPULLCONSUMERCONFIG_H_
+
+#include "AllocateMQStrategy.h"
+#include "ConsumeType.h"
+#include "MQClientConfig.h"
+
+namespace rocketmq {
+
+class DefaultLitePullConsumerConfig;
+typedef std::shared_ptr<DefaultLitePullConsumerConfig> DefaultLitePullConsumerConfigPtr;
+
+class ROCKETMQCLIENT_API DefaultLitePullConsumerConfig : virtual public MQClientConfig  // base interface
+{
+ public:
+  virtual ~DefaultLitePullConsumerConfig() = default;
+
+  virtual MessageModel message_model() const = 0;
+  virtual void set_message_model(MessageModel message_model) = 0;
+
+  virtual ConsumeFromWhere consume_from_where() const = 0;
+  virtual void set_consume_from_where(ConsumeFromWhere consume_from_where) = 0;
+
+  virtual const std::string& consume_timestamp() const = 0;
+  virtual void set_consume_timestamp(const std::string& consume_timestamp) = 0;
+
+  virtual long auto_commit_interval_millis() const = 0;
+  virtual void set_auto_commit_interval_millis(long auto_commit_interval_millis) = 0;
+
+  virtual int pull_batch_size() const = 0;
+  virtual void set_pull_batch_size(int pull_batch_size) = 0;
+
+  virtual int pull_thread_nums() const = 0;
+  virtual void set_pull_thread_nums(int pull_thread_nums) = 0;
+
+  virtual bool long_polling_enable() const = 0;
+  virtual void set_long_polling_enable(bool long_polling_enable) = 0;
+
+  virtual long consumer_pull_timeout_millis() const = 0;
+  virtual void set_consumer_pull_timeout_millis(long consumer_pull_timeout_millis) = 0;
+
+  virtual long consumer_timeout_millis_when_suspend() const = 0;
+  virtual void set_consumer_timeout_millis_when_suspend(long consumer_timeout_millis_when_suspend) = 0;
+
+  virtual long broker_suspend_max_time_millis() const = 0;
+  virtual void set_broker_suspend_max_time_millis(long broker_suspend_max_time_millis) = 0;
+
+  virtual long pull_threshold_for_all() const = 0;
+  virtual void set_pull_threshold_for_all(long pull_threshold_for_all) = 0;
+
+  virtual int pull_threshold_for_queue() const = 0;
+  virtual void set_pull_threshold_for_queue(int pull_threshold_for_queue) = 0;
+
+  virtual long pull_time_delay_millis_when_exception() const = 0;
+  virtual void set_pull_time_delay_millis_when_exception(long pull_time_delay_millis_when_exception) = 0;
+
+  virtual long poll_timeout_millis() const = 0;
+  virtual void set_poll_timeout_millis(long poll_timeout_millis) = 0;
+
+  virtual long topic_metadata_check_interval_millis() const = 0;
+  virtual void set_topic_metadata_check_interval_millis(long topic_metadata_check_interval_millis) = 0;
+
+  virtual AllocateMQStrategy* allocate_mq_strategy() const = 0;
+  virtual void set_allocate_mq_strategy(AllocateMQStrategy* strategy) = 0;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_DEFAULTLITEPULLCONSUMERCONFIG_H_
diff --git a/include/DefaultLitePullConsumerConfigProxy.h b/include/DefaultLitePullConsumerConfigProxy.h
new file mode 100644
index 0000000..eae047f
--- /dev/null
+++ b/include/DefaultLitePullConsumerConfigProxy.h
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_DEFAULTLITEPULLCONSUMERCONFIGPROXY_H__
+#define ROCKETMQ_DEFAULTLITEPULLCONSUMERCONFIGPROXY_H__
+
+#include "DefaultLitePullConsumerConfig.h"
+#include "MQClientConfigProxy.h"
+
+namespace rocketmq {
+
+class ROCKETMQCLIENT_API DefaultLitePullConsumerConfigProxy : public MQClientConfigProxy,                   // base
+                                                              virtual public DefaultLitePullConsumerConfig  // interface
+{
+ public:
+  DefaultLitePullConsumerConfigProxy(DefaultLitePullConsumerConfigPtr consumerConfig)
+      : MQClientConfigProxy(consumerConfig) {}
+  virtual ~DefaultLitePullConsumerConfigProxy() = default;
+
+  MessageModel message_model() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->message_model();
+  }
+
+  void set_message_model(MessageModel messageModel) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->set_message_model(messageModel);
+  }
+
+  ConsumeFromWhere consume_from_where() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->consume_from_where();
+  }
+
+  void set_consume_from_where(ConsumeFromWhere consumeFromWhere) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->set_consume_from_where(consumeFromWhere);
+  }
+
+  const std::string& consume_timestamp() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->consume_timestamp();
+  }
+  void set_consume_timestamp(const std::string& consumeTimestamp) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->set_consume_timestamp(consumeTimestamp);
+  }
+
+  long auto_commit_interval_millis() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->auto_commit_interval_millis();
+  }
+
+  void set_auto_commit_interval_millis(long auto_commit_interval_millis) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())
+        ->set_auto_commit_interval_millis(auto_commit_interval_millis);
+  }
+
+  int pull_batch_size() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->pull_batch_size();
+  }
+
+  void set_pull_batch_size(int pull_batch_size) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->set_pull_batch_size(pull_batch_size);
+  }
+
+  int pull_thread_nums() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->pull_thread_nums();
+  }
+
+  void set_pull_thread_nums(int pullThreadNums) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->set_pull_thread_nums(pullThreadNums);
+  }
+
+  bool long_polling_enable() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->long_polling_enable();
+  }
+
+  void set_long_polling_enable(bool long_polling_enable) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->set_long_polling_enable(long_polling_enable);
+  }
+
+  long consumer_pull_timeout_millis() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->consumer_pull_timeout_millis();
+  }
+
+  void set_consumer_pull_timeout_millis(long consumer_pull_timeout_millis) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())
+        ->set_consumer_pull_timeout_millis(consumer_pull_timeout_millis);
+  }
+
+  long consumer_timeout_millis_when_suspend() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->consumer_timeout_millis_when_suspend();
+  }
+
+  void set_consumer_timeout_millis_when_suspend(long consumer_timeout_millis_when_suspend) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())
+        ->set_consumer_timeout_millis_when_suspend(consumer_timeout_millis_when_suspend);
+  }
+
+  long broker_suspend_max_time_millis() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->broker_suspend_max_time_millis();
+  }
+
+  void set_broker_suspend_max_time_millis(long broker_suspend_max_time_millis) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())
+        ->set_broker_suspend_max_time_millis(broker_suspend_max_time_millis);
+  }
+
+  long pull_threshold_for_all() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->pull_threshold_for_all();
+  }
+
+  void set_pull_threshold_for_all(long pull_threshold_for_all) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())
+        ->set_pull_threshold_for_all(pull_threshold_for_all);
+  }
+
+  int pull_threshold_for_queue() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->pull_threshold_for_queue();
+  }
+
+  void set_pull_threshold_for_queue(int pull_threshold_for_queue) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())
+        ->set_pull_threshold_for_queue(pull_threshold_for_queue);
+  }
+
+  long pull_time_delay_millis_when_exception() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->pull_time_delay_millis_when_exception();
+  }
+
+  void set_pull_time_delay_millis_when_exception(long pull_time_delay_millis_when_exception) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())
+        ->set_pull_time_delay_millis_when_exception(pull_time_delay_millis_when_exception);
+  }
+
+  long poll_timeout_millis() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->poll_timeout_millis();
+  }
+
+  void set_poll_timeout_millis(long poll_timeout_millis) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->set_poll_timeout_millis(poll_timeout_millis);
+  }
+
+  long topic_metadata_check_interval_millis() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->topic_metadata_check_interval_millis();
+  }
+
+  void set_topic_metadata_check_interval_millis(long topicMetadataCheckIntervalMillis) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())
+        ->set_topic_metadata_check_interval_millis(topicMetadataCheckIntervalMillis);
+  }
+
+  AllocateMQStrategy* allocate_mq_strategy() const override {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->allocate_mq_strategy();
+  }
+
+  void set_allocate_mq_strategy(AllocateMQStrategy* strategy) override {
+    dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get())->set_allocate_mq_strategy(strategy);
+  }
+
+  inline DefaultLitePullConsumerConfigPtr real_config() const {
+    return std::dynamic_pointer_cast<DefaultLitePullConsumerConfig>(client_config_);
+  }
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_DEFAULTLITEPULLCONSUMERCONFIGPROXY_H__
diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
deleted file mode 100755
index 2f39e97..0000000
--- a/include/DefaultMQPullConsumer.h
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ROCCKETMQ_DEFAULTMQPULLCONSUMER_H_
-#define ROCCKETMQ_DEFAULTMQPULLCONSUMER_H_
-
-#include <set>
-#include <string>
-
-#include "AllocateMQStrategy.h"
-#include "DefaultMQPullConsumerConfigProxy.h"
-#include "MQPullConsumer.h"
-#include "RPCHook.h"
-
-namespace rocketmq {
-
-class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQPullConsumer, public DefaultMQPullConsumerConfigProxy {
- public:
-  DefaultMQPullConsumer(const std::string& groupname);
-  DefaultMQPullConsumer(const std::string& groupname, RPCHookPtr rpcHook);
-  virtual ~DefaultMQPullConsumer();
-
- public:  // MQConsumer
-  void start() override;
-  void shutdown() override;
-
-  bool sendMessageBack(MessageExtPtr msg, int delayLevel) override;
-  bool sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) override;
-  void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) override;
-
- public:  // MQPullConsumer
-  void registerMessageQueueListener(const std::string& topic, MQueueListener* pListener) override;
-
-  PullResult pull(const MQMessageQueue& mq, const std::string& subExpression, int64_t offset, int maxNums) override;
-
-  void pull(const MQMessageQueue& mq,
-            const std::string& subExpression,
-            int64_t offset,
-            int maxNums,
-            PullCallback* pullCallback) override;
-
-  PullResult pullBlockIfNotFound(const MQMessageQueue& mq,
-                                 const std::string& subExpression,
-                                 int64_t offset,
-                                 int maxNums) override;
-
-  void pullBlockIfNotFound(const MQMessageQueue& mq,
-                           const std::string& subExpression,
-                           int64_t offset,
-                           int maxNums,
-                           PullCallback* pullCallback) override;
-
-  void updateConsumeOffset(const MQMessageQueue& mq, int64_t offset) override;
-
-  int64_t fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore) override;
-
-  void fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue>& mqs) override;
-
- public:
-  void setRPCHook(RPCHookPtr rpcHook);
-
- protected:
-  std::shared_ptr<MQPullConsumer> m_pullConsumerDelegate;
-};
-
-}  // namespace rocketmq
-
-#endif  // ROCCKETMQ_DEFAULTMQPULLCONSUMER_H_
diff --git a/include/DefaultMQPullConsumerConfigProxy.h b/include/DefaultMQPullConsumerConfigProxy.h
deleted file mode 100644
index 63c73ec..0000000
--- a/include/DefaultMQPullConsumerConfigProxy.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ROCKETMQ_DEFAULTMQPULLCONSUMERCONFIGPROXY_H_
-#define ROCKETMQ_DEFAULTMQPULLCONSUMERCONFIGPROXY_H_
-
-#include "DefaultMQPullConsumerConfig.h"
-#include "MQClientConfigProxy.h"
-
-namespace rocketmq {
-
-class ROCKETMQCLIENT_API DefaultMQPullConsumerConfigProxy : virtual public DefaultMQPullConsumerConfig,
-                                                            public MQClientConfigProxy {
- public:
-  DefaultMQPullConsumerConfigProxy(DefaultMQPullConsumerConfigPtr consumerConfig)
-      : MQClientConfigProxy(consumerConfig), m_consumerConfig(consumerConfig) {}
-  virtual ~DefaultMQPullConsumerConfigProxy() = default;
-
-  MessageModel getMessageModel() const override { return m_consumerConfig->getMessageModel(); }
-
-  void setMessageModel(MessageModel messageModel) override { m_consumerConfig->setMessageModel(messageModel); }
-
-  AllocateMQStrategy* getAllocateMQStrategy() const override { return m_consumerConfig->getAllocateMQStrategy(); }
-
-  void setAllocateMQStrategy(AllocateMQStrategy* strategy) override {
-    m_consumerConfig->setAllocateMQStrategy(strategy);
-  }
-
- private:
-  DefaultMQPullConsumerConfigPtr m_consumerConfig;
-};
-
-}  // namespace rocketmq
-
-#endif  // ROCKETMQ_DEFAULTMQPULLCONSUMERCONFIGPROXY_H_
diff --git a/include/LitePullConsumer.h b/include/LitePullConsumer.h
new file mode 100644
index 0000000..6dcc6d1
--- /dev/null
+++ b/include/LitePullConsumer.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_LITEPULLCONSUMER_H_
+#define ROCKETMQ_LITEPULLCONSUMER_H_
+
+#include "TopicMessageQueueChangeListener.h"
+#include "MessageSelector.h"
+#include "MQMessageExt.h"
+#include "MQMessageQueue.h"
+
+namespace rocketmq {
+
+/**
+ * LitePullConsumer - interface for pull consumer
+ */
+class ROCKETMQCLIENT_API LitePullConsumer {
+ public:
+  virtual ~LitePullConsumer() = default;
+
+ public:  // LitePullConsumer in Java
+  virtual void start() = 0;
+  virtual void shutdown() = 0;
+
+  virtual bool isAutoCommit() const = 0;
+  virtual void setAutoCommit(bool auto_commit) = 0;
+
+  //
+  // Automatic mode
+
+  virtual void subscribe(const std::string& topic, const std::string& subExpression) = 0;
+  virtual void subscribe(const std::string& topic, const MessageSelector& selector) = 0;
+
+  virtual void unsubscribe(const std::string& topic) = 0;
+
+  virtual std::vector<MQMessageExt> poll() = 0;
+  virtual std::vector<MQMessageExt> poll(long timeout) = 0;
+
+  //
+  // Manually mode
+
+  virtual std::vector<MQMessageQueue> fetchMessageQueues(const std::string& topic) = 0;
+
+  virtual void assign(const std::vector<MQMessageQueue>& messageQueues) = 0;
+
+  virtual void seek(const MQMessageQueue& messageQueue, int64_t offset) = 0;
+  virtual void seekToBegin(const MQMessageQueue& messageQueue) = 0;
+  virtual void seekToEnd(const MQMessageQueue& messageQueue) = 0;
+
+  virtual int64_t offsetForTimestamp(const MQMessageQueue& messageQueue, int64_t timestamp) = 0;
+
+  virtual void pause(const std::vector<MQMessageQueue>& messageQueues) = 0;
+  virtual void resume(const std::vector<MQMessageQueue>& messageQueues) = 0;
+
+  virtual void commitSync() = 0;
+
+  virtual int64_t committed(const MQMessageQueue& messageQueue) = 0;
+
+  virtual void registerTopicMessageQueueChangeListener(
+      const std::string& topic,
+      TopicMessageQueueChangeListener* topicMessageQueueChangeListener) = 0;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_LITEPULLCONSUMER_H_
diff --git a/include/MQAdmin.h b/include/MQAdmin.h
index dff6f51..61c3044 100644
--- a/include/MQAdmin.h
+++ b/include/MQAdmin.h
@@ -47,7 +47,7 @@ class ROCKETMQCLIENT_API MQAdmin {
    * @param timestamp from when in milliseconds.
    * @return offset
    */
-  virtual int64_t searchOffset(const MQMessageQueue& mq, uint64_t timestamp) = 0;
+  virtual int64_t searchOffset(const MQMessageQueue& mq, int64_t timestamp) = 0;
 
   /**
    * Gets the max offset
diff --git a/include/MQPullConsumer.h b/include/MQPullConsumer.h
deleted file mode 100644
index 2545116..0000000
--- a/include/MQPullConsumer.h
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ROCKETMQ_MQPULLCONSUMER_H_
-#define ROCKETMQ_MQPULLCONSUMER_H_
-
-#include "MQConsumer.h"
-#include "MQueueListener.h"
-#include "PullCallback.h"
-
-namespace rocketmq {
-
-class ROCKETMQCLIENT_API MQPullConsumer : public MQConsumer {
- public:
-  virtual void registerMessageQueueListener(const std::string& topic, MQueueListener* pListener) = 0;
-
-  /**
-   * pull msg from specified queue, if no msg in queue, return directly
-   *
-   * @param mq
-   *            specify the pulled queue
-   * @param subExpression
-   *            set filter expression for pulled msg, broker will filter msg actively
-   *            Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
-   *            if subExpression is setted to "null" or "*" all msg will be subscribed
-   * @param offset
-   *            specify the started pull offset
-   * @param maxNums
-   *            specify max msg num by per pull
-   * @return
-   *            accroding to PullResult
-   */
-  virtual PullResult pull(const MQMessageQueue& mq, const std::string& subExpression, int64_t offset, int maxNums) = 0;
-
-  virtual void pull(const MQMessageQueue& mq,
-                    const std::string& subExpression,
-                    int64_t offset,
-                    int maxNums,
-                    PullCallback* pullCallback) = 0;
-
-  /**
-   * pull msg from specified queue, if no msg, broker will suspend the pull request 20s
-   *
-   * @param mq
-   *            specify the pulled queue
-   * @param subExpression
-   *            set filter expression for pulled msg, broker will filter msg actively
-   *            Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
-   *            if subExpression is setted to "null" or "*" all msg will be subscribed
-   * @param offset
-   *            specify the started pull offset
-   * @param maxNums
-   *            specify max msg num by per pull
-   * @return
-   *            accroding to PullResult
-   */
-  virtual PullResult pullBlockIfNotFound(const MQMessageQueue& mq,
-                                         const std::string& subExpression,
-                                         int64_t offset,
-                                         int maxNums) = 0;
-
-  virtual void pullBlockIfNotFound(const MQMessageQueue& mq,
-                                   const std::string& subExpression,
-                                   int64_t offset,
-                                   int maxNums,
-                                   PullCallback* pullCallback) = 0;
-
-  virtual void updateConsumeOffset(const MQMessageQueue& mq, int64_t offset) = 0;
-
-  /**
-   * Fetch the offset
-   *
-   * @param mq
-   * @param fromStore
-   * @return
-   */
-  virtual int64_t fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore) = 0;
-
-  /**
-   * Fetch the message queues according to the topic
-   *
-   * @param topic Message Topic
-   * @return
-   */
-  virtual void fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue>& mqs) = 0;
-};
-
-}  // namespace rocketmq
-
-#endif  // ROCKETMQ_MQPULLCONSUMER_H_
diff --git a/include/DefaultMQPullConsumerConfig.h b/include/TopicMessageQueueChangeListener.h
similarity index 53%
rename from include/DefaultMQPullConsumerConfig.h
rename to include/TopicMessageQueueChangeListener.h
index 056e6d9..59e7121 100644
--- a/include/DefaultMQPullConsumerConfig.h
+++ b/include/TopicMessageQueueChangeListener.h
@@ -14,29 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef ROCKETMQ_DEFAULTMQPULLCONSUMERCONFIG_H_
-#define ROCKETMQ_DEFAULTMQPULLCONSUMERCONFIG_H_
+#ifndef ROCKETMQ_TOPICMESSAGEQUEUECHANGLISTENER_H_
+#define ROCKETMQ_TOPICMESSAGEQUEUECHANGLISTENER_H_
 
-#include "AllocateMQStrategy.h"
-#include "ConsumeType.h"
-#include "MQClientConfig.h"
+#include <vector>  // std::vector
 
-namespace rocketmq {
+#include "MQMessageQueue.h"
 
-class DefaultMQPullConsumerConfig;
-typedef std::shared_ptr<DefaultMQPullConsumerConfig> DefaultMQPullConsumerConfigPtr;
+namespace rocketmq {
 
-class ROCKETMQCLIENT_API DefaultMQPullConsumerConfig : virtual public MQClientConfig {
+class ROCKETMQCLIENT_API TopicMessageQueueChangeListener {
  public:
-  virtual ~DefaultMQPullConsumerConfig() = default;
-
-  virtual MessageModel getMessageModel() const = 0;
-  virtual void setMessageModel(MessageModel messageModel) = 0;
+  virtual ~TopicMessageQueueChangeListener() = default;
 
-  virtual AllocateMQStrategy* getAllocateMQStrategy() const = 0;
-  virtual void setAllocateMQStrategy(AllocateMQStrategy* strategy) = 0;
+  virtual void onChanged(const std::string& topic, const std::vector<MQMessageQueue>& messageQueues) = 0;
 };
 
 }  // namespace rocketmq
 
-#endif  // ROCKETMQ_DEFAULTMQPULLCONSUMERCONFIG_H_
+#endif  // ROCKETMQ_TOPICMESSAGEQUEUECHANGLISTENER_H_
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index c3e010f..2d35354 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -331,7 +331,7 @@ MQMessageExt MQClientAPIImpl::viewMessage(const std::string& addr, int64_t phyof
 int64_t MQClientAPIImpl::searchOffset(const std::string& addr,
                                       const std::string& topic,
                                       int queueId,
-                                      uint64_t timestamp,
+                                      int64_t timestamp,
                                       int timeoutMillis) {
   auto* requestHeader = new SearchOffsetRequestHeader();
   requestHeader->topic = topic;
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index b105ffe..143a6a5 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -91,7 +91,7 @@ class MQClientAPIImpl {
   int64_t searchOffset(const std::string& addr,
                        const std::string& topic,
                        int queueId,
-                       uint64_t timestamp,
+                       int64_t timestamp,
                        int timeoutMillis);
 
   int64_t getMaxOffset(const std::string& addr, const std::string& topic, int queueId, int timeoutMillis);
diff --git a/src/MQClientImpl.cpp b/src/MQClientImpl.cpp
index 195e680..e7c97a5 100644
--- a/src/MQClientImpl.cpp
+++ b/src/MQClientImpl.cpp
@@ -55,7 +55,7 @@ void MQClientImpl::createTopic(const std::string& key, const std::string& newTop
   }
 }
 
-int64_t MQClientImpl::searchOffset(const MQMessageQueue& mq, uint64_t timestamp) {
+int64_t MQClientImpl::searchOffset(const MQMessageQueue& mq, int64_t timestamp) {
   return client_instance_->getMQAdminImpl()->searchOffset(mq, timestamp);
 }
 
diff --git a/src/MQClientImpl.h b/src/MQClientImpl.h
index 879f471..0748374 100644
--- a/src/MQClientImpl.h
+++ b/src/MQClientImpl.h
@@ -31,7 +31,7 @@ class MQClientImpl : public MQAdmin {
 
  public:  // MQAdmin
   void createTopic(const std::string& key, const std::string& newTopic, int queueNum) override;
-  int64_t searchOffset(const MQMessageQueue& mq, uint64_t timestamp) override;
+  int64_t searchOffset(const MQMessageQueue& mq, int64_t timestamp) override;
   int64_t maxOffset(const MQMessageQueue& mq) override;
   int64_t minOffset(const MQMessageQueue& mq) override;
   int64_t earliestMsgStoreTime(const MQMessageQueue& mq) override;
diff --git a/src/common/PullCallbackWrap.cpp b/src/common/PullCallbackWrap.cpp
old mode 100755
new mode 100644
diff --git a/src/common/PullCallbackWrap.h b/src/common/PullCallbackWrap.h
old mode 100755
new mode 100644
diff --git a/src/common/PullSysFlag.cpp b/src/common/PullSysFlag.cpp
index dc8194f..68f6a3b 100644
--- a/src/common/PullSysFlag.cpp
+++ b/src/common/PullSysFlag.cpp
@@ -20,10 +20,11 @@ static const int FLAG_COMMIT_OFFSET = 0x1 << 0;
 static const int FLAG_SUSPEND = 0x1 << 1;
 static const int FLAG_SUBSCRIPTION = 0x1 << 2;
 static const int FLAG_CLASS_FILTER = 0x1 << 3;
+static const int FLAG_LITE_PULL_MESSAGE = 0x1 << 4;
 
 namespace rocketmq {
 
-int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend, bool subscription, bool classFilter) {
+int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend, bool subscription, bool classFilter, bool litePull) {
   int flag = 0;
 
   if (commitOffset) {
@@ -42,6 +43,10 @@ int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend, bool subscription
     flag |= FLAG_CLASS_FILTER;
   }
 
+  if (litePull) {
+    flag |= FLAG_LITE_PULL_MESSAGE;
+  }
+
   return flag;
 }
 
@@ -65,4 +70,8 @@ bool PullSysFlag::hasClassFilterFlag(int sysFlag) {
   return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
 }
 
+bool PullSysFlag::hasLitePullFlag(int sysFlag) {
+  return (sysFlag & FLAG_LITE_PULL_MESSAGE) == FLAG_LITE_PULL_MESSAGE;
+}
+
 }  // namespace rocketmq
diff --git a/src/common/PullSysFlag.h b/src/common/PullSysFlag.h
index 51f2944..f604eae 100644
--- a/src/common/PullSysFlag.h
+++ b/src/common/PullSysFlag.h
@@ -21,13 +21,14 @@ namespace rocketmq {
 
 class PullSysFlag {
  public:
-  static int buildSysFlag(bool commitOffset, bool suspend, bool subscription, bool classFilter);
+  static int buildSysFlag(bool commitOffset, bool suspend, bool subscription, bool classFilter, bool litePull = false);
 
   static int clearCommitOffsetFlag(int sysFlag);
   static bool hasCommitOffsetFlag(int sysFlag);
   static bool hasSuspendFlag(int sysFlag);
   static bool hasSubscriptionFlag(int sysFlag);
   static bool hasClassFilterFlag(int sysFlag);
+  static bool hasLitePullFlag(int sysFlag);
 };
 
 }  // namespace rocketmq
diff --git a/src/common/SendCallbackWrap.cpp b/src/common/SendCallbackWrap.cpp
old mode 100755
new mode 100644
diff --git a/src/common/SendCallbackWrap.h b/src/common/SendCallbackWrap.h
old mode 100755
new mode 100644
diff --git a/src/concurrent/blocking_queue.hpp b/src/concurrent/blocking_queue.hpp
new file mode 100644
index 0000000..e801cf7
--- /dev/null
+++ b/src/concurrent/blocking_queue.hpp
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_CONCURRENT_BLOCKINGQUEUE_HPP_
+#define ROCKETMQ_CONCURRENT_BLOCKINGQUEUE_HPP_
+
+#include <chrono>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+
+#include "time.hpp"
+
+namespace rocketmq {
+
+template <typename T>
+class blocking_queue {
+ public:
+  // types:
+  typedef T value_type;
+
+  virtual ~blocking_queue() = default;
+
+  bool empty() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return queue_.empty();
+  }
+
+  size_t size() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return queue_.size();
+  }
+
+  template <typename E,
+            typename std::enable_if<std::is_same<typename std::decay<E>::type, value_type>::value, int>::type = 0>
+  void push_back(E&& v) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    queue_.emplace_back(new value_type(std::forward<E>(v)));
+    cv_.notify_one();
+  }
+
+  template <class E, typename std::enable_if<std::is_convertible<E, value_type*>::value, int>::type = 0>
+  void push_back(E v) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    queue_.emplace_back(v);
+    cv_.notify_one();
+  }
+
+  std::unique_ptr<value_type> pop_front() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (queue_.empty()) {
+      cv_.wait(lock, [&] { return !queue_.empty(); });
+    }
+    auto v = std::move(queue_.front());
+    queue_.pop_front();
+    return v;
+  }
+
+  std::unique_ptr<value_type> pop_front(long timeout, time_unit unit) {
+    auto deadline = until_time_point(timeout, unit);
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (queue_.empty()) {
+      cv_.wait_until(lock, deadline, [&] { return !queue_.empty(); });
+    }
+    if (!queue_.empty()) {
+      auto v = std::move(queue_.front());
+      queue_.pop_front();
+      return v;
+    }
+    return std::unique_ptr<value_type>();
+  }
+
+ private:
+  std::deque<std::unique_ptr<value_type>> queue_;
+  std::mutex mutex_;
+  std::condition_variable cv_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_CONCURRENT_BLOCKINGQUEUE_HPP_
diff --git a/src/concurrent/concurrent_queue.hpp b/src/concurrent/concurrent_queue.hpp
index 7fed592..5106341 100644
--- a/src/concurrent/concurrent_queue.hpp
+++ b/src/concurrent/concurrent_queue.hpp
@@ -57,7 +57,7 @@ class concurrent_queue {
   typedef T value_type;
   typedef concurrent_queue_node<value_type> node_type;
 
-  ~concurrent_queue() {
+  virtual ~concurrent_queue() {
     // clear this queue
     while (_clear_when_destruct) {
       if (nullptr == pop_front()) {
diff --git a/src/concurrent/executor_impl.hpp b/src/concurrent/executor_impl.hpp
index 89b90dd..616eb98 100644
--- a/src/concurrent/executor_impl.hpp
+++ b/src/concurrent/executor_impl.hpp
@@ -73,6 +73,7 @@ class thread_pool_executor : public abstract_executor_service {
   bool is_shutdown() override { return state_ != RUNNING; }
 
   std::size_t thread_nums() { return thread_nums_; }
+  void set_thread_nums(std::size_t thread_nums) { thread_nums_ = thread_nums; }
 
  protected:
   static const unsigned int ACCEPT_NEW_TASKS = 1U << 0U;
diff --git a/src/consumer/AssignedMessageQueue.hpp b/src/consumer/AssignedMessageQueue.hpp
new file mode 100644
index 0000000..9225728
--- /dev/null
+++ b/src/consumer/AssignedMessageQueue.hpp
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_CONSUMER_ASSIGNEDMESSAGEQUEUE_H_
+#define ROCKETMQ_CONSUMER_ASSIGNEDMESSAGEQUEUE_H_
+
+#include <algorithm>  // std::move, std::binary_search
+#include <mutex>      // std::mutex
+
+#include "MQMessageQueue.h"
+#include "ProcessQueue.h"
+#include "RebalanceImpl.h"
+
+namespace rocketmq {
+
+class MessageQueueState {
+ public:
+  MessageQueueState(const MQMessageQueue& message_queue, ProcessQueuePtr process_queue)
+      : message_queue_(message_queue),
+        process_queue_(std::move(process_queue)),
+        paused_(false),
+        pull_offset_(-1),
+        consume_offset_(-1),
+        seek_offset_(-1) {}
+
+  inline const MQMessageQueue& message_queue() const { return message_queue_; }
+  inline void set_message_queue(const MQMessageQueue message_queue) { message_queue_ = message_queue; }
+
+  inline ProcessQueuePtr process_queue() const { return process_queue_; }
+  inline void process_queue(ProcessQueuePtr process_queue) { process_queue_ = std::move(process_queue); }
+
+  inline bool is_paused() const { return paused_; }
+  inline void set_paused(bool paused) { paused_ = paused; }
+
+  inline int64_t pull_offset() const { return pull_offset_; }
+  inline void set_pull_offset(int64_t pull_offset) { pull_offset_ = pull_offset; }
+
+  inline int64_t consume_offset() const { return consume_offset_; }
+  inline void set_consume_offset(int64_t consume_offset) { consume_offset_ = consume_offset; }
+
+  inline int64_t seek_offset() const { return seek_offset_; }
+  inline void set_seek_offset(int64_t seek_offset) { seek_offset_ = seek_offset; }
+
+ private:
+  MQMessageQueue message_queue_;
+  ProcessQueuePtr process_queue_;
+  volatile bool paused_;
+  volatile int64_t pull_offset_;
+  volatile int64_t consume_offset_;
+  volatile int64_t seek_offset_;
+};
+
+class AssignedMessageQueue {
+ public:
+  std::vector<MQMessageQueue> messageQueues() {
+    std::vector<MQMessageQueue> mqs;
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    for (const auto& it : assigned_message_queue_state_) {
+      mqs.push_back(it.first);
+    }
+    return mqs;
+  }
+
+  bool isPaused(const MQMessageQueue& message_queue) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    auto it = assigned_message_queue_state_.find(message_queue);
+    if (it != assigned_message_queue_state_.end()) {
+      auto& message_queue_state = it->second;
+      return message_queue_state.is_paused();
+    }
+    return true;
+  }
+
+  void pause(const std::vector<MQMessageQueue>& message_queues) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    for (const auto& message_queue : message_queues) {
+      auto it = assigned_message_queue_state_.find(message_queue);
+      if (it != assigned_message_queue_state_.end()) {
+        auto& message_queue_state = it->second;
+        message_queue_state.set_paused(true);
+      }
+    }
+  }
+
+  void resume(const std::vector<MQMessageQueue>& message_queues) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    for (const auto& message_queue : message_queues) {
+      auto it = assigned_message_queue_state_.find(message_queue);
+      if (it != assigned_message_queue_state_.end()) {
+        auto& message_queue_state = it->second;
+        message_queue_state.set_paused(false);
+      }
+    }
+  }
+
+  ProcessQueuePtr getProcessQueue(const MQMessageQueue& message_queue) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    auto it = assigned_message_queue_state_.find(message_queue);
+    if (it != assigned_message_queue_state_.end()) {
+      auto& message_queue_state = it->second;
+      return message_queue_state.process_queue();
+    }
+    return nullptr;
+  }
+
+  int64_t getPullOffset(const MQMessageQueue& message_queue) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    auto it = assigned_message_queue_state_.find(message_queue);
+    if (it != assigned_message_queue_state_.end()) {
+      auto& message_queue_state = it->second;
+      return message_queue_state.pull_offset();
+    }
+    return -1;
+  }
+
+  void updatePullOffset(const MQMessageQueue& message_queue, int64_t offset) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    auto it = assigned_message_queue_state_.find(message_queue);
+    if (it != assigned_message_queue_state_.end()) {
+      auto& message_queue_state = it->second;
+      return message_queue_state.set_pull_offset(offset);
+    }
+  }
+
+  int64_t getConsumerOffset(const MQMessageQueue& message_queue) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    auto it = assigned_message_queue_state_.find(message_queue);
+    if (it != assigned_message_queue_state_.end()) {
+      auto& message_queue_state = it->second;
+      return message_queue_state.consume_offset();
+    }
+    return -1;
+  }
+
+  void updateConsumeOffset(const MQMessageQueue& message_queue, int64_t offset) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    auto it = assigned_message_queue_state_.find(message_queue);
+    if (it != assigned_message_queue_state_.end()) {
+      auto& message_queue_state = it->second;
+      return message_queue_state.set_consume_offset(offset);
+    }
+  }
+
+  int64_t getSeekOffset(const MQMessageQueue& message_queue) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    auto it = assigned_message_queue_state_.find(message_queue);
+    if (it != assigned_message_queue_state_.end()) {
+      auto& message_queue_state = it->second;
+      return message_queue_state.seek_offset();
+    }
+    return -1;
+  }
+
+  void setSeekOffset(const MQMessageQueue& message_queue, int64_t offset) {
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    auto it = assigned_message_queue_state_.find(message_queue);
+    if (it != assigned_message_queue_state_.end()) {
+      auto& message_queue_state = it->second;
+      return message_queue_state.set_seek_offset(offset);
+    }
+  }
+
+  void updateAssignedMessageQueue(const std::string& topic, std::vector<MQMessageQueue>& assigned) {
+    std::sort(assigned.begin(), assigned.end());
+    std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);
+    for (auto it = assigned_message_queue_state_.begin(); it != assigned_message_queue_state_.end();) {
+      auto& mq = it->first;
+      if (mq.topic() == topic) {
+        if (!std::binary_search(assigned.begin(), assigned.end(), mq)) {
+          it = assigned_message_queue_state_.erase(it);
+          continue;
+        }
+      }
+      it++;
+    }
+    addAssignedMessageQueue(assigned);
+  }
+
+ private:
+  void addAssignedMessageQueue(const std::vector<MQMessageQueue>& assigned) {
+    for (const auto& message_queue : assigned) {
+      if (assigned_message_queue_state_.find(message_queue) == assigned_message_queue_state_.end()) {
+        ProcessQueuePtr process_queue;
+        if (rebalance_impl_ != nullptr) {
+          process_queue = rebalance_impl_->getProcessQueue(message_queue);
+        }
+        if (nullptr == process_queue) {
+          process_queue.reset(new ProcessQueue());
+        }
+        assigned_message_queue_state_.emplace(message_queue, MessageQueueState(message_queue, process_queue));
+      }
+    }
+  }
+
+ public:
+  inline void set_rebalance_impl(RebalanceImpl* rebalance_impl) { rebalance_impl_ = rebalance_impl; }
+
+ private:
+  std::map<MQMessageQueue, MessageQueueState> assigned_message_queue_state_;
+  std::mutex assigned_message_queue_state_mutex_;
+  RebalanceImpl* rebalance_impl_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_CONSUMER_ASSIGNEDMESSAGEQUEUE_H_
diff --git a/src/consumer/DefaultLitePullConsumer.cpp b/src/consumer/DefaultLitePullConsumer.cpp
new file mode 100644
index 0000000..1351ea4
--- /dev/null
+++ b/src/consumer/DefaultLitePullConsumer.cpp
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DefaultLitePullConsumer.h"
+
+#include "DefaultLitePullConsumerConfigImpl.hpp"
+#include "DefaultLitePullConsumerImpl.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+DefaultLitePullConsumer::DefaultLitePullConsumer(const std::string& groupname)
+    : DefaultLitePullConsumer(groupname, nullptr) {}
+
+DefaultLitePullConsumer::DefaultLitePullConsumer(const std::string& groupname, RPCHookPtr rpcHook)
+    : DefaultLitePullConsumerConfigProxy(std::make_shared<DefaultLitePullConsumerConfigImpl>()),
+      pull_consumer_impl_(nullptr) {
+  // set default group name
+  if (groupname.empty()) {
+    set_group_name(DEFAULT_CONSUMER_GROUP);
+  } else {
+    set_group_name(groupname);
+  }
+
+  // create DefaultLitePullConsumerImpl
+  pull_consumer_impl_ = DefaultLitePullConsumerImpl::create(real_config(), rpcHook);
+}
+
+DefaultLitePullConsumer::~DefaultLitePullConsumer() = default;
+
+void DefaultLitePullConsumer::start() {
+  pull_consumer_impl_->start();
+}
+
+void DefaultLitePullConsumer::shutdown() {
+  pull_consumer_impl_->shutdown();
+}
+
+bool DefaultLitePullConsumer::isAutoCommit() const {
+  return pull_consumer_impl_->isAutoCommit();
+}
+
+void DefaultLitePullConsumer::setAutoCommit(bool auto_commit) {
+  pull_consumer_impl_->setAutoCommit(auto_commit);
+}
+
+void DefaultLitePullConsumer::subscribe(const std::string& topic, const std::string& subExpression) {
+  pull_consumer_impl_->subscribe(topic, subExpression);
+}
+
+void DefaultLitePullConsumer::subscribe(const std::string& topic, const MessageSelector& selector) {
+  pull_consumer_impl_->subscribe(topic, selector);
+}
+
+void DefaultLitePullConsumer::unsubscribe(const std::string& topic) {
+  pull_consumer_impl_->unsubscribe(topic);
+}
+
+std::vector<MQMessageExt> DefaultLitePullConsumer::poll() {
+  return pull_consumer_impl_->poll();
+}
+
+std::vector<MQMessageExt> DefaultLitePullConsumer::poll(long timeout) {
+  return pull_consumer_impl_->poll(timeout);
+}
+
+std::vector<MQMessageQueue> DefaultLitePullConsumer::fetchMessageQueues(const std::string& topic) {
+  return pull_consumer_impl_->fetchMessageQueues(topic);
+}
+
+void DefaultLitePullConsumer::assign(const std::vector<MQMessageQueue>& messageQueues) {
+  pull_consumer_impl_->assign(messageQueues);
+}
+
+void DefaultLitePullConsumer::seek(const MQMessageQueue& messageQueue, int64_t offset) {
+  pull_consumer_impl_->seek(messageQueue, offset);
+}
+
+void DefaultLitePullConsumer::seekToBegin(const MQMessageQueue& messageQueue) {
+  pull_consumer_impl_->seekToBegin(messageQueue);
+}
+
+void DefaultLitePullConsumer::seekToEnd(const MQMessageQueue& messageQueue) {
+  pull_consumer_impl_->seekToEnd(messageQueue);
+}
+
+int64_t DefaultLitePullConsumer::offsetForTimestamp(const MQMessageQueue& messageQueue, int64_t timestamp) {
+  return pull_consumer_impl_->offsetForTimestamp(messageQueue, timestamp);
+}
+
+void DefaultLitePullConsumer::pause(const std::vector<MQMessageQueue>& messageQueues) {
+  pull_consumer_impl_->pause(messageQueues);
+}
+
+void DefaultLitePullConsumer::resume(const std::vector<MQMessageQueue>& messageQueues) {
+  pull_consumer_impl_->resume(messageQueues);
+}
+
+void DefaultLitePullConsumer::commitSync() {
+  pull_consumer_impl_->commitSync();
+}
+
+int64_t DefaultLitePullConsumer::committed(const MQMessageQueue& messageQueue) {
+  return pull_consumer_impl_->committed(messageQueue);
+}
+
+void DefaultLitePullConsumer::registerTopicMessageQueueChangeListener(
+    const std::string& topic,
+    TopicMessageQueueChangeListener* topicMessageQueueChangeListener) {
+  pull_consumer_impl_->registerTopicMessageQueueChangeListener(topic, topicMessageQueueChangeListener);
+}
+
+void DefaultLitePullConsumer::setRPCHook(RPCHookPtr rpcHook) {
+  dynamic_cast<DefaultLitePullConsumerImpl*>(pull_consumer_impl_.get())->setRPCHook(rpcHook);
+}
+
+}  // namespace rocketmq
diff --git a/src/consumer/DefaultLitePullConsumerConfigImpl.hpp b/src/consumer/DefaultLitePullConsumerConfigImpl.hpp
new file mode 100644
index 0000000..ad41c5b
--- /dev/null
+++ b/src/consumer/DefaultLitePullConsumerConfigImpl.hpp
@@ -0,0 +1,150 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_CONSUMER_DEFAULTMQPUSHCONSUMERCONFIGIMPL_H_
+#define ROCKETMQ_CONSUMER_DEFAULTMQPUSHCONSUMERCONFIGIMPL_H_
+
+#include <algorithm>  // std::min
+#include <thread>     // std::thread::hardware_concurrency
+
+#include "AllocateMQAveragely.hpp"
+#include "DefaultLitePullConsumerConfig.h"
+#include "MQClientConfigImpl.hpp"
+
+namespace rocketmq {
+
+/**
+ * DefaultLitePullConsumerConfigImpl - implement for DefaultLitePullConsumerConfig
+ */
+class DefaultLitePullConsumerConfigImpl : virtual public DefaultLitePullConsumerConfig, public MQClientConfigImpl {
+ public:
+  DefaultLitePullConsumerConfigImpl()
+      : message_model_(MessageModel::CLUSTERING),
+        consume_from_where_(ConsumeFromWhere::CONSUME_FROM_LAST_OFFSET),
+        consume_timestamp_(UtilAll::to_string(UtilAll::currentTimeMillis() - (1000 * 60 * 30))),
+        auto_commit_interval_millis_(5 * 1000),
+        pull_batch_size_(10),
+        pull_thread_nums_(20),
+        long_polling_enable_(true),
+        consumer_pull_timeout_millis_(1000 * 10),
+        consumer_timeout_millis_when_suspend_(1000 * 30),
+        broker_suspend_max_time_millis_(1000 * 20),
+        pull_threshold_for_all_(10000),
+        pull_threshold_for_queue_(1000),
+        pull_time_delay_millis_when_exception_(1000),
+        poll_timeout_millis_(1000 * 5),
+        topic_metadata_check_interval_millis_(30 * 1000),
+        allocate_mq_strategy_(new AllocateMQAveragely()) {}
+  virtual ~DefaultLitePullConsumerConfigImpl() = default;
+
+  MessageModel message_model() const override { return message_model_; }
+  void set_message_model(MessageModel messageModel) override { message_model_ = messageModel; }
+
+  ConsumeFromWhere consume_from_where() const override { return consume_from_where_; }
+  void set_consume_from_where(ConsumeFromWhere consumeFromWhere) override { consume_from_where_ = consumeFromWhere; }
+
+  const std::string& consume_timestamp() const override { return consume_timestamp_; }
+  void set_consume_timestamp(const std::string& consumeTimestamp) override { consume_timestamp_ = consumeTimestamp; }
+
+  long auto_commit_interval_millis() const override { return auto_commit_interval_millis_; }
+  void set_auto_commit_interval_millis(long auto_commit_interval_millis) override {
+    auto_commit_interval_millis_ = auto_commit_interval_millis;
+  }
+
+  int pull_batch_size() const override { return pull_batch_size_; }
+  void set_pull_batch_size(int pull_batch_size) override { pull_batch_size_ = pull_batch_size; }
+
+  int pull_thread_nums() const override { return pull_thread_nums_; }
+  void set_pull_thread_nums(int pullThreadNums) override { pull_thread_nums_ = pullThreadNums; }
+
+  bool long_polling_enable() const override { return long_polling_enable_; }
+  void set_long_polling_enable(bool long_polling_enable) override { long_polling_enable_ = long_polling_enable; }
+
+  long consumer_pull_timeout_millis() const override { return consumer_pull_timeout_millis_; }
+  void set_consumer_pull_timeout_millis(long consumer_pull_timeout_millis) override {
+    consumer_pull_timeout_millis_ = consumer_pull_timeout_millis;
+  }
+
+  long consumer_timeout_millis_when_suspend() const override { return consumer_timeout_millis_when_suspend_; }
+  void set_consumer_timeout_millis_when_suspend(long consumer_timeout_millis_when_suspend) override {
+    consumer_timeout_millis_when_suspend_ = consumer_timeout_millis_when_suspend;
+  }
+
+  long broker_suspend_max_time_millis() const override { return broker_suspend_max_time_millis_; }
+  void set_broker_suspend_max_time_millis(long broker_suspend_max_time_millis) override {
+    broker_suspend_max_time_millis_ = broker_suspend_max_time_millis;
+  }
+
+  long pull_threshold_for_all() const override { return pull_threshold_for_all_; }
+  void set_pull_threshold_for_all(long pull_threshold_for_all) override {
+    pull_threshold_for_all_ = pull_threshold_for_all;
+  }
+
+  int pull_threshold_for_queue() const override { return pull_threshold_for_queue_; }
+  void set_pull_threshold_for_queue(int pull_threshold_for_queue) override {
+    pull_threshold_for_queue_ = pull_threshold_for_queue;
+  }
+
+  long pull_time_delay_millis_when_exception() const override { return pull_time_delay_millis_when_exception_; }
+  void set_pull_time_delay_millis_when_exception(long pull_time_delay_millis_when_exception) override {
+    pull_time_delay_millis_when_exception_ = pull_time_delay_millis_when_exception;
+  }
+
+  long poll_timeout_millis() const override { return poll_timeout_millis_; }
+  void set_poll_timeout_millis(long poll_timeout_millis) override { poll_timeout_millis_ = poll_timeout_millis; }
+
+  long topic_metadata_check_interval_millis() const override { return topic_metadata_check_interval_millis_; }
+  void set_topic_metadata_check_interval_millis(long topicMetadataCheckIntervalMillis) override {
+    topic_metadata_check_interval_millis_ = topicMetadataCheckIntervalMillis;
+  }
+
+  AllocateMQStrategy* allocate_mq_strategy() const override { return allocate_mq_strategy_.get(); }
+  void set_allocate_mq_strategy(AllocateMQStrategy* strategy) override { allocate_mq_strategy_.reset(strategy); }
+
+ private:
+  MessageModel message_model_;
+
+  ConsumeFromWhere consume_from_where_;
+  std::string consume_timestamp_;
+
+  long auto_commit_interval_millis_;
+
+  int pull_batch_size_;
+
+  int pull_thread_nums_;
+
+  bool long_polling_enable_;
+
+  long consumer_pull_timeout_millis_;
+  long consumer_timeout_millis_when_suspend_;
+  long broker_suspend_max_time_millis_;
+
+  long pull_threshold_for_all_;
+  int pull_threshold_for_queue_;
+
+  long pull_time_delay_millis_when_exception_;  // 1000
+
+  long poll_timeout_millis_;
+
+  long topic_metadata_check_interval_millis_;
+
+  std::unique_ptr<AllocateMQStrategy> allocate_mq_strategy_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_CONSUMER_DEFAULTMQPUSHCONSUMERCONFIGIMPL_H_
diff --git a/src/consumer/DefaultLitePullConsumerImpl.cpp b/src/consumer/DefaultLitePullConsumerImpl.cpp
new file mode 100644
index 0000000..3a5fc81
--- /dev/null
+++ b/src/consumer/DefaultLitePullConsumerImpl.cpp
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DefaultLitePullConsumerImpl.h"
+
+#ifndef WIN32
+#include <signal.h>
+#endif
+
+#include "AssignedMessageQueue.hpp"
+#include "FilterAPI.hpp"
+#include "MQAdminImpl.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientInstance.h"
+#include "NamespaceUtil.h"
+#include "LocalFileOffsetStore.h"
+#include "PullAPIWrapper.h"
+#include "PullSysFlag.h"
+#include "RebalanceLitePullImpl.h"
+#include "RemoteBrokerOffsetStore.h"
+#include "UtilAll.h"
+#include "Validators.h"
+
+static const long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
+static const long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+
+namespace rocketmq {
+
+class DefaultLitePullConsumerImpl::MessageQueueListenerImpl : public MessageQueueListener {
+ public:
+  MessageQueueListenerImpl(DefaultLitePullConsumerImplPtr pull_consumer) : default_lite_pull_consumer_(pull_consumer) {}
+
+  ~MessageQueueListenerImpl() = default;
+
+  void messageQueueChanged(const std::string& topic,
+                           std::vector<MQMessageQueue>& mq_all,
+                           std::vector<MQMessageQueue>& mq_divided) override {
+    auto consumer = default_lite_pull_consumer_.lock();
+    if (nullptr == consumer) {
+      return;
+    }
+    switch (consumer->messageModel()) {
+      case BROADCASTING:
+        consumer->updateAssignedMessageQueue(topic, mq_all);
+        consumer->updatePullTask(topic, mq_all);
+        break;
+      case CLUSTERING:
+        consumer->updateAssignedMessageQueue(topic, mq_divided);
+        consumer->updatePullTask(topic, mq_divided);
+        break;
+      default:
+        break;
+    }
+  }
+
+ private:
+  std::weak_ptr<DefaultLitePullConsumerImpl> default_lite_pull_consumer_;
+};
+
+class DefaultLitePullConsumerImpl::ConsumeRequest {
+ public:
+  ConsumeRequest(std::vector<MessageExtPtr>&& message_exts,
+                 const MQMessageQueue& message_queue,
+                 ProcessQueuePtr process_queue)
+      : message_exts_(std::move(message_exts)), message_queue_(message_queue), process_queue_(process_queue) {}
+
+ public:
+  std::vector<MessageExtPtr>& message_exts() { return message_exts_; }
+
+  MQMessageQueue& message_queue() { return message_queue_; }
+
+  ProcessQueuePtr process_queue() { return process_queue_; }
+
+ private:
+  std::vector<MessageExtPtr> message_exts_;
+  MQMessageQueue message_queue_;
+  ProcessQueuePtr process_queue_;
+};
+
+class DefaultLitePullConsumerImpl::PullTaskImpl : public std::enable_shared_from_this<PullTaskImpl> {
+ public:
+  PullTaskImpl(DefaultLitePullConsumerImplPtr pull_consumer, const MQMessageQueue& message_queue)
+      : default_lite_pull_consumer_(pull_consumer), message_queue_(message_queue), cancelled_(false) {}
+
+  void run() {
+    auto consumer = default_lite_pull_consumer_.lock();
+    if (nullptr == consumer) {
+      LOG_WARN_NEW("PullTaskImpl::run: DefaultLitePullConsumerImpl is released.");
+      return;
+    }
+
+    if (cancelled_) {
+      return;
+    }
+
+    if (consumer->assigned_message_queue_->isPaused(message_queue_)) {
+      consumer->scheduled_thread_pool_executor_.schedule(
+          std::bind(&DefaultLitePullConsumerImpl::PullTaskImpl::run, shared_from_this()),
+          PULL_TIME_DELAY_MILLS_WHEN_PAUSE, time_unit::milliseconds);
+      LOG_DEBUG_NEW("Message Queue: {} has been paused!", message_queue_.toString());
+      return;
+    }
+
+    auto process_queue = consumer->assigned_message_queue_->getProcessQueue(message_queue_);
+    if (nullptr == process_queue || process_queue->dropped()) {
+      LOG_INFO_NEW("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}",
+                   consumer->groupName(), message_queue_.toString());
+      return;
+    }
+
+    auto config = consumer->getDefaultLitePullConsumerConfig();
+
+    if (consumer->consume_request_cache_.size() * config->pull_batch_size() > config->pull_threshold_for_all()) {
+      consumer->scheduled_thread_pool_executor_.schedule(
+          std::bind(&DefaultLitePullConsumerImpl::PullTaskImpl::run, shared_from_this()),
+          PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, time_unit::milliseconds);
+      if ((consumer->consume_request_flow_control_times_++ % 1000) == 0)
+        LOG_WARN_NEW(
+            "The consume request count exceeds threshold {}, so do flow control, consume request count={}, "
+            "flowControlTimes={}",
+            config->pull_threshold_for_all(), consumer->consume_request_cache_.size(),
+            consumer->consume_request_flow_control_times_);
+      return;
+    }
+
+    auto cached_message_count = process_queue->getCacheMsgCount();
+    if (cached_message_count > config->pull_threshold_for_queue()) {
+      consumer->scheduled_thread_pool_executor_.schedule(
+          std::bind(&DefaultLitePullConsumerImpl::PullTaskImpl::run, shared_from_this()),
+          PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, time_unit::milliseconds);
+      if ((consumer->queue_flow_control_times_++ % 1000) == 0) {
+        LOG_WARN_NEW(
+            "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, "
+            "count={}, size={} MiB, flowControlTimes={}",
+            config->pull_threshold_for_queue(), process_queue->getCacheMinOffset(), process_queue->getCacheMaxOffset(),
+            cached_message_count, "unknown", consumer->queue_flow_control_times_);
+      }
+      return;
+    }
+
+    // long cachedMessageSizeInMiB = processQueue->getMsgSize() / (1024 * 1024);
+    // if (cachedMessageSizeInMiB > consumer.getPullThresholdSizeForQueue()) {
+    //   scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+    //   if ((queueFlowControlTimes++ % 1000) == 0) {
+    //     log.warn(
+    //         "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={},
+    //         "
+    //         "count={}, size={} MiB, flowControlTimes={}",
+    //         consumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(),
+    //         processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB,
+    //         queueFlowControlTimes);
+    //   }
+    //   return;
+    // }
+
+    // if (processQueue.getMaxSpan() > consumer.getConsumeMaxSpan()) {
+    //   scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+    //   if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
+    //     log.warn(
+    //         "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, "
+    //         "flowControlTimes={}",
+    //         processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(),
+    //         processQueue.getMaxSpan(),
+    //         queueMaxSpanFlowControlTimes);
+    //   }
+    //   return;
+    // }
+
+    auto offset = consumer->nextPullOffset(message_queue_);
+    long pull_delay_time_millis = 0;
+    SubscriptionData* subscription_data = nullptr;
+    try {
+      if (consumer->subscription_type_ == SubscriptionType::SUBSCRIBE) {
+        subscription_data = consumer->rebalance_impl_->getSubscriptionData(message_queue_.topic());
+      } else {
+        subscription_data = FilterAPI::buildSubscriptionData(message_queue_.topic(), SUB_ALL);
+      }
+
+      std::unique_ptr<PullResult> pull_result(
+          consumer->pull(message_queue_, subscription_data, offset, config->pull_batch_size()));
+
+      switch (pull_result->pull_status()) {
+        case PullStatus::FOUND: {
+          auto objLock = consumer->message_queue_lock_.fetchLockObject(message_queue_);
+          std::lock_guard<std::mutex> lock(*objLock);
+          if (!pull_result->msg_found_list().empty() &&
+              consumer->assigned_message_queue_->getSeekOffset(message_queue_) == -1) {
+            process_queue->putMessage(pull_result->msg_found_list());
+            consumer->submitConsumeRequest(
+                new ConsumeRequest(std::move(pull_result->msg_found_list()), message_queue_, process_queue));
+          }
+        } break;
+        case PullStatus::OFFSET_ILLEGAL:
+          LOG_WARN_NEW("The pull request offset illegal, {}", pull_result->toString());
+          break;
+        case PullStatus::NO_NEW_MSG:
+        case PullStatus::NO_MATCHED_MSG:
+          pull_delay_time_millis = 1000;
+          break;
+        case PullStatus::NO_LATEST_MSG:
+          pull_delay_time_millis = config->pull_time_delay_millis_when_exception();
+          break;
+        default:
+          break;
+      }
+
+      consumer->updatePullOffset(message_queue_, pull_result->next_begin_offset());
+    } catch (std::exception& e) {
+      pull_delay_time_millis = config->pull_time_delay_millis_when_exception();
+      LOG_ERROR_NEW("An error occurred in pull message process. {}", e.what());
+    }
+
+    if (consumer->subscription_type_ != SubscriptionType::SUBSCRIBE) {
+      delete subscription_data;
+    }
+
+    if (!cancelled_) {
+      consumer->scheduled_thread_pool_executor_.schedule(
+          std::bind(&DefaultLitePullConsumerImpl::PullTaskImpl::run, shared_from_this()), pull_delay_time_millis,
+          time_unit::milliseconds);
+    } else {
+      LOG_WARN_NEW("The Pull Task is cancelled after doPullTask, {}", message_queue_.toString());
+    }
+  }
+
+ public:
+  inline const MQMessageQueue& message_queue() { return message_queue_; }
+
+  inline bool is_cancelled() const { return cancelled_; }
+  inline void set_cancelled(bool cancelled) { cancelled_ = cancelled; }
+
+ private:
+  std::weak_ptr<DefaultLitePullConsumerImpl> default_lite_pull_consumer_;
+  MQMessageQueue message_queue_;
+  volatile bool cancelled_;
+};
+
+DefaultLitePullConsumerImpl::DefaultLitePullConsumerImpl(DefaultLitePullConsumerConfigPtr config)
+    : DefaultLitePullConsumerImpl(config, nullptr) {}
+
+DefaultLitePullConsumerImpl::DefaultLitePullConsumerImpl(DefaultLitePullConsumerConfigPtr config, RPCHookPtr rpcHook)
+    : MQClientImpl(config, rpcHook),
+      start_time_(UtilAll::currentTimeMillis()),
+      subscription_type_(SubscriptionType::NONE),
+      consume_request_flow_control_times_(0),
+      queue_flow_control_times_(0),
+      next_auto_commit_deadline_(-1LL),
+      auto_commit_(true),
+      message_queue_listener_(nullptr),
+      assigned_message_queue_(new AssignedMessageQueue()),
+      scheduled_thread_pool_executor_("PullMsgThread", config->pull_thread_nums(), false),
+      scheduled_executor_service_("MonitorMessageQueueChangeThread", false),
+      rebalance_impl_(new RebalanceLitePullImpl(this)),
+      pull_api_wrapper_(nullptr),
+      offset_store_(nullptr) {}
+
+DefaultLitePullConsumerImpl::~DefaultLitePullConsumerImpl() = default;
+
+void DefaultLitePullConsumerImpl::start() {
+#ifndef WIN32
+  /* Ignore the SIGPIPE */
+  struct sigaction sa;
+  memset(&sa, 0, sizeof(struct sigaction));
+  sa.sa_handler = SIG_IGN;
+  sa.sa_flags = 0;
+  ::sigaction(SIGPIPE, &sa, 0);
+#endif
+
+  switch (service_state_) {
+    case CREATE_JUST: {
+      // wrap namespace
+      client_config_->set_group_name(
+          NamespaceUtil::wrapNamespace(client_config_->name_space(), client_config_->group_name()));
+
+      LOG_INFO_NEW("the consumer [{}] start beginning.", client_config_->group_name());
+
+      service_state_ = START_FAILED;
+
+      checkConfig();
+
+      if (messageModel() == MessageModel::CLUSTERING) {
+        client_config_->changeInstanceNameToPID();
+      }
+
+      // init client_instance_
+      MQClientImpl::start();
+
+      // init rebalance_impl_
+      rebalance_impl_->set_consumer_group(client_config_->group_name());
+      rebalance_impl_->set_message_model(getDefaultLitePullConsumerConfig()->message_model());
+      rebalance_impl_->set_allocate_mq_strategy(getDefaultLitePullConsumerConfig()->allocate_mq_strategy());
+      rebalance_impl_->set_client_instance(client_instance_.get());
+
+      // init pull_api_wrapper_
+      pull_api_wrapper_.reset(new PullAPIWrapper(client_instance_.get(), client_config_->group_name()));
+      // TODO: registerFilterMessageHook
+
+      // init offset_store_
+      switch (getDefaultLitePullConsumerConfig()->message_model()) {
+        case MessageModel::BROADCASTING:
+          offset_store_.reset(new LocalFileOffsetStore(client_instance_.get(), client_config_->group_name()));
+          break;
+        case MessageModel::CLUSTERING:
+          offset_store_.reset(new RemoteBrokerOffsetStore(client_instance_.get(), client_config_->group_name()));
+          break;
+      }
+      offset_store_->load();
+
+      scheduled_thread_pool_executor_.set_thread_nums(getDefaultLitePullConsumerConfig()->pull_thread_nums());
+      scheduled_thread_pool_executor_.startup();
+      scheduled_executor_service_.startup();
+
+      // register consumer
+      bool registerOK = client_instance_->registerConsumer(client_config_->group_name(), this);
+      if (!registerOK) {
+        service_state_ = CREATE_JUST;
+        THROW_MQEXCEPTION(MQClientException, "The cousumer group[" + client_config_->group_name() +
+                                                 "] has been created before, specify another name please.",
+                          -1);
+      }
+
+      client_instance_->start();
+
+      startScheduleTask();
+
+      LOG_INFO_NEW("the consumer [{}] start OK", client_config_->group_name());
+      service_state_ = RUNNING;
+
+      operateAfterRunning();
+      break;
+    }
+    case RUNNING:
+    case START_FAILED:
+    case SHUTDOWN_ALREADY:
+      THROW_MQEXCEPTION(MQClientException, "The PullConsumer service state not OK, maybe started once", -1);
+      break;
+    default:
+      break;
+  };
+}
+
+void DefaultLitePullConsumerImpl::checkConfig() {
+  const auto& groupname = client_config_->group_name();
+
+  // check consumerGroup
+  Validators::checkGroup(groupname);
+
+  // consumerGroup
+  if (DEFAULT_CONSUMER_GROUP == groupname) {
+    THROW_MQEXCEPTION(MQClientException,
+                      "consumerGroup can not equal " + DEFAULT_CONSUMER_GROUP + ", please specify another one.", -1);
+  }
+
+  // messageModel
+  if (getDefaultLitePullConsumerConfig()->message_model() != BROADCASTING &&
+      getDefaultLitePullConsumerConfig()->message_model() != CLUSTERING) {
+    THROW_MQEXCEPTION(MQClientException, "messageModel is valid", -1);
+  }
+
+  // allocateMessageQueueStrategy
+  if (nullptr == getDefaultLitePullConsumerConfig()->allocate_mq_strategy()) {
+    THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1);
+  }
+
+  // if (getDefaultLitePullConsumerConfig()->getConsumerTimeoutMillisWhenSuspend() <
+  //     getDefaultLitePullConsumerConfig()->getBrokerSuspendMaxTimeMillis()) {
+  //   THROW_MQEXCEPTION(
+  //       MQClientException,
+  //       "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than
+  //       brokerSuspendMaxTimeMillis",
+  //       -1);
+  // }
+}
+
+void DefaultLitePullConsumerImpl::startScheduleTask() {
+  scheduled_executor_service_.schedule(
+      std::bind(&DefaultLitePullConsumerImpl::fetchTopicMessageQueuesAndComparePeriodically, this), 1000 * 10,
+      time_unit::milliseconds);
+}
+
+void DefaultLitePullConsumerImpl::fetchTopicMessageQueuesAndComparePeriodically() {
+  try {
+    fetchTopicMessageQueuesAndCompare();
+  } catch (std::exception& e) {
+    LOG_ERROR_NEW("ScheduledTask fetchMessageQueuesAndCompare exception: {}", e.what());
+  }
+
+  // next round
+  scheduled_executor_service_.schedule(
+      std::bind(&DefaultLitePullConsumerImpl::fetchTopicMessageQueuesAndComparePeriodically, this),
+      getDefaultLitePullConsumerConfig()->topic_metadata_check_interval_millis(), time_unit::milliseconds);
+}
+
+void DefaultLitePullConsumerImpl::fetchTopicMessageQueuesAndCompare() {
+  std::lock_guard<std::mutex> lock(mutex_);  // synchronized
+  for (const auto& it : topic_message_queue_change_listener_map_) {
+    const auto& topic = it.first;
+    auto* topic_message_queue_change_listener = it.second;
+    std::vector<MQMessageQueue> old_message_queues = message_queues_for_topic_[topic];
+    std::vector<MQMessageQueue> new_message_queues = fetchMessageQueues(topic);
+    bool isChanged = !isSetEqual(new_message_queues, old_message_queues);
+    if (isChanged) {
+      message_queues_for_topic_[topic] = new_message_queues;
+      if (topic_message_queue_change_listener != nullptr) {
+        topic_message_queue_change_listener->onChanged(topic, new_message_queues);
+      }
+    }
+  }
+}
+
+bool DefaultLitePullConsumerImpl::isSetEqual(std::vector<MQMessageQueue>& new_message_queues,
+                                             std::vector<MQMessageQueue>& old_message_queues) {
+  if (new_message_queues.size() != old_message_queues.size()) {
+    return false;
+  }
+  std::sort(new_message_queues.begin(), new_message_queues.end());
+  std::sort(old_message_queues.begin(), old_message_queues.end());
+  return new_message_queues == old_message_queues;
+}
+
+void DefaultLitePullConsumerImpl::operateAfterRunning() {
+  // If subscribe function invoke before start function, then update topic subscribe info after initialization.
+  if (subscription_type_ == SubscriptionType::SUBSCRIBE) {
+    updateTopicSubscribeInfoWhenSubscriptionChanged();
+  }
+  // If assign function invoke before start function, then update pull task after initialization.
+  else if (subscription_type_ == SubscriptionType::ASSIGN) {
+    auto message_queues = assigned_message_queue_->messageQueues();
+    updateAssignPullTask(message_queues);
+  }
+
+  for (const auto& it : topic_message_queue_change_listener_map_) {
+    const auto& topic = it.first;
+    auto messageQueues = fetchMessageQueues(topic);
+    message_queues_for_topic_[topic] = std::move(messageQueues);
+  }
+  // client_instance_->checkClientInBroker();
+}
+
+void DefaultLitePullConsumerImpl::updateTopicSubscribeInfoWhenSubscriptionChanged() {
+  auto& subTable = rebalance_impl_->getSubscriptionInner();
+  for (const auto& it : subTable) {
+    const auto& topic = it.first;
+    bool ret = client_instance_->updateTopicRouteInfoFromNameServer(topic);
+    if (!ret) {
+      LOG_WARN_NEW("The topic:[{}] not exist", topic);
+    }
+  }
+}
+
+void DefaultLitePullConsumerImpl::updateAssignPullTask(std::vector<MQMessageQueue>& mq_new_set) {
+  std::sort(mq_new_set.begin(), mq_new_set.end());
+  std::lock_guard<std::mutex> lock(task_table_mutex_);
+  for (auto it = task_table_.begin(); it != task_table_.end();) {
+    auto& mq = it->first;
+    if (!std::binary_search(mq_new_set.begin(), mq_new_set.end(), mq)) {
+      it->second->set_cancelled(true);
+      it = task_table_.erase(it);
+      continue;
+    }
+    it++;
+  }
+  startPullTask(mq_new_set);
+}
+
+void DefaultLitePullConsumerImpl::shutdown() {
+  switch (service_state_) {
+    case CREATE_JUST:
+      break;
+    case RUNNING:
+      persistConsumerOffset();
+      client_instance_->unregisterConsumer(client_config_->group_name());
+      scheduled_thread_pool_executor_.shutdown();
+      scheduled_executor_service_.shutdown();
+      client_instance_->shutdown();
+      rebalance_impl_->destroy();
+      service_state_ = ServiceState::SHUTDOWN_ALREADY;
+      LOG_INFO_NEW("the consumer [{}] shutdown OK", client_config_->group_name());
+      break;
+    default:
+      break;
+  }
+}
+
+void DefaultLitePullConsumerImpl::subscribe(const std::string& topic, const std::string& subExpression) {
+  std::lock_guard<std::mutex> lock(mutex_);  // synchronized
+  try {
+    if (topic.empty()) {
+      THROW_MQEXCEPTION(MQClientException, "Topic can not be null or empty.", -1);
+    }
+    set_subscription_type(SubscriptionType::SUBSCRIBE);
+    auto* subscription_data = FilterAPI::buildSubscriptionData(topic, subExpression);
+    rebalance_impl_->setSubscriptionData(topic, subscription_data);
+
+    message_queue_listener_.reset(new MessageQueueListenerImpl(shared_from_this()));
+    assigned_message_queue_->set_rebalance_impl(rebalance_impl_.get());
+
+    if (service_state_ == ServiceState::RUNNING) {
+      client_instance_->sendHeartbeatToAllBrokerWithLock();
+      updateTopicSubscribeInfoWhenSubscriptionChanged();
+    }
+  } catch (std::exception& e) {
+    THROW_MQEXCEPTION2(MQClientException, "subscribe exception", -1, std::make_exception_ptr(e));
+  }
+}
+
+void DefaultLitePullConsumerImpl::subscribe(const std::string& topic, const MessageSelector& selector) {
+  // TODO:
+}
+
+void DefaultLitePullConsumerImpl::unsubscribe(const std::string& topic) {
+  // TODO:
+}
+
+std::vector<SubscriptionData> DefaultLitePullConsumerImpl::subscriptions() const {
+  std::vector<SubscriptionData> result;
+  auto& subTable = rebalance_impl_->getSubscriptionInner();
+  for (const auto& it : subTable) {
+    result.push_back(*(it.second));
+  }
+  return result;
+}
+
+void DefaultLitePullConsumerImpl::updateTopicSubscribeInfo(const std::string& topic,
+                                                           std::vector<MQMessageQueue>& info) {
+  rebalance_impl_->setTopicSubscribeInfo(topic, info);
+}
+
+void DefaultLitePullConsumerImpl::doRebalance() {
+  if (rebalance_impl_ != nullptr) {
+    rebalance_impl_->doRebalance(false);
+  }
+}
+
+void DefaultLitePullConsumerImpl::updateAssignedMessageQueue(const std::string& topic,
+                                                             std::vector<MQMessageQueue>& assigned_message_queue) {
+  assigned_message_queue_->updateAssignedMessageQueue(topic, assigned_message_queue);
+}
+
+void DefaultLitePullConsumerImpl::updatePullTask(const std::string& topic, std::vector<MQMessageQueue>& mq_new_set) {
+  std::sort(mq_new_set.begin(), mq_new_set.end());
+  std::lock_guard<std::mutex> lock(task_table_mutex_);
+  for (auto it = task_table_.begin(); it != task_table_.end();) {
+    auto& mq = it->first;
+    if (mq.topic() == topic) {
+      // remove unnecessary PullTask
+      if (!std::binary_search(mq_new_set.begin(), mq_new_set.end(), mq)) {
+        it->second->set_cancelled(true);
+        it = task_table_.erase(it);
+        continue;
+      }
+    }
+    it++;
+  }
+  startPullTask(mq_new_set);
+}
+
+void DefaultLitePullConsumerImpl::startPullTask(std::vector<MQMessageQueue>& mq_set) {
+  for (const auto& mq : mq_set) {
+    // add new PullTask
+    if (task_table_.find(mq) == task_table_.end()) {
+      auto pull_task = std::make_shared<PullTaskImpl>(shared_from_this(), mq);
+      task_table_.emplace(mq, pull_task);
+      scheduled_thread_pool_executor_.submit(std::bind(&PullTaskImpl::run, pull_task));
+    }
+  }
+}
+
+int64_t DefaultLitePullConsumerImpl::nextPullOffset(const MQMessageQueue& message_queue) {
+  int64_t offset = -1;
+  int64_t seek_offset = assigned_message_queue_->getSeekOffset(message_queue);
+  if (seek_offset != -1) {
+    offset = seek_offset;
+    assigned_message_queue_->updateConsumeOffset(message_queue, offset);
+    assigned_message_queue_->setSeekOffset(message_queue, -1);
+  } else {
+    offset = assigned_message_queue_->getPullOffset(message_queue);
+    if (offset == -1) {
+      offset = fetchConsumeOffset(message_queue);
+    }
+  }
+  return offset;
+}
+
+int64_t DefaultLitePullConsumerImpl::fetchConsumeOffset(const MQMessageQueue& messageQueue) {
+  // checkServiceState();
+  return rebalance_impl_->computePullFromWhere(messageQueue);
+}
+
+PullResult* DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
+                                              SubscriptionData* subscription_data,
+                                              int64_t offset,
+                                              int max_nums) {
+  return pull(mq, subscription_data, offset, max_nums,
+              getDefaultLitePullConsumerConfig()->consumer_pull_timeout_millis());
+}
+
+PullResult* DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
+                                              SubscriptionData* subscription_data,
+                                              int64_t offset,
+                                              int max_nums,
+                                              long timeout) {
+  return pullSyncImpl(mq, subscription_data, offset, max_nums,
+                      getDefaultLitePullConsumerConfig()->long_polling_enable(), timeout);
+}
+
+PullResult* DefaultLitePullConsumerImpl::pullSyncImpl(const MQMessageQueue& mq,
+                                                      SubscriptionData* subscription_data,
+                                                      int64_t offset,
+                                                      int max_nums,
+                                                      bool block,
+                                                      long timeout) {
+  if (offset < 0) {
+    THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
+  }
+
+  if (max_nums <= 0) {
+    THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
+  }
+
+  int sysFlag = PullSysFlag::buildSysFlag(false, block, true, false, true);
+
+  long timeoutMillis = block ? getDefaultLitePullConsumerConfig()->consumer_timeout_millis_when_suspend() : timeout;
+
+  bool isTagType = ExpressionType::isTagType(subscription_data->expression_type());
+
+  std::unique_ptr<PullResult> pull_result(pull_api_wrapper_->pullKernelImpl(
+      mq,                                                                    // mq
+      subscription_data->sub_string(),                                       // subExpression
+      subscription_data->expression_type(),                                  // expressionType
+      isTagType ? 0L : subscription_data->sub_version(),                     // subVersion
+      offset,                                                                // offset
+      max_nums,                                                              // maxNums
+      sysFlag,                                                               // sysFlag
+      0,                                                                     // commitOffset
+      getDefaultLitePullConsumerConfig()->broker_suspend_max_time_millis(),  // brokerSuspendMaxTimeMillis
+      timeoutMillis,                                                         // timeoutMillis
+      CommunicationMode::SYNC,                                               // communicationMode
+      nullptr));                                                             // pullCallback
+
+  return pull_api_wrapper_->processPullResult(mq, std::move(pull_result), subscription_data);
+}
+
+void DefaultLitePullConsumerImpl::submitConsumeRequest(ConsumeRequest* consume_request) {
+  consume_request_cache_.push_back(consume_request);
+}
+
+void DefaultLitePullConsumerImpl::updatePullOffset(const MQMessageQueue& message_queue, int64_t next_pull_offset) {
+  if (assigned_message_queue_->getSeekOffset(message_queue) == -1) {
+    assigned_message_queue_->updatePullOffset(message_queue, next_pull_offset);
+  }
+}
+
+std::vector<MQMessageExt> DefaultLitePullConsumerImpl::poll() {
+  return poll(getDefaultLitePullConsumerConfig()->poll_timeout_millis());
+}
+
+std::vector<MQMessageExt> DefaultLitePullConsumerImpl::poll(long timeout) {
+  // checkServiceState();
+  if (auto_commit_) {
+    maybeAutoCommit();
+  }
+
+  int64_t endTime = UtilAll::currentTimeMillis() + timeout;
+
+  auto consume_request = consume_request_cache_.pop_front(timeout, time_unit::milliseconds);
+  if (endTime - UtilAll::currentTimeMillis() > 0) {
+    while (consume_request != nullptr && consume_request->process_queue()->dropped()) {
+      consume_request = consume_request_cache_.pop_front();
+      if (endTime - UtilAll::currentTimeMillis() <= 0) {
+        break;
+      }
+    }
+  }
+
+  if (consume_request != nullptr && !consume_request->process_queue()->dropped()) {
+    auto& messages = consume_request->message_exts();
+    long offset = consume_request->process_queue()->removeMessage(messages);
+    assigned_message_queue_->updateConsumeOffset(consume_request->message_queue(), offset);
+    // If namespace not null , reset Topic without namespace.
+    resetTopic(messages);
+    return MQMessageExt::from_list(messages);
+  }
+
+  return std::vector<MQMessageExt>();
+}
+
+void DefaultLitePullConsumerImpl::maybeAutoCommit() {
+  auto now = UtilAll::currentTimeMillis();
+  if (now >= next_auto_commit_deadline_) {
+    commitAll();
+    next_auto_commit_deadline_ = now + getDefaultLitePullConsumerConfig()->auto_commit_interval_millis();
+  }
+}
+
+void DefaultLitePullConsumerImpl::resetTopic(std::vector<MessageExtPtr>& msg_list) {
+  if (msg_list.empty()) {
+    return;
+  }
+
+  // If namespace not null , reset Topic without namespace.
+  const auto& name_space = getDefaultLitePullConsumerConfig()->name_space();
+  if (!name_space.empty()) {
+    for (auto& message_ext : msg_list) {
+      message_ext->set_topic(NamespaceUtil::withoutNamespace(message_ext->topic(), name_space));
+    }
+  }
+}
+
+void DefaultLitePullConsumerImpl::commitAll() {
+  try {
+    std::vector<MQMessageQueue> message_queues = assigned_message_queue_->messageQueues();
+    for (const auto& message_queue : message_queues) {
+      long consumer_offset = assigned_message_queue_->getConsumerOffset(message_queue);
+      if (consumer_offset != -1) {
+        auto process_queue = assigned_message_queue_->getProcessQueue(message_queue);
+        if (process_queue != nullptr && !process_queue->dropped()) {
+          updateConsumeOffset(message_queue, consumer_offset);
+        }
+      }
+    }
+    if (getDefaultLitePullConsumerConfig()->message_model() == MessageModel::BROADCASTING) {
+      offset_store_->persistAll(message_queues);
+    }
+  } catch (std::exception& e) {
+    LOG_ERROR_NEW("An error occurred when update consume offset Automatically.");
+  }
+}
+
+void DefaultLitePullConsumerImpl::updateConsumeOffset(const MQMessageQueue& mq, int64_t offset) {
+  // checkServiceState();
+  offset_store_->updateOffset(mq, offset, false);
+}
+
+void DefaultLitePullConsumerImpl::persistConsumerOffset() {
+  if (isServiceStateOk()) {
+    std::vector<MQMessageQueue> allocated_mqs;
+    if (subscription_type_ == SubscriptionType::SUBSCRIBE) {
+      allocated_mqs = rebalance_impl_->getAllocatedMQ();
+    } else if (subscription_type_ == SubscriptionType::ASSIGN) {
+      allocated_mqs = assigned_message_queue_->messageQueues();
+    }
+    offset_store_->persistAll(allocated_mqs);
+  }
+}
+
+std::vector<MQMessageQueue> DefaultLitePullConsumerImpl::fetchMessageQueues(const std::string& topic) {
+  std::vector<MQMessageQueue> result;
+  if (isServiceStateOk()) {
+    client_instance_->getMQAdminImpl()->fetchSubscribeMessageQueues(topic, result);
+    parseMessageQueues(result);
+  }
+  return result;
+}
+
+void DefaultLitePullConsumerImpl::parseMessageQueues(std::vector<MQMessageQueue>& queueSet) {
+  const auto& name_space = client_config_->name_space();
+  if (name_space.empty()) {
+    return;
+  }
+  for (auto& messageQueue : queueSet) {
+    auto user_topic = NamespaceUtil::withoutNamespace(messageQueue.topic(), name_space);
+    messageQueue.set_topic(user_topic);
+  }
+}
+
+void DefaultLitePullConsumerImpl::assign(const std::vector<MQMessageQueue>& messageQueues) {
+  // TODO:
+}
+
+void DefaultLitePullConsumerImpl::seek(const MQMessageQueue& messageQueue, int64_t offset) {
+  // TODO:
+}
+
+void DefaultLitePullConsumerImpl::seekToBegin(const MQMessageQueue& message_queue) {
+  auto begin = minOffset(message_queue);
+  seek(message_queue, begin);
+}
+
+void DefaultLitePullConsumerImpl::seekToEnd(const MQMessageQueue& message_queue) {
+  auto end = maxOffset(message_queue);
+  seek(message_queue, end);
+}
+
+int64_t DefaultLitePullConsumerImpl::offsetForTimestamp(const MQMessageQueue& message_queue, int64_t timestamp) {
+  return searchOffset(message_queue, timestamp);
+}
+
+void DefaultLitePullConsumerImpl::pause(const std::vector<MQMessageQueue>& message_queues) {
+  assigned_message_queue_->pause(message_queues);
+}
+
+void DefaultLitePullConsumerImpl::resume(const std::vector<MQMessageQueue>& message_queues) {
+  assigned_message_queue_->resume(message_queues);
+}
+
+void DefaultLitePullConsumerImpl::commitSync() {
+  commitAll();
+}
+
+int64_t DefaultLitePullConsumerImpl::committed(const MQMessageQueue& message_queue) {
+  // checkServiceState();
+  auto offset = offset_store_->readOffset(message_queue, ReadOffsetType::MEMORY_FIRST_THEN_STORE);
+  if (offset == -2) {
+    THROW_MQEXCEPTION(MQClientException, "Fetch consume offset from broker exception", -1);
+  }
+  return offset;
+}
+
+void DefaultLitePullConsumerImpl::registerTopicMessageQueueChangeListener(
+    const std::string& topic,
+    TopicMessageQueueChangeListener* topicMessageQueueChangeListener) {
+  std::lock_guard<std::mutex> lock(mutex_);  // synchronized
+  if (topic.empty() || nullptr == topicMessageQueueChangeListener) {
+    THROW_MQEXCEPTION(MQClientException, "Topic or listener is null", -1);
+  }
+  if (topic_message_queue_change_listener_map_.find(topic) != topic_message_queue_change_listener_map_.end()) {
+    LOG_WARN_NEW("Topic {} had been registered, new listener will overwrite the old one", topic);
+  }
+
+  topic_message_queue_change_listener_map_[topic] = topicMessageQueueChangeListener;
+  if (service_state_ == ServiceState::RUNNING) {
+    auto messageQueues = fetchMessageQueues(topic);
+    message_queues_for_topic_[topic] = std::move(messageQueues);
+  }
+}
+
+ConsumerRunningInfo* DefaultLitePullConsumerImpl::consumerRunningInfo() {
+  auto* info = new ConsumerRunningInfo();
+
+  info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(start_time_));
+
+  info->setSubscriptionSet(subscriptions());
+
+  auto processQueueTable = rebalance_impl_->getProcessQueueTable();
+  for (const auto& it : processQueueTable) {
+    const auto& mq = it.first;
+    const auto& pq = it.second;
+
+    ProcessQueueInfo pq_info;
+    pq_info.setCommitOffset(offset_store_->readOffset(mq, MEMORY_FIRST_THEN_STORE));
+    pq->fillProcessQueueInfo(pq_info);
+    info->setMqTable(mq, pq_info);
+  }
+
+  return info;
+}
+
+bool DefaultLitePullConsumerImpl::isAutoCommit() const {
+  return auto_commit_;
+}
+
+void DefaultLitePullConsumerImpl::setAutoCommit(bool auto_commit) {
+  auto_commit_ = auto_commit;
+}
+
+const std::string& DefaultLitePullConsumerImpl::groupName() const {
+  return client_config_->group_name();
+}
+
+MessageModel DefaultLitePullConsumerImpl::messageModel() const {
+  return getDefaultLitePullConsumerConfig()->message_model();
+};
+
+ConsumeType DefaultLitePullConsumerImpl::consumeType() const {
+  return CONSUME_ACTIVELY;
+}
+
+ConsumeFromWhere DefaultLitePullConsumerImpl::consumeFromWhere() const {
+  return getDefaultLitePullConsumerConfig()->consume_from_where();
+}
+
+void DefaultLitePullConsumerImpl::set_subscription_type(SubscriptionType subscription_type) {
+  if (subscription_type_ == SubscriptionType::NONE) {
+    subscription_type_ = subscription_type;
+  } else if (subscription_type_ != subscription_type) {
+    THROW_MQEXCEPTION(MQClientException, "Subscribe and assign are mutually exclusive.", -1);
+  }
+}
+
+}  // namespace rocketmq
diff --git a/src/consumer/DefaultLitePullConsumerImpl.h b/src/consumer/DefaultLitePullConsumerImpl.h
new file mode 100755
index 0000000..6923799
--- /dev/null
+++ b/src/consumer/DefaultLitePullConsumerImpl.h
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_DEFAULTLITEPULLCONSUMERIMPL_H_
+#define ROCKETMQ_DEFAULTLITEPULLCONSUMERIMPL_H_
+
+#include <memory>  // std::shared_ptr
+#include <mutex>   // std::mutex
+#include <string>  // std::string
+
+#include "concurrent/blocking_queue.hpp"
+#include "concurrent/executor.hpp"
+#include "DefaultLitePullConsumer.h"
+#include "MessageQueueListener.h"
+#include "MessageQueueLock.hpp"
+#include "MQClientImpl.h"
+#include "MQConsumerInner.h"
+#include "TopicMessageQueueChangeListener.h"
+
+namespace rocketmq {
+
+class AssignedMessageQueue;
+class OffsetStore;
+class PullAPIWrapper;
+class PullResult;
+class RebalanceImpl;
+
+class DefaultLitePullConsumerImpl;
+typedef std::shared_ptr<DefaultLitePullConsumerImpl> DefaultLitePullConsumerImplPtr;
+
+enum SubscriptionType { NONE, SUBSCRIBE, ASSIGN };
+
+class DefaultLitePullConsumerImpl : public std::enable_shared_from_this<DefaultLitePullConsumerImpl>,
+                                    public LitePullConsumer,
+                                    public MQClientImpl,
+                                    public MQConsumerInner {
+ private:
+  class MessageQueueListenerImpl;
+  class ConsumeRequest;
+  class PullTaskImpl;
+
+ public:
+  /**
+   * create() - Factory method for DefaultLitePullConsumerImpl, used to ensure that all objects of
+   * DefaultLitePullConsumerImpl are managed by std::share_ptr
+   */
+  static DefaultLitePullConsumerImplPtr create(DefaultLitePullConsumerConfigPtr config, RPCHookPtr rpcHook = nullptr) {
+    if (nullptr == rpcHook) {
+      return DefaultLitePullConsumerImplPtr(new DefaultLitePullConsumerImpl(config));
+    } else {
+      return DefaultLitePullConsumerImplPtr(new DefaultLitePullConsumerImpl(config, rpcHook));
+    }
+  }
+
+ private:
+  DefaultLitePullConsumerImpl(DefaultLitePullConsumerConfigPtr config);
+  DefaultLitePullConsumerImpl(DefaultLitePullConsumerConfigPtr config, RPCHookPtr rpcHook);
+
+ public:
+  virtual ~DefaultLitePullConsumerImpl();
+
+ public:  // LitePullConsumer
+  void start() override;
+  void shutdown() override;
+
+  bool isAutoCommit() const override;
+  void setAutoCommit(bool auto_commit) override;
+
+  void subscribe(const std::string& topic, const std::string& subExpression) override;
+  void subscribe(const std::string& topic, const MessageSelector& selector) override;
+
+  void unsubscribe(const std::string& topic) override;
+
+  std::vector<MQMessageExt> poll() override;
+  std::vector<MQMessageExt> poll(long timeout) override;
+
+  std::vector<MQMessageQueue> fetchMessageQueues(const std::string& topic) override;
+
+  void assign(const std::vector<MQMessageQueue>& messageQueues) override;
+
+  void seek(const MQMessageQueue& messageQueue, int64_t offset) override;
+  void seekToBegin(const MQMessageQueue& messageQueue) override;
+  void seekToEnd(const MQMessageQueue& messageQueue) override;
+
+  int64_t offsetForTimestamp(const MQMessageQueue& messageQueue, int64_t timestamp) override;
+
+  void pause(const std::vector<MQMessageQueue>& messageQueues) override;
+  void resume(const std::vector<MQMessageQueue>& messageQueues) override;
+
+  void commitSync() override;
+
+  int64_t committed(const MQMessageQueue& messageQueue) override;
+
+  void registerTopicMessageQueueChangeListener(
+      const std::string& topic,
+      TopicMessageQueueChangeListener* topicMessageQueueChangeListener) override;
+
+ public:  // MQConsumerInner
+  const std::string& groupName() const override;
+  MessageModel messageModel() const override;
+  ConsumeType consumeType() const override;
+  ConsumeFromWhere consumeFromWhere() const override;
+
+  std::vector<SubscriptionData> subscriptions() const override;
+
+  // service discovery
+  void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info) override;
+
+  // load balancing
+  void doRebalance() override;
+
+  // offset persistence
+  void persistConsumerOffset() override;
+
+  ConsumerRunningInfo* consumerRunningInfo() override;
+
+ private:
+  void checkConfig();
+  void startScheduleTask();
+  void operateAfterRunning();
+
+  void fetchTopicMessageQueuesAndComparePeriodically();
+  void fetchTopicMessageQueuesAndCompare();
+
+  bool isSetEqual(std::vector<MQMessageQueue>& newMessageQueues, std::vector<MQMessageQueue>& oldMessageQueues);
+
+  void updateTopicSubscribeInfoWhenSubscriptionChanged();
+
+  void updateAssignPullTask(std::vector<MQMessageQueue>& mqNewSet);
+
+  void updateAssignedMessageQueue(const std::string& topic, std::vector<MQMessageQueue>& assignedMessageQueue);
+  void updatePullTask(const std::string& topic, std::vector<MQMessageQueue>& mqNewSet);
+
+  void startPullTask(std::vector<MQMessageQueue>& mqSet);
+
+  int64_t nextPullOffset(const MQMessageQueue& messageQueue);
+  int64_t fetchConsumeOffset(const MQMessageQueue& messageQueue);
+
+  PullResult* pull(const MQMessageQueue& mq, SubscriptionData* subscription_data, int64_t offset, int max_nums);
+  PullResult* pull(const MQMessageQueue& mq,
+                   SubscriptionData* subscription_data,
+                   int64_t offset,
+                   int max_nums,
+                   long timeout);
+  PullResult* pullSyncImpl(const MQMessageQueue& mq,
+                           SubscriptionData* subscription_data,
+                           int64_t offset,
+                           int max_nums,
+                           bool block,
+                           long timeout);
+
+  void submitConsumeRequest(ConsumeRequest* consume_request);
+
+  void updatePullOffset(const MQMessageQueue& messageQueue, int64_t nextPullOffset);
+
+  void maybeAutoCommit();
+
+  void resetTopic(std::vector<MessageExtPtr>& msg_list);
+
+  void commitAll();
+
+  void updateConsumeOffset(const MQMessageQueue& mq, int64_t offset);
+
+  void parseMessageQueues(std::vector<MQMessageQueue>& queueSet);
+
+ public:
+  inline MessageQueueListener* getMessageQueueListener() const { return message_queue_listener_.get(); }
+
+  inline OffsetStore* getOffsetStore() const { return offset_store_.get(); }
+
+  inline DefaultLitePullConsumerConfig* getDefaultLitePullConsumerConfig() const {
+    return dynamic_cast<DefaultLitePullConsumerConfig*>(client_config_.get());
+  }
+
+ private:
+  void set_subscription_type(SubscriptionType subscription_type);
+
+ private:
+  std::mutex mutex_;
+
+  uint64_t start_time_;
+
+  SubscriptionType subscription_type_;
+
+  long consume_request_flow_control_times_;
+  long queue_flow_control_times_;
+
+  int64_t next_auto_commit_deadline_;
+
+  bool auto_commit_;
+
+  std::unique_ptr<MessageQueueListener> message_queue_listener_;
+
+  std::map<std::string, TopicMessageQueueChangeListener*> topic_message_queue_change_listener_map_;
+  std::map<std::string, std::vector<MQMessageQueue>> message_queues_for_topic_;
+
+  std::unique_ptr<AssignedMessageQueue> assigned_message_queue_;
+  MessageQueueLock message_queue_lock_;
+
+  std::map<MQMessageQueue, std::shared_ptr<PullTaskImpl>> task_table_;
+  std::mutex task_table_mutex_;
+
+  blocking_queue<ConsumeRequest> consume_request_cache_;
+
+  scheduled_thread_pool_executor scheduled_thread_pool_executor_;
+  scheduled_thread_pool_executor scheduled_executor_service_;
+
+  std::unique_ptr<RebalanceImpl> rebalance_impl_;
+  std::unique_ptr<PullAPIWrapper> pull_api_wrapper_;
+  std::unique_ptr<OffsetStore> offset_store_;
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_DEFAULTLITEPULLCONSUMERIMPL_H_
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
deleted file mode 100644
index 166ed69..0000000
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "DefaultMQPullConsumer.h"
-
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-DefaultMQPullConsumer::DefaultMQPullConsumer(const std::string& groupname)
-    : DefaultMQPullConsumer(groupname, nullptr) {}
-
-DefaultMQPullConsumer::DefaultMQPullConsumer(const std::string& groupname, RPCHookPtr rpcHook)
-    : DefaultMQPullConsumerConfigProxy(nullptr), m_pullConsumerDelegate(nullptr) {
-  // set default group name
-  if (groupname.empty()) {
-    set_group_name(DEFAULT_CONSUMER_GROUP);
-  } else {
-    set_group_name(groupname);
-  }
-
-  // TODO: DefaultMQPullConsumerImpl
-}
-
-DefaultMQPullConsumer::~DefaultMQPullConsumer() = default;
-
-void DefaultMQPullConsumer::start() {
-  m_pullConsumerDelegate->start();
-}
-
-void DefaultMQPullConsumer::shutdown() {
-  m_pullConsumerDelegate->shutdown();
-}
-
-bool DefaultMQPullConsumer::sendMessageBack(MessageExtPtr msg, int delayLevel) {
-  return m_pullConsumerDelegate->sendMessageBack(msg, delayLevel);
-}
-
-bool DefaultMQPullConsumer::sendMessageBack(MessageExtPtr msg, int delayLevel, const std::string& brokerName) {
-  return m_pullConsumerDelegate->sendMessageBack(msg, delayLevel, brokerName);
-}
-
-void DefaultMQPullConsumer::fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) {
-  m_pullConsumerDelegate->fetchMessageQueuesInBalance(topic, mqs);
-}
-
-void DefaultMQPullConsumer::registerMessageQueueListener(const std::string& topic, MQueueListener* listener) {
-  m_pullConsumerDelegate->registerMessageQueueListener(topic, listener);
-}
-
-PullResult DefaultMQPullConsumer::pull(const MQMessageQueue& mq,
-                                       const std::string& subExpression,
-                                       int64_t offset,
-                                       int maxNums) {
-  return m_pullConsumerDelegate->pull(mq, subExpression, offset, maxNums);
-}
-
-void DefaultMQPullConsumer::pull(const MQMessageQueue& mq,
-                                 const std::string& subExpression,
-                                 int64_t offset,
-                                 int maxNums,
-                                 PullCallback* pullCallback) {
-  m_pullConsumerDelegate->pull(mq, subExpression, offset, maxNums, pullCallback);
-}
-
-PullResult DefaultMQPullConsumer::pullBlockIfNotFound(const MQMessageQueue& mq,
-                                                      const std::string& subExpression,
-                                                      int64_t offset,
-                                                      int maxNums) {
-  return m_pullConsumerDelegate->pullBlockIfNotFound(mq, subExpression, offset, maxNums);
-}
-
-void DefaultMQPullConsumer::pullBlockIfNotFound(const MQMessageQueue& mq,
-                                                const std::string& subExpression,
-                                                int64_t offset,
-                                                int maxNums,
-                                                PullCallback* pullCallback) {
-  m_pullConsumerDelegate->pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback);
-}
-
-void DefaultMQPullConsumer::updateConsumeOffset(const MQMessageQueue& mq, int64_t offset) {
-  m_pullConsumerDelegate->updateConsumeOffset(mq, offset);
-}
-
-int64_t DefaultMQPullConsumer::fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore) {
-  return m_pullConsumerDelegate->fetchConsumeOffset(mq, fromStore);
-}
-
-void DefaultMQPullConsumer::fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue>& mqs) {
-  m_pullConsumerDelegate->fetchMessageQueuesInBalance(topic, mqs);
-}
-
-void DefaultMQPullConsumer::setRPCHook(RPCHookPtr rpcHook) {
-  // dynamic_cast<DefaultMQPullConsumerImpl*>(m_pullConsumerDelegate.get())->setRPCHook(rpcHook);
-}
-
-}  // namespace rocketmq
diff --git a/src/consumer/DefaultMQPullConsumerImpl.cpp b/src/consumer/DefaultMQPullConsumerImpl.cpp
deleted file mode 100644
index 78845cf..0000000
--- a/src/consumer/DefaultMQPullConsumerImpl.cpp
+++ /dev/null
@@ -1,375 +0,0 @@
-// /*
-//  * Licensed to the Apache Software Foundation (ASF) under one or more
-//  * contributor license agreements.  See the NOTICE file distributed with
-//  * this work for additional information regarding copyright ownership.
-//  * The ASF licenses this file to You under the Apache License, Version 2.0
-//  * (the "License"); you may not use this file except in compliance with
-//  * the License.  You may obtain a copy of the License at
-//  *
-//  *     http://www.apache.org/licenses/LICENSE-2.0
-//  *
-//  * Unless required by applicable law or agreed to in writing, software
-//  * distributed under the License is distributed on an "AS IS" BASIS,
-//  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//  * See the License for the specific language governing permissions and
-//  * limitations under the License.
-//  */
-// #include "DefaultMQPullConsumerImpl.h"
-
-// #ifndef WIN32
-// #include <signal.h>
-// #endif
-
-// #include "AllocateMQAveragely.h"
-// #include "CommunicationMode.h"
-// #include "FilterAPI.h"
-// #include "Logging.h"
-// #include "MQAdminImpl.h"
-// #include "MQClientAPIImpl.h"
-// #include "MQClientInstance.h"
-// #include "MQClientManager.h"
-// #include "MQProtos.h"
-// #include "OffsetStore.h"
-// #include "PullAPIWrapper.h"
-// #include "PullSysFlag.h"
-// #include "RebalanceImpl.h"
-// #include "Validators.h"
-
-// namespace rocketmq {
-
-// DefaultMQPullConsumer::DefaultMQPullConsumer(const string& groupname) : DefaultMQPullConsumer(groupname, nullptr) {}
-
-// DefaultMQPullConsumer::DefaultMQPullConsumer(const string& groupname, RPCHookPtr rpcHook)
-//     : MQClient(rpcHook),
-//       m_rebalanceImpl(new RebalancePullImpl(this)),
-//       m_pullAPIWrapper(nullptr),
-//       m_offsetStore(nullptr),
-//       m_messageQueueListener(nullptr) {
-//   // set default group name
-//   if (groupname.empty()) {
-//     setGroupName(DEFAULT_CONSUMER_GROUP);
-//   } else {
-//     setGroupName(groupname);
-//   }
-// }
-
-// DefaultMQPullConsumer::~DefaultMQPullConsumer() = default;
-
-// void DefaultMQPullConsumer::start() {
-// #ifndef WIN32
-//   /* Ignore the SIGPIPE */
-//   struct sigaction sa;
-//   memset(&sa, 0, sizeof(struct sigaction));
-//   sa.sa_handler = SIG_IGN;
-//   sa.sa_flags = 0;
-//   ::sigaction(SIGPIPE, &sa, 0);
-// #endif
-//   switch (m_serviceState) {
-//     case CREATE_JUST: {
-//       m_serviceState = START_FAILED;
-
-//       // data
-//       checkConfig();
-
-//       copySubscription();
-
-//       if (getMessageModel() == CLUSTERING) {
-//         changeInstanceNameToPID();
-//       }
-
-//       MQClient::start();
-//       LOG_INFO_NEW("DefaultMQPullConsumer:{} start", getGroupName());
-
-//       // reset rebalance;
-//       m_rebalanceImpl->setConsumerGroup(getGroupName());
-//       m_rebalanceImpl->setMessageModel(getMessageModel());
-//       m_rebalanceImpl->setAllocateMQStrategy(getAllocateMQStrategy());
-//       m_rebalanceImpl->setMQClientFactory(m_clientInstance.get());
-
-//       m_pullAPIWrapper.reset(new PullAPIWrapper(m_clientInstance.get(), getGroupName()));
-
-//       // msg model
-//       switch (getMessageModel()) {
-//         case BROADCASTING:
-//           m_offsetStore.reset(new LocalFileOffsetStore(m_clientInstance.get(), getGroupName()));
-//           break;
-//         case CLUSTERING:
-//           m_offsetStore.reset(new RemoteBrokerOffsetStore(m_clientInstance.get(), getGroupName()));
-//           break;
-//       }
-//       m_offsetStore->load();
-
-//       // register consumer
-//       bool registerOK = m_clientInstance->registerConsumer(getGroupName(), this);
-//       if (!registerOK) {
-//         m_serviceState = CREATE_JUST;
-//         THROW_MQEXCEPTION(
-//             MQClientException,
-//             "The cousumer group[" + getGroupName() + "] has been created before, specify another name please.", -1);
-//       }
-
-//       m_clientInstance->start();
-//       LOG_INFO_NEW("the consumer [{}] start OK", getGroupName());
-//       m_serviceState = RUNNING;
-//       break;
-//     }
-//     case RUNNING:
-//     case START_FAILED:
-//     case SHUTDOWN_ALREADY:
-//       break;
-//     default:
-//       break;
-//   }
-// }
-
-// void DefaultMQPullConsumer::shutdown() {
-//   switch (m_serviceState) {
-//     case RUNNING: {
-//       LOG_INFO("DefaultMQPullConsumer:%s shutdown", m_groupName.c_str());
-//       persistConsumerOffset();
-//       m_clientInstance->unregisterConsumer(getGroupName());
-//       m_clientInstance->shutdown();
-//       m_serviceState = SHUTDOWN_ALREADY;
-//       break;
-//     }
-//     case SHUTDOWN_ALREADY:
-//     case CREATE_JUST:
-//       break;
-//     default:
-//       break;
-//   }
-// }
-
-// bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
-//   return false;
-// }
-
-// void DefaultMQPullConsumer::fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) {
-//   mqs.clear();
-//   try {
-//     m_clientInstance->getMQAdminImpl()->fetchSubscribeMessageQueues(topic, mqs);
-//   } catch (MQException& e) {
-//     LOG_ERROR(e.what());
-//   }
-// }
-
-// void DefaultMQPullConsumer::updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info) {}
-
-// void DefaultMQPullConsumer::registerMessageQueueListener(const std::string& topic, MQueueListener* listener) {
-//   m_registerTopics.insert(topic);
-//   if (listener != nullptr) {
-//     m_messageQueueListener = listener;
-//   }
-// }
-
-// PullResult DefaultMQPullConsumer::pull(const MQMessageQueue& mq,
-//                                        const std::string& subExpression,
-//                                        int64_t offset,
-//                                        int maxNums) {
-//   return pullSyncImpl(mq, subExpression, offset, maxNums, false);
-// }
-
-// void DefaultMQPullConsumer::pull(const MQMessageQueue& mq,
-//                                  const std::string& subExpression,
-//                                  int64_t offset,
-//                                  int maxNums,
-//                                  PullCallback* pPullCallback) {
-//   pullAsyncImpl(mq, subExpression, offset, maxNums, false, pPullCallback);
-// }
-
-// PullResult DefaultMQPullConsumer::pullBlockIfNotFound(const MQMessageQueue& mq,
-//                                                       const std::string& subExpression,
-//                                                       int64_t offset,
-//                                                       int maxNums) {
-//   return pullSyncImpl(mq, subExpression, offset, maxNums, true);
-// }
-
-// void DefaultMQPullConsumer::pullBlockIfNotFound(const MQMessageQueue& mq,
-//                                                 const std::string& subExpression,
-//                                                 int64_t offset,
-//                                                 int maxNums,
-//                                                 PullCallback* pPullCallback) {
-//   pullAsyncImpl(mq, subExpression, offset, maxNums, true, pPullCallback);
-// }
-
-// PullResult DefaultMQPullConsumer::pullSyncImpl(const MQMessageQueue& mq,
-//                                                const std::string& subExpression,
-//                                                int64_t offset,
-//                                                int maxNums,
-//                                                bool block) {
-//   if (offset < 0)
-//     THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
-
-//   if (maxNums <= 0)
-//     THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
-
-//   // auto subscript, all sub
-//   subscriptionAutomatically(mq.getTopic());
-
-//   int sysFlag = PullSysFlag::buildSysFlag(false, block, true, false);
-
-//   // this sub
-//   std::unique_ptr<SubscriptionData> pSData(FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression));
-
-//   int timeoutMillis = block ? 1000 * 30 : 1000 * 10;
-
-//   try {
-//     std::unique_ptr<PullResult> pullResult(m_pullAPIWrapper->pullKernelImpl(mq,                      // 1
-//                                                                             pSData->getSubString(),  // 2
-//                                                                             0L,                      // 3
-//                                                                             offset,                  // 4
-//                                                                             maxNums,                 // 5
-//                                                                             sysFlag,                 // 6
-//                                                                             0,                       // 7
-//                                                                             1000 * 20,               // 8
-//                                                                             timeoutMillis,           // 9
-//                                                                             ComMode_SYNC,            // 10
-//                                                                             nullptr));               // callback
-//     assert(pullResult != nullptr);
-//     return m_pullAPIWrapper->processPullResult(mq, *pullResult, pSData.get());
-//   } catch (MQException& e) {
-//     LOG_ERROR(e.what());
-//   }
-//   return PullResult(BROKER_TIMEOUT);
-// }
-
-// void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
-//                                           const std::string& subExpression,
-//                                           int64_t offset,
-//                                           int maxNums,
-//                                           bool block,
-//                                           PullCallback* pPullCallback) {
-//   if (offset < 0)
-//     THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
-
-//   if (maxNums <= 0)
-//     THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
-
-//   if (!pPullCallback)
-//     THROW_MQEXCEPTION(MQClientException, "pPullCallback is null", -1);
-
-//   // auto subscript, all sub
-//   subscriptionAutomatically(mq.getTopic());
-
-//   int sysFlag = PullSysFlag::buildSysFlag(false, block, true, false);
-
-//   // this sub
-//   std::unique_ptr<SubscriptionData> pSData(FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression));
-
-//   int timeoutMillis = block ? 1000 * 30 : 1000 * 10;
-
-//   try {
-//     std::unique_ptr<PullResult> pullResult(m_pullAPIWrapper->pullKernelImpl(mq,                      // 1
-//                                                                             pSData->getSubString(),  // 2
-//                                                                             0L,                      // 3
-//                                                                             offset,                  // 4
-//                                                                             maxNums,                 // 5
-//                                                                             sysFlag,                 // 6
-//                                                                             0,                       // 7
-//                                                                             1000 * 20,               // 8
-//                                                                             timeoutMillis,           // 9
-//                                                                             ComMode_ASYNC,           // 10
-//                                                                             pPullCallback));
-//   } catch (MQException& e) {
-//     LOG_ERROR(e.what());
-//   }
-// }
-
-// void DefaultMQPullConsumer::subscriptionAutomatically(const std::string& topic) {
-//   SubscriptionData* pSdata = m_rebalanceImpl->getSubscriptionData(topic);
-//   if (pSdata == nullptr) {
-//     std::unique_ptr<SubscriptionData> subscriptionData(FilterAPI::buildSubscriptionData(topic, SUB_ALL));
-//     m_rebalanceImpl->setSubscriptionData(topic, subscriptionData.release());
-//   }
-// }
-
-// void DefaultMQPullConsumer::updateConsumeOffset(const MQMessageQueue& mq, int64_t offset) {
-//   m_offsetStore->updateOffset(mq, offset, false);
-// }
-
-// void DefaultMQPullConsumer::removeConsumeOffset(const MQMessageQueue& mq) {
-//   m_offsetStore->removeOffset(mq);
-// }
-
-// int64_t DefaultMQPullConsumer::fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore) {
-//   return m_offsetStore->readOffset(mq, fromStore ? READ_FROM_STORE : MEMORY_FIRST_THEN_STORE);
-// }
-
-// void DefaultMQPullConsumer::persistConsumerOffset() {
-//   /*As do not execute rebalance for pullConsumer now, requestTable is always
-//   empty
-//   map<MQMessageQueue, PullRequest*> requestTable =
-//   m_pRebalance->getPullRequestTable();
-//   map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();
-//   vector<MQMessageQueue> mqs;
-//   for (; it != requestTable.end(); ++it)
-//   {
-//       if (it->second)
-//       {
-//           mqs.push_back(it->first);
-//       }
-//   }
-//   m_pOffsetStore->persistAll(mqs);*/
-// }
-
-// void DefaultMQPullConsumer::persistConsumerOffset4PullConsumer(const MQMessageQueue& mq) {
-//   if (isServiceStateOk()) {
-//     m_offsetStore->persist(mq);
-//   }
-// }
-
-// void DefaultMQPullConsumer::fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue>& mqs)
-// {}
-
-// void DefaultMQPullConsumer::checkConfig() {
-//   string groupname = getGroupName();
-//   // check consumerGroup
-//   Validators::checkGroup(groupname);
-
-//   // consumerGroup
-//   if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
-//     THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal DEFAULT_CONSUMER", -1);
-//   }
-
-//   if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
-//     THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
-//   }
-// }
-
-// void DefaultMQPullConsumer::doRebalance() {}
-
-// void DefaultMQPullConsumer::copySubscription() {
-//   std::set<string>::iterator it = m_registerTopics.begin();
-//   for (; it != m_registerTopics.end(); ++it) {
-//     std::unique_ptr<SubscriptionData> subscriptionData(FilterAPI::buildSubscriptionData((*it), SUB_ALL));
-//     m_rebalanceImpl->setSubscriptionData((*it), subscriptionData.release());
-//   }
-// }
-
-// std::string DefaultMQPullConsumer::groupName() const {
-//   return getGroupName();
-// }
-
-// MessageModel DefaultMQPullConsumer::messageModel() const {
-//   return getMessageModel();
-// }
-
-// ConsumeType DefaultMQPullConsumer::consumeType() const {
-//   return CONSUME_ACTIVELY;
-// }
-
-// ConsumeFromWhere DefaultMQPullConsumer::consumeFromWhere() const {
-//   return CONSUME_FROM_LAST_OFFSET;
-// }
-
-// std::vector<SubscriptionData> DefaultMQPullConsumer::subscriptions() const {
-//   std::vector<SubscriptionData> result;
-//   std::set<string>::iterator it = m_registerTopics.begin();
-//   for (; it != m_registerTopics.end(); ++it) {
-//     SubscriptionData ms(*it, SUB_ALL);
-//     result.push_back(ms);
-//   }
-//   return result;
-// }
-
-// }  // namespace rocketmq
diff --git a/src/consumer/DefaultMQPullConsumerImpl.h b/src/consumer/DefaultMQPullConsumerImpl.h
deleted file mode 100755
index dc39eb4..0000000
--- a/src/consumer/DefaultMQPullConsumerImpl.h
+++ /dev/null
@@ -1,173 +0,0 @@
-// /*
-//  * Licensed to the Apache Software Foundation (ASF) under one or more
-//  * contributor license agreements.  See the NOTICE file distributed with
-//  * this work for additional information regarding copyright ownership.
-//  * The ASF licenses this file to You under the Apache License, Version 2.0
-//  * (the "License"); you may not use this file except in compliance with
-//  * the License.  You may obtain a copy of the License at
-//  *
-//  *     http://www.apache.org/licenses/LICENSE-2.0
-//  *
-//  * Unless required by applicable law or agreed to in writing, software
-//  * distributed under the License is distributed on an "AS IS" BASIS,
-//  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//  * See the License for the specific language governing permissions and
-//  * limitations under the License.
-//  */
-// #ifndef __DEFAULT_MQ_PULL_CONSUMER_IMPL_H__
-// #define __DEFAULT_MQ_PULL_CONSUMER_IMPL_H__
-
-// #include <set>
-// #include <string>
-
-// #include "AllocateMQStrategy.h"
-// #include "MQClientConfig.h"
-// #include "MQConsumer.h"
-// #include "MQueueListener.h"
-
-// namespace rocketmq {
-
-// class RebalanceImpl;
-// class SubscriptionData;
-// class OffsetStore;
-// class PullAPIWrapper;
-
-// class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQPullConsumer,
-//                                                  public MQClientImpl,
-//                                                  public DefaultMQPullConsumerConfig {
-//  public:
-//   DefaultMQPullConsumer(const std::string& groupname);
-//   DefaultMQPullConsumer(const std::string& groupname, RPCHookPtr rpcHook);
-//   virtual ~DefaultMQPullConsumer();
-
-//  public:  // MQClient
-//   void start() override;
-//   void shutdown() override;
-
-//  public:  // MQConsumer
-//   bool sendMessageBack(MQMessageExt& msg, int delayLevel) override;
-//   void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) override;
-
-//   std::string groupName() const override;
-//   MessageModel messageModel() const override;
-//   ConsumeType consumeType() const override;
-//   ConsumeFromWhere consumeFromWhere() const override;
-//   std::vector<SubscriptionData> subscriptions() const override;
-
-//   void doRebalance() override;
-//   void persistConsumerOffset() override;
-//   void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info) override;
-//   ConsumerRunningInfo* consumerRunningInfo() override { return nullptr; }
-
-//  public:  // MQPullConsumer
-//   void pull(const MQMessageQueue& mq,
-//             const std::string& subExpression,
-//             int64_t offset,
-//             int maxNums,
-//             PullCallback* pullCallback) override;
-
-//   /**
-//    * pull msg from specified queue, if no msg in queue, return directly
-//    *
-//    * @param mq
-//    *            specify the pulled queue
-//    * @param subExpression
-//    *            set filter expression for pulled msg, broker will filter msg actively
-//    *            Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
-//    *            if subExpression is setted to "null" or "*" all msg will be subscribed
-//    * @param offset
-//    *            specify the started pull offset
-//    * @param maxNums
-//    *            specify max msg num by per pull
-//    * @return
-//    *            accroding to PullResult
-//    */
-//   PullResult pull(const MQMessageQueue& mq, const std::string& subExpression, int64_t offset, int maxNums) override;
-
-//   virtual void updateConsumeOffset(const MQMessageQueue& mq, int64_t offset);
-//   virtual void removeConsumeOffset(const MQMessageQueue& mq);
-
-//   void registerMessageQueueListener(const std::string& topic, MQueueListener* pListener);
-
-//   /**
-//    * pull msg from specified queue, if no msg, broker will suspend the pull request 20s
-//    *
-//    * @param mq
-//    *            specify the pulled queue
-//    * @param subExpression
-//    *            set filter expression for pulled msg, broker will filter msg actively
-//    *            Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
-//    *            if subExpression is setted to "null" or "*" all msg will be subscribed
-//    * @param offset
-//    *            specify the started pull offset
-//    * @param maxNums
-//    *            specify max msg num by per pull
-//    * @return
-//    *            accroding to PullResult
-//    */
-//   PullResult pullBlockIfNotFound(const MQMessageQueue& mq,
-//                                  const std::string& subExpression,
-//                                  int64_t offset,
-//                                  int maxNums);
-
-//   void pullBlockIfNotFound(const MQMessageQueue& mq,
-//                            const std::string& subExpression,
-//                            int64_t offset,
-//                            int maxNums,
-//                            PullCallback* pullCallback);
-
-//   /**
-//    * Fetch the offset
-//    *
-//    * @param mq
-//    * @param fromStore
-//    * @return
-//    */
-//   int64_t fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore);
-
-//   /**
-//    * Fetch the message queues according to the topic
-//    *
-//    * @param topic Message Topic
-//    * @return
-//    */
-//   void fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue>& mqs);
-
-//   // temp persist consumer offset interface, only valid with
-//   // RemoteBrokerOffsetStore, updateConsumeOffset should be called before.
-//   void persistConsumerOffset4PullConsumer(const MQMessageQueue& mq);
-
-//  public:
-//   OffsetStore* getOffsetStore() const { return m_offsetStore.get(); }
-
-//  private:
-//   void checkConfig();
-//   void copySubscription();
-
-//   PullResult pullSyncImpl(const MQMessageQueue& mq,
-//                           const std::string& subExpression,
-//                           int64_t offset,
-//                           int maxNums,
-//                           bool block);
-
-//   void pullAsyncImpl(const MQMessageQueue& mq,
-//                      const std::string& subExpression,
-//                      int64_t offset,
-//                      int maxNums,
-//                      bool block,
-//                      PullCallback* pPullCallback);
-
-//   void subscriptionAutomatically(const std::string& topic);
-
-//  private:
-//   std::set<std::string> m_registerTopics;
-
-//   std::unique_ptr<RebalanceImpl> m_rebalanceImpl;
-//   std::unique_ptr<PullAPIWrapper> m_pullAPIWrapper;
-//   std::unique_ptr<OffsetStore> m_offsetStore;
-//   MQueueListener* m_messageQueueListener;
-// };
-
-// }  // namespace rocketmq
-
-// #endif  // __DEFAULT_MQ_PULL_CONSUMER_IMPL_H__
diff --git a/include/MQueueListener.h b/src/consumer/MessageQueueListener.h
similarity index 80%
rename from include/MQueueListener.h
rename to src/consumer/MessageQueueListener.h
index 11842aa..be65c3b 100644
--- a/include/MQueueListener.h
+++ b/src/consumer/MessageQueueListener.h
@@ -14,19 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __MQ_LISTENER_H__
-#define __MQ_LISTENER_H__
+#ifndef ROCKETMQ_MESSAGEQUEUELISTENER_H_
+#define ROCKETMQ_MESSAGEQUEUELISTENER_H_
 
-#include <string>
-#include <vector>
+#include <string>  // std::string
+#include <vector>  // std::vector
 
 #include "MQMessageQueue.h"
 
 namespace rocketmq {
 
-class ROCKETMQCLIENT_API MQueueListener {
+class MessageQueueListener {
  public:
-  virtual ~MQueueListener() = default;
+  virtual ~MessageQueueListener() = default;
 
   virtual void messageQueueChanged(const std::string& topic,
                                    std::vector<MQMessageQueue>& mqAll,
@@ -35,4 +35,4 @@ class ROCKETMQCLIENT_API MQueueListener {
 
 }  // namespace rocketmq
 
-#endif  // __MQ_LISTENER_H__
+#endif  // ROCKETMQ_MESSAGESELECTOR_H_
diff --git a/src/consumer/RebalanceImpl.cpp b/src/consumer/RebalanceImpl.cpp
index 1454584..ddb39e8 100644
--- a/src/consumer/RebalanceImpl.cpp
+++ b/src/consumer/RebalanceImpl.cpp
@@ -52,21 +52,21 @@ void RebalanceImpl::unlock(MQMessageQueue mq, const bool oneway) {
       ProcessQueuePtr processQueue = getProcessQueue(mq);
       if (processQueue) {
         processQueue->set_locked(false);
-        LOG_INFO("the message queue unlock OK, mq:%s", mq.toString().c_str());
+        LOG_INFO_NEW("the message queue unlock OK, mq:{}", mq.toString());
       } else {
-        LOG_ERROR("the message queue unlock Failed, mq:%s", mq.toString().c_str());
+        LOG_ERROR_NEW("the message queue unlock Failed, mq:{}", mq.toString());
       }
     } catch (MQException& e) {
-      LOG_ERROR("unlockBatchMQ exception, mq:%s", mq.toString().c_str());
+      LOG_ERROR_NEW("unlockBatchMQ exception, mq:{}", mq.toString());
     }
   } else {
-    LOG_WARN("unlock findBrokerAddressInSubscribe ret null for broker:%s", mq.broker_name().data());
+    LOG_WARN("unlock findBrokerAddressInSubscribe ret null for broker:{}", mq.broker_name());
   }
 }
 
 void RebalanceImpl::unlockAll(const bool oneway) {
   auto brokerMqs = buildProcessQueueTableByBrokerName();
-  LOG_INFO("unLockAll " SIZET_FMT " broker mqs", brokerMqs->size());
+  LOG_INFO_NEW("unLockAll {} broker mqs", brokerMqs->size());
 
   for (const auto& it : *brokerMqs) {
     const std::string& brokerName = it.first;
@@ -91,16 +91,16 @@ void RebalanceImpl::unlockAll(const bool oneway) {
           ProcessQueuePtr processQueue = getProcessQueue(mq);
           if (processQueue) {
             processQueue->set_locked(false);
-            LOG_INFO("the message queue unlock OK, mq:%s", mq.toString().c_str());
+            LOG_INFO_NEW("the message queue unlock OK, mq:{}", mq.toString());
           } else {
-            LOG_ERROR("the message queue unlock Failed, mq:%s", mq.toString().c_str());
+            LOG_ERROR_NEW("the message queue unlock Failed, mq:{}", mq.toString());
           }
         }
       } catch (MQException& e) {
-        LOG_ERROR("unlockBatchMQ exception");
+        LOG_ERROR_NEW("unlockBatchMQ exception");
       }
     } else {
-      LOG_ERROR("unlockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.data());
+      LOG_ERROR_NEW("unlockAll findBrokerAddressInSubscribe ret null for broker:{}", brokerName);
     }
   }
 }
@@ -129,7 +129,7 @@ bool RebalanceImpl::lock(MQMessageQueue mq) {
     lockBatchRequest->mq_set().push_back(mq);
 
     try {
-      LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
+      LOG_DEBUG_NEW("try to lock mq:{}", mq.toString());
 
       std::vector<MQMessageQueue> lockedMq;
       client_instance_->getMQClientAPIImpl()->lockBatchMQ(findBrokerResult->broker_addr(), lockBatchRequest.get(),
@@ -143,23 +143,23 @@ bool RebalanceImpl::lock(MQMessageQueue mq) {
             processQueue->set_locked(true);
             processQueue->set_last_lock_timestamp(UtilAll::currentTimeMillis());
             lockOK = true;
-            LOG_INFO("the message queue locked OK, mq:%s", mmqq.toString().c_str());
+            LOG_INFO_NEW("the message queue locked OK, mq:{}", mmqq.toString());
           } else {
-            LOG_WARN("the message queue locked OK, but it is released, mq:%s", mmqq.toString().c_str());
+            LOG_WARN_NEW("the message queue locked OK, but it is released, mq:{}", mmqq.toString());
           }
         }
 
         lockedMq.clear();
       } else {
-        LOG_ERROR("the message queue locked Failed, mq:%s", mq.toString().c_str());
+        LOG_ERROR_NEW("the message queue locked Failed, mq:{}", mq.toString());
       }
 
       return lockOK;
     } catch (MQException& e) {
-      LOG_ERROR("lockBatchMQ exception, mq:%s", mq.toString().c_str());
+      LOG_ERROR_NEW("lockBatchMQ exception, mq:{}", mq.toString());
     }
   } else {
-    LOG_ERROR("lock findBrokerAddressInSubscribe ret null for broker:%s", mq.broker_name().data());
+    LOG_ERROR_NEW("lock findBrokerAddressInSubscribe ret null for broker:{}", mq.broker_name());
   }
 
   return false;
@@ -167,7 +167,7 @@ bool RebalanceImpl::lock(MQMessageQueue mq) {
 
 void RebalanceImpl::lockAll() {
   auto brokerMqs = buildProcessQueueTableByBrokerName();
-  LOG_INFO("LockAll " SIZET_FMT " broker mqs", brokerMqs->size());
+  LOG_INFO_NEW("LockAll {} broker mqs", brokerMqs->size());
 
   for (const auto& it : *brokerMqs) {
     const std::string& brokerName = it.first;
@@ -185,7 +185,7 @@ void RebalanceImpl::lockAll() {
       lockBatchRequest->set_client_id(client_instance_->getClientId());
       lockBatchRequest->set_mq_set(mqs);
 
-      LOG_INFO("try to lock:" SIZET_FMT " mqs of broker:%s", mqs.size(), brokerName.c_str());
+      LOG_INFO_NEW("try to lock:{} mqs of broker:{}", mqs.size(), brokerName);
       try {
         std::vector<MQMessageQueue> lockOKMQVec;
         client_instance_->getMQClientAPIImpl()->lockBatchMQ(findBrokerResult->broker_addr(), lockBatchRequest.get(),
@@ -199,9 +199,9 @@ void RebalanceImpl::lockAll() {
           if (processQueue) {
             processQueue->set_locked(true);
             processQueue->set_last_lock_timestamp(UtilAll::currentTimeMillis());
-            LOG_INFO("the message queue locked OK, mq:%s", mq.toString().c_str());
+            LOG_INFO_NEW("the message queue locked OK, mq:{}", mq.toString());
           } else {
-            LOG_WARN("the message queue locked OK, but it is released, mq:%s", mq.toString().c_str());
+            LOG_WARN_NEW("the message queue locked OK, but it is released, mq:{}", mq.toString());
           }
         }
 
@@ -209,29 +209,29 @@ void RebalanceImpl::lockAll() {
           if (lockOKMQSet.find(mq) == lockOKMQSet.end()) {
             ProcessQueuePtr processQueue = getProcessQueue(mq);
             if (processQueue) {
-              LOG_WARN("the message queue locked Failed, mq:%s", mq.toString().c_str());
+              LOG_WARN_NEW("the message queue locked Failed, mq:{}", mq.toString());
               processQueue->set_locked(false);
             }
           }
         }
       } catch (MQException& e) {
-        LOG_ERROR("lockBatchMQ fails");
+        LOG_ERROR_NEW("lockBatchMQ fails");
       }
     } else {
-      LOG_ERROR("lockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.c_str());
+      LOG_ERROR_NEW("lockAll findBrokerAddressInSubscribe ret null for broker:{}", brokerName);
     }
   }
 }
 
 void RebalanceImpl::doRebalance(const bool isOrder) {
-  LOG_DEBUG("start doRebalance");
+  LOG_DEBUG_NEW("start doRebalance");
   for (const auto& it : subscription_inner_) {
     const std::string& topic = it.first;
-    LOG_INFO("current topic is:%s", topic.c_str());
+    LOG_INFO_NEW("current topic is:{}", topic);
     try {
       rebalanceByTopic(topic, isOrder);
     } catch (MQException& e) {
-      LOG_ERROR(e.what());
+      LOG_ERROR_NEW("{}", e.what());
     }
   }
 
@@ -249,14 +249,14 @@ void RebalanceImpl::rebalanceByTopic(const std::string& topic, const bool isOrde
           messageQueueChanged(topic, mqSet, mqSet);
         }
       } else {
-        LOG_WARN("doRebalance, %s, but the topic[%s] not exist.", consumer_group_.c_str(), topic.c_str());
+        LOG_WARN_NEW("doRebalance, {}, but the topic[{}] not exist.", consumer_group_, topic);
       }
     } break;
     case CLUSTERING: {
       std::vector<MQMessageQueue> mqAll;
       if (!getTopicSubscribeInfo(topic, mqAll)) {
         if (!UtilAll::isRetryTopic(topic)) {
-          LOG_WARN("doRebalance, %s, but the topic[%s] not exist.", consumer_group_.c_str(), topic.c_str());
+          LOG_WARN_NEW("doRebalance, {}, but the topic[{}] not exist.", consumer_group_, topic);
         }
         return;
       }
@@ -265,13 +265,13 @@ void RebalanceImpl::rebalanceByTopic(const std::string& topic, const bool isOrde
       client_instance_->findConsumerIds(topic, consumer_group_, cidAll);
 
       if (cidAll.empty()) {
-        LOG_WARN("doRebalance, %s %s, get consumer id list failed", consumer_group_.c_str(), topic.c_str());
+        LOG_WARN_NEW("doRebalance, {} {}, get consumer id list failed", consumer_group_, topic);
         return;
       }
 
       // log
       for (auto& cid : cidAll) {
-        LOG_INFO("client id:%s of topic:%s", cid.c_str(), topic.c_str());
+        LOG_INFO_NEW("client id:{} of topic:{}", cid, topic);
       }
 
       // sort
@@ -283,19 +283,20 @@ void RebalanceImpl::rebalanceByTopic(const std::string& topic, const bool isOrde
       try {
         allocate_mq_strategy_->allocate(client_instance_->getClientId(), mqAll, cidAll, allocateResult);
       } catch (MQException& e) {
-        LOG_ERROR("AllocateMessageQueueStrategy.allocate Exception: %s", e.what());
+        LOG_ERROR_NEW("AllocateMessageQueueStrategy.allocate Exception: {}", e.what());
         return;
       }
 
       // update local
       bool changed = updateProcessQueueTableInRebalance(topic, allocateResult, isOrder);
       if (changed) {
-        LOG_INFO("rebalanced result changed. group=%s, topic=%s, clientId=%s, mqAllSize=" SIZET_FMT
-                 ", cidAllSize=" SIZET_FMT ", rebalanceResultSize=" SIZET_FMT ", rebalanceResultSet:",
-                 consumer_group_.c_str(), topic.c_str(), client_instance_->getClientId().c_str(), mqAll.size(),
-                 cidAll.size(), allocateResult.size());
+        LOG_INFO_NEW(
+            "rebalanced result changed. group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, "
+            "rebalanceResultSize={}, rebalanceResultSet:",
+            consumer_group_, topic, client_instance_->getClientId(), mqAll.size(), cidAll.size(),
+            allocateResult.size());
         for (auto& mq : allocateResult) {
-          LOG_INFO("allocate mq:%s", mq.toString().c_str());
+          LOG_INFO_NEW("allocate mq:{}", mq.toString());
         }
         messageQueueChanged(topic, mqAll, allocateResult);
       }
@@ -313,8 +314,8 @@ void RebalanceImpl::truncateMessageQueueNotMyTopic() {
       auto pq = removeProcessQueueDirectly(mq);
       if (pq != nullptr) {
         pq->set_dropped(true);
-        LOG_INFO("doRebalance, %s, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumer_group_.c_str(),
-                 mq.toString().c_str());
+        LOG_INFO_NEW("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumer_group_,
+                     mq.toString());
       }
     }
   }
@@ -339,7 +340,7 @@ bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic,
         if (removeUnnecessaryMessageQueue(mq, pq)) {
           removeProcessQueueDirectly(mq);
           changed = true;
-          LOG_INFO("doRebalance, %s, remove unnecessary mq, %s", consumer_group_.c_str(), mq.toString().c_str());
+          LOG_INFO_NEW("doRebalance, {}, remove unnecessary mq, {}", consumer_group_, mq.toString());
         }
       } else if (pq->isPullExpired()) {
         switch (consumeType()) {
@@ -350,8 +351,9 @@ bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic,
             if (removeUnnecessaryMessageQueue(mq, pq)) {
               removeProcessQueueDirectly(mq);
               changed = true;
-              LOG_ERROR("[BUG]doRebalance, %s, remove unnecessary mq, %s, because pull is pause, so try to fixed it",
-                        consumer_group_.c_str(), mq.toString().c_str());
+              LOG_ERROR_NEW(
+                  "[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+                  consumer_group_, mq.toString());
             }
             break;
           default:
@@ -367,8 +369,7 @@ bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic,
     ProcessQueuePtr pq = getProcessQueue(mq);
     if (nullptr == pq) {
       if (isOrder && !lock(mq)) {
-        LOG_WARN("doRebalance, %s, add a new mq failed, %s, because lock failed", consumer_group_.c_str(),
-                 mq.toString().c_str());
+        LOG_WARN_NEW("doRebalance, {}, add a new mq failed, {}, because lock failed", consumer_group_, mq.toString());
         continue;
       }
 
@@ -378,9 +379,9 @@ bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic,
       if (nextOffset >= 0) {
         auto pre = putProcessQueueIfAbsent(mq, pq);
         if (pre) {
-          LOG_INFO("doRebalance, %s, mq already exists, %s", consumer_group_.c_str(), mq.toString().c_str());
+          LOG_INFO_NEW("doRebalance, {}, mq already exists, {}", consumer_group_, mq.toString());
         } else {
-          LOG_INFO("doRebalance, %s, add a new mq, %s", consumer_group_.c_str(), mq.toString().c_str());
+          LOG_INFO_NEW("doRebalance, {}, add a new mq, {}", consumer_group_, mq.toString());
           PullRequestPtr pullRequest(new PullRequest());
           pullRequest->set_consumer_group(consumer_group_);
           pullRequest->set_next_offset(nextOffset);
@@ -390,14 +391,14 @@ bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic,
           changed = true;
         }
       } else {
-        LOG_WARN("doRebalance, %s, add new mq failed, %s", consumer_group_.c_str(), mq.toString().c_str());
+        LOG_WARN_NEW("doRebalance, {}, add new mq failed, {}", consumer_group_, mq.toString());
       }
     }
   }
 
   dispatchPullRequest(pullRequestList);
 
-  LOG_DEBUG("updateRequestTableInRebalance exit");
+  LOG_DEBUG_NEW("updateRequestTableInRebalance exit");
   return changed;
 }
 
@@ -411,8 +412,8 @@ void RebalanceImpl::removeProcessQueue(const MQMessageQueue& mq) {
     bool dropped = prev->dropped();
     prev->set_dropped(true);
     removeUnnecessaryMessageQueue(mq, prev);
-    LOG_INFO("Fix Offset, {}, remove unnecessary mq, {} Dropped: {}", consumer_group_, mq.toString(),
-             UtilAll::to_string(dropped));
+    LOG_INFO_NEW("Fix Offset, {}, remove unnecessary mq, {} Dropped: {}", consumer_group_, mq.toString(),
+                 UtilAll::to_string(dropped));
   }
 }
 
@@ -514,7 +515,7 @@ void RebalanceImpl::setTopicSubscribeInfo(const std::string& topic, std::vector<
 
   // log
   for (const auto& mq : mqs) {
-    LOG_DEBUG("topic [%s] has :%s", topic.c_str(), mq.toString().c_str());
+    LOG_DEBUG_NEW("topic [{}] has :{}", topic, mq.toString());
   }
 }
 
diff --git a/src/consumer/RebalanceImpl.h b/src/consumer/RebalanceImpl.h
index 0ef1a9a..194fd48 100755
--- a/src/consumer/RebalanceImpl.h
+++ b/src/consumer/RebalanceImpl.h
@@ -102,7 +102,7 @@ class RebalanceImpl {
   TOPIC2MQS topic_subscribe_info_table_;
   std::mutex topic_subscribe_info_table_mutex_;
 
-  TOPIC2SD subscription_inner_;  // don't modify subscription_inner_ after consumer start.
+  TOPIC2SD subscription_inner_;  // don't modify subscription_inner_ after the consumer started.
 
   std::string consumer_group_;
   MessageModel message_model_;
diff --git a/src/consumer/RebalanceLitePullImpl.cpp b/src/consumer/RebalanceLitePullImpl.cpp
new file mode 100644
index 0000000..4956d14
--- /dev/null
+++ b/src/consumer/RebalanceLitePullImpl.cpp
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RebalanceLitePullImpl.h"
+
+#include "OffsetStore.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+RebalanceLitePullImpl::RebalanceLitePullImpl(DefaultLitePullConsumerImpl* consumerImpl)
+    : RebalanceImpl(null, CLUSTERING, nullptr, nullptr), lite_pull_consumer_impl_(consumerImpl) {}
+
+bool RebalanceLitePullImpl::removeUnnecessaryMessageQueue(const MQMessageQueue& mq, ProcessQueuePtr pq) {
+  lite_pull_consumer_impl_->getOffsetStore()->persist(mq);
+  lite_pull_consumer_impl_->getOffsetStore()->removeOffset(mq);
+  return true;
+}
+
+void RebalanceLitePullImpl::removeDirtyOffset(const MQMessageQueue& mq) {
+  lite_pull_consumer_impl_->getOffsetStore()->removeOffset(mq);
+}
+
+int64_t RebalanceLitePullImpl::computePullFromWhere(const MQMessageQueue& mq) {
+  int64_t result = -1;
+  ConsumeFromWhere consumeFromWhere =
+      lite_pull_consumer_impl_->getDefaultLitePullConsumerConfig()->consume_from_where();
+  OffsetStore* offsetStore = lite_pull_consumer_impl_->getOffsetStore();
+  switch (consumeFromWhere) {
+    default:
+    case CONSUME_FROM_LAST_OFFSET: {
+      long lastOffset = offsetStore->readOffset(mq, ReadOffsetType::MEMORY_FIRST_THEN_STORE);
+      if (lastOffset >= 0) {
+        result = lastOffset;
+      } else if (-1 == lastOffset) {
+        if (UtilAll::isRetryTopic(mq.topic())) {  // First start, no offset
+          result = 0;
+        } else {
+          try {
+            result = lite_pull_consumer_impl_->maxOffset(mq);
+          } catch (MQClientException& e) {
+            result = -1;
+          }
+        }
+      } else {
+        result = -1;
+      }
+      break;
+    }
+    case CONSUME_FROM_FIRST_OFFSET: {
+      long lastOffset = offsetStore->readOffset(mq, ReadOffsetType::MEMORY_FIRST_THEN_STORE);
+      if (lastOffset >= 0) {
+        result = lastOffset;
+      } else if (-1 == lastOffset) {
+        result = 0L;
+      } else {
+        result = -1;
+      }
+      break;
+    }
+    case CONSUME_FROM_TIMESTAMP: {
+      long lastOffset = offsetStore->readOffset(mq, ReadOffsetType::MEMORY_FIRST_THEN_STORE);
+      if (lastOffset >= 0) {
+        result = lastOffset;
+      } else if (-1 == lastOffset) {
+        if (UtilAll::isRetryTopic(mq.topic())) {
+          try {
+            result = lite_pull_consumer_impl_->maxOffset(mq);
+          } catch (MQClientException& e) {
+            result = -1;
+          }
+        } else {
+          try {
+            // FIXME: parseDate by YYYYMMDDHHMMSS
+            auto timestamp =
+                std::stoull(lite_pull_consumer_impl_->getDefaultLitePullConsumerConfig()->consume_timestamp());
+            result = lite_pull_consumer_impl_->searchOffset(mq, timestamp);
+          } catch (MQClientException& e) {
+            result = -1;
+          }
+        }
+      } else {
+        result = -1;
+      }
+      break;
+    }
+  }
+  return result;
+}
+
+void RebalanceLitePullImpl::dispatchPullRequest(const std::vector<PullRequestPtr>& pullRequestList) {}
+
+void RebalanceLitePullImpl::messageQueueChanged(const std::string& topic,
+                                                std::vector<MQMessageQueue>& mqAll,
+                                                std::vector<MQMessageQueue>& mqDivided) {
+  auto* messageQueueListener = lite_pull_consumer_impl_->getMessageQueueListener();
+  if (messageQueueListener != nullptr) {
+    try {
+      messageQueueListener->messageQueueChanged(topic, mqAll, mqDivided);
+    } catch (std::exception& e) {
+      LOG_ERROR_NEW("messageQueueChanged exception {}", e.what());
+    }
+  }
+}
+
+}  // namespace rocketmq
diff --git a/src/consumer/RebalancePullImpl.h b/src/consumer/RebalanceLitePullImpl.h
similarity index 82%
rename from src/consumer/RebalancePullImpl.h
rename to src/consumer/RebalanceLitePullImpl.h
index d4f106c..56b6189 100755
--- a/src/consumer/RebalancePullImpl.h
+++ b/src/consumer/RebalanceLitePullImpl.h
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __REBALANCE_PULL_IMPL_H__
-#define __REBALANCE_PULL_IMPL_H__
+#ifndef ROCKETMQ_CONSUMER_REBALANCELITEPULLIMPL_H_
+#define ROCKETMQ_CONSUMER_REBALANCELITEPULLIMPL_H_
 
-#include "DefaultMQPullConsumer.h"
+#include "DefaultLitePullConsumerImpl.h"
 #include "RebalanceImpl.h"
 
 namespace rocketmq {
@@ -27,9 +27,9 @@ typedef std::map<std::string, std::vector<MQMessageQueue>> TOPIC2MQS;
 typedef std::map<std::string, SubscriptionData*> TOPIC2SD;
 typedef std::map<std::string, std::vector<MQMessageQueue>> BROKER2MQS;
 
-class RebalancePullImpl : public RebalanceImpl {
+class RebalanceLitePullImpl : public RebalanceImpl {
  public:
-  RebalancePullImpl(DefaultMQPullConsumer* consumer);
+  RebalanceLitePullImpl(DefaultLitePullConsumerImpl* consumerImpl);
 
   ConsumeType consumeType() override final { return CONSUME_ACTIVELY; }
 
@@ -46,9 +46,9 @@ class RebalancePullImpl : public RebalanceImpl {
                            std::vector<MQMessageQueue>& mqDivided) override;
 
  private:
-  DefaultMQPullConsumer* m_defaultMQPullConsumer;
+  DefaultLitePullConsumerImpl* lite_pull_consumer_impl_;
 };
 
 }  // namespace rocketmq
 
-#endif  // __REBALANCE_PULL_IMPL_H__
+#endif  // ROCKETMQ_CONSUMER_REBALANCELITEPULLIMPL_H_
diff --git a/src/consumer/RebalancePullImpl.cpp b/src/consumer/RebalancePullImpl.cpp
deleted file mode 100644
index 15184ea..0000000
--- a/src/consumer/RebalancePullImpl.cpp
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "RebalancePullImpl.h"
-
-#include "OffsetStore.h"
-
-namespace rocketmq {
-
-
-RebalancePullImpl::RebalancePullImpl(DefaultMQPullConsumer* consumer)
-    : RebalanceImpl("", CLUSTERING, nullptr, nullptr), m_defaultMQPullConsumer(consumer) {}
-
-bool RebalancePullImpl::removeUnnecessaryMessageQueue(const MQMessageQueue& mq, ProcessQueuePtr pq) {
-  // m_defaultMQPullConsumer->getOffsetStore()->persist(mq);
-  // m_defaultMQPullConsumer->getOffsetStore()->removeOffset(mq);
-  // return true;
-  return false;
-}
-
-void RebalancePullImpl::removeDirtyOffset(const MQMessageQueue& mq) {
-  // m_defaultMQPullConsumer->removeConsumeOffset(mq);
-}
-
-int64_t RebalancePullImpl::computePullFromWhere(const MQMessageQueue& mq) {
-  return 0;
-}
-
-void RebalancePullImpl::dispatchPullRequest(const std::vector<PullRequestPtr>& pullRequestList) {}
-
-void RebalancePullImpl::messageQueueChanged(const std::string& topic,
-                                            std::vector<MQMessageQueue>& mqAll,
-                                            std::vector<MQMessageQueue>& mqDivided) {}
-
-}  // namespace rocketmq
diff --git a/src/extern/CPullConsumer.cpp b/src/extern/CPullConsumer.cpp
index b4ed9ac..f3d5085 100644
--- a/src/extern/CPullConsumer.cpp
+++ b/src/extern/CPullConsumer.cpp
@@ -1,252 +1,252 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "c/CPullConsumer.h"
-
-#include <cstring>
-
-#include "CErrorContainer.h"
-#include "ClientRPCHook.h"
-#include "DefaultMQPullConsumer.h"
-#include "Logging.h"
-
-using namespace rocketmq;
-
-CPullConsumer* CreatePullConsumer(const char* groupId) {
-  if (groupId == NULL) {
-    return NULL;
-  }
-  auto* defaultMQPullConsumer = new DefaultMQPullConsumer(groupId);
-  return reinterpret_cast<CPullConsumer*>(defaultMQPullConsumer);
-}
-
-int DestroyPullConsumer(CPullConsumer* consumer) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  delete reinterpret_cast<DefaultMQPullConsumer*>(consumer);
-  return OK;
-}
-
-int StartPullConsumer(CPullConsumer* consumer) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  try {
-    reinterpret_cast<DefaultMQPullConsumer*>(consumer)->start();
-  } catch (std::exception& e) {
-    CErrorContainer::setErrorMessage(e.what());
-    return PULLCONSUMER_START_FAILED;
-  }
-  return OK;
-}
-
-int ShutdownPullConsumer(CPullConsumer* consumer) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  reinterpret_cast<DefaultMQPullConsumer*>(consumer)->shutdown();
-  return OK;
-}
-
-int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId) {
-  if (consumer == NULL || groupId == NULL) {
-    return NULL_POINTER;
-  }
-  reinterpret_cast<DefaultMQPullConsumer*>(consumer)->set_group_name(groupId);
-  return OK;
-}
-
-const char* GetPullConsumerGroupID(CPullConsumer* consumer) {
-  if (consumer == NULL) {
-    return NULL;
-  }
-  return reinterpret_cast<DefaultMQPullConsumer*>(consumer)->group_name().c_str();
-}
-
-int SetPullConsumerNameServerAddress(CPullConsumer* consumer, const char* namesrv) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  reinterpret_cast<DefaultMQPullConsumer*>(consumer)->set_namesrv_addr(namesrv);
-  return OK;
-}
-
-// Deprecated
-int SetPullConsumerNameServerDomain(CPullConsumer* consumer, const char* domain) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  return NOT_SUPPORT_NOW;
-}
-
-int SetPullConsumerSessionCredentials(CPullConsumer* consumer,
-                                      const char* accessKey,
-                                      const char* secretKey,
-                                      const char* channel) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  auto rpcHook = std::make_shared<ClientRPCHook>(SessionCredentials(accessKey, secretKey, channel));
-  reinterpret_cast<DefaultMQPullConsumer*>(consumer)->setRPCHook(rpcHook);
-  return OK;
-}
-
-int SetPullConsumerLogPath(CPullConsumer* consumer, const char* logPath) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  // Todo, This api should be implemented by core api.
-  //((DefaultMQPullConsumer *) consumer)->setInstanceName(instanceName);
-  return OK;
-}
-
-int SetPullConsumerLogFileNumAndSize(CPullConsumer* consumer, int fileNum, long fileSize) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  DEFAULT_LOG_ADAPTER->setLogFileNumAndSize(fileNum, fileSize);
-  return OK;
-}
-
-int SetPullConsumerLogLevel(CPullConsumer* consumer, CLogLevel level) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  DEFAULT_LOG_ADAPTER->set_log_level((LogLevel)level);
-  return OK;
-}
-
-int FetchSubscriptionMessageQueues(CPullConsumer* consumer, const char* topic, CMessageQueue** mqs, int* size) {
-  if (consumer == NULL) {
-    return NULL_POINTER;
-  }
-  unsigned int index = 0;
-  CMessageQueue* temMQ = NULL;
-  std::vector<MQMessageQueue> fullMQ;
-  try {
-    reinterpret_cast<DefaultMQPullConsumer*>(consumer)->fetchSubscribeMessageQueues(topic, fullMQ);
-    *size = fullMQ.size();
-    // Alloc memory to save the pointer to CPP MessageQueue, and the MessageQueues may be changed.
-    // Thus, this memory should be released by users using @ReleaseSubscribeMessageQueue every time.
-    temMQ = (CMessageQueue*)malloc(*size * sizeof(CMessageQueue));
-    if (temMQ == NULL) {
-      *size = 0;
-      *mqs = NULL;
-      return MALLOC_FAILED;
-    }
-    auto iter = fullMQ.begin();
-    for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); ++iter, index++) {
-      strncpy(temMQ[index].topic, iter->topic().c_str(), MAX_TOPIC_LENGTH - 1);
-      strncpy(temMQ[index].brokerName, iter->broker_name().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1);
-      temMQ[index].queueId = iter->queue_id();
-    }
-    *mqs = temMQ;
-  } catch (MQException& e) {
-    *size = 0;
-    *mqs = NULL;
-    CErrorContainer::setErrorMessage(e.what());
-    return PULLCONSUMER_FETCH_MQ_FAILED;
-  }
-  return OK;
-}
-
-int ReleaseSubscriptionMessageQueue(CMessageQueue* mqs) {
-  if (mqs == NULL) {
-    return NULL_POINTER;
-  }
-  free((void*)mqs);
-  mqs = NULL;
-  return OK;
-}
-
-CPullResult Pull(CPullConsumer* consumer,
-                 const CMessageQueue* mq,
-                 const char* subExpression,
-                 long long offset,
-                 int maxNums) {
-  CPullResult pullResult;
-  memset(&pullResult, 0, sizeof(CPullResult));
-  MQMessageQueue messageQueue(mq->topic, mq->brokerName, mq->queueId);
-  PullResult cppPullResult;
-  try {
-    cppPullResult =
-        reinterpret_cast<DefaultMQPullConsumer*>(consumer)->pull(messageQueue, subExpression, offset, maxNums);
-  } catch (std::exception& e) {
-    CErrorContainer::setErrorMessage(e.what());
-    cppPullResult.set_pull_status(BROKER_TIMEOUT);
-  }
-
-  if (cppPullResult.pull_status() != BROKER_TIMEOUT) {
-    pullResult.maxOffset = cppPullResult.max_offset();
-    pullResult.minOffset = cppPullResult.min_offset();
-    pullResult.nextBeginOffset = cppPullResult.next_begin_offset();
-  }
-
-  switch (cppPullResult.pull_status()) {
-    case FOUND: {
-      pullResult.pullStatus = E_FOUND;
-      pullResult.size = cppPullResult.msg_found_list().size();
-      PullResult* tmpPullResult = new PullResult(cppPullResult);
-      pullResult.pData = tmpPullResult;
-      // Alloc memory to save the pointer to CPP MQMessageExt, which will be release by the CPP SDK core.
-      // Thus, this memory should be released by users using @ReleasePullResult
-      pullResult.msgFoundList = (CMessageExt**)malloc(pullResult.size * sizeof(CMessageExt*));
-      for (size_t i = 0; i < cppPullResult.msg_found_list().size(); i++) {
-        auto msg = tmpPullResult->msg_found_list()[i];
-        pullResult.msgFoundList[i] = reinterpret_cast<CMessageExt*>(&msg);
-      }
-      break;
-    }
-    case NO_NEW_MSG: {
-      pullResult.pullStatus = E_NO_NEW_MSG;
-      break;
-    }
-    case NO_MATCHED_MSG: {
-      pullResult.pullStatus = E_NO_MATCHED_MSG;
-      break;
-    }
-    case OFFSET_ILLEGAL: {
-      pullResult.pullStatus = E_OFFSET_ILLEGAL;
-      break;
-    }
-    case BROKER_TIMEOUT: {
-      pullResult.pullStatus = E_BROKER_TIMEOUT;
-      break;
-    }
-    default:
-      pullResult.pullStatus = E_NO_NEW_MSG;
-      break;
-  }
-  return pullResult;
-}
-
-int ReleasePullResult(CPullResult pullResult) {
-  if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult.pData == NULL) {
-    return NULL_POINTER;
-  }
-  if (pullResult.pData != NULL) {
-    try {
-      delete ((PullResult*)pullResult.pData);
-    } catch (std::exception& e) {
-      CErrorContainer::setErrorMessage(e.what());
-      return NULL_POINTER;
-    }
-  }
-  free((void*)pullResult.msgFoundList);
-  pullResult.msgFoundList = NULL;
-  return OK;
-}
+// /*
+//  * Licensed to the Apache Software Foundation (ASF) under one or more
+//  * contributor license agreements.  See the NOTICE file distributed with
+//  * this work for additional information regarding copyright ownership.
+//  * The ASF licenses this file to You under the Apache License, Version 2.0
+//  * (the "License"); you may not use this file except in compliance with
+//  * the License.  You may obtain a copy of the License at
+//  *
+//  *     http://www.apache.org/licenses/LICENSE-2.0
+//  *
+//  * Unless required by applicable law or agreed to in writing, software
+//  * distributed under the License is distributed on an "AS IS" BASIS,
+//  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  * See the License for the specific language governing permissions and
+//  * limitations under the License.
+//  */
+// #include "c/CPullConsumer.h"
+
+// #include <cstring>
+
+// #include "CErrorContainer.h"
+// #include "ClientRPCHook.h"
+// #include "DefaultMQPullConsumer.h"
+// #include "Logging.h"
+
+// using namespace rocketmq;
+
+// CPullConsumer* CreatePullConsumer(const char* groupId) {
+//   if (groupId == NULL) {
+//     return NULL;
+//   }
+//   auto* defaultMQPullConsumer = new DefaultMQPullConsumer(groupId);
+//   return reinterpret_cast<CPullConsumer*>(defaultMQPullConsumer);
+// }
+
+// int DestroyPullConsumer(CPullConsumer* consumer) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   delete reinterpret_cast<DefaultMQPullConsumer*>(consumer);
+//   return OK;
+// }
+
+// int StartPullConsumer(CPullConsumer* consumer) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   try {
+//     reinterpret_cast<DefaultMQPullConsumer*>(consumer)->start();
+//   } catch (std::exception& e) {
+//     CErrorContainer::setErrorMessage(e.what());
+//     return PULLCONSUMER_START_FAILED;
+//   }
+//   return OK;
+// }
+
+// int ShutdownPullConsumer(CPullConsumer* consumer) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   reinterpret_cast<DefaultMQPullConsumer*>(consumer)->shutdown();
+//   return OK;
+// }
+
+// int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId) {
+//   if (consumer == NULL || groupId == NULL) {
+//     return NULL_POINTER;
+//   }
+//   reinterpret_cast<DefaultMQPullConsumer*>(consumer)->setGroupName(groupId);
+//   return OK;
+// }
+
+// const char* GetPullConsumerGroupID(CPullConsumer* consumer) {
+//   if (consumer == NULL) {
+//     return NULL;
+//   }
+//   return reinterpret_cast<DefaultMQPullConsumer*>(consumer)->getGroupName().c_str();
+// }
+
+// int SetPullConsumerNameServerAddress(CPullConsumer* consumer, const char* namesrv) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   reinterpret_cast<DefaultMQPullConsumer*>(consumer)->setNamesrvAddr(namesrv);
+//   return OK;
+// }
+
+// // Deprecated
+// int SetPullConsumerNameServerDomain(CPullConsumer* consumer, const char* domain) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   return NOT_SUPPORT_NOW;
+// }
+
+// int SetPullConsumerSessionCredentials(CPullConsumer* consumer,
+//                                       const char* accessKey,
+//                                       const char* secretKey,
+//                                       const char* channel) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   auto rpcHook = std::make_shared<ClientRPCHook>(SessionCredentials(accessKey, secretKey, channel));
+//   reinterpret_cast<DefaultMQPullConsumer*>(consumer)->setRPCHook(rpcHook);
+//   return OK;
+// }
+
+// int SetPullConsumerLogPath(CPullConsumer* consumer, const char* logPath) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   // Todo, This api should be implemented by core api.
+//   //((DefaultMQPullConsumer *) consumer)->setInstanceName(instanceName);
+//   return OK;
+// }
+
+// int SetPullConsumerLogFileNumAndSize(CPullConsumer* consumer, int fileNum, long fileSize) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   DEFAULT_LOG_ADAPTER->setLogFileNumAndSize(fileNum, fileSize);
+//   return OK;
+// }
+
+// int SetPullConsumerLogLevel(CPullConsumer* consumer, CLogLevel level) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   DEFAULT_LOG_ADAPTER->set_log_level((LogLevel)level);
+//   return OK;
+// }
+
+// int FetchSubscriptionMessageQueues(CPullConsumer* consumer, const char* topic, CMessageQueue** mqs, int* size) {
+//   if (consumer == NULL) {
+//     return NULL_POINTER;
+//   }
+//   unsigned int index = 0;
+//   CMessageQueue* temMQ = NULL;
+//   std::vector<MQMessageQueue> fullMQ;
+//   try {
+//     reinterpret_cast<DefaultMQPullConsumer*>(consumer)->fetchSubscribeMessageQueues(topic, fullMQ);
+//     *size = fullMQ.size();
+//     // Alloc memory to save the pointer to CPP MessageQueue, and the MessageQueues may be changed.
+//     // Thus, this memory should be released by users using @ReleaseSubscribeMessageQueue every time.
+//     temMQ = (CMessageQueue*)malloc(*size * sizeof(CMessageQueue));
+//     if (temMQ == NULL) {
+//       *size = 0;
+//       *mqs = NULL;
+//       return MALLOC_FAILED;
+//     }
+//     auto iter = fullMQ.begin();
+//     for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); ++iter, index++) {
+//       strncpy(temMQ[index].topic, iter->topic().c_str(), MAX_TOPIC_LENGTH - 1);
+//       strncpy(temMQ[index].brokerName, iter->broker_name().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1);
+//       temMQ[index].queueId = iter->queue_id();
+//     }
+//     *mqs = temMQ;
+//   } catch (MQException& e) {
+//     *size = 0;
+//     *mqs = NULL;
+//     CErrorContainer::setErrorMessage(e.what());
+//     return PULLCONSUMER_FETCH_MQ_FAILED;
+//   }
+//   return OK;
+// }
+
+// int ReleaseSubscriptionMessageQueue(CMessageQueue* mqs) {
+//   if (mqs == NULL) {
+//     return NULL_POINTER;
+//   }
+//   free((void*)mqs);
+//   mqs = NULL;
+//   return OK;
+// }
+
+// CPullResult Pull(CPullConsumer* consumer,
+//                  const CMessageQueue* mq,
+//                  const char* subExpression,
+//                  long long offset,
+//                  int maxNums) {
+//   CPullResult pullResult;
+//   memset(&pullResult, 0, sizeof(CPullResult));
+//   MQMessageQueue messageQueue(mq->topic, mq->brokerName, mq->queueId);
+//   PullResult cppPullResult;
+//   try {
+//     cppPullResult =
+//         reinterpret_cast<DefaultMQPullConsumer*>(consumer)->pull(messageQueue, subExpression, offset, maxNums);
+//   } catch (std::exception& e) {
+//     CErrorContainer::setErrorMessage(e.what());
+//     cppPullResult.set_pull_status(BROKER_TIMEOUT);
+//   }
+
+//   if (cppPullResult.pull_status() != BROKER_TIMEOUT) {
+//     pullResult.maxOffset = cppPullResult.max_offset();
+//     pullResult.minOffset = cppPullResult.min_offset();
+//     pullResult.nextBeginOffset = cppPullResult.next_begin_offset();
+//   }
+
+//   switch (cppPullResult.pull_status()) {
+//     case FOUND: {
+//       pullResult.pullStatus = E_FOUND;
+//       pullResult.size = cppPullResult.msg_found_list().size();
+//       PullResult* tmpPullResult = new PullResult(cppPullResult);
+//       pullResult.pData = tmpPullResult;
+//       // Alloc memory to save the pointer to CPP MQMessageExt, which will be release by the CPP SDK core.
+//       // Thus, this memory should be released by users using @ReleasePullResult
+//       pullResult.msgFoundList = (CMessageExt**)malloc(pullResult.size * sizeof(CMessageExt*));
+//       for (size_t i = 0; i < cppPullResult.msg_found_list().size(); i++) {
+//         auto msg = tmpPullResult->msg_found_list()[i];
+//         pullResult.msgFoundList[i] = reinterpret_cast<CMessageExt*>(&msg);
+//       }
+//       break;
+//     }
+//     case NO_NEW_MSG: {
+//       pullResult.pullStatus = E_NO_NEW_MSG;
+//       break;
+//     }
+//     case NO_MATCHED_MSG: {
+//       pullResult.pullStatus = E_NO_MATCHED_MSG;
+//       break;
+//     }
+//     case OFFSET_ILLEGAL: {
+//       pullResult.pullStatus = E_OFFSET_ILLEGAL;
+//       break;
+//     }
+//     case BROKER_TIMEOUT: {
+//       pullResult.pullStatus = E_BROKER_TIMEOUT;
+//       break;
+//     }
+//     default:
+//       pullResult.pullStatus = E_NO_NEW_MSG;
+//       break;
+//   }
+//   return pullResult;
+// }
+
+// int ReleasePullResult(CPullResult pullResult) {
+//   if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult.pData == NULL) {
+//     return NULL_POINTER;
+//   }
+//   if (pullResult.pData != NULL) {
+//     try {
+//       delete ((PullResult*)pullResult.pData);
+//     } catch (std::exception& e) {
+//       CErrorContainer::setErrorMessage(e.what());
+//       return NULL_POINTER;
+//     }
+//   }
+//   free((void*)pullResult.msgFoundList);
+//   pullResult.msgFoundList = NULL;
+//   return OK;
+// }
diff --git a/test/src/extern/CPullConsumerTest.cpp b/test/src/extern/CPullConsumerTest.cpp.bak
similarity index 99%
rename from test/src/extern/CPullConsumerTest.cpp
rename to test/src/extern/CPullConsumerTest.cpp.bak
index 51b79dc..996659d 100644
--- a/test/src/extern/CPullConsumerTest.cpp
+++ b/test/src/extern/CPullConsumerTest.cpp.bak
@@ -20,7 +20,7 @@
 #include <string>
 #include <vector>
 
-#include "DefaultMQPullConsumer.h"
+#include "DefaultLitePullConsumer.h"
 #include "MQClientInstance.h"
 #include "MessageExtImpl.h"
 #include "MQMessageQueue.h"


Mime
View raw message