rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] branch litePullConsumer updated: Polish lite pull consumer (#1349)
Date Tue, 30 Jul 2019 02:59:19 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/litePullConsumer by this push:
     new 83098eb  Polish lite pull consumer (#1349)
83098eb is described below

commit 83098eb604007eca7892565df7c213ee19a342f5
Author: King <794220751@qq.com>
AuthorDate: Tue Jul 30 10:59:13 2019 +0800

    Polish lite pull consumer (#1349)
    
    * fix unsubscribe code
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * polish commit consumed offset
    
    * pass checkstyle
    
    * pass checkstyle
    
    * polish LiteMQPullConsumer
    
    * add flow control and polish commit logic
    
    * fix bug
    
    * polish code
    
    * fix commit consumed offset back
    
    * refactor litePullConsumer
    
    * development save
    
    * development save
    
    * Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
    
    * Polish lite pull consumer
    
    * polish lite pull consumer
    
    * polish lite pull consumer
    
    * fix seek
---
 .../client/consumer/DefaultLiteMQPullConsumer.java |  132 ---
 .../client/consumer/DefaultLitePullConsumer.java   |  396 ++++++++
 ...teMQPullConsumer.java => LitePullConsumer.java} |   32 +-
 .../consumer/store/RemoteBrokerOffsetStore.java    |    1 -
 .../client/impl/consumer/AssignedMessageQueue.java |  109 +-
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 1069 ++++++++++++++++++++
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |    4 +-
 .../impl/consumer/LiteMQPullConsumerImpl.java      |  469 ---------
 .../client/impl/consumer/ProcessQueue.java         |   10 -
 .../impl/consumer/RebalanceLitePullImpl.java       |   68 ++
 .../example/simple/LitePullConsumerTest.java       |   24 +-
 11 files changed, 1661 insertions(+), 653 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
deleted file mode 100644
index 6f67bcf..0000000
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.consumer;
-
-import java.util.Collection;
-import java.util.List;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.consumer.LiteMQPullConsumerImpl;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.RPCHook;
-
-public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer {
-    private LiteMQPullConsumerImpl liteMQPullConsumer;
-
-    /**
-     * Maximum amount of time in minutes a message may block the consuming thread.
-     */
-    private long consumeTimeout = 15;
-
-    /**
-     * Is auto commit offset
-     */
-    private boolean autoCommit = true;
-
-    private int pullThreadNumbers = 20;
-
-    /**
-     * Maximum commit offset interval time in seconds.
-     */
-    private long autoCommitInterval = 5;
-
-    public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
-        this.setConsumerGroup(consumerGroup);
-        this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
-    }
-
-    public DefaultLiteMQPullConsumer(String consumerGroup) {
-        this.setConsumerGroup(consumerGroup);
-        this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, null);
-    }
-
-    @Override
-    public void start() throws MQClientException {
-        this.liteMQPullConsumer.start();
-    }
-
-    @Override
-    public void subscribe(String topic, String subExpression) throws MQClientException {
-        this.liteMQPullConsumer.subscribe(topic, subExpression);
-    }
-
-    @Override
-    public void unsubscribe(String topic) {
-        this.liteMQPullConsumer.unsubscribe(topic);
-    }
-
-    @Override
-    public List<MessageExt> poll() {
-        return poll(this.getConsumerPullTimeoutMillis());
-    }
-
-    @Override public List<MessageExt> poll(long timeout) {
-        return liteMQPullConsumer.poll(timeout);
-    }
-
-    @Override
-    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
-        this.liteMQPullConsumer.seek(messageQueue, offset);
-    }
-
-    @Override
-    public void pause(Collection<MessageQueue> messageQueues) {
-        this.liteMQPullConsumer.pause(messageQueues);
-    }
-
-    @Override
-    public void resume(Collection<MessageQueue> messageQueues) {
-        this.liteMQPullConsumer.resume(messageQueues);
-    }
-
-    @Override
-    public void commitSync() {
-        this.liteMQPullConsumer.commitSync();
-    }
-
-    public long getConsumeTimeout() {
-        return consumeTimeout;
-    }
-
-    public void setConsumeTimeout(long consumeTimeout) {
-        this.consumeTimeout = consumeTimeout;
-    }
-
-    public boolean isAutoCommit() {
-        return autoCommit;
-    }
-
-    public void setAutoCommit(boolean autoCommit) {
-        this.autoCommit = autoCommit;
-    }
-
-    public int getPullThreadNumbers() {
-        return pullThreadNumbers;
-    }
-
-    public void setPullThreadNumbers(int pullThreadNumbers) {
-        this.pullThreadNumbers = pullThreadNumbers;
-    }
-
-    public long getAutoCommitInterval() {
-        return autoCommitInterval;
-    }
-
-    public void setAutoCommitInterval(long autoCommitInterval) {
-        this.autoCommitInterval = autoCommitInterval;
-    }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
new file mode 100644
index 0000000..757c966
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.RPCHook;
+
+
+public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
+
+    private DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
+
+    /**
+     * Do the same thing for the same Group, the application must be set,and guarantee Globally unique
+     */
+    private String consumerGroup;
+
+    /**
+     * Long polling mode, the Consumer connection max suspend time, it is not recommended to modify
+     */
+    private long brokerSuspendMaxTimeMillis = 1000 * 20;
+
+
+    /**
+     * Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
+     * recommended to modify
+     */
+    private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
+
+    /**
+     * The socket timeout in milliseconds
+     */
+    private long consumerPullTimeoutMillis = 1000 * 10;
+
+    /**
+     * Consumption pattern,default is clustering
+     */
+    private MessageModel messageModel = MessageModel.CLUSTERING;
+    /**
+     * Message queue listener
+     */
+    private MessageQueueListener messageQueueListener;
+    /**
+     * Offset Storage
+     */
+    private OffsetStore offsetStore;
+    /**
+     * Topic set you want to register
+     */
+    private Set<String> registerTopics = new HashSet<String>();
+    /**
+     * Queue allocation algorithm
+     */
+    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
+    /**
+     * Whether the unit of subscription group
+     */
+    private boolean unitMode = false;
+
+    private int maxReconsumeTimes = 16;
+    /**
+     * Maximum amount of time in minutes a message may block the consuming thread.
+     */
+    private long consumeTimeout = 15;
+
+    /**
+     * Is auto commit offset
+     */
+    private boolean autoCommit = true;
+
+    private int pullThreadNumbers = 20;
+
+    /**
+     * Maximum commit offset interval time in seconds.
+     */
+    private long autoCommitInterval = 5;
+
+    /**
+     * Maximum number of messages pulled each time.
+     */
+    private int pullBatchNums = 10;
+
+    /**
+     * Flow control threshold for consume request, each consumer will cache at most 10000 consume requests by default.
+     * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+     */
+    private long pullThresholdForAll = 10000;
+
+    /**
+     * Consume max span offset.
+     */
+    private int consumeMaxSpan = 2000;
+
+    /**
+     * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
+     * the {@code pullBatchSize}, the instantaneous value may exceed the limit
+     */
+    private int pullThresholdForQueue = 1000;
+
+    /**
+     * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
+     * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+     *
+     * <p>
+     * The size of a message only measured by message body, so it's not accurate
+     */
+    private int pullThresholdSizeForQueue = 100;
+
+    /**
+     * The socket timeout in milliseconds
+     */
+    private long pollTimeoutMillis = 1000 * 5;
+
+    public DefaultLitePullConsumer() {
+        this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
+    }
+
+    public DefaultLitePullConsumer(final String consumerGroup) {
+        this(null, consumerGroup, null);
+    }
+
+    public DefaultLitePullConsumer(RPCHook rpcHook) {
+        this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
+    }
+
+    public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
+        this(null, consumerGroup, rpcHook);
+    }
+
+    /**
+     * Constructor specifying namespace, consumer group and RPC hook.
+     *
+     * @param consumerGroup Consumer group.
+     * @param rpcHook RPC hook to execute before each remoting command.
+     */
+    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
+        this.namespace = namespace;
+        this.consumerGroup = consumerGroup;
+        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this,rpcHook);
+    }
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultLitePullConsumerImpl.start();
+    }
+
+    @Override
+    public void shutdown() {
+        this.defaultLitePullConsumerImpl.shutdown();
+    }
+
+    @Override
+    public void subscribe(String topic, String subExpression) throws MQClientException {
+        this.defaultLitePullConsumerImpl.subscribe(topic, subExpression);
+    }
+
+    @Override
+    public void unsubscribe(String topic) {
+        this.defaultLitePullConsumerImpl.unsubscribe(topic);
+    }
+
+    @Override
+    public void assign(Collection<MessageQueue> messageQueues) {
+        defaultLitePullConsumerImpl.assign(messageQueues);
+    }
+
+    @Override
+    public List<MessageExt> poll() {
+        return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
+    }
+
+    @Override
+    public List<MessageExt> poll(long timeout) {
+        return defaultLitePullConsumerImpl.poll(timeout);
+    }
+
+    @Override
+    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+        this.defaultLitePullConsumerImpl.seek(messageQueue, offset);
+    }
+
+    @Override
+    public void pause(Collection<MessageQueue> messageQueues) {
+        this.defaultLitePullConsumerImpl.pause(messageQueues);
+    }
+
+    @Override
+    public void resume(Collection<MessageQueue> messageQueues) {
+        this.defaultLitePullConsumerImpl.resume(messageQueues);
+    }
+
+    @Override
+    public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException{
+        return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
+    }
+
+    @Override
+    public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException{
+        return this.defaultLitePullConsumerImpl.searchOffset(messageQueue,timestamp);
+    }
+
+    @Override
+    public void commitSync() {
+        this.defaultLitePullConsumerImpl.commitSync();
+    }
+
+    @Override
+    public boolean isAutoCommit() {
+        return autoCommit;
+    }
+
+    @Override
+    public void setAutoCommit(boolean autoCommit) {
+        this.autoCommit = autoCommit;
+    }
+
+    public long getConsumeTimeout() {
+        return consumeTimeout;
+    }
+
+    public void setConsumeTimeout(long consumeTimeout) {
+        this.consumeTimeout = consumeTimeout;
+    }
+
+    public int getPullThreadNumbers() {
+        return pullThreadNumbers;
+    }
+
+    public void setPullThreadNumbers(int pullThreadNumbers) {
+        this.pullThreadNumbers = pullThreadNumbers;
+    }
+
+    public long getAutoCommitInterval() {
+        return autoCommitInterval;
+    }
+
+    public void setAutoCommitInterval(long autoCommitInterval) {
+        this.autoCommitInterval = autoCommitInterval;
+    }
+
+    public int getPullBatchNums() {
+        return pullBatchNums;
+    }
+
+    public void setPullBatchNums(int pullBatchNums) {
+        this.pullBatchNums = pullBatchNums;
+    }
+
+    public long getPullThresholdForAll() {
+        return pullThresholdForAll;
+    }
+
+    public void setPullThresholdForAll(long pullThresholdForAll) {
+        this.pullThresholdForAll = pullThresholdForAll;
+    }
+
+    public int getConsumeMaxSpan() {
+        return consumeMaxSpan;
+    }
+
+    public void setConsumeMaxSpan(int consumeMaxSpan) {
+        this.consumeMaxSpan = consumeMaxSpan;
+    }
+
+    public int getPullThresholdForQueue() {
+        return pullThresholdForQueue;
+    }
+
+    public void setPullThresholdForQueue(int pullThresholdForQueue) {
+        this.pullThresholdForQueue = pullThresholdForQueue;
+    }
+
+    public int getPullThresholdSizeForQueue() {
+        return pullThresholdSizeForQueue;
+    }
+
+    public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) {
+        this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
+    }
+
+    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+        return allocateMessageQueueStrategy;
+    }
+
+    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+    }
+
+    public long getBrokerSuspendMaxTimeMillis() {
+        return brokerSuspendMaxTimeMillis;
+    }
+
+    public long getPollTimeoutMillis() {
+        return pollTimeoutMillis;
+    }
+
+    public void setPollTimeoutMillis(long pollTimeoutMillis) {
+        this.pollTimeoutMillis = pollTimeoutMillis;
+    }
+
+    public OffsetStore getOffsetStore() {
+        return offsetStore;
+    }
+
+    public void setOffsetStore(OffsetStore offsetStore) {
+        this.offsetStore = offsetStore;
+    }
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+    public void setUnitMode(boolean isUnitMode) {
+        this.unitMode = isUnitMode;
+    }
+
+    public int getMaxReconsumeTimes() {
+        return maxReconsumeTimes;
+    }
+
+    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
+        this.maxReconsumeTimes = maxReconsumeTimes;
+    }
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public MessageQueueListener getMessageQueueListener() {
+        return messageQueueListener;
+    }
+
+    public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
+        this.messageQueueListener = messageQueueListener;
+    }
+
+    public Set<String> getRegisterTopics() {
+        return registerTopics;
+    }
+
+    public void setRegisterTopics(Set<String> registerTopics) {
+        this.registerTopics = withNamespace(registerTopics);
+    }
+
+    public long getConsumerPullTimeoutMillis() {
+        return consumerPullTimeoutMillis;
+    }
+
+    public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
+        this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
+    }
+
+    public long getConsumerTimeoutMillisWhenSuspend() {
+        return consumerTimeoutMillisWhenSuspend;
+    }
+
+    public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
+        this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
+    }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
similarity index 70%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
rename to client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index da8d1cf..ece08af 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -18,16 +18,28 @@ package org.apache.rocketmq.client.consumer;
 
 import java.util.Collection;
 import java.util.List;
