rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] branch pop_consumer updated: [RIP-19] Pop Consuming (common)
Date Wed, 17 Mar 2021 01:40:21 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch pop_consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/pop_consumer by this push:
     new 86bea21  [RIP-19] Pop Consuming (common)
     new 60ad9ab  Merge pull request #2721 from ayanamist/pop_consumer
86bea21 is described below

commit 86bea21b4dc701e6cba5977b09e6f4e874494d19
Author: ayanamist <ayanamist@gmail.com>
AuthorDate: Tue Mar 9 11:04:46 2021 +0800

    [RIP-19] Pop Consuming (common)
---
 .../org/apache/rocketmq/common/BrokerConfig.java   | 154 ++++++++++++
 .../org/apache/rocketmq/common/KeyBuilder.java     |  37 +++
 .../apache/rocketmq/common/PopAckConstants.java    |  35 +++
 .../rocketmq/common/constant/ConsumeInitMode.java  |  22 ++
 .../rocketmq/common/constant/LoggerName.java       |   1 +
 .../rocketmq/common/message/MessageConst.java      |   4 +
 .../rocketmq/common/message/MessageDecoder.java    |   2 +-
 .../common/message/MessageQueueAssignment.java     |  83 +++++++
 .../common/message/MessageRequestMode.java         |  43 ++++
 .../rocketmq/common/protocol/RequestCode.java      |   7 +
 .../rocketmq/common/protocol/ResponseCode.java     |   3 +
 .../common/protocol/body/ConsumerRunningInfo.java  |  32 +++
 .../common/protocol/body/PopProcessQueueInfo.java  |  59 +++++
 .../protocol/body/QueryAssignmentRequestBody.java  |  74 ++++++
 .../protocol/body/QueryAssignmentResponseBody.java |  36 +++
 .../body/SetMessageRequestModeRequestBody.java     |  70 ++++++
 .../protocol/header/AckMessageRequestHeader.java   |  85 +++++++
 .../header/ChangeInvisibleTimeRequestHeader.java   |  97 ++++++++
 .../header/ChangeInvisibleTimeResponseHeader.java  |  61 +++++
 .../common/protocol/header/ExtraInfoUtil.java      | 258 +++++++++++++++++++++
 .../protocol/header/PopMessageRequestHeader.java   | 155 +++++++++++++
 .../protocol/header/PopMessageResponseHeader.java  | 102 ++++++++
 .../rocketmq/common/utils/DataConverter.java       |  42 ++++
 23 files changed, 1461 insertions(+), 1 deletion(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index d80b3d2..488f213 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -62,12 +63,14 @@ public class BrokerConfig {
      */
     private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
     private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+    private int ackMessageThreadPoolNums = 3;
     private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
     private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
 
     private int adminBrokerThreadPoolNums = 16;
     private int clientManageThreadPoolNums = 32;
     private int consumerManageThreadPoolNums = 32;
+    private int loadBalanceProcessorThreadPoolNums = 32;
     private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
 
     /**
@@ -85,6 +88,7 @@ public class BrokerConfig {
     private boolean fetchNamesrvAddrByAddressServer = false;
     private int sendThreadPoolQueueCapacity = 10000;
     private int pullThreadPoolQueueCapacity = 100000;
+    private int ackThreadPoolQueueCapacity = 100000;
     private int replyThreadPoolQueueCapacity = 10000;
     private int queryThreadPoolQueueCapacity = 20000;
     private int clientManagerThreadPoolQueueCapacity = 1000000;
@@ -158,6 +162,37 @@ public class BrokerConfig {
      */
     private int registerNameServerPeriod = 1000 * 30;
 
+    private int popPollingSize = 1024;
+    private int popPollingMapSize = 100000;
+    // 20w cost 200M heap memory.
+    private long maxPopPollingSize = 100000;
+    private int reviveQueueNum = 8;
+    private long reviveInterval = 1000;
+    private long reviveMaxSlow = 3;
+    private long reviveScanTime = 10000;
+    private boolean enablePopLog = true;
+    private boolean enablePopBufferMerge = false;
+    private int popCkStayBufferTime = 10 * 1000;
+    private int popCkStayBufferTimeOut = 3 * 1000;
+    private int popCkMaxBufferSize = 200000;
+    private int popCkOffsetMaxQueueSize = 20000;
+
+    /**
+     * the interval of pulling topic information from the named server
+     */
+    private long loadBalancePollNameServerInterval = 1000 * 30;
+
+    /**
+     * the interval of cleaning
+     */
+    private int cleanOfflineBrokerInterval = 1000 * 30;
+
+    private boolean serverLoadBalancerEnabled = true;
+
+    private MessageRequestMode defaultMessageRequestMode = MessageRequestMode.PULL;
+
+    private int defaultPopShareQueueNum = -1;
+
     /**
      * The minimum time of the transactional message  to be checked firstly, one message only exceed this time interval
      * that can be checked.
@@ -197,6 +232,58 @@ public class BrokerConfig {
         return "DEFAULT_BROKER";
     }
 
+    public long getMaxPopPollingSize() {
+        return maxPopPollingSize;
+    }
+
+    public int getReviveQueueNum() {
+        return reviveQueueNum;
+    }
+
+    public long getReviveInterval() {
+        return reviveInterval;
+    }
+
+    public int getPopCkStayBufferTime() {
+        return popCkStayBufferTime;
+    }
+
+    public int getPopCkStayBufferTimeOut() {
+        return popCkStayBufferTimeOut;
+    }
+
+    public int getPopPollingMapSize() {
+        return popPollingMapSize;
+    }
+
+    public long getReviveScanTime() {
+        return reviveScanTime;
+    }
+
+    public long getReviveMaxSlow() {
+        return reviveMaxSlow;
+    }
+
+    public int getPopPollingSize() {
+        return popPollingSize;
+    }
+
+    public boolean isEnablePopBufferMerge() {
+        return enablePopBufferMerge;
+    }
+
+    public int getPopCkMaxBufferSize() {
+        return popCkMaxBufferSize;
+    }
+
+    public int getPopCkOffsetMaxQueueSize() {
+        return popCkOffsetMaxQueueSize;
+    }
+
+    public boolean isEnablePopLog() {
+        return enablePopLog;
+    }
+
     public boolean isTraceOn() {
         return traceOn;
     }
@@ -381,6 +468,14 @@ public class BrokerConfig {
         this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
     }
 
+    public int getAckMessageThreadPoolNums() {
+        return ackMessageThreadPoolNums;
+    }
+
+    public void setAckMessageThreadPoolNums(int ackMessageThreadPoolNums) {
+        this.ackMessageThreadPoolNums = ackMessageThreadPoolNums;
+    }
+
     public int getProcessReplyMessageThreadPoolNums() {
         return processReplyMessageThreadPoolNums;
     }
@@ -485,6 +580,14 @@ public class BrokerConfig {
         this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
     }
 
+    public int getAckThreadPoolQueueCapacity() {
+        return ackThreadPoolQueueCapacity;
+    }
+
+    public void setAckThreadPoolQueueCapacity(int ackThreadPoolQueueCapacity) {
+        this.ackThreadPoolQueueCapacity = ackThreadPoolQueueCapacity;
+    }
+
     public int getReplyThreadPoolQueueCapacity() {
         return replyThreadPoolQueueCapacity;
     }
@@ -804,4 +907,55 @@ public class BrokerConfig {
     public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) {
         this.autoDeleteUnusedStats = autoDeleteUnusedStats;
     }
+
+
+    public long getLoadBalancePollNameServerInterval() {
+        return loadBalancePollNameServerInterval;
+    }
+
+    public void setLoadBalancePollNameServerInterval(long loadBalancePollNameServerInterval) {
+        this.loadBalancePollNameServerInterval = loadBalancePollNameServerInterval;
+    }
+
+    public int getCleanOfflineBrokerInterval() {
+        return cleanOfflineBrokerInterval;
+    }
+
+    public void setCleanOfflineBrokerInterval(int cleanOfflineBrokerInterval) {
+        this.cleanOfflineBrokerInterval = cleanOfflineBrokerInterval;
+    }
+
+    public int getLoadBalanceProcessorThreadPoolNums() {
+        return loadBalanceProcessorThreadPoolNums;
+    }
+
+    public void setLoadBalanceProcessorThreadPoolNums(int loadBalanceProcessorThreadPoolNums) {
+        this.loadBalanceProcessorThreadPoolNums = loadBalanceProcessorThreadPoolNums;
+    }
+
+    public boolean isServerLoadBalancerEnabled() {
+        return serverLoadBalancerEnabled;
+    }
+
+    public void setServerLoadBalancerEnabled(boolean serverLoadBalancerEnabled) {
+        this.serverLoadBalancerEnabled = serverLoadBalancerEnabled;
+    }
+
+    public MessageRequestMode getDefaultMessageRequestMode() {
+        return defaultMessageRequestMode;
+    }
+
+    public void setDefaultMessageRequestMode(String defaultMessageRequestMode) {
+        this.defaultMessageRequestMode = MessageRequestMode.valueOf(defaultMessageRequestMode);
+    }
+
+
+    public int getDefaultPopShareQueueNum() {
+        return defaultPopShareQueueNum;
+    }
+
+
+    public void setDefaultPopShareQueueNum(int defaultPopShareQueueNum) {
+        this.defaultPopShareQueueNum = defaultPopShareQueueNum;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
new file mode 100644
index 0000000..d30789f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+public class KeyBuilder {
+    public static final int POP_ORDER_REVIVE_QUEUE = 999;
+
+    public static String buildPopRetryTopic(String topic, String cid) {
+        return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;
+    }
+
+    public static String parseNormalTopic(String topic, String cid) {
+        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length());
+        } else {
+            return topic;
+        }
+    }
+
+    public static String buildPollingKey(String topic, String cid, int queueId) {
+        return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
new file mode 100644
index 0000000..839f947
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.topic.TopicValidator;
+
+public class PopAckConstants {
+    public static long ackTimeInterval = 1000;
+    public static final long SECOND = 1000;
+
+    public static long lockTime = 5000;
+    public static int retryQueueNum = 1;
+
+    public static final String REVIVE_GROUP = MixAll.CID_RMQ_SYS_PREFIX + "REVIVE_GROUP";
+    public static final String LOCAL_HOST = "127.0.0.1";
+    public static final String REVIVE_TOPIC = TopicValidator.SYSTEM_TOPIC_PREFIX + "REVIVE_LOG_";
+    public static final String CK_TAG = "ck";
+    public static final String ACK_TAG = "ack";
+    public static final String SPLIT = "@";
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java b/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java
new file mode 100644
index 0000000..b7091fa
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.constant;
+
+public class ConsumeInitMode {
+    public static final int MIN = 0;
+    public static final int MAX = 1;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index fe0ae9f..589200b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -37,4 +37,5 @@ public class LoggerName {
     public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
     public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
     public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
+    public static final String ROCKETMQ_POP_LOGGER_NAME = "RocketmqPop";
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 5bdc846..0922c5f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -52,6 +52,8 @@ public class MessageConst {
     public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
     public static final String PROPERTY_CLUSTER = "CLUSTER";
     public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
+    public static final String PROPERTY_POP_CK = "POP_CK";
+    public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME";
 
     public static final String KEY_SEPARATOR = " ";
 
@@ -80,6 +82,8 @@ public class MessageConst {
         STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
         STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
         STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
+        STRING_HASH_SET.add(PROPERTY_POP_CK);
+        STRING_HASH_SET.add(PROPERTY_FIRST_POP_TIME);
         STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
         STRING_HASH_SET.add(PROPERTY_CORRELATION_ID);
         STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT);
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 7e86d84..77d3034 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -37,7 +37,7 @@ public class MessageDecoder {
     public final static int MESSAGE_MAGIC_CODE_POSTION = 4;
     public final static int MESSAGE_FLAG_POSTION = 16;
     public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;
-    //    public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;
+    public final static int MESSAGE_STORE_TIMESTAMP_POSITION = 56;
     public final static int MESSAGE_MAGIC_CODE = -626843481;
     public static final char NAME_VALUE_SEPARATOR = 1;
     public static final char PROPERTY_SEPARATOR = 2;
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java
new file mode 100644
index 0000000..fcd9f58
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class MessageQueueAssignment implements Serializable {
+
+    private static final long serialVersionUID = 8092600270527861645L;
+
+    private MessageQueue messageQueue;
+
+    private MessageRequestMode mode = MessageRequestMode.PULL;
+
+    private Map<String, String> attachments;
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
+        result = prime * result + ((mode == null) ? 0 : mode.hashCode());
+        result = prime * result + ((attachments == null) ? 0 : attachments.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        MessageQueueAssignment other = (MessageQueueAssignment) obj;
+        return messageQueue.equals(other.messageQueue);
+    }
+
+    @Override
+    public String toString() {
+        return "MessageQueueAssignment [MessageQueue=" + messageQueue + ", Mode=" + mode + "]";
+    }
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+    public MessageRequestMode getMode() {
+        return mode;
+    }
+
+    public void setMode(MessageRequestMode mode) {
+        this.mode = mode;
+    }
+
+    public Map<String, String> getAttachments() {
+        return attachments;
+    }
+
+    public void setAttachments(Map<String, String> attachments) {
+        this.attachments = attachments;
+    }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java
new file mode 100644
index 0000000..35a166a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+/**
+ * Message Request Mode
+ */
+public enum MessageRequestMode {
+
+    /**
+     * pull
+     */
+    PULL("PULL"),
+
+    /**
+     * pop, consumer working in pop mode could share MessageQueue
+     */
+    POP("POP");
+
+    private String name;
+
+    MessageRequestMode(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 75ceff3..9446caa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -80,6 +80,10 @@ public class RequestCode {
 
     public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;
 
+    public static final int POP_MESSAGE = 200050;
+    public static final int ACK_MESSAGE = 200051;
+    public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053;
+
     public static final int PUT_KV_CONFIG = 100;
 
     public static final int GET_KV_CONFIG = 101;
@@ -188,4 +192,7 @@ public class RequestCode {
     public static final int SEND_REPLY_MESSAGE_V2 = 325;
 
     public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
+
+    public static final int QUERY_ASSIGNMENT = 400;
+    public static final int SET_MESSAGE_REQUEST_MODE = 401;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index dc74444..df0ccbe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -80,4 +80,7 @@ public class ResponseCode extends RemotingSysResponseCode {
 
     public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
 
+    public static final int POLLING_FULL = 209;
+
+    public static final int POLLING_TIMEOUT = 210;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
index d7942eb..10d6f4d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -41,6 +41,8 @@ public class ConsumerRunningInfo extends RemotingSerializable {
 
     private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
 
+    private TreeMap<MessageQueue, PopProcessQueueInfo> mqPopTable = new TreeMap<MessageQueue, PopProcessQueueInfo>();
+
     private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
 
     private String jstack;
@@ -266,6 +268,28 @@ public class ConsumerRunningInfo extends RemotingSerializable {
         }
 
         {
+            sb.append("\n\n#Consumer Pop Detail#\n");
+            sb.append(String.format("%-32s  %-32s  %-4s  %-20s%n",
+                "#Topic",
+                "#Broker Name",
+                "#QID",
+                "#ProcessQueueInfo"
+            ));
+
+            Iterator<Entry<MessageQueue, PopProcessQueueInfo>> it = this.mqPopTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<MessageQueue, PopProcessQueueInfo> next = it.next();
+                String item = String.format("%-32s  %-32s  %-4d  %s%n",
+                    next.getKey().getTopic(),
+                    next.getKey().getBrokerName(),
+                    next.getKey().getQueueId(),
+                    next.getValue().toString());
+
+                sb.append(item);
+            }
+        }
+
+        {
             sb.append("\n\n#Consumer RT&TPS#\n");
             sb.append(String.format("%-32s  %14s %14s %14s %14s %18s %25s%n",
                 "#Topic",
@@ -310,4 +334,12 @@ public class ConsumerRunningInfo extends RemotingSerializable {
         this.jstack = jstack;
     }
 
+    public TreeMap<MessageQueue, PopProcessQueueInfo> getMqPopTable() {
+        return mqPopTable;
+    }
+
+    public void setMqPopTable(
+        TreeMap<MessageQueue, PopProcessQueueInfo> mqPopTable) {
+        this.mqPopTable = mqPopTable;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java
new file mode 100644
index 0000000..b8811bb
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+public class PopProcessQueueInfo {
+    private int waitAckCount;
+    private boolean droped;
+    private long lastPopTimestamp;
+
+
+    public int getWaitAckCount() {
+        return waitAckCount;
+    }
+
+
+    public void setWaitAckCount(int waitAckCount) {
+        this.waitAckCount = waitAckCount;
+    }
+
+
+    public boolean isDroped() {
+        return droped;
+    }
+
+
+    public void setDroped(boolean droped) {
+        this.droped = droped;
+    }
+
+
+    public long getLastPopTimestamp() {
+        return lastPopTimestamp;
+    }
+
+
+    public void setLastPopTimestamp(long lastPopTimestamp) {
+        this.lastPopTimestamp = lastPopTimestamp;
+    }
+
+    @Override
+    public String toString() {
+        return "PopProcessQueueInfo [waitAckCount:" + waitAckCount +
+                ", droped:" + droped + ", lastPopTimestamp:" + lastPopTimestamp + "]";
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java
new file mode 100644
index 0000000..6d0285b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class QueryAssignmentRequestBody extends RemotingSerializable {
+
+    private String topic;
+
+    private String consumerGroup;
+
+    private String clientId;
+
+    private String strategyName;
+
+    private MessageModel messageModel;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getStrategyName() {
+        return strategyName;
+    }
+
+    public void setStrategyName(String strategyName) {
+        this.strategyName = strategyName;
+    }
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java
new file mode 100644
index 0000000..688737d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import java.util.Set;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class QueryAssignmentResponseBody extends RemotingSerializable {
+
+    private Set<MessageQueueAssignment> messageQueueAssignments;
+
+    public Set<MessageQueueAssignment> getMessageQueueAssignments() {
+        return messageQueueAssignments;
+    }
+
+    public void setMessageQueueAssignments(
+        Set<MessageQueueAssignment> messageQueueAssignments) {
+        this.messageQueueAssignments = messageQueueAssignments;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java
new file mode 100644
index 0000000..309f7ae
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class SetMessageRequestModeRequestBody extends RemotingSerializable {
+
+    private String topic;
+
+    private String consumerGroup;
+
+    private MessageRequestMode mode = MessageRequestMode.PULL;
+
+    /*
+    consumer working in pop mode could share the MessageQueues assigned to the N (N = popShareQueueNum) consumers following it in the cid list
+     */
+    private int popShareQueueNum = 0;
+
+    public SetMessageRequestModeRequestBody() {
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public MessageRequestMode getMode() {
+        return mode;
+    }
+
+    public void setMode(MessageRequestMode mode) {
+        this.mode = mode;
+    }
+
+    public int getPopShareQueueNum() {
+        return popShareQueueNum;
+    }
+
+    public void setPopShareQueueNum(int popShareQueueNum) {
+        this.popShareQueueNum = popShareQueueNum;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
new file mode 100644
index 0000000..02e388b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AckMessageRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    @CFNotNull
+    private String topic;
+    @CFNotNull
+    private Integer queueId;
+    @CFNotNull
+    private String extraInfo;
+
+    @CFNotNull
+    private Long offset;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+    public void setOffset(Long offset) {
+        this.offset = offset;
+    }
+
+    public Long getOffset() {
+        return offset;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setExtraInfo(String extraInfo) {
+        this.extraInfo = extraInfo;
+    }
+
+    public String getExtraInfo() {
+        return extraInfo;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public Integer getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(Integer queueId) {
+        this.queueId = queueId;
+    }
+
+    @Override
+    public String toString() {
+        return topic + "," + this.consumerGroup + "," + this.queueId + "," + this.offset + "," + this.extraInfo;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
new file mode 100644
index 0000000..a586e49
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ChangeInvisibleTimeRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    @CFNotNull
+    private String topic;
+    @CFNotNull
+    private Integer queueId;
+    /**
+     * startOffset popTime invisibleTime queueId
+     */
+    @CFNotNull
+    private String extraInfo;
+
+    @CFNotNull
+    private Long offset;
+
+    @CFNotNull
+    private Long invisibleTime;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+    public void setOffset(Long offset) {
+        this.offset = offset;
+    }
+
+    public Long getOffset() {
+        return offset;
+    }
+
+    public Long getInvisibleTime() {
+        return invisibleTime;
+    }
+
+    public void setInvisibleTime(Long invisibleTime) {
+        this.invisibleTime = invisibleTime;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setExtraInfo(String extraInfo) {
+        this.extraInfo = extraInfo;
+    }
+
+    /**
+     * startOffset popTime invisibleTime queueId
+     */
+    public String getExtraInfo() {
+        return extraInfo;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public Integer getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(Integer queueId) {
+        this.queueId = queueId;
+    }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java
new file mode 100644
index 0000000..2ebabb7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ChangeInvisibleTimeResponseHeader implements CommandCustomHeader {
+
+
+    @CFNotNull
+    private long popTime;
+    @CFNotNull
+    private long invisibleTime;
+
+    @CFNotNull
+    private int reviveQid;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+    public long getPopTime() {
+        return popTime;
+    }
+
+    public void setPopTime(long popTime) {
+        this.popTime = popTime;
+    }
+
+    public long getInvisibleTime() {
+        return invisibleTime;
+    }
+
+    public void setInvisibleTime(long invisibleTime) {
+        this.invisibleTime = invisibleTime;
+    }
+
+    public int getReviveQid() {
+        return reviveQid;
+    }
+
+    public void setReviveQid(int reviveQid) {
+        this.reviveQid = reviveQid;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
new file mode 100644
index 0000000..19f37f6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+
+public class ExtraInfoUtil {
+    private static final String NORMAL_TOPIC = "0";
+    private static final String RETRY_TOPIC = "1";
+
+    public static String[] split(String extraInfo) {
+        if (extraInfo == null) {
+            throw new IllegalArgumentException("split extraInfo is null");
+        }
+        return extraInfo.split(MessageConst.KEY_SEPARATOR);
+    }
+
+    public static Long getCkQueueOffset(String[] extraInfoStrs) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 1) {
+            throw new IllegalArgumentException("getCkQueueOffset fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        return Long.valueOf(extraInfoStrs[0]);
+    }
+
+    public static Long getPopTime(String[] extraInfoStrs) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 2) {
+            throw new IllegalArgumentException("getPopTime fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        return Long.valueOf(extraInfoStrs[1]);
+    }
+
+    public static Long getInvisibleTime(String[] extraInfoStrs) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 3) {
+            throw new IllegalArgumentException("getInvisibleTime fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        return Long.valueOf(extraInfoStrs[2]);
+    }
+
+    public static int getReviveQid(String[] extraInfoStrs) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 4) {
+            throw new IllegalArgumentException("getReviveQid fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        return Integer.valueOf(extraInfoStrs[3]);
+    }
+
+    public static String getRealTopic(String[] extraInfoStrs, String topic, String cid) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 5) {
+            throw new IllegalArgumentException("getRealTopic fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        if (RETRY_TOPIC.equals(extraInfoStrs[4])) {
+            return KeyBuilder.buildPopRetryTopic(topic, cid);
+        } else {
+            return topic;
+        }
+    }
+
+    public static String getBrokerName(String[] extraInfoStrs) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 6) {
+            throw new IllegalArgumentException("getBrokerName fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        return extraInfoStrs[5];
+    }
+
+    public static int getQueueId(String[] extraInfoStrs) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 7) {
+            throw new IllegalArgumentException("getQueueId fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        return Integer.valueOf(extraInfoStrs[6]);
+    }
+
+    public static long getQueueOffset(String[] extraInfoStrs) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 8) {
+            throw new IllegalArgumentException("getQueueOffset fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        return Long.valueOf(extraInfoStrs[7]);
+    }
+
+    public static String buildExtraInfo(long ckQueueOffset, long popTime, long invisibleTime, int reviveQid, String topic, String brokerName, int queueId) {
+        String t = NORMAL_TOPIC;
+        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            t = RETRY_TOPIC;
+        }
+        return ckQueueOffset + MessageConst.KEY_SEPARATOR + popTime + MessageConst.KEY_SEPARATOR + invisibleTime + MessageConst.KEY_SEPARATOR + reviveQid + MessageConst.KEY_SEPARATOR + t
+            + MessageConst.KEY_SEPARATOR + brokerName + MessageConst.KEY_SEPARATOR + queueId;
+    }
+
+    public static String buildExtraInfo(long ckQueueOffset, long popTime, long invisibleTime, int reviveQid, String topic, String brokerName, int queueId,
+                                        long msgQueueOffset) {
+        String t = NORMAL_TOPIC;
+        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            t = RETRY_TOPIC;
+        }
+        return ckQueueOffset
+            + MessageConst.KEY_SEPARATOR + popTime + MessageConst.KEY_SEPARATOR + invisibleTime
+            + MessageConst.KEY_SEPARATOR + reviveQid + MessageConst.KEY_SEPARATOR + t
+            + MessageConst.KEY_SEPARATOR + brokerName + MessageConst.KEY_SEPARATOR + queueId
+            + MessageConst.KEY_SEPARATOR + msgQueueOffset;
+    }
+
+    public static void buildStartOffsetInfo(StringBuilder stringBuilder, boolean retry, int queueId, long startOffset) {
+        if (stringBuilder == null) {
+            stringBuilder = new StringBuilder(64);
+        }
+
+        if (stringBuilder.length() > 0) {
+            stringBuilder.append(";");
+        }
+
+        stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+            .append(MessageConst.KEY_SEPARATOR).append(queueId)
+            .append(MessageConst.KEY_SEPARATOR).append(startOffset);
+    }
+
+    public static void buildOrderCountInfo(StringBuilder stringBuilder, boolean retry, int queueId, int orderCount) {
+        if (stringBuilder == null) {
+            stringBuilder = new StringBuilder(64);
+        }
+
+        if (stringBuilder.length() > 0) {
+            stringBuilder.append(";");
+        }
+
+        stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+                .append(MessageConst.KEY_SEPARATOR).append(queueId)
+                .append(MessageConst.KEY_SEPARATOR).append(orderCount);
+    }
+
+    public static void buildMsgOffsetInfo(StringBuilder stringBuilder, boolean retry, int queueId, List<Long> msgOffsets) {
+        if (stringBuilder == null) {
+            stringBuilder = new StringBuilder(64);
+        }
+
+        if (stringBuilder.length() > 0) {
+            stringBuilder.append(";");
+        }
+
+        stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+            .append(MessageConst.KEY_SEPARATOR).append(queueId)
+            .append(MessageConst.KEY_SEPARATOR);
+
+        for (int i = 0; i < msgOffsets.size(); i++) {
+            stringBuilder.append(msgOffsets.get(i));
+            if (i < msgOffsets.size() - 1) {
+                stringBuilder.append(",");
+            }
+        }
+    }
+
+    public static Map<String, List<Long>> parseMsgOffsetInfo(String msgOffsetInfo) {
+        if (msgOffsetInfo == null || msgOffsetInfo.length() == 0) {
+            return null;
+        }
+
+        Map<String, List<Long>> msgOffsetMap = new HashMap<String, List<Long>>(4);
+        String[] array;
+        if (msgOffsetInfo.indexOf(";") < 0) {
+            array = new String[]{msgOffsetInfo};
+        } else {
+            array = msgOffsetInfo.split(";");
+        }
+
+        for (String one : array) {
+            String[] split = one.split(MessageConst.KEY_SEPARATOR);
+            if (split.length != 3) {
+                throw new IllegalArgumentException("parse msgOffsetMap error, " + msgOffsetMap);
+            }
+            String key = split[0] + "@" + split[1];
+            if (msgOffsetMap.containsKey(key)) {
+                throw new IllegalArgumentException("parse msgOffsetMap error, duplicate, " + msgOffsetMap);
+            }
+            msgOffsetMap.put(key, new ArrayList<Long>(8));
+            String[] msgOffsets = split[2].split(",");
+            for (String msgOffset : msgOffsets) {
+                msgOffsetMap.get(key).add(Long.valueOf(msgOffset));
+            }
+        }
+
+        return msgOffsetMap;
+    }
+
+    public static Map<String, Long> parseStartOffsetInfo(String startOffsetInfo) {
+        if (startOffsetInfo == null || startOffsetInfo.length() == 0) {
+            return null;
+        }
+        Map<String, Long> startOffsetMap = new HashMap<String, Long>(4);
+        String[] array;
+        if (startOffsetInfo.indexOf(";") < 0) {
+            array = new String[]{startOffsetInfo};
+        } else {
+            array = startOffsetInfo.split(";");
+        }
+
+        for (String one : array) {
+            String[] split = one.split(MessageConst.KEY_SEPARATOR);
+            if (split.length != 3) {
+                throw new IllegalArgumentException("parse startOffsetInfo error, " + startOffsetInfo);
+            }
+            String key = split[0] + "@" + split[1];
+            if (startOffsetMap.containsKey(key)) {
+                throw new IllegalArgumentException("parse startOffsetInfo error, duplicate, " + startOffsetInfo);
+            }
+            startOffsetMap.put(key, Long.valueOf(split[2]));
+        }
+
+        return startOffsetMap;
+    }
+
+    public static Map<String, Integer> parseOrderCountInfo(String orderCountInfo) {
+        if (orderCountInfo == null || orderCountInfo.length() == 0) {
+            return null;
+        }
+        Map<String, Integer> startOffsetMap = new HashMap<String, Integer>(4);
+        String[] array;
+        if (orderCountInfo.indexOf(";") < 0) {
+            array = new String[]{orderCountInfo};
+        } else {
+            array = orderCountInfo.split(";");
+        }
+
+        for (String one : array) {
+            String[] split = one.split(MessageConst.KEY_SEPARATOR);
+            if (split.length != 3) {
+                throw new IllegalArgumentException("parse orderCountInfo error, " + orderCountInfo);
+            }
+            String key = split[0] + "@" + split[1];
+            if (startOffsetMap.containsKey(key)) {
+                throw new IllegalArgumentException("parse orderCountInfo error, duplicate, " + orderCountInfo);
+            }
+            startOffsetMap.put(key, Integer.valueOf(split[2]));
+        }
+
+        return startOffsetMap;
+    }
+
+    public static String getStartOffsetInfoMapKey(String topic, int queueId) {
+        return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + queueId;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
new file mode 100644
index 0000000..4d151a2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class PopMessageRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    @CFNotNull
+    private String topic;
+    @CFNotNull
+    private int queueId;
+    @CFNotNull
+    private int maxMsgNums;
+    @CFNotNull
+    private long invisibleTime;
+    @CFNotNull
+    private long pollTime;
+    @CFNotNull
+    private long bornTime;
+    @CFNotNull
+    private int initMode;
+
+    private String expType;
+    private String exp;
+
+    /**
+     * marked as order consume, if true
+     * 1. not commit offset
+     * 2. not pop retry, because no retry
+     * 3. not append check point, because no retry
+     */
+    private Boolean order = Boolean.FALSE;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+    public void setInitMode(int initMode) {
+        this.initMode = initMode;
+    }
+
+    public int getInitMode() {
+        return initMode;
+    }
+
+    public long getInvisibleTime() {
+        return invisibleTime;
+    }
+
+    public void setInvisibleTime(long invisibleTime) {
+        this.invisibleTime = invisibleTime;
+    }
+
+    public long getPollTime() {
+        return pollTime;
+    }
+
+    public void setPollTime(long pollTime) {
+        this.pollTime = pollTime;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public long getBornTime() {
+        return bornTime;
+    }
+
+    public void setBornTime(long bornTime) {
+        this.bornTime = bornTime;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getQueueId() {
+        if (queueId < 0) {
+            return -1;
+        }
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+
+    public int getMaxMsgNums() {
+        return maxMsgNums;
+    }
+
+    public void setMaxMsgNums(int maxMsgNums) {
+        this.maxMsgNums = maxMsgNums;
+    }
+
+    public boolean isTimeoutTooMuch() {
+        return System.currentTimeMillis() - bornTime - pollTime > 500;
+    }
+
+    public String getExpType() {
+        return expType;
+    }
+
+    public void setExpType(String expType) {
+        this.expType = expType;
+    }
+
+    public String getExp() {
+        return exp;
+    }
+
+    public void setExp(String exp) {
+        this.exp = exp;
+    }
+
+    public Boolean getOrder() {
+        return order;
+    }
+
+    public void setOrder(Boolean order) {
+        this.order = order;
+    }
+
+    public boolean isOrder() {
+        return this.order != null && this.order.booleanValue();
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java
new file mode 100644
index 0000000..09867f3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class PopMessageResponseHeader implements CommandCustomHeader {
+
+
+    @CFNotNull
+    private long popTime;
+    @CFNotNull
+    private long invisibleTime;
+
+    @CFNotNull
+    private int reviveQid;
+    /**
+     * the rest num in queue
+     */
+    @CFNotNull
+    private long restNum;
+
+    private String startOffsetInfo;
+    private String msgOffsetInfo;
+    private String orderCountInfo;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+    public long getPopTime() {
+        return popTime;
+    }
+
+    public void setPopTime(long popTime) {
+        this.popTime = popTime;
+    }
+
+    public long getInvisibleTime() {
+        return invisibleTime;
+    }
+
+    public long getRestNum() {
+        return restNum;
+    }
+
+    public void setRestNum(long restNum) {
+        this.restNum = restNum;
+    }
+
+    public void setInvisibleTime(long invisibleTime) {
+        this.invisibleTime = invisibleTime;
+    }
+
+    public int getReviveQid() {
+        return reviveQid;
+    }
+
+    public void setReviveQid(int reviveQid) {
+        this.reviveQid = reviveQid;
+    }
+
+    public String getStartOffsetInfo() {
+        return startOffsetInfo;
+    }
+
+    public void setStartOffsetInfo(String startOffsetInfo) {
+        this.startOffsetInfo = startOffsetInfo;
+    }
+
+    public String getMsgOffsetInfo() {
+        return msgOffsetInfo;
+    }
+
+    public void setMsgOffsetInfo(String msgOffsetInfo) {
+        this.msgOffsetInfo = msgOffsetInfo;
+    }
+
+    public String getOrderCountInfo() {
+        return orderCountInfo;
+    }
+
+    public void setOrderCountInfo(String orderCountInfo) {
+        this.orderCountInfo = orderCountInfo;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java
new file mode 100644
index 0000000..8b50de1
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+public class DataConverter {
+    public static Charset charset = Charset.forName("UTF-8");
+
+    public static byte[] Long2Byte(Long v) {
+        ByteBuffer tmp = ByteBuffer.allocate(8);
+        tmp.putLong(v);
+        return tmp.array();
+    }
+
+    public static int setBit(int value, int index, boolean flag) {
+        if (flag) {
+            return (int) (value | (1L << index));
+        } else {
+            return (int) (value & ~(1L << index));
+        }
+    }
+
+    public static boolean getBit(int value, int index) {
+        return (value & (1L << index)) != 0;
+    }
+}


Mime
View raw message