rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [35/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
new file mode 100644
index 0000000..63fda5d
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.common;
+
+import java.util.Random;
+
+public class ThreadLocalIndex {
+    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
+    private final Random random = new Random();
+    public ThreadLocalIndex(int value) {
+
+    }
+
+    public int getAndIncrement() {
+        Integer index = this.threadLocalIndex.get();
+        if (null == index) {
+            index = Math.abs(random.nextInt());
+            if (index < 0) index = 0;
+            this.threadLocalIndex.set(index);
+        }
+
+        index = Math.abs(index + 1);
+        if (index < 0)
+            index = 0;
+
+        this.threadLocalIndex.set(index);
+        return index;
+    }
+
+    @Override
+    public String toString() {
+        return "ThreadLocalIndex{" +
+                "threadLocalIndex=" + threadLocalIndex.get() +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
new file mode 100644
index 0000000..4d70167
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * Strategy Algorithm for message allocating between consumers
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public interface AllocateMessageQueueStrategy {
+
+    /**
+     * Allocating by consumer id
+     *
+     * @param consumerGroup
+     *         current consumer group
+     * @param currentCID
+     *         current consumer id
+     * @param mqAll
+     *         message queue set in current topic
+     * @param cidAll
+     *         consumer set in current consumer group
+     *
+     * @return The allocate result of given strategy
+     */
+    List<MessageQueue> allocate(
+            final String consumerGroup,
+            final String currentCID,
+            final List<MessageQueue> mqAll,
+            final List<String> cidAll
+    );
+
+
+    /**
+     * Algorithm name
+     *
+     * @return The strategy name
+     */
+    String getName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
new file mode 100644
index 0000000..96040ae
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.client.ClientConfig;
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Default pulling consumer
+ *
+ * @author shijia.wxr
+ */
+public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
+    protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
+
+    /**
+     * Do the same thing for the same Group, the application must be set,and
+     * guarantee Globally unique
+     */
+    private String consumerGroup;
+    /**
+     * Long polling mode, the Consumer connection max suspend time, it is not
+     * recommended to modify
+     */
+    private long brokerSuspendMaxTimeMillis = 1000 * 20;
+    /**
+     * Long polling mode, the Consumer connection timeout(must greater than
+     * brokerSuspendMaxTimeMillis), it is not recommended to modify
+     */
+    private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
+    /**
+     * The socket timeout in milliseconds
+     */
+    private long consumerPullTimeoutMillis = 1000 * 10;
+    /**
+     * Consumption pattern,default is clustering
+     */
+    private MessageModel messageModel = MessageModel.CLUSTERING;
+    /**
+     * Message queue listener
+     */
+    private MessageQueueListener messageQueueListener;
+    /**
+     * Offset Storage
+     */
+    private OffsetStore offsetStore;
+    /**
+     * Topic set you want to register
+     */
+    private Set<String> registerTopics = new HashSet<String>();
+    /**
+     * Queue allocation algorithm
+     */
+    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
+    /**
+     * Whether the unit of subscription group
+     */
+    private boolean unitMode = false;
+
+    private int maxReconsumeTimes = 16;
+
+
+    public DefaultMQPullConsumer() {
+        this(MixAll.DEFAULT_CONSUMER_GROUP, null);
+    }
+
+
+    public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
+        this.consumerGroup = consumerGroup;
+        defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
+    }
+
+
+    public DefaultMQPullConsumer(final String consumerGroup) {
+        this(consumerGroup, null);
+    }
+
+
+    public DefaultMQPullConsumer(RPCHook rpcHook) {
+        this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+        this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
+    }
+
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.maxOffset(mq);
+    }
+
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.minOffset(mq);
+    }
+
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
+    }
+
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+            throws MQClientException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
+    }
+
+
+    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+        return allocateMessageQueueStrategy;
+    }
+
+
+    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+    }
+
+
+    public long getBrokerSuspendMaxTimeMillis() {
+        return brokerSuspendMaxTimeMillis;
+    }
+
+
+    public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) {
+        this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public long getConsumerPullTimeoutMillis() {
+        return consumerPullTimeoutMillis;
+    }
+
+
+    public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
+        this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
+    }
+
+
+    public long getConsumerTimeoutMillisWhenSuspend() {
+        return consumerTimeoutMillisWhenSuspend;
+    }
+
+
+    public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
+        this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
+    }
+
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+
+    public MessageQueueListener getMessageQueueListener() {
+        return messageQueueListener;
+    }
+
+
+    public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
+        this.messageQueueListener = messageQueueListener;
+    }
+
+
+    public Set<String> getRegisterTopics() {
+        return registerTopics;
+    }
+
+
+    public void setRegisterTopics(Set<String> registerTopics) {
+        this.registerTopics = registerTopics;
+    }
+
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
+    }
+
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
+    }
+
+    @Override
+    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic);
+    }
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultMQPullConsumerImpl.start();
+    }
+
+    @Override
+    public void shutdown() {
+        this.defaultMQPullConsumerImpl.shutdown();
+    }
+
+    @Override
+    public void registerMessageQueueListener(String topic, MessageQueueListener listener) {
+        synchronized (this.registerTopics) {
+            this.registerTopics.add(topic);
+            if (listener != null) {
+                this.messageQueueListener = listener;
+            }
+        }
+    }
+
+    @Override
+    public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums);
+    }
+
+    @Override
+    public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
+    }
+
+    @Override
+    public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
+    }
+
+    @Override
+    public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
+    }
+
+    @Override
+    public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums);
+    }
+
+    @Override
+    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback);
+    }
+
+    @Override
+    public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
+        this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset);
+    }
+
+    @Override
+    public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, fromStore);
+    }
+
+    @Override
+    public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException {
+        return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic);
+    }
+
+    @Override
+    public MessageExt viewMessage(String topic, String uniqKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            MessageDecoder.decodeMessageId(uniqKey);
+            return this.viewMessage(uniqKey);
+        } catch (Exception e) {
+        }
+        return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, uniqKey);
+    }
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup);
+    }
+
+    public OffsetStore getOffsetStore() {
+        return offsetStore;
+    }
+
+
+    public void setOffsetStore(OffsetStore offsetStore) {
+        this.offsetStore = offsetStore;
+    }
+
+
+    public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
+        return defaultMQPullConsumerImpl;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean isUnitMode) {
+        this.unitMode = isUnitMode;
+    }
+
+
+    public int getMaxReconsumeTimes() {
+        return maxReconsumeTimes;
+    }
+
+
+    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
+        this.maxReconsumeTimes = maxReconsumeTimes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
new file mode 100644
index 0000000..f37e982
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -0,0 +1,519 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.client.ClientConfig;
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
+ *
+ * @author shijia.wxr
+ */
+public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
+    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+    /**
+     * Do the same thing for the same Group, the application must be set,and
+     * guarantee Globally unique
+     */
+    private String consumerGroup;
+    /**
+     * Consumption pattern,default is clustering
+     */
+    private MessageModel messageModel = MessageModel.CLUSTERING;
+    /**
+     * Consumption offset
+     */
+    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+    /**
+     * Backtracking consumption time with second precision.time format is
+     * 20131223171201<br>
+     * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
+     * Default backtracking consumption time Half an hour ago
+     */
+    private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
+    /**
+     * Queue allocation algorithm
+     */
+    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
+
+    /**
+     * Subscription relationship
+     */
+    private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
+    /**
+     * Message listener
+     */
+    private MessageListener messageListener;
+    /**
+     * Offset Storage
+     */
+    private OffsetStore offsetStore;
+    /**
+     * Minimum consumer thread number
+     */
+    private int consumeThreadMin = 20;
+    /**
+     * Max consumer thread number
+     */
+    private int consumeThreadMax = 64;
+
+    /**
+     * Threshold for dynamic adjustment of the number of thread pool
+     */
+    private long adjustThreadPoolNumsThreshold = 100000;
+
+    /**
+     * Concurrently max span offset.it has no effect on sequential consumption
+     */
+    private int consumeConcurrentlyMaxSpan = 2000;
+    /**
+     * Flow control threshold
+     */
+    private int pullThresholdForQueue = 1000;
+    /**
+     * Message pull Interval
+     */
+    private long pullInterval = 0;
+    /**
+     * Batch consumption size
+     */
+    private int consumeMessageBatchMaxSize = 1;
+    /**
+     * Batch pull size
+     */
+    private int pullBatchSize = 32;
+
+    /**
+     * Whether update subscription relationship when every pull
+     */
+    private boolean postSubscriptionWhenPull = false;
+
+    /**
+     * Whether the unit of subscription group
+     */
+    private boolean unitMode = false;
+
+    private int maxReconsumeTimes = -1;
+    private long suspendCurrentQueueTimeMillis = 1000;
+    private long consumeTimeout = 15;
+
+
+    public DefaultMQPushConsumer() {
+        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
+    }
+
+
+    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
+        this.consumerGroup = consumerGroup;
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
+    }
+
+
+    public DefaultMQPushConsumer(RPCHook rpcHook) {
+        this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
+    }
+
+
+    public DefaultMQPushConsumer(final String consumerGroup) {
+        this(consumerGroup, null, new AllocateMessageQueueAveragely());
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+        this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
+    }
+
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPushConsumerImpl.maxOffset(mq);
+    }
+
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPushConsumerImpl.minOffset(mq);
+    }
+
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
+    }
+
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+            throws MQClientException, InterruptedException {
+        return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
+    }
+
+    @Override
+    public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            MessageDecoder.decodeMessageId(msgId);
+            return this.viewMessage(msgId);
+        } catch (Exception e) {
+        }
+        return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId);
+    }
+
+    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+        return allocateMessageQueueStrategy;
+    }
+
+
+    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+    }
+
+
+    public int getConsumeConcurrentlyMaxSpan() {
+        return consumeConcurrentlyMaxSpan;
+    }
+
+
+    public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
+        this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
+    }
+
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+
+    public int getConsumeMessageBatchMaxSize() {
+        return consumeMessageBatchMaxSize;
+    }
+
+
+    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
+        this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public int getConsumeThreadMax() {
+        return consumeThreadMax;
+    }
+
+
+    public void setConsumeThreadMax(int consumeThreadMax) {
+        this.consumeThreadMax = consumeThreadMax;
+    }
+
+
+    public int getConsumeThreadMin() {
+        return consumeThreadMin;
+    }
+
+
+    public void setConsumeThreadMin(int consumeThreadMin) {
+        this.consumeThreadMin = consumeThreadMin;
+    }
+
+
+    public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
+        return defaultMQPushConsumerImpl;
+    }
+
+
+    public MessageListener getMessageListener() {
+        return messageListener;
+    }
+
+
+    public void setMessageListener(MessageListener messageListener) {
+        this.messageListener = messageListener;
+    }
+
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+
+    public int getPullBatchSize() {
+        return pullBatchSize;
+    }
+
+
+    public void setPullBatchSize(int pullBatchSize) {
+        this.pullBatchSize = pullBatchSize;
+    }
+
+
+    public long getPullInterval() {
+        return pullInterval;
+    }
+
+
+    public void setPullInterval(long pullInterval) {
+        this.pullInterval = pullInterval;
+    }
+
+
+    public int getPullThresholdForQueue() {
+        return pullThresholdForQueue;
+    }
+
+
+    public void setPullThresholdForQueue(int pullThresholdForQueue) {
+        this.pullThresholdForQueue = pullThresholdForQueue;
+    }
+
+
+    public Map<String, String> getSubscription() {
+        return subscription;
+    }
+
+
+    public void setSubscription(Map<String, String> subscription) {
+        this.subscription = subscription;
+    }
+
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
+    }
+
+
+    @Override
+    public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
+    }
+
+
+    @Override
+    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
+        return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
+    }
+
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultMQPushConsumerImpl.start();
+    }
+
+
+    @Override
+    public void shutdown() {
+        this.defaultMQPushConsumerImpl.shutdown();
+    }
+
+
+    @Override
+    @Deprecated
+    public void registerMessageListener(MessageListener messageListener) {
+        this.messageListener = messageListener;
+        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
+    }
+
+
+    @Override
+    public void registerMessageListener(MessageListenerConcurrently messageListener) {
+        this.messageListener = messageListener;
+        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
+    }
+
+
+    @Override
+    public void registerMessageListener(MessageListenerOrderly messageListener) {
+        this.messageListener = messageListener;
+        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
+    }
+
+
+    @Override
+    public void subscribe(String topic, String subExpression) throws MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
+    }
+
+
+    @Override
+    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
+    }
+
+
+    @Override
+    public void unsubscribe(String topic) {
+        this.defaultMQPushConsumerImpl.unsubscribe(topic);
+    }
+
+
+    @Override
+    public void updateCorePoolSize(int corePoolSize) {
+        this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
+    }
+
+
+    @Override
+    public void suspend() {
+        this.defaultMQPushConsumerImpl.suspend();
+    }
+
+
+    @Override
+    public void resume() {
+        this.defaultMQPushConsumerImpl.resume();
+    }
+
+
+    public OffsetStore getOffsetStore() {
+        return offsetStore;
+    }
+
+
+    public void setOffsetStore(OffsetStore offsetStore) {
+        this.offsetStore = offsetStore;
+    }
+
+
+    public String getConsumeTimestamp() {
+        return consumeTimestamp;
+    }
+
+
+    public void setConsumeTimestamp(String consumeTimestamp) {
+        this.consumeTimestamp = consumeTimestamp;
+    }
+
+
+    public boolean isPostSubscriptionWhenPull() {
+        return postSubscriptionWhenPull;
+    }
+
+
+    public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) {
+        this.postSubscriptionWhenPull = postSubscriptionWhenPull;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean isUnitMode) {
+        this.unitMode = isUnitMode;
+    }
+
+
+    public long getAdjustThreadPoolNumsThreshold() {
+        return adjustThreadPoolNumsThreshold;
+    }
+
+
+    public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {
+        this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
+    }
+
+
+    public int getMaxReconsumeTimes() {
+        return maxReconsumeTimes;
+    }
+
+
+    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
+        this.maxReconsumeTimes = maxReconsumeTimes;
+    }
+
+
+    public long getSuspendCurrentQueueTimeMillis() {
+        return suspendCurrentQueueTimeMillis;
+    }
+
+
+    public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
+        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+    }
+
+
+    public long getConsumeTimeout() {
+        return consumeTimeout;
+    }
+
+    public void setConsumeTimeout(final long consumeTimeout) {
+        this.consumeTimeout = consumeTimeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
new file mode 100644
index 0000000..2a46b65
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.client.MQAdmin;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Set;
+
+
+/**
+ * Message queue consumer interface
+ *
+ * @author shijia.wxr
+ */
+public interface MQConsumer extends MQAdmin {
+    /**
+     * If consuming failure,message will be send back to the brokers,and delay consuming some time
+     *
+     * @param msg
+     * @param delayLevel
+     *
+     * @throws InterruptedException
+     * @throws MQBrokerException
+     * @throws RemotingException
+     * @throws MQClientException
+     */
+    @Deprecated
+    void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException;
+
+
+    /**
+     * If consuming failure,message will be send back to the broker,and delay consuming some time
+     *
+     * @param msg
+     * @param delayLevel
+     * @param brokerName
+     *
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     * @throws MQClientException
+     */
+    void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
+
+    /**
+     * Fetch message queues from consumer cache according to the topic
+     *
+     * @param topic
+     *         message topic
+     *
+     * @return queue set
+     *
+     * @throws MQClientException
+     */
+    Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java
new file mode 100644
index 0000000..1125d09
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Set;
+
+
+/**
+ * Pulling consumer interface
+ *
+ * @author shijia.wxr
+ */
+public interface MQPullConsumer extends MQConsumer {
+    /**
+     * Start the consumer
+     *
+     * @throws MQClientException
+     */
+    void start() throws MQClientException;
+
+
+    /**
+     * Shutdown the consumer
+     */
+    void shutdown();
+
+
+    /**
+     * Register the message queue listener
+     *
+     * @param topic
+     * @param listener
+     */
+    void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
+
+
+    /**
+     * Pulling the messages,not blocking
+     *
+     * @param mq
+     *         from which message queue
+     * @param subExpression
+     *         subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
+     *         if null or * expression,meaning subscribe all
+     * @param offset
+     *         from where to pull
+     * @param maxNums
+     *         max pulling numbers
+     *
+     * @return The resulting {@code PullRequest}
+     *
+     * @throws MQClientException
+     * @throws InterruptedException
+     * @throws MQBrokerException
+     * @throws RemotingException
+     */
+    PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
+                    final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
+            InterruptedException;
+
+
+    /**
+     * Pulling the messages in the specified timeout
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     * @param timeout
+     *
+     * @return The resulting {@code PullRequest}
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     */
+    PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
+                    final int maxNums, final long timeout) throws MQClientException, RemotingException,
+            MQBrokerException, InterruptedException;
+
+
+    /**
+     * Pulling the messages in a async. way
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     * @param pullCallback
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
+              final PullCallback pullCallback) throws MQClientException, RemotingException,
+            InterruptedException;
+
+    /**
+     * Pulling the messages in a async. way
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     * @param pullCallback
+     * @param timeout
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
+              final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    /**
+     * Pulling the messages,if no message arrival,blocking some time
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     *
+     * @return The resulting {@code PullRequest}
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     */
+    PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression,
+                                   final long offset, final int maxNums) throws MQClientException, RemotingException,
+            MQBrokerException, InterruptedException;
+
+
+    /**
+     * Pulling the messages through callback function,if no message arrival,blocking.
+     *
+     * @param mq
+     * @param subExpression
+     * @param offset
+     * @param maxNums
+     * @param pullCallback
+     *
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,
+                             final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    /**
+     * Update the offset
+     *
+     * @param mq
+     * @param offset
+     *
+     * @throws MQClientException
+     */
+    void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException;
+
+
+    /**
+     * Fetch the offset
+     *
+     * @param mq
+     * @param fromStore
+     *
+     * @return The fetched offset of given queue
+     *
+     * @throws MQClientException
+     */
+    long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException;
+
+
+    /**
+     * Fetch the message queues according to the topic
+     *
+     * @param topic
+     *         message topic
+     *
+     * @return message queue set
+     *
+     * @throws MQClientException
+     */
+    Set<MessageQueue> fetchMessageQueuesInBalance(final String topic) throws MQClientException;
+
+    /**
+     * If consuming failure,message will be send back to the broker,and delay consuming in some time later.<br>
+     * Mind! message can only be consumed in the same group.
+     *
+     * @param msg
+     * @param delayLevel
+     * @param brokerName
+     * @param consumerGroup
+     *
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     * @throws MQClientException
+     */
+    void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java
new file mode 100644
index 0000000..d68b559
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Schedule service for pull consumer
+ *
+ * @author shijia.wxr
+ */
+public class MQPullConsumerScheduleService {
+    private final Logger log = ClientLogger.getLog();
+    private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
+    private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
+            new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+    private DefaultMQPullConsumer defaultMQPullConsumer;
+    private int pullThreadNums = 20;
+    private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =
+            new ConcurrentHashMap<String, PullTaskCallback>();
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+    public MQPullConsumerScheduleService(final String consumerGroup) {
+        this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
+        this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
+    }
+
+    public void putTask(String topic, Set<MessageQueue> mqNewSet) {
+        Iterator<Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, PullTaskImpl> next = it.next();
+            if (next.getKey().getTopic().equals(topic)) {
+                if (!mqNewSet.contains(next.getKey())) {
+                    next.getValue().setCancelled(true);
+                    it.remove();
+                }
+            }
+        }
+
+        for (MessageQueue mq : mqNewSet) {
+            if (!this.taskTable.containsKey(mq)) {
+                PullTaskImpl command = new PullTaskImpl(mq);
+                this.taskTable.put(mq, command);
+                this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS);
+
+            }
+        }
+    }
+
+    public void start() throws MQClientException {
+        final String group = this.defaultMQPullConsumer.getConsumerGroup();
+        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+                this.pullThreadNums,
+                new ThreadFactoryImpl("PullMsgThread-" + group)
+        );
+
+        this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);
+
+        this.defaultMQPullConsumer.start();
+
+        log.info("MQPullConsumerScheduleService start OK, {} {}",
+                this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);
+    }
+
+    public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
+        this.callbackTable.put(topic, callback);
+        this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
+    }
+
+    public void shutdown() {
+        if (this.scheduledThreadPoolExecutor != null) {
+            this.scheduledThreadPoolExecutor.shutdown();
+        }
+
+        if (this.defaultMQPullConsumer != null) {
+            this.defaultMQPullConsumer.shutdown();
+        }
+    }
+
+    public ConcurrentHashMap<String, PullTaskCallback> getCallbackTable() {
+        return callbackTable;
+    }
+
+    public void setCallbackTable(ConcurrentHashMap<String, PullTaskCallback> callbackTable) {
+        this.callbackTable = callbackTable;
+    }
+
+    public int getPullThreadNums() {
+        return pullThreadNums;
+    }
+
+    public void setPullThreadNums(int pullThreadNums) {
+        this.pullThreadNums = pullThreadNums;
+    }
+
+    public DefaultMQPullConsumer getDefaultMQPullConsumer() {
+        return defaultMQPullConsumer;
+    }
+
+    public void setDefaultMQPullConsumer(DefaultMQPullConsumer defaultMQPullConsumer) {
+        this.defaultMQPullConsumer = defaultMQPullConsumer;
+    }
+
+    public MessageModel getMessageModel() {
+        return this.defaultMQPullConsumer.getMessageModel();
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.defaultMQPullConsumer.setMessageModel(messageModel);
+    }
+
+    class MessageQueueListenerImpl implements MessageQueueListener {
+        @Override
+        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+            MessageModel messageModel =
+                    MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
+            switch (messageModel) {
+                case BROADCASTING:
+                    MQPullConsumerScheduleService.this.putTask(topic, mqAll);
+                    break;
+                case CLUSTERING:
+                    MQPullConsumerScheduleService.this.putTask(topic, mqDivided);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    class PullTaskImpl implements Runnable {
+        private final MessageQueue messageQueue;
+        private volatile boolean cancelled = false;
+
+
+        public PullTaskImpl(final MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+
+        @Override
+        public void run() {
+            String topic = this.messageQueue.getTopic();
+            if (!this.isCancelled()) {
+                PullTaskCallback pullTaskCallback =
+                        MQPullConsumerScheduleService.this.callbackTable.get(topic);
+                if (pullTaskCallback != null) {
+                    final PullTaskContext context = new PullTaskContext();
+                    context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer);
+                    try {
+                        pullTaskCallback.doPullTask(this.messageQueue, context);
+                    } catch (Throwable e) {
+                        context.setPullNextDelayTimeMillis(1000);
+                        log.error("doPullTask Exception", e);
+                    }
+
+                    if (!this.isCancelled()) {
+                        MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this,
+                                context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
+                    } else {
+                        log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
+                    }
+                } else {
+                    log.warn("Pull Task Callback not exist , {}", topic);
+                }
+            } else {
+                log.warn("The Pull Task is cancelled, {}", messageQueue);
+            }
+        }
+
+
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+
+        public void setCancelled(boolean cancelled) {
+            this.cancelled = cancelled;
+        }
+
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java
new file mode 100644
index 0000000..e47739d
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+
+
+/**
+ * Push consumer
+ *
+ * @author shijia.wxr
+ */
+public interface MQPushConsumer extends MQConsumer {
+    /**
+     * Start the consumer
+     *
+     * @throws MQClientException
+     */
+    void start() throws MQClientException;
+
+
+    /**
+     * Shutdown the consumer
+     */
+    void shutdown();
+
+
+    /**
+     * Register the message listener
+     *
+     * @param messageListener
+     */
+    @Deprecated
+    void registerMessageListener(MessageListener messageListener);
+
+
+    void registerMessageListener(final MessageListenerConcurrently messageListener);
+
+
+    void registerMessageListener(final MessageListenerOrderly messageListener);
+
+
+    /**
+     * Subscribe some topic
+     *
+     * @param topic
+     * @param subExpression
+     *         subscription expression.it only support or operation such as
+     *         "tag1 || tag2 || tag3" <br>
+     *         if null or * expression,meaning subscribe all
+     *
+     * @throws MQClientException
+     */
+    void subscribe(final String topic, final String subExpression) throws MQClientException;
+
+
+    /**
+     * Subscribe some topic
+     *
+     * @param topic
+     * @param fullClassName
+     *         full class name,must extend
+     *         com.alibaba.rocketmq.common.filter. MessageFilter
+     * @param filterClassSource
+     *         class source code,used UTF-8 file encoding,must be responsible
+     *         for your code safety
+     *
+     * @throws MQClientException
+     */
+    void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException;
+
+
+    /**
+     * Unsubscribe consumption some topic
+     *
+     * @param topic
+     *         message topic
+     */
+    void unsubscribe(final String topic);
+
+
+    /**
+     * Update the consumer thread pool size Dynamically
+     *
+     * @param corePoolSize
+     */
+    void updateCorePoolSize(int corePoolSize);
+
+
+    /**
+     * Suspend the consumption
+     */
+    void suspend();
+
+
+    /**
+     * Resume the consumption
+     */
+    void resume();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java
new file mode 100644
index 0000000..bb25a3a
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.Set;
+
+
+/**
+ * A MessageQueueListener is implemented by the application and may be specified when a message queue changed
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public interface MessageQueueListener {
+    /**
+     * @param topic
+     *         message topic
+     * @param mqAll
+     *         all queues in this message topic
+     * @param mqDivided
+     *         collection of queues,assigned to the current consumer
+     */
+    void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
+                             final Set<MessageQueue> mqDivided);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java
new file mode 100644
index 0000000..545cff2
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+/**
+ * Async message pulling interface
+ *
+ * @author shijia.wxr
+ */
+public interface PullCallback {
+    public void onSuccess(final PullResult pullResult);
+
+    public void onException(final Throwable e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java
new file mode 100644
index 0000000..b485243
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullResult {
+    private final PullStatus pullStatus;
+    private final long nextBeginOffset;
+    private final long minOffset;
+    private final long maxOffset;
+    private List<MessageExt> msgFoundList;
+
+
+    public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
+                      List<MessageExt> msgFoundList) {
+        super();
+        this.pullStatus = pullStatus;
+        this.nextBeginOffset = nextBeginOffset;
+        this.minOffset = minOffset;
+        this.maxOffset = maxOffset;
+        this.msgFoundList = msgFoundList;
+    }
+
+
+    public PullStatus getPullStatus() {
+        return pullStatus;
+    }
+
+
+    public long getNextBeginOffset() {
+        return nextBeginOffset;
+    }
+
+
+    public long getMinOffset() {
+        return minOffset;
+    }
+
+
+    public long getMaxOffset() {
+        return maxOffset;
+    }
+
+
+    public List<MessageExt> getMsgFoundList() {
+        return msgFoundList;
+    }
+
+
+    public void setMsgFoundList(List<MessageExt> msgFoundList) {
+        this.msgFoundList = msgFoundList;
+    }
+
+
+    @Override
+    public String toString() {
+        return "PullResult [pullStatus=" + pullStatus + ", nextBeginOffset=" + nextBeginOffset
+                + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList="
+                + (msgFoundList == null ? 0 : msgFoundList.size()) + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java
new file mode 100644
index 0000000..35166f3
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum PullStatus {
+    /**
+     * Founded
+     */
+    FOUND,
+    /**
+     * No new message can be pull
+     */
+    NO_NEW_MSG,
+    /**
+     * Filtering results can not match
+     */
+    NO_MATCHED_MSG,
+    /**
+     * Illegal offset,may be too big or too small
+     */
+    OFFSET_ILLEGAL
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java
new file mode 100644
index 0000000..19d5bfc
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+
+public interface PullTaskCallback {
+    public void doPullTask(final MessageQueue mq, final PullTaskContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java
new file mode 100644
index 0000000..72c57d6
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer;
+
+public class PullTaskContext {
+
+    private int pullNextDelayTimeMillis = 200;
+
+    private MQPullConsumer pullConsumer;
+
+
+    public int getPullNextDelayTimeMillis() {
+        return pullNextDelayTimeMillis;
+    }
+
+
+    public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) {
+        this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;
+    }
+
+
+    public MQPullConsumer getPullConsumer() {
+        return pullConsumer;
+    }
+
+
+    public void setPullConsumer(MQPullConsumer pullConsumer) {
+        this.pullConsumer = pullConsumer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
new file mode 100644
index 0000000..36fcf19
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer.listener;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * Consumer concurrent consumption context
+ *
+ * @author shijia.wxr
+ */
+public class ConsumeConcurrentlyContext {
+    private final MessageQueue messageQueue;
+    /**
+     * Message consume retry strategy<br>
+     * -1,no retry,put into DLQ directly<br>
+     * 0,broker control retry frequency<br>
+     * >0,client control retry frequency
+     */
+    private int delayLevelWhenNextConsume = 0;
+    private int ackIndex = Integer.MAX_VALUE;
+
+    public ConsumeConcurrentlyContext(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public int getDelayLevelWhenNextConsume() {
+        return delayLevelWhenNextConsume;
+    }
+
+
+    public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
+        this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public int getAckIndex() {
+        return ackIndex;
+    }
+
+
+    public void setAckIndex(int ackIndex) {
+        this.ackIndex = ackIndex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
new file mode 100644
index 0000000..d0d3bf4
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer.listener;
+
+/**
+ * @author shijia.wxr
+ */
+public enum ConsumeConcurrentlyStatus {
+    /**
+     * Success consumption
+     */
+    CONSUME_SUCCESS,
+    /**
+     * Failure consumption,later try to consume
+     */
+    RECONSUME_LATER;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
new file mode 100644
index 0000000..26a3892
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer.listener;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * Consumer Orderly consumption context
+ *
+ * @author shijia.wxr
+ */
+public class ConsumeOrderlyContext {
+    private final MessageQueue messageQueue;
+    private boolean autoCommit = true;
+    private long suspendCurrentQueueTimeMillis = -1;
+
+
+    public ConsumeOrderlyContext(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public boolean isAutoCommit() {
+        return autoCommit;
+    }
+
+
+    public void setAutoCommit(boolean autoCommit) {
+        this.autoCommit = autoCommit;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public long getSuspendCurrentQueueTimeMillis() {
+        return suspendCurrentQueueTimeMillis;
+    }
+
+
+    public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
+        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
new file mode 100644
index 0000000..e490c5c
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer.listener;
+
+/**
+ * @author shijia.wxr
+ */
+public enum ConsumeOrderlyStatus {
+    /**
+     * Success consumption
+     */
+    SUCCESS,
+    /**
+     * Rollback consumption(only for binlog consumption)
+     */
+    @Deprecated
+    ROLLBACK,
+    /**
+     * Commit offset(only for binlog consumption)
+     */
+    @Deprecated
+    COMMIT,
+    /**
+     * Suspend current queue a moment
+     */
+    SUSPEND_CURRENT_QUEUE_A_MOMENT;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java
new file mode 100644
index 0000000..44f998e
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.consumer.listener;
+
+/**
+ * Created by alvin on 16-11-30.
+ */
+public enum ConsumeReturnType {
+    /**
+     * consume return success
+     */
+    SUCCESS,
+    /**
+     * consume timeout ,even if success
+     */
+    TIME_OUT,
+    /**
+     * consume throw exception
+     */
+    EXCEPTION,
+    /**
+     * consume return null
+     */
+    RETURNNULL,
+    /**
+     * consume return failed
+     */
+    FAILED
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java
new file mode 100644
index 0000000..f34946e
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer.listener;
+
+/**
+ * A MessageListener object is used to receive asynchronously delivered messages.
+ *
+ * @author shijia.wxr
+ */
+public interface MessageListener {
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
new file mode 100644
index 0000000..f0b0c61
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer.listener;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently
+ *
+ * @author shijia.wxr
+ */
+public interface MessageListenerConcurrently extends MessageListener {
+    /**
+     * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
+     *
+     * @param msgs
+     *         msgs.size() >= 1<br>
+     *         DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
+     * @param context
+     *
+     * @return The consume status
+     */
+    ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
+                                             final ConsumeConcurrentlyContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java
new file mode 100644
index 0000000..d30cdfa
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.consumer.listener;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread
+ *
+ * @author shijia.wxr
+ */
+public interface MessageListenerOrderly extends MessageListener {
+    /**
+     * It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT if consumption failure
+     *
+     * @param msgs
+     *         msgs.size() >= 1<br>
+     *         DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
+     * @param context
+     *
+     * @return The consume status
+     */
+    ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
+                                        final ConsumeOrderlyContext context);
+}


Mime
View raw message