+
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 
-public interface LiteMQPullConsumer {
+public interface LitePullConsumer {
+
+    /**
+     * Start the consumer
+     */
+    void start() throws MQClientException;
+
+    /**
+     * Shutdown the consumer
+     */
+    void shutdown();
+
     /**
      * Subscribe some topic
      *
      * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
-     * null or * expression,meaning subscribe all
+     *                      null or * expression,meaning subscribe all
      */
     void subscribe(final String topic, final String subExpression) throws MQClientException;
 
@@ -38,15 +50,27 @@ public interface LiteMQPullConsumer {
      */
     void unsubscribe(final String topic);
 
+    void assign(Collection<MessageQueue> messageQueues);
+
     List<MessageExt> poll();
 
     List<MessageExt> poll(long timeout);
 
     void seek(MessageQueue messageQueue, long offset) throws MQClientException;
 
-    void pause(Collection<MessageQueue> messageQueueCollection);
+    void pause(Collection<MessageQueue> messageQueues);
+
+    boolean isAutoCommit();
+
+    void setAutoCommit(boolean autoCommit);
+
+    void resume(Collection<MessageQueue> messageQueues);
 
-    void resume(Collection<MessageQueue> partitions);
+    Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException;
+
+    Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;
 
     void commitSync();
+
+
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index c1524e1..63dc525 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -204,7 +204,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
         MQBrokerException, InterruptedException, MQClientException {
         FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
         if (null == findBrokerResult) {
-
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
             findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index fb0ca79..a3c5da1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -19,25 +19,33 @@ package org.apache.rocketmq.client.impl.consumer;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 public class AssignedMessageQueue {
 
     private ConcurrentHashMap<MessageQueue, MessageQueueStat> assignedMessageQueueState;
 
+    private RebalanceImpl rebalanceImpl;
+
     public AssignedMessageQueue() {
         assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueStat>();
     }
 
+    public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
+        this.rebalanceImpl = rebalanceImpl;
+    }
+
+    public Collection<MessageQueue> messageQueues(){
+        return assignedMessageQueueState.keySet();
+    }
+
     public boolean isPaused(MessageQueue messageQueue) {
         MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
         if (messageQueueStat != null) {
             return messageQueueStat.isPaused();
         }
-        return false;
+        return true;
     }
 
     public void pause(Collection<MessageQueue> messageQueues) {
@@ -58,24 +66,60 @@ public class AssignedMessageQueue {
         }
     }
 
-    public long getNextOffset(MessageQueue messageQueue) throws MQClientException {
+    public ProcessQueue getProcessQueue(MessageQueue messageQueue) {
         MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
-        if (assignedMessageQueueState.get(messageQueue) != null) {
+        if (messageQueueStat != null) {
+            return messageQueueStat.getProcessQueue();
+        }
+        return null;
+    }
+
+    public long getNextOffset(MessageQueue messageQueue) {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
             return messageQueueStat.getNextOffset();
         }
         return -1;
     }
 
-    public void updateNextOffset(MessageQueue messageQueue, long offset) throws MQClientException {
+    public void updateNextOffset(MessageQueue messageQueue, long offset) {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
+            messageQueueStat.setNextOffset(offset);
+        }
+    }
+
+    public long getConusmerOffset(MessageQueue messageQueue) {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
+            return messageQueueStat.getConsumeOffset();
+        }
+        return -1;
+    }
+
+    public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
+            messageQueueStat.setConsumeOffset(offset);
+        }
+    }
+
+    public void setSeekOffset(MessageQueue messageQueue, long offset) {
         MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
-        if (messageQueue == null) {
-            messageQueueStat = new MessageQueueStat(messageQueue, offset);
-            assignedMessageQueueState.putIfAbsent(messageQueue, messageQueueStat);
+        if (messageQueueStat != null) {
+            messageQueueStat.setSeekOffset(offset);
         }
-        assignedMessageQueueState.get(messageQueue).setNextOffset(offset);
     }
 
-    public void updateAssignedMessageQueue(Set<MessageQueue> assigned) {
+    public long getSeekOffset(MessageQueue messageQueue) {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
+            return messageQueueStat.getSeekOffset();
+        }
+        return -1;
+    }
+
+    public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
         synchronized (this.assignedMessageQueueState) {
             Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
             while (it.hasNext()) {
@@ -87,7 +131,13 @@ public class AssignedMessageQueue {
 
             for (MessageQueue messageQueue : assigned) {
                 if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
-                    MessageQueueStat messageQueueStat = new MessageQueueStat(messageQueue);
+                    MessageQueueStat messageQueueStat;
+                    if (rebalanceImpl != null && rebalanceImpl.processQueueTable.get(messageQueue) != null) {
+                        messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.processQueueTable.get(messageQueue));
+                    } else {
+                        ProcessQueue processQueue = new ProcessQueue();
+                        messageQueueStat = new MessageQueueStat(messageQueue, processQueue);
+                    }
                     this.assignedMessageQueueState.put(messageQueue, messageQueueStat);
                 }
             }
@@ -108,16 +158,15 @@ public class AssignedMessageQueue {
 
     public class MessageQueueStat {
         private MessageQueue messageQueue;
+        private ProcessQueue processQueue;
         private boolean paused = false;
         private long nextOffset = -1;
+        private long consumeOffset = -1;
+        private volatile long seekOffset = -1;
 
-        public MessageQueueStat(MessageQueue messageQueue) {
-            this.messageQueue = messageQueue;
-        }
-
-        public MessageQueueStat(MessageQueue messageQueue, long nextOffset) {
+        public MessageQueueStat(MessageQueue messageQueue, ProcessQueue processQueue) {
             this.messageQueue = messageQueue;
-            this.nextOffset = nextOffset;
+            this.processQueue = processQueue;
         }
 
         public MessageQueue getMessageQueue() {
@@ -143,5 +192,29 @@ public class AssignedMessageQueue {
         public void setNextOffset(long nextOffset) {
             this.nextOffset = nextOffset;
         }
+
+        public ProcessQueue getProcessQueue() {
+            return processQueue;
+        }
+
+        public void setProcessQueue(ProcessQueue processQueue) {
+            this.processQueue = processQueue;
+        }
+
+        public long getConsumeOffset() {
+            return consumeOffset;
+        }
+
+        public void setConsumeOffset(long consumeOffset) {
+            this.consumeOffset = consumeOffset;
+        }
+
+        public long getSeekOffset() {
+            return seekOffset;
+        }
+
+        public void setSeekOffset(long seekOffset) {
+            this.seekOffset = seekOffset;
+        }
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
new file mode 100644
index 0000000..95e218f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class DefaultLitePullConsumerImpl implements MQConsumerInner {
+
+    private final InternalLogger log = ClientLogger.getLog();
+
+    private final long consumerStartTimestamp = System.currentTimeMillis();
+
+    private final RPCHook rpcHook;
+
+    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
+
+    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
+
+    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
+
+    protected MQClientInstance mQClientFactory;
+
+    private PullAPIWrapper pullAPIWrapper;
+
+    private OffsetStore offsetStore;
+
+    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
+
+    private enum SubscriptionType {
+        NONE, SUBSCRIBE, ASSIGN
+    }
+
+    private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running.";
+
+    private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Cannot select two subscription types at the same time.";
+    /**
+     * the type of subscription
+     */
+    private SubscriptionType subscriptionType = SubscriptionType.NONE;
+    /**
+     * Delay some time when exception occur
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000;
+    /**
+     * Flow control interval
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+    /**
+     * Delay some time when suspend pull service
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
+
+    private DefaultLitePullConsumer defaultLitePullConsumer;
+
+    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
+        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+
+    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
+
+    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
+
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+    private long consumeRequestFlowControlTimes = 0L;
+
+    private long queueFlowControlTimes = 0L;
+
+    private long queueMaxSpanFlowControlTimes = 0L;
+
+    private long nextAutoCommitDeadline = -1L;
+
+    public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
+
+        this.defaultLitePullConsumer = defaultLitePullConsumer;
+        this.rpcHook = rpcHook;
+
+    }
+
+    private void checkServiceState() {
+        if (!(this.serviceState == ServiceState.RUNNING))
+            throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
+    }
+
+    private synchronized void setSubscriptionType(SubscriptionType type) {
+        if (this.subscriptionType == SubscriptionType.NONE)
+            this.subscriptionType = type;
+        else if (this.subscriptionType != type)
+            throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE);
+    }
+
+    private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
+        this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
+        updatePullTask(topic, assignedMessageQueue);
+    }
+
+    private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
+        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+            if (next.getKey().getTopic().equals(topic)) {
+                if (!mqNewSet.contains(next.getKey())) {
+                    next.getValue().setCancelled(true);
+                    it.remove();
+                }
+            }
+        }
+        startPullTask(mqNewSet);
+    }
+
+    class MessageQueueListenerImpl implements MessageQueueListener {
+        @Override
+        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+            MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
+            switch (messageModel) {
+                case BROADCASTING:
+                    updateAssignedMessageQueue(topic, mqAll);
+                    break;
+                case CLUSTERING:
+                    updateAssignedMessageQueue(topic, mqDivided);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private int nextPullBatchNums() {
+        return Math.min(this.defaultLitePullConsumer.getPullBatchNums(), consumeRequestCache.remainingCapacity());
+    }
+
+    public synchronized void shutdown() {
+        switch (this.serviceState) {
+            case CREATE_JUST:
+                break;
+            case RUNNING:
+                this.persistConsumerOffset();
+                this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup());
+                this.mQClientFactory.shutdown();
+                log.info("the consumer [{}] shutdown OK", this.defaultLitePullConsumer.getConsumerGroup());
+                scheduledThreadPoolExecutor.shutdown();
+                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+                break;
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+    }
+
+    public synchronized void start() throws MQClientException {
+        switch (this.serviceState) {
+            case CREATE_JUST:
+                this.serviceState = ServiceState.START_FAILED;
+
+                this.checkConfig();
+
+                this.copySubscription();
+
+                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
+                    this.defaultLitePullConsumer.changeInstanceNameToPID();
+                }
+
+                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
+
+                this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
+                this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
+                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
+                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
+
+                this.pullAPIWrapper = new PullAPIWrapper(
+                    mQClientFactory,
+                    this.defaultLitePullConsumer.getConsumerGroup(), isUnitMode());
+                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
+
+                if (this.defaultLitePullConsumer.getOffsetStore() != null) {
+                    this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
+                } else {
+                    switch (this.defaultLitePullConsumer.getMessageModel()) {
+                        case BROADCASTING:
+                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
+                            break;
+                        case CLUSTERING:
+                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
+                            break;
+                        default:
+                            break;
+                    }
+                    this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
+                }
+
+                this.offsetStore.load();
+
+                boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
+                if (!registerOK) {
+                    this.serviceState = ServiceState.CREATE_JUST;
+
+                    throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
+                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+                        null);
+                }
+
+                mQClientFactory.start();
+
+                final String group = this.defaultLitePullConsumer.getConsumerGroup();
+
+                this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+                    this.defaultLitePullConsumer.getPullThreadNumbers(),
+                    new ThreadFactoryImpl("PullMsgThread-" + group)
+                );
+                if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+                    updateTopicSubscribeInfoWhenSubscriptionChanged();
+                }
+                if (subscriptionType == SubscriptionType.ASSIGN) {
+                    updateAssignPullTask(assignedMessageQueue.messageQueues());
+                }
+
+                log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
+                this.serviceState = ServiceState.RUNNING;
+                break;
+            case RUNNING:
+            case START_FAILED:
+            case SHUTDOWN_ALREADY:
+                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+                    + this.serviceState
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                    null);
+            default:
+                break;
+        }
+    }
+
+    private void checkConfig() throws MQClientException {
+        // check consumerGroup
+        Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup());
+
+        // consumerGroup
+        if (null == this.defaultLitePullConsumer.getConsumerGroup()) {
+            throw new MQClientException(
+                "consumerGroup is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
+        }
+
+        // consumerGroup
+        if (this.defaultLitePullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
+            throw new MQClientException(
+                "consumerGroup can not equal "
+                    + MixAll.DEFAULT_CONSUMER_GROUP
+                    + ", please specify another one."
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
+        }
+
+        // messageModel
+        if (null == this.defaultLitePullConsumer.getMessageModel()) {
+            throw new MQClientException(
+                "messageModel is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
+        }
+
+        // allocateMessageQueueStrategy
+        if (null == this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()) {
+            throw new MQClientException(
+                "allocateMessageQueueStrategy is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
+        }
+
+        // allocateMessageQueueStrategy
+        if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
+            throw new MQClientException(
+                "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
+        }
+    }
+
+    private void copySubscription() throws MQClientException {
+        try {
+            Set<String> registerTopics = this.defaultLitePullConsumer.getRegisterTopics();
+            if (registerTopics != null) {
+                for (final String topic : registerTopics) {
+                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
+                        topic, SubscriptionData.SUB_ALL);
+                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+                }
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscription exception", e);
+        }
+    }
+
+    private void startPullTask(Collection<MessageQueue> mqSet) {
+        for (MessageQueue messageQueue : mqSet) {
+            if (!this.taskTable.containsKey(messageQueue)) {
+                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
+                this.taskTable.put(messageQueue, pullTask);
+                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    private void updateAssignPullTask(Collection<MessageQueue> mqNewSet) {
+        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+            if (!mqNewSet.contains(next.getKey())) {
+                next.getValue().setCancelled(true);
+                it.remove();
+            }
+        }
+
+        startPullTask(mqNewSet);
+    }
+
+    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
+        if (subTable != null) {
+            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+                final String topic = entry.getKey();
+                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            }
+        }
+    }
+
+    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
+        try {
+            setSubscriptionType(SubscriptionType.SUBSCRIBE);
+            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
+                topic, subExpression);
+            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+            if (serviceState == ServiceState.RUNNING) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+                updateTopicSubscribeInfoWhenSubscriptionChanged();
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscription exception", e);
+        }
+    }
+
+    public synchronized void unsubscribe(final String topic) {
+        this.rebalanceImpl.getSubscriptionInner().remove(topic);
+        //can be delete
+        removePullTaskCallback(topic);
+        assignedMessageQueue.removeAssignedMessageQueue(topic);
+    }
+
+    public synchronized void assign(Collection<MessageQueue> messageQueues) {
+        setSubscriptionType(SubscriptionType.ASSIGN);
+        assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
+        if (serviceState == ServiceState.RUNNING) {
+            updateAssignPullTask(messageQueues);
+        }
+    }
+
+    private void maybeAutoCommit() {
+        long now = System.currentTimeMillis();
+        if (now >= nextAutoCommitDeadline) {
+            commitAll();
+            nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitInterval() * 1000;
+        }
+    }
+
+    public List<MessageExt> poll(long timeout) {
+        try {
+            checkServiceState();
+            if (defaultLitePullConsumer.isAutoCommit()) {
+                maybeAutoCommit();
+            }
+            long endTime = System.currentTimeMillis() + timeout;
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+            while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
+                consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+                if ((endTime - System.currentTimeMillis()) <= 0)
+                    break;
+            }
+            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
+                List<MessageExt> messages = consumeRequest.getMessageExts();
+                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
+                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
+                return messages;
+            }
+        } catch (InterruptedException ignore) {
+
+        }
+        return null;
+    }
+
+    public void pause(Collection<MessageQueue> messageQueues) {
+        assignedMessageQueue.pause(messageQueues);
+    }
+
+    public void resume(Collection<MessageQueue> messageQueues) {
+        assignedMessageQueue.resume(messageQueues);
+    }
+
+    public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+        if (offset < minOffset(messageQueue) || offset > maxOffset(messageQueue))
+            throw new MQClientException("Seek offset illegal", null);
+        try {
+            assignedMessageQueue.setSeekOffset(messageQueue, offset);
+            updateConsumeOffset(messageQueue, offset);
+            updateConsumeOffsetToBroker(messageQueue, offset, false);
+        } catch (Exception e) {
+            log.error("Seek offset failed.", e);
+        }
+    }
+
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        checkServiceState();
+        return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+    }
+
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        checkServiceState();
+        return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
+    }
+
+    public void removePullTaskCallback(final String topic) {
+        removePullTask(topic);
+    }
+
+    public void removePullTask(final String topic) {
+        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+            if (next.getKey().getTopic().equals(topic)) {
+                next.getValue().setCancelled(true);
+                it.remove();
+            }
+        }
+    }
+
+    public synchronized void commitSync() {
+        try {
+            for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
+                long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
+                if (consumerOffset != -1) {
+                    ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+                    long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
+                    if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) {
+                        updateConsumeOffset(messageQueue, consumerOffset);
+                        updateConsumeOffsetToBroker(messageQueue, consumerOffset, false);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("An error occurred when update consume offset synchronously.", e);
+        }
+    }
+
+    public synchronized void commitAll() {
+        try {
+            for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
+                long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
+                if (consumerOffset != -1) {
+                    ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+                    long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
+                    if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) {
+                        updateConsumeOffset(messageQueue, consumerOffset);
+                        updateConsumeOffsetToBroker(messageQueue, consumerOffset, true);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("An error occurred when update consume offset Automatically.");
+        }
+    }
+
+    private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+        if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) {
+            assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
+        }
+    }
+
+    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
+        try {
+            consumeRequestCache.put(consumeRequest);
+        } catch (InterruptedException ex) {
+            log.error("Submit consumeRequest error", ex);
+        }
+    }
+
+    private long fetchConsumeOffset(MessageQueue mq, boolean fromStore) {
+        checkServiceState();
+        return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+    }
+
+    private long nextPullOffset(MessageQueue remoteQueue) {
+        long offset = -1;
+        long seekOffset = assignedMessageQueue.getSeekOffset(remoteQueue);
+        if (seekOffset != -1) {
+            offset = seekOffset;
+            assignedMessageQueue.setSeekOffset(remoteQueue, -1);
+            assignedMessageQueue.updateNextOffset(remoteQueue,offset);
+        } else {
+            offset = assignedMessageQueue.getNextOffset(remoteQueue);
+            if (offset == -1) {
+                offset = fetchConsumeOffset(remoteQueue, false);
+                assignedMessageQueue.updateNextOffset(remoteQueue, offset);
+                assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
+            }
+        }
+
+        return offset;
+    }
+
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        checkServiceState();
+        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+    }
+
+    public class PullTaskImpl implements Runnable {
+        private final MessageQueue messageQueue;
+        private volatile boolean cancelled = false;
+
+        public PullTaskImpl(final MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+        @Override
+        public void run() {
+            ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+
+            if (processQueue == null && processQueue.isDropped()) {
+                log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
+                return;
+            }
+
+            if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
+                scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                if ((consumeRequestFlowControlTimes++ % 1000) == 0)
+                    log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
+                return;
+            }
+
+            long cachedMessageCount = processQueue.getMsgCount().get();
+            long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
+
+            if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
+                scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                if ((queueFlowControlTimes++ % 1000) == 0) {
+                    log.warn(
+                        "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+                        defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+                }
+                return;
+            }
+
+            if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
+                scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                if ((queueFlowControlTimes++ % 1000) == 0) {
+                    log.warn(
+                        "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+                        defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
+                }
+                return;
+            }
+
+            if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
+                scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
+                    log.warn(
+                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
+                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
+                }
+                return;
+            }
+
+            if (!this.isCancelled()) {
+                if (assignedMessageQueue.isPaused(messageQueue)) {
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
+                    log.debug("Message Queue: {} has been paused!", messageQueue);
+                    return;
+                }
+                String subExpression = null;
+                if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+                    String topic = this.messageQueue.getTopic();
+                    subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
+                }
+                long offset = nextPullOffset(messageQueue);
+                long pullDelayTimeMills = 0;
+                try {
+                    PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums());
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            processQueue.putMessage(pullResult.getMsgFoundList());
+                            submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+                            pullDelayTimeMills = 0;
+                            break;
+                        case NO_NEW_MSG:
+                            pullDelayTimeMills = 100;
+                        case OFFSET_ILLEGAL:
+                            //TODO
+                            log.warn("the pull request offset illegal, {}", pullResult.toString());
+                            break;
+                        default:
+                            break;
+                    }
+                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
+                } catch (Throwable e) {
+                    pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
+                    e.printStackTrace();
+                    log.error("An error occurred in pull message process.", e);
+                }
+
+                if (!this.isCancelled()) {
+                    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
+                } else {
+                    log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
+                }
+            }
+        }
+
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        public void setCancelled(boolean cancelled) {
+            this.cancelled = cancelled;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+    }
+
+    private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return pull(mq, subExpression, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
+    }
+
+    private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return pull(mq, messageSelector, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
+                mq.getTopic(), subExpression);
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.build(mq.getTopic(),
+                messageSelector.getExpression(), messageSelector.getExpressionType());
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums,
+        boolean block,
+        long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        if (offset < 0) {
+            throw new MQClientException("offset < 0", null);
+        }
+
+        if (maxNums <= 0) {
+            throw new MQClientException("maxNums <= 0", null);
+        }
+
+        this.subscriptionAutomatically(mq.getTopic());
+
+        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
+
+        long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
+
+        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
+        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
+            mq,
+            subscriptionData.getSubString(),
+            subscriptionData.getExpressionType(),
+            isTagType ? 0L : subscriptionData.getSubVersion(),
+            offset,
+            maxNums,
+            sysFlag,
+            0,
+            this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis(),
+            timeoutMillis,
+            CommunicationMode.SYNC,
+            null
+        );
+        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
+        //If namespace not null , reset Topic without namespace.
+        this.resetTopic(pullResult.getMsgFoundList());
+        if (!this.consumeMessageHookList.isEmpty()) {
+            ConsumeMessageContext consumeMessageContext = null;
+            consumeMessageContext = new ConsumeMessageContext();
+            consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
+            consumeMessageContext.setConsumerGroup(this.groupName());
+            consumeMessageContext.setMq(mq);
+            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
+            consumeMessageContext.setSuccess(false);
+            this.executeHookBefore(consumeMessageContext);
+            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
+            consumeMessageContext.setSuccess(true);
+            this.executeHookAfter(consumeMessageContext);
+        }
+        return pullResult;
+    }
+
+    private void executeHookBefore(final ConsumeMessageContext context) {
+        if (!this.consumeMessageHookList.isEmpty()) {
+            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+                try {
+                    hook.consumeMessageBefore(context);
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    private void executeHookAfter(final ConsumeMessageContext context) {
+        if (!this.consumeMessageHookList.isEmpty()) {
+            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+                try {
+                    hook.consumeMessageAfter(context);
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public void resetTopic(List<MessageExt> msgList) {
+        if (null == msgList || msgList.size() == 0) {
+            return;
+        }
+
+        //If namespace not null , reset Topic without namespace.
+        for (MessageExt messageExt : msgList) {
+            if (null != this.defaultLitePullConsumer.getNamespace()) {
+                messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.defaultLitePullConsumer.getNamespace()));
+            }
+        }
+
+    }
+
+    public void subscriptionAutomatically(final String topic) {
+        if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
+            try {
+                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
+                    topic, SubscriptionData.SUB_ALL);
+                this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
+            } catch (Exception ignore) {
+            }
+        }
+    }
+
+    public void updateConsumeOffset(MessageQueue mq, long offset) {
+        checkServiceState();
+        this.offsetStore.updateOffset(mq, offset, false);
+    }
+
+    @Override
+    public String groupName() {
+        return this.defaultLitePullConsumer.getConsumerGroup();
+    }
+
+    @Override
+    public MessageModel messageModel() {
+        return this.defaultLitePullConsumer.getMessageModel();
+    }
+
+    @Override
+    public ConsumeType consumeType() {
+        return ConsumeType.CONSUME_ACTIVELY;
+    }
+
+    @Override
+    public ConsumeFromWhere consumeFromWhere() {
+        return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+    }
+
+    @Override
+    public Set<SubscriptionData> subscriptions() {
+        Set<SubscriptionData> result = new HashSet<SubscriptionData>();
+
+        Set<String> topics = this.defaultLitePullConsumer.getRegisterTopics();
+        if (topics != null) {
+            synchronized (topics) {
+                for (String t : topics) {
+                    SubscriptionData ms = null;
+                    try {
+                        ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);
+                    } catch (Exception e) {
+                        log.error("parse subscription error", e);
+                    }
+                    ms.setSubVersion(0L);
+                    result.add(ms);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public void doRebalance() {
+        if (this.rebalanceImpl != null) {
+            this.rebalanceImpl.doRebalance(false);
+        }
+    }
+
+    @Override
+    public void persistConsumerOffset() {
+        try {
+            checkServiceState();
+            Set<MessageQueue> mqs = new HashSet<MessageQueue>();
+            Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
+            mqs.addAll(allocateMq);
+            this.offsetStore.persistAll(mqs);
+        } catch (Exception e) {
+            log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
+        }
+    }
+
+    @Override
+    public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
+        Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
+        if (subTable != null) {
+            if (subTable.containsKey(topic)) {
+                this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
+            }
+        }
+    }
+
+    @Override
+    public boolean isSubscribeTopicNeedUpdate(String topic) {
+        Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
+        if (subTable != null) {
+            if (subTable.containsKey(topic)) {
+                return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean isUnitMode() {
+        return this.defaultLitePullConsumer.isUnitMode();
+    }
+
+    @Override
+    public ConsumerRunningInfo consumerRunningInfo() {
+        ConsumerRunningInfo info = new ConsumerRunningInfo();
+
+        Properties prop = MixAll.object2Properties(this.defaultLitePullConsumer);
+        prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
+        info.setProperties(prop);
+
+        info.getSubscriptionSet().addAll(this.subscriptions());
+        return info;
+    }
+
+    private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        sendMessageBack(msg, delayLevel, brokerName, this.defaultLitePullConsumer.getConsumerGroup());
+    }
+
+    private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+
+            if (UtilAll.isBlank(consumerGroup)) {
+                consumerGroup = this.defaultLitePullConsumer.getConsumerGroup();
+            }
+
+            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
+                this.defaultLitePullConsumer.getMaxReconsumeTimes());
+        } catch (Exception e) {
+            log.error("sendMessageBack Exception, " + this.defaultLitePullConsumer.getConsumerGroup(), e);
+
+            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultLitePullConsumer.getConsumerGroup()), msg.getBody());
+            String originMsgId = MessageAccessor.getOriginMessageId(msg);
+            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+            newMsg.setFlag(msg.getFlag());
+            MessageAccessor.setProperties(newMsg, msg.getProperties());
+            MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
+            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultLitePullConsumer.getMaxReconsumeTimes()));
+            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
+        } finally {
+            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultLitePullConsumer.getNamespace()));
+        }
+    }
+
+    private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
+        MQBrokerException, InterruptedException, MQClientException {
+        this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
+    }
+
+    public OffsetStore getOffsetStore() {
+        return offsetStore;
+    }
+
+    public DefaultLitePullConsumer getDefaultLitePullConsumer() {
+        return defaultLitePullConsumer;
+    }
+
+    public Set<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
+        checkServiceState();
+        // check if has info in memory, otherwise invoke api.
+        Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
+        if (null == result) {
+            result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
+        }
+
+        return parseMessageQueues(result);
+    }
+
+    private Set<MessageQueue> parseMessageQueues(Set<MessageQueue> queueSet) {
+        Set<MessageQueue> resultQueues = new HashSet<MessageQueue>();
+        for (MessageQueue messageQueue : queueSet) {
+            String userTopic = NamespaceUtil.withoutNamespace(messageQueue.getTopic(),
+                this.defaultLitePullConsumer.getNamespace());
+            resultQueues.add(new MessageQueue(userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId()));
+        }
+        return resultQueues;
+    }
+
+    public class ConsumeRequest {
+        private final List<MessageExt> messageExts;
+        private final MessageQueue messageQueue;
+        private final ProcessQueue processQueue;
+        private long startConsumeTimeMillis;
+
+        public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
+            final ProcessQueue processQueue) {
+            this.messageExts = messageExts;
+            this.messageQueue = messageQueue;
+            this.processQueue = processQueue;
+        }
+
+        public List<MessageExt> getMessageExts() {
+            return messageExts;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+
+        public ProcessQueue getProcessQueue() {
+            return processQueue;
+        }
+
+        public long getStartConsumeTimeMillis() {
+            return startConsumeTimeMillis;
+        }
+
+        public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
+            this.startConsumeTimeMillis = startConsumeTimeMillis;
+        }
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index bc0884a..3c98385 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -68,7 +68,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 
 public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     private final InternalLogger log = ClientLogger.getLog();
-    protected final DefaultMQPullConsumer defaultMQPullConsumer;
+    private final DefaultMQPullConsumer defaultMQPullConsumer;
     private final long consumerStartTimestamp = System.currentTimeMillis();
     private final RPCHook rpcHook;
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
@@ -77,7 +77,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     protected MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private OffsetStore offsetStore;
-    protected RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
+    private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
 
     public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
         this.defaultMQPullConsumer = defaultMQPullConsumer;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
deleted file mode 100644
index ab229e4..0000000
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.impl.consumer;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
-import org.apache.rocketmq.client.consumer.MessageQueueListener;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.filter.FilterAPI;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.remoting.RPCHook;
-
-public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
-
-    private final InternalLogger log = ClientLogger.getLog();
-
-    private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
-
-    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
-        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
-
-    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
-
-    private volatile Set<ConsumeRequest> consumedSet = new HashSet<ConsumeRequest>();
-
-    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
-
-    private final ScheduledExecutorService cleanExpireMsgExecutors;
-
-    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
-
-    private ScheduledExecutorService autoCommitExecutors;
-
-    private final ThreadLocal<ConsumeRequest> preConsumeRequestLocal = new ThreadLocal<ConsumeRequest>();
-
-    public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
-        super(defaultMQPullConsumer, rpcHook);
-        this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
-        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "Lite_CleanExpireMsgScheduledThread_"));
-        this.autoCommitExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "Lite_AutoCommitScheduledThread_"));
-
-    }
-
-    public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
-        this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
-        updatePullTask(topic, assignedMessageQueue);
-    }
-
-    public void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
-        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
-            if (next.getKey().getTopic().equals(topic)) {
-                if (!mqNewSet.contains(next.getKey())) {
-                    next.getValue().setCancelled(true);
-                    it.remove();
-                }
-            }
-        }
-
-        for (MessageQueue messageQueue : mqNewSet) {
-            if (!this.taskTable.containsKey(messageQueue)) {
-                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
-                this.taskTable.put(messageQueue, pullTask);
-                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
-    class MessageQueueListenerImpl implements MessageQueueListener {
-        @Override
-        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
-            MessageModel messageModel = defaultMQPullConsumer.getMessageModel();
-            switch (messageModel) {
-                case BROADCASTING:
-                    updateAssignedMessageQueue(topic, mqAll);
-                    break;
-                case CLUSTERING:
-                    updateAssignedMessageQueue(topic, mqDivided);
-                    break;
-                default:
-                    break;
-            }
-        }
-    }
-
-    int nextPullBatchNums() {
-        return Math.min(10, consumeRequestCache.remainingCapacity());
-    }
-
-    @Override
-    public synchronized void start() throws MQClientException {
-        this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
-        super.start();
-        final String group = this.defaultMQPullConsumer.getConsumerGroup();
-        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
-            this.defaultLiteMQPullConsumer.getPullThreadNumbers(),
-            new ThreadFactoryImpl("PullMsgThread-" + group)
-        );
-        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                cleanExpireMsg();
-            }
-        }, this.defaultLiteMQPullConsumer.getConsumeTimeout(), this.defaultLiteMQPullConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
-        this.autoCommitExecutors.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                if (defaultLiteMQPullConsumer.isAutoCommit()) {
-                    commitAll();
-                }
-            }
-        }, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
-        updateTopicSubscribeInfoWhenSubscriptionChanged();
-    }
-
-    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
-        Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
-        if (subTable != null) {
-            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
-                final String topic = entry.getKey();
-                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-            }
-        }
-    }
-
-    public List<MessageExt> poll(long timeout) {
-        try {
-            addToConsumed(preConsumeRequestLocal.get());
-            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
-            preConsumeRequestLocal.set(consumeRequest);
-            if (consumeRequest != null) {
-                List<MessageExt> messages = consumeRequest.getMessageExts();
-                for (MessageExt messageExt : messages) {
-                    MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
-                }
-                consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
-                return messages;
-            }
-        } catch (InterruptedException e) {
-            log.error("poll ComsumeRequest error.", e);
-        }
-        return null;
-    }
-
-    public void pause(Collection<MessageQueue> messageQueues) {
-        assignedMessageQueue.pause(messageQueues);
-    }
-
-    public void resume(Collection<MessageQueue> messageQueues) {
-        assignedMessageQueue.resume(messageQueues);
-    }
-
-    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
-        this.updatePullOffset(messageQueue, offset);
-        try {
-            updateConsumeOffset(messageQueue, offset);
-        } catch (MQClientException ex) {
-            log.error("Seek offset to remote message queue error!", ex);
-            throw ex;
-        }
-    }
-
-    public void unsubscribe(final String topic) {
-        super.unsubscribe(topic);
-        removePullTaskCallback(topic);
-        assignedMessageQueue.removeAssignedMessageQueue(topic);
-    }
-
-    public void removePullTaskCallback(final String topic) {
-        removePullTask(topic);
-    }
-
-    public void removePullTask(final String topic) {
-        synchronized (this.taskTable) {
-            Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
-            while (it.hasNext()) {
-                Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
-                if (next.getKey().getTopic().equals(topic)) {
-                    next.getValue().setCancelled(true);
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    public void commitSync() {
-        addToConsumed(preConsumeRequestLocal.get());
-        preConsumeRequestLocal.set(null);
-        commitAll();
-    }
-
-    public void commitAll() {
-        Set<ConsumeRequest> consumedRequests;
-        synchronized (this.consumedSet) {
-            consumedRequests = this.consumedSet;
-            this.consumedSet = new HashSet<ConsumeRequest>();
-        }
-        for (ConsumeRequest consumeRequest : consumedRequests) {
-            consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts);
-        }
-        Set<Map.Entry<MessageQueue, ProcessQueue>> entrySet = this.rebalanceImpl.getProcessQueueTable().entrySet();
-        for (Map.Entry<MessageQueue, ProcessQueue> entry : entrySet) {
-            try {
-                long consumeOffset = entry.getValue().getConsumeOffset();
-                if (consumeOffset != -1)
-                    updateConsumeOffset(entry.getKey(), consumeOffset);
-            } catch (MQClientException e) {
-                log.error("A error occurred in update consume offset process.", e);
-            }
-        }
-        this.getOffsetStore().persistAll(this.rebalanceImpl.getProcessQueueTable().keySet());
-    }
-
-    private void commit(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
-        long offset = processQueue.removeMessage(Collections.singletonList(messageExt));
-        try {
-            updateConsumeOffset(messageQueue, offset);
-        } catch (MQClientException e) {
-            log.error("An error occurred in update consume offset process.", e);
-        }
-    }
-
-    public void subscribe(String topic, String subExpression) throws MQClientException {
-        try {
-            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(),
-                topic, subExpression);
-            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
-            if (this.mQClientFactory != null) {
-                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-            }
-        } catch (Exception e) {
-            throw new MQClientException("subscription exception", e);
-        }
-    }
-
-    private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
-        try {
-            assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
-        } catch (MQClientException e) {
-            log.error("A error occurred in update consume: {} offset process.", remoteQueue, e);
-        }
-    }
-
-    private void addToConsumed(ConsumeRequest consumeRequest) {
-        if (consumeRequest != null) {
-            synchronized (this.consumedSet) {
-                if (!consumedSet.contains(consumeRequest))
-                    consumedSet.add(consumeRequest);
-            }
-        }
-    }
-
-    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
-        try {
-            consumeRequestCache.put(consumeRequest);
-        } catch (InterruptedException ex) {
-            log.error("Submit consumeRequest error", ex);
-        }
-    }
-
-    private long nextPullOffset(MessageQueue remoteQueue) {
-        long offset = -1;
-        try {
-            offset = assignedMessageQueue.getNextOffset(remoteQueue);
-            if (offset == -1) {
-                offset = fetchConsumeOffset(remoteQueue, false);
-                assignedMessageQueue.updateNextOffset(remoteQueue, offset);
-            }
-        } catch (MQClientException e) {
-            log.error("An error occurred in fetch consume offset process.", e);
-        }
-        return offset;
-    }
-
-    private void cleanExpireMsg() {
-        for (final Map.Entry<MessageQueue, ProcessQueue> next : rebalanceImpl.getProcessQueueTable().entrySet()) {
-            ProcessQueue pq = next.getValue();
-            MessageQueue mq = next.getKey();
-            ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
-            if (lockTreeMap == null) {
-                log.error("Gets tree map lock in process queue error of message queue:", mq);
-                return;
-            }
-
-            TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
-
-            int loop = msgTreeMap.size();
-            for (int i = 0; i < loop; i++) {
-                MessageExt msg = null;
-                try {
-                    lockTreeMap.readLock().lockInterruptibly();
-                    try {
-                        if (!msgTreeMap.isEmpty()) {
-                            msg = msgTreeMap.firstEntry().getValue();
-                            if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
-                                > this.defaultLiteMQPullConsumer.getConsumeTimeout() * 60 * 1000) {
-                                //Expired, ack and remove it.
-                            } else {
-                                break;
-                            }
-                        } else {
-                            break;
-                        }
-                    } finally {
-                        lockTreeMap.readLock().unlock();
-                    }
-                } catch (InterruptedException e) {
-                    log.error("Gets expired message exception", e);
-                }
-
-                try {
-                    this.defaultMQPullConsumer.sendMessageBack(msg, 3);
-                    log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
-                        msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
-                    log.info("Send expired msg back.");
-                    commit(mq, pq, msg);
-                } catch (Exception e) {
-                    log.error("Send back expired msg exception", e);
-                }
-            }
-        }
-    }
-
-    private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
-        try {
-            return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true);
-        } catch (IllegalAccessException e) {
-            return null;
-        }
-    }
-
-    public class PullTaskImpl implements Runnable {
-        private final MessageQueue messageQueue;
-        private volatile boolean cancelled = false;
-
-        public PullTaskImpl(final MessageQueue messageQueue) {
-            this.messageQueue = messageQueue;
-        }
-
-        @Override
-        public void run() {
-            String topic = this.messageQueue.getTopic();
-            if (!this.isCancelled()) {
-                if (assignedMessageQueue.isPaused(messageQueue)) {
-                    scheduledThreadPoolExecutor.schedule(this, 1000, TimeUnit.MILLISECONDS);
-                    log.debug("Message Queue: {} has been paused!", messageQueue);
-                    return;
-                }
-                SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
-                long offset = nextPullOffset(messageQueue);
-                long pullDelayTimeMills = 0;
-                try {
-                    PullResult pullResult = pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
-                    ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue);
-                    switch (pullResult.getPullStatus()) {
-                        case FOUND:
-                            if (processQueue != null) {
-                                processQueue.putMessage(pullResult.getMsgFoundList());
-                                submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
-                            }
-                            break;
-                        default:
-                            break;
-                    }
-                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
-                } catch (Throwable e) {
-                    pullDelayTimeMills = 1000;
-                    e.printStackTrace();
-                    log.error("An error occurred in pull message process.", e);
-                }
-
-                if (!this.isCancelled()) {
-                    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
-                } else {
-                    log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
-                }
-            }
-        }
-
-        public boolean isCancelled() {
-            return cancelled;
-        }
-
-        public void setCancelled(boolean cancelled) {
-            this.cancelled = cancelled;
-        }
-
-        public MessageQueue getMessageQueue() {
-            return messageQueue;
-        }
-    }
-
-    public class ConsumeRequest {
-        private final List<MessageExt> messageExts;
-        private final MessageQueue messageQueue;
-        private final ProcessQueue processQueue;
-        private long startConsumeTimeMillis;
-
-        public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
-            final ProcessQueue processQueue) {
-            this.messageExts = messageExts;
-            this.messageQueue = messageQueue;
-            this.processQueue = processQueue;
-        }
-
-        public List<MessageExt> getMessageExts() {
-            return messageExts;
-        }
-
-        public MessageQueue getMessageQueue() {
-            return messageQueue;
-        }
-
-        public ProcessQueue getProcessQueue() {
-            return processQueue;
-        }
-
-        public long getStartConsumeTimeMillis() {
-            return startConsumeTimeMillis;
-        }
-
-        public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
-            this.startConsumeTimeMillis = startConsumeTimeMillis;
-        }
-    }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index e9a1c72..092da9a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -433,14 +433,4 @@ public class ProcessQueue {
         this.lastConsumeTimestamp = lastConsumeTimestamp;
     }
 
-    public long getConsumeOffset() {
-
-        if (msgTreeMap.isEmpty() && queueOffsetMax == 0L)
-            return -1;
-
-        if (!msgTreeMap.isEmpty())
-            return msgTreeMap.firstKey();
-        else
-            return queueOffsetMax + 1;
-    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
new file mode 100644
index 0000000..8148c7d
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -0,0 +1,68 @@
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.util.List;
+import java.util.Set;
+
+public class RebalanceLitePullImpl extends RebalanceImpl  {
+
+    private final DefaultLitePullConsumerImpl litePullConsumerImpl;
+
+    public RebalanceLitePullImpl(DefaultLitePullConsumerImpl litePullConsumerImpl) {
+        this(null, null, null, null, litePullConsumerImpl);
+    }
+
+    public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel,
+                                 AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+                                 MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
+        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
+        this.litePullConsumerImpl = litePullConsumerImpl;
+    }
+
+    @Override
+    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
+        if (messageQueueListener != null) {
+            try {
+                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+            } catch (Throwable e) {
+                log.error("messageQueueChanged exception", e);
+            }
+        }
+    }
+
+
+    @Override
+    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
+        this.litePullConsumerImpl.getOffsetStore().persist(mq);
+        this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
+        return true;
+    }
+
+    @Override
+    public ConsumeType consumeType() {
+        return ConsumeType.CONSUME_ACTIVELY;
+    }
+
+    @Override
+    public void removeDirtyOffset(final MessageQueue mq) {
+        this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
+    }
+
+    @Override
+    public long computePullFromWhere(MessageQueue mq) {
+        return 0;
+    }
+
+    @Override
+    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+    }
+
+
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
index 215763b..488a499 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -16,34 +16,24 @@
  */
 package org.apache.rocketmq.example.simple;
 
-import java.util.Arrays;
 import java.util.List;
-import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
+
 
 public class LitePullConsumerTest {
     public static void main(String[] args) throws Exception {
-        DefaultLiteMQPullConsumer litePullConsumer = new DefaultLiteMQPullConsumer("test", null);
+        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("test");
         litePullConsumer.setNamesrvAddr("localhost:9876");
-        litePullConsumer.subscribe("litepullconsumertest9", null);
+        litePullConsumer.setAutoCommit(true);
+        litePullConsumer.subscribe("test41","TagA" );
         litePullConsumer.start();
-        MessageQueue messageQueue = new MessageQueue("test", "IT-C02YW28FLVDL.local", 1);
+
         int i = 0;
         while (true) {
             List<MessageExt> messageExts = litePullConsumer.poll();
             System.out.printf("%s%n", messageExts);
-            i++;
-            if (i == 3) {
-                System.out.printf("pause%n");
-                litePullConsumer.pause(Arrays.asList(messageQueue));
-            }
-            if (i == 10) {
-                System.out.printf("resume%n");
-                litePullConsumer.resume(Arrays.asList(messageQueue));
-            }
-//
-            litePullConsumer.commitSync();
         }
+
     }
 }


Mime
View raw message