rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [23/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java
new file mode 100644
index 0000000..fc06d6e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import java.util.HashSet;
+
+
+public class MessageConst {
+    public static final String PROPERTY_KEYS = "KEYS";
+    public static final String PROPERTY_TAGS = "TAGS";
+    public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
+    public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
+    public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
+    public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+    public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
+    public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
+    public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";
+    public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";
+    public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";
+    public static final String PROPERTY_BUYER_ID = "BUYER_ID";
+    public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
+    public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
+    public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
+    public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";
+    public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
+    public static final String PROPERTY_MSG_REGION = "MSG_REGION";
+    public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";
+    public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
+    public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
+    public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
+
+    public static final String KEY_SEPARATOR = " ";
+
+    public static final HashSet<String> STRING_HASH_SET = new HashSet<String>();
+
+
+    static {
+        STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH);
+        STRING_HASH_SET.add(PROPERTY_MSG_REGION);
+        STRING_HASH_SET.add(PROPERTY_KEYS);
+        STRING_HASH_SET.add(PROPERTY_TAGS);
+        STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
+        STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
+        STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
+        STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
+        STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
+        STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);
+        STRING_HASH_SET.add(PROPERTY_PRODUCER_GROUP);
+        STRING_HASH_SET.add(PROPERTY_MIN_OFFSET);
+        STRING_HASH_SET.add(PROPERTY_MAX_OFFSET);
+        STRING_HASH_SET.add(PROPERTY_BUYER_ID);
+        STRING_HASH_SET.add(PROPERTY_ORIGIN_MESSAGE_ID);
+        STRING_HASH_SET.add(PROPERTY_TRANSFER_FLAG);
+        STRING_HASH_SET.add(PROPERTY_CORRECTION_FLAG);
+        STRING_HASH_SET.add(PROPERTY_MQ2_FLAG);
+        STRING_HASH_SET.add(PROPERTY_RECONSUME_TIME);
+        STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+        STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
+        STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java
new file mode 100644
index 0000000..e21c1ca
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageDecoder {
+    public final static int MSG_ID_LENGTH = 8 + 8;
+
+    public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+    public final static int MESSAGE_MAGIC_CODE_POSTION = 4;
+    public final static int MESSAGE_FLAG_POSTION = 16;
+    public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;
+    public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;
+    public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
+
+
+    public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
+        input.flip();
+        input.limit(MessageDecoder.MSG_ID_LENGTH);
+
+        input.put(addr);
+        input.putLong(offset);
+
+        return UtilAll.bytes2string(input.array());
+    }
+
+
+    public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+        byteBuffer.put(inetSocketAddress.getAddress().getAddress());
+        byteBuffer.putInt(inetSocketAddress.getPort());
+        byteBuffer.putLong(transactionIdhashCode);
+        byteBuffer.flip();
+        return UtilAll.bytes2string(byteBuffer.array());
+    }
+
+
+    public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
+        SocketAddress address;
+        long offset;
+
+
+        byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
+        byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
+        ByteBuffer bb = ByteBuffer.wrap(port);
+        int portInt = bb.getInt(0);
+        address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
+
+        // offset
+        byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
+        bb = ByteBuffer.wrap(data);
+        offset = bb.getLong(0);
+
+        return new MessageId(address, offset);
+    }
+
+
+    public static MessageExt decode(java.nio.ByteBuffer byteBuffer) {
+        return decode(byteBuffer, true, true, false);
+    }
+
+    public static MessageExt clientDecode(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
+        return decode(byteBuffer, readBody, true, true);
+    }
+
+    public static MessageExt decode(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
+        return decode(byteBuffer, readBody, true, false);
+    }
+
+
+    public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {
+        byte[] body = messageExt.getBody();
+        byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);
+        byte topicLen = (byte) topics.length;
+        String properties = messageProperties2String(messageExt.getProperties());
+        byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
+        short propertiesLength = (short) propertiesBytes.length;
+        int sysFlag = messageExt.getSysFlag();
+        byte[] newBody = messageExt.getBody();
+        if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
+            newBody = UtilAll.compress(body, 5);
+        }
+        int bodyLength = newBody.length;
+        int storeSize = messageExt.getStoreSize();
+        ByteBuffer byteBuffer;
+        if (storeSize > 0) {
+            byteBuffer = ByteBuffer.allocate(storeSize);
+        } else {
+            storeSize = 4 // 1 TOTALSIZE
+                    + 4 // 2 MAGICCODE
+                    + 4 // 3 BODYCRC
+                    + 4 // 4 QUEUEID
+                    + 4 // 5 FLAG
+                    + 8 // 6 QUEUEOFFSET
+                    + 8 // 7 PHYSICALOFFSET
+                    + 4 // 8 SYSFLAG
+                    + 8 // 9 BORNTIMESTAMP
+                    + 8 // 10 BORNHOST
+                    + 8 // 11 STORETIMESTAMP
+                    + 8 // 12 STOREHOSTADDRESS
+                    + 4 // 13 RECONSUMETIMES
+                    + 8 // 14 Prepared Transaction Offset
+                    + 4 + bodyLength // 14 BODY
+                    + 1 + topicLen // 15 TOPIC
+                    + 2 + propertiesLength // 16 propertiesLength
+                    + 0;
+            byteBuffer = ByteBuffer.allocate(storeSize);
+        }
+        // 1 TOTALSIZE
+        byteBuffer.putInt(storeSize);
+
+        // 2 MAGICCODE
+        byteBuffer.putInt(MESSAGE_MAGIC_CODE);
+
+        // 3 BODYCRC
+        int bodyCRC = messageExt.getBodyCRC();
+        byteBuffer.putInt(bodyCRC);
+
+        // 4 QUEUEID
+        int queueId = messageExt.getQueueId();
+        byteBuffer.putInt(queueId);
+
+        // 5 FLAG
+        int flag = messageExt.getFlag();
+        byteBuffer.putInt(flag);
+
+        // 6 QUEUEOFFSET
+        long queueOffset = messageExt.getQueueOffset();
+        byteBuffer.putLong(queueOffset);
+
+        // 7 PHYSICALOFFSET
+        long physicOffset = messageExt.getCommitLogOffset();
+        byteBuffer.putLong(physicOffset);
+
+        // 8 SYSFLAG
+        byteBuffer.putInt(sysFlag);
+
+        // 9 BORNTIMESTAMP
+        long bornTimeStamp = messageExt.getBornTimestamp();
+        byteBuffer.putLong(bornTimeStamp);
+
+        // 10 BORNHOST
+        InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();
+        byteBuffer.put(bornHost.getAddress().getAddress());
+        byteBuffer.putInt(bornHost.getPort());
+
+        // 11 STORETIMESTAMP
+        long storeTimestamp = messageExt.getStoreTimestamp();
+        byteBuffer.putLong(storeTimestamp);
+
+        // 12 STOREHOST
+        InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();
+        byteBuffer.put(serverHost.getAddress().getAddress());
+        byteBuffer.putInt(serverHost.getPort());
+
+        // 13 RECONSUMETIMES
+        int reconsumeTimes = messageExt.getReconsumeTimes();
+        byteBuffer.putInt(reconsumeTimes);
+
+        // 14 Prepared Transaction Offset
+        long preparedTransactionOffset = messageExt.getPreparedTransactionOffset();
+        byteBuffer.putLong(preparedTransactionOffset);
+
+        // 15 BODY
+        byteBuffer.putInt(bodyLength);
+        byteBuffer.put(newBody);
+
+        // 16 TOPIC
+        byteBuffer.put(topicLen);
+        byteBuffer.put(topics);
+
+        // 17 properties
+        byteBuffer.putShort(propertiesLength);
+        byteBuffer.put(propertiesBytes);
+
+        return byteBuffer.array();
+    }
+
+    public static MessageExt decode(
+            java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {
+        return decode(byteBuffer, readBody, deCompressBody, false);
+    }
+
+    public static MessageExt decode(
+            java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) {
+        try {
+
+            MessageExt msgExt;
+            if (isClient) {
+                msgExt = new MessageClientExt();
+            } else {
+                msgExt = new MessageExt();
+            }
+
+            // 1 TOTALSIZE
+            int storeSize = byteBuffer.getInt();
+            msgExt.setStoreSize(storeSize);
+
+            // 2 MAGICCODE
+            byteBuffer.getInt();
+
+            // 3 BODYCRC
+            int bodyCRC = byteBuffer.getInt();
+            msgExt.setBodyCRC(bodyCRC);
+
+            // 4 QUEUEID
+            int queueId = byteBuffer.getInt();
+            msgExt.setQueueId(queueId);
+
+            // 5 FLAG
+            int flag = byteBuffer.getInt();
+            msgExt.setFlag(flag);
+
+            // 6 QUEUEOFFSET
+            long queueOffset = byteBuffer.getLong();
+            msgExt.setQueueOffset(queueOffset);
+
+            // 7 PHYSICALOFFSET
+            long physicOffset = byteBuffer.getLong();
+            msgExt.setCommitLogOffset(physicOffset);
+
+            // 8 SYSFLAG
+            int sysFlag = byteBuffer.getInt();
+            msgExt.setSysFlag(sysFlag);
+
+            // 9 BORNTIMESTAMP
+            long bornTimeStamp = byteBuffer.getLong();
+            msgExt.setBornTimestamp(bornTimeStamp);
+
+            // 10 BORNHOST
+            byte[] bornHost = new byte[4];
+            byteBuffer.get(bornHost, 0, 4);
+            int port = byteBuffer.getInt();
+            msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));
+
+            // 11 STORETIMESTAMP
+            long storeTimestamp = byteBuffer.getLong();
+            msgExt.setStoreTimestamp(storeTimestamp);
+
+            // 12 STOREHOST
+            byte[] storeHost = new byte[4];
+            byteBuffer.get(storeHost, 0, 4);
+            port = byteBuffer.getInt();
+            msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));
+
+            // 13 RECONSUMETIMES
+            int reconsumeTimes = byteBuffer.getInt();
+            msgExt.setReconsumeTimes(reconsumeTimes);
+
+            // 14 Prepared Transaction Offset
+            long preparedTransactionOffset = byteBuffer.getLong();
+            msgExt.setPreparedTransactionOffset(preparedTransactionOffset);
+
+            // 15 BODY
+            int bodyLen = byteBuffer.getInt();
+            if (bodyLen > 0) {
+                if (readBody) {
+                    byte[] body = new byte[bodyLen];
+                    byteBuffer.get(body);
+
+                    // uncompress body
+                    if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
+                        body = UtilAll.uncompress(body);
+                    }
+
+                    msgExt.setBody(body);
+                } else {
+                    byteBuffer.position(byteBuffer.position() + bodyLen);
+                }
+            }
+
+            // 16 TOPIC
+            byte topicLen = byteBuffer.get();
+            byte[] topic = new byte[(int) topicLen];
+            byteBuffer.get(topic);
+            msgExt.setTopic(new String(topic, CHARSET_UTF8));
+
+            // 17 properties
+            short propertiesLength = byteBuffer.getShort();
+            if (propertiesLength > 0) {
+                byte[] properties = new byte[propertiesLength];
+                byteBuffer.get(properties);
+                String propertiesString = new String(properties, CHARSET_UTF8);
+                Map<String, String> map = string2messageProperties(propertiesString);
+                msgExt.setProperties(map);
+            }
+
+            ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);
+            String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
+            msgExt.setMsgId(msgId);
+
+            if (isClient) {
+                ((MessageClientExt) msgExt).setOffsetMsgId(msgId);
+            }
+
+            return msgExt;
+        } catch (UnknownHostException e) {
+            byteBuffer.position(byteBuffer.limit());
+        } catch (BufferUnderflowException e) {
+            byteBuffer.position(byteBuffer.limit());
+        } catch (Exception e) {
+            byteBuffer.position(byteBuffer.limit());
+        }
+
+        return null;
+    }
+
+
+    public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer) {
+        return decodes(byteBuffer, true);
+    }
+
+    public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
+        List<MessageExt> msgExts = new ArrayList<MessageExt>();
+        while (byteBuffer.hasRemaining()) {
+            MessageExt msgExt = clientDecode(byteBuffer, readBody);
+            if (null != msgExt) {
+                msgExts.add(msgExt);
+            } else {
+                break;
+            }
+        }
+        return msgExts;
+    }
+
+    public static final char NAME_VALUE_SEPARATOR = 1;
+    public static final char PROPERTY_SEPARATOR = 2;
+
+
+    public static String messageProperties2String(Map<String, String> properties) {
+        StringBuilder sb = new StringBuilder();
+        if (properties != null) {
+            for (final Map.Entry<String, String> entry : properties.entrySet()) {
+                final String name = entry.getKey();
+                final String value = entry.getValue();
+
+                sb.append(name);
+                sb.append(NAME_VALUE_SEPARATOR);
+                sb.append(value);
+                sb.append(PROPERTY_SEPARATOR);
+            }
+        }
+        return sb.toString();
+    }
+
+    public static Map<String, String> string2messageProperties(final String properties) {
+        Map<String, String> map = new HashMap<String, String>();
+        if (properties != null) {
+            String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR));
+            if (items != null) {
+                for (String i : items) {
+                    String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR));
+                    if (nv != null && 2 == nv.length) {
+                        map.put(nv[0], nv[1]);
+                    }
+                }
+            }
+        }
+
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java
new file mode 100644
index 0000000..627935d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageExt extends Message {
+    private static final long serialVersionUID = 5720810158625748049L;
+
+    private int queueId;
+
+    private int storeSize;
+
+    private long queueOffset;
+    private int sysFlag;
+    private long bornTimestamp;
+    private SocketAddress bornHost;
+
+    private long storeTimestamp;
+    private SocketAddress storeHost;
+    private String msgId;
+    private long commitLogOffset;
+    private int bodyCRC;
+    private int reconsumeTimes;
+
+    private long preparedTransactionOffset;
+
+
+    public MessageExt() {
+    }
+
+
+    public MessageExt(int queueId, long bornTimestamp, SocketAddress bornHost, long storeTimestamp,
+                      SocketAddress storeHost, String msgId) {
+        this.queueId = queueId;
+        this.bornTimestamp = bornTimestamp;
+        this.bornHost = bornHost;
+        this.storeTimestamp = storeTimestamp;
+        this.storeHost = storeHost;
+        this.msgId = msgId;
+    }
+
+    public static TopicFilterType parseTopicFilterType(final int sysFlag) {
+        if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) {
+            return TopicFilterType.MULTI_TAG;
+        }
+
+        return TopicFilterType.SINGLE_TAG;
+    }
+
+    public ByteBuffer getBornHostBytes() {
+        return socketAddress2ByteBuffer(this.bornHost);
+    }
+
+    public ByteBuffer getBornHostBytes(ByteBuffer byteBuffer) {
+        return socketAddress2ByteBuffer(this.bornHost, byteBuffer);
+    }
+
+    private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+        byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
+        byteBuffer.putInt(inetSocketAddress.getPort());
+        byteBuffer.flip();
+        return byteBuffer;
+    }
+
+    public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+        return socketAddress2ByteBuffer(socketAddress, byteBuffer);
+    }
+
+    public ByteBuffer getStoreHostBytes() {
+        return socketAddress2ByteBuffer(this.storeHost);
+    }
+
+    public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {
+        return socketAddress2ByteBuffer(this.storeHost, byteBuffer);
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public long getBornTimestamp() {
+        return bornTimestamp;
+    }
+
+    public void setBornTimestamp(long bornTimestamp) {
+        this.bornTimestamp = bornTimestamp;
+    }
+
+    public SocketAddress getBornHost() {
+        return bornHost;
+    }
+
+    public void setBornHost(SocketAddress bornHost) {
+        this.bornHost = bornHost;
+    }
+
+    public String getBornHostString() {
+        if (this.bornHost != null) {
+            InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
+            return inetSocketAddress.getAddress().getHostAddress();
+        }
+
+        return null;
+    }
+
+    public String getBornHostNameString() {
+        if (this.bornHost != null) {
+            InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
+            return inetSocketAddress.getAddress().getHostName();
+        }
+
+        return null;
+    }
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+    public void setStoreTimestamp(long storeTimestamp) {
+        this.storeTimestamp = storeTimestamp;
+    }
+
+    public SocketAddress getStoreHost() {
+        return storeHost;
+    }
+
+    public void setStoreHost(SocketAddress storeHost) {
+        this.storeHost = storeHost;
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public int getSysFlag() {
+        return sysFlag;
+    }
+
+    public void setSysFlag(int sysFlag) {
+        this.sysFlag = sysFlag;
+    }
+
+    public int getBodyCRC() {
+        return bodyCRC;
+    }
+
+    public void setBodyCRC(int bodyCRC) {
+        this.bodyCRC = bodyCRC;
+    }
+
+    public long getQueueOffset() {
+        return queueOffset;
+    }
+
+    public void setQueueOffset(long queueOffset) {
+        this.queueOffset = queueOffset;
+    }
+
+    public long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+    public void setCommitLogOffset(long physicOffset) {
+        this.commitLogOffset = physicOffset;
+    }
+
+    public int getStoreSize() {
+        return storeSize;
+    }
+
+    public void setStoreSize(int storeSize) {
+        this.storeSize = storeSize;
+    }
+
+    public int getReconsumeTimes() {
+        return reconsumeTimes;
+    }
+
+
+    public void setReconsumeTimes(int reconsumeTimes) {
+        this.reconsumeTimes = reconsumeTimes;
+    }
+
+
+    public long getPreparedTransactionOffset() {
+        return preparedTransactionOffset;
+    }
+
+
+    public void setPreparedTransactionOffset(long preparedTransactionOffset) {
+        this.preparedTransactionOffset = preparedTransactionOffset;
+    }
+
+
+    @Override
+    public String toString() {
+        return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
+                + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
+                + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
+                + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
+                + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
+                + ", toString()=" + super.toString() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java
new file mode 100644
index 0000000..d08be86
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import java.net.SocketAddress;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageId {
+    private SocketAddress address;
+    private long offset;
+
+
+    public MessageId(SocketAddress address, long offset) {
+        this.address = address;
+        this.offset = offset;
+    }
+
+
+    public SocketAddress getAddress() {
+        return address;
+    }
+
+
+    public void setAddress(SocketAddress address) {
+        this.address = address;
+    }
+
+
+    public long getOffset() {
+        return offset;
+    }
+
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java
new file mode 100644
index 0000000..35d2827
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import java.io.Serializable;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageQueue implements Comparable<MessageQueue>, Serializable {
+    private static final long serialVersionUID = 6191200464116433425L;
+    private String topic;
+    private String brokerName;
+    private int queueId;
+
+
+    public MessageQueue() {
+
+    }
+
+
+    public MessageQueue(String topic, String brokerName, int queueId) {
+        this.topic = topic;
+        this.brokerName = brokerName;
+        this.queueId = queueId;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+        result = prime * result + queueId;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        return result;
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        MessageQueue other = (MessageQueue) obj;
+        if (brokerName == null) {
+            if (other.brokerName != null)
+                return false;
+        } else if (!brokerName.equals(other.brokerName))
+            return false;
+        if (queueId != other.queueId)
+            return false;
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic))
+            return false;
+        return true;
+    }
+
+
+    @Override
+    public String toString() {
+        return "MessageQueue [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId + "]";
+    }
+
+
+    @Override
+    public int compareTo(MessageQueue o) {
+        {
+            int result = this.topic.compareTo(o.topic);
+            if (result != 0) {
+                return result;
+            }
+        }
+
+        {
+            int result = this.brokerName.compareTo(o.brokerName);
+            if (result != 0) {
+                return result;
+            }
+        }
+
+        return this.queueId - o.queueId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java
new file mode 100644
index 0000000..a905af6
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.message;
+
+import java.io.Serializable;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class MessageQueueForC implements Comparable<MessageQueueForC>, Serializable {
+
+    private static final long serialVersionUID = 5320967846569962104L;
+    private String topic;
+    private String brokerName;
+    private int queueId;
+    private long offset;
+
+
+    public MessageQueueForC(String topic, String brokerName, int queueId, long offset) {
+        this.topic = topic;
+        this.brokerName = brokerName;
+        this.queueId = queueId;
+        this.offset = offset;
+    }
+
+
+    @Override
+    public int compareTo(MessageQueueForC o) {
+        int result = this.topic.compareTo(o.topic);
+        if (result != 0) {
+            return result;
+        }
+        result = this.brokerName.compareTo(o.brokerName);
+        if (result != 0) {
+            return result;
+        }
+        result = this.queueId - o.queueId;
+        if (result != 0) {
+            return result;
+        }
+        if ((this.offset - o.offset) > 0) {
+            return 1;
+        } else if ((this.offset - o.offset) == 0) {
+            return 0;
+        } else {
+            return -1;
+        }
+    }
+
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+        result = prime * result + queueId;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        return result;
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        MessageQueueForC other = (MessageQueueForC) obj;
+        if (brokerName == null) {
+            if (other.brokerName != null)
+                return false;
+        } else if (!brokerName.equals(other.brokerName))
+            return false;
+        if (queueId != other.queueId)
+            return false;
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic))
+            return false;
+
+        if (offset != other.offset) {
+            return false;
+        }
+        return true;
+    }
+
+
+    @Override
+    public String toString() {
+        return "MessageQueueForC [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId
+                + ", offset=" + offset + "]";
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+
+    public long getOffset() {
+        return offset;
+    }
+
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java
new file mode 100644
index 0000000..164eb87
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.message;
+
+public enum MessageType {
+    Normal_Msg,
+    Trans_Msg_Half,
+    Trans_msg_Commit,
+    Delay_Msg,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java
new file mode 100644
index 0000000..08db357
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.namesrv;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+
+/**
+ *
+ * @author shijia.wxr
+ * @author lansheng.zj
+ */
+public class NamesrvConfig {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
+    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
+    private String productEnvName = "center";
+    private boolean clusterTest = false;
+    private boolean orderMessageEnable = false;
+
+    public boolean isOrderMessageEnable() {
+        return orderMessageEnable;
+    }
+
+    public void setOrderMessageEnable(boolean orderMessageEnable) {
+        this.orderMessageEnable = orderMessageEnable;
+    }
+
+    public String getRocketmqHome() {
+        return rocketmqHome;
+    }
+
+
+    public void setRocketmqHome(String rocketmqHome) {
+        this.rocketmqHome = rocketmqHome;
+    }
+
+
+    public String getKvConfigPath() {
+        return kvConfigPath;
+    }
+
+
+    public void setKvConfigPath(String kvConfigPath) {
+        this.kvConfigPath = kvConfigPath;
+    }
+
+
+    public String getProductEnvName() {
+        return productEnvName;
+    }
+
+
+    public void setProductEnvName(String productEnvName) {
+        this.productEnvName = productEnvName;
+    }
+
+
+    public boolean isClusterTest() {
+        return clusterTest;
+    }
+
+
+    public void setClusterTest(boolean clusterTest) {
+        this.clusterTest = clusterTest;
+    }
+
+    public String getConfigStorePath() {
+        return configStorePath;
+    }
+
+    public void setConfigStorePath(final String configStorePath) {
+        this.configStorePath = configStorePath;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java
new file mode 100644
index 0000000..fcc32d9
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.namesrv;
+
+/**
+ * @author shijia.wxr
+ */
+public class NamesrvUtil {
+    public static final String NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG";
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java
new file mode 100644
index 0000000..68bf44a
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.namesrv;
+
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RegisterBrokerResult {
+    private String haServerAddr;
+    private String masterAddr;
+    private KVTable kvTable;
+
+
+    public String getHaServerAddr() {
+        return haServerAddr;
+    }
+
+
+    public void setHaServerAddr(String haServerAddr) {
+        this.haServerAddr = haServerAddr;
+    }
+
+
+    public String getMasterAddr() {
+        return masterAddr;
+    }
+
+
+    public void setMasterAddr(String masterAddr) {
+        this.masterAddr = masterAddr;
+    }
+
+
+    public KVTable getKvTable() {
+        return kvTable;
+    }
+
+
+    public void setKvTable(KVTable kvTable) {
+        this.kvTable = kvTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java
new file mode 100644
index 0000000..2e4ad87
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: TopAddressing.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.namesrv;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.utils.HttpTinyClient;
+import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+/**
+ * @author shijia.wxr
+ * @author manhong.yqd
+ */
+public class TopAddressing {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+    private String nsAddr;
+    private String wsAddr;
+    private String unitName;
+
+
+    public TopAddressing(final String wsAddr) {
+        this(wsAddr, null);
+    }
+
+
+    public TopAddressing(final String wsAddr, final String unitName) {
+        this.wsAddr = wsAddr;
+        this.unitName = unitName;
+    }
+
+    public final String fetchNSAddr() {
+        return fetchNSAddr(true, 3000);
+    }
+
+    public final String fetchNSAddr(boolean verbose, long timeoutMills) {
+        String url = this.wsAddr;
+        try {
+            if (!UtilAll.isBlank(this.unitName)) {
+                url = url + "-" + this.unitName + "?nofix=1";
+            }
+            HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
+            if (200 == result.code) {
+                String responseStr = result.content;
+                if (responseStr != null) {
+                    return clearNewLine(responseStr);
+                } else {
+                    log.error("fetch nameserver address is null");
+                }
+            } else {
+                log.error("fetch nameserver address failed. statusCode={}", result.code);
+            }
+        } catch (IOException e) {
+            if (verbose) {
+                log.error("fetch name server address exception", e);
+            }
+        }
+
+        if (verbose) {
+            String errorMsg =
+                    "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts";
+            errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);
+
+            log.warn(errorMsg);
+        }
+        return null;
+    }
+
+    private static String clearNewLine(final String str) {
+        String newString = str.trim();
+        int index = newString.indexOf("\r");
+        if (index != -1) {
+            return newString.substring(0, index);
+        }
+
+        index = newString.indexOf("\n");
+        if (index != -1) {
+            return newString.substring(0, index);
+        }
+
+        return newString;
+    }
+
+    public String getNsAddr() {
+        return nsAddr;
+    }
+
+
+    public void setNsAddr(String nsAddr) {
+        this.nsAddr = nsAddr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java
new file mode 100644
index 0000000..aaaa51d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol;
+
+import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQProtosHelper {
+    public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr,
+                                                     final long timeoutMillis) {
+        RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
+        requestHeader.setBrokerAddr(brokerAddr);
+
+        RemotingCommand request =
+                RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
+
+        try {
+            RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis);
+            if (response != null) {
+                return ResponseCode.SUCCESS == response.getCode();
+            }
+        } catch (RemotingConnectException e) {
+            e.printStackTrace();
+        } catch (RemotingSendRequestException e) {
+            e.printStackTrace();
+        } catch (RemotingTimeoutException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java
new file mode 100644
index 0000000..a8b8698
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol;
+
+public class RequestCode {
+
+    public static final int SEND_MESSAGE = 10;
+
+    public static final int PULL_MESSAGE = 11;
+
+    public static final int QUERY_MESSAGE = 12;
+    public static final int QUERY_BROKER_OFFSET = 13;
+    public static final int QUERY_CONSUMER_OFFSET = 14;
+    public static final int UPDATE_CONSUMER_OFFSET = 15;
+    public static final int UPDATE_AND_CREATE_TOPIC = 17;
+    public static final int GET_ALL_TOPIC_CONFIG = 21;
+    public static final int GET_TOPIC_CONFIG_LIST = 22;
+
+    public static final int GET_TOPIC_NAME_LIST = 23;
+
+    public static final int UPDATE_BROKER_CONFIG = 25;
+
+    public static final int GET_BROKER_CONFIG = 26;
+
+    public static final int TRIGGER_DELETE_FILES = 27;
+
+    public static final int GET_BROKER_RUNTIME_INFO = 28;
+    public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;
+    public static final int GET_MAX_OFFSET = 30;
+    public static final int GET_MIN_OFFSET = 31;
+
+    public static final int GET_EARLIEST_MSG_STORETIME = 32;
+
+    public static final int VIEW_MESSAGE_BY_ID = 33;
+
+    public static final int HEART_BEAT = 34;
+
+    public static final int UNREGISTER_CLIENT = 35;
+
+    public static final int CONSUMER_SEND_MSG_BACK = 36;
+
+    public static final int END_TRANSACTION = 37;
+    public static final int GET_CONSUMER_LIST_BY_GROUP = 38;
+
+    public static final int CHECK_TRANSACTION_STATE = 39;
+
+    public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;
+
+    public static final int LOCK_BATCH_MQ = 41;
+
+    public static final int UNLOCK_BATCH_MQ = 42;
+    public static final int GET_ALL_CONSUMER_OFFSET = 43;
+
+    public static final int GET_ALL_DELAY_OFFSET = 45;
+
+    public static final int PUT_KV_CONFIG = 100;
+
+    public static final int GET_KV_CONFIG = 101;
+
+    public static final int DELETE_KV_CONFIG = 102;
+
+    public static final int REGISTER_BROKER = 103;
+
+    public static final int UNREGISTER_BROKER = 104;
+    public static final int GET_ROUTEINTO_BY_TOPIC = 105;
+
+    public static final int GET_BROKER_CLUSTER_INFO = 106;
+    public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
+    public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;
+    public static final int GET_TOPIC_STATS_INFO = 202;
+    public static final int GET_CONSUMER_CONNECTION_LIST = 203;
+    public static final int GET_PRODUCER_CONNECTION_LIST = 204;
+    public static final int WIPE_WRITE_PERM_OF_BROKER = 205;
+
+
+    public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;
+
+    public static final int DELETE_SUBSCRIPTIONGROUP = 207;
+    public static final int GET_CONSUME_STATS = 208;
+
+    public static final int SUSPEND_CONSUMER = 209;
+
+    public static final int RESUME_CONSUMER = 210;
+    public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;
+    public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;
+
+    public static final int ADJUST_CONSUMER_THREAD_POOL = 213;
+
+    public static final int WHO_CONSUME_THE_MESSAGE = 214;
+
+
+    public static final int DELETE_TOPIC_IN_BROKER = 215;
+
+    public static final int DELETE_TOPIC_IN_NAMESRV = 216;
+    public static final int GET_KVLIST_BY_NAMESPACE = 219;
+
+
+    public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
+
+    public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;
+
+    public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;
+
+    public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;
+
+
+    public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;
+
+    public static final int GET_TOPICS_BY_CLUSTER = 224;
+
+    public static final int REGISTER_FILTER_SERVER = 301;
+    public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;
+
+    public static final int QUERY_CONSUME_TIME_SPAN = 303;
+
+    public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;
+    public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;
+
+    public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;
+
+    public static final int GET_CONSUMER_RUNNING_INFO = 307;
+
+    public static final int QUERY_CORRECTION_OFFSET = 308;
+    public static final int CONSUME_MESSAGE_DIRECTLY = 309;
+
+    public static final int SEND_MESSAGE_V2 = 310;
+
+    public static final int GET_UNIT_TOPIC_LIST = 311;
+
+    public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;
+
+    public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;
+
+    public static final int CLONE_GROUP_OFFSET = 314;
+
+    public static final int VIEW_BROKER_STATS_DATA = 315;
+
+    public static final int CLEAN_UNUSED_TOPIC = 316;
+
+    public static final int GET_BROKER_CONSUME_STATS = 317;
+
+    /**
+     * update the config of name server
+     */
+    public static final int UPDATE_NAMESRV_CONFIG = 318;
+
+    /**
+     * get config from name server
+     */
+    public static final int GET_NAMESRV_CONFIG = 319;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java
new file mode 100644
index 0000000..3c01fad
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode;
+
+
+public class ResponseCode extends RemotingSysResponseCode {
+
+    public static final int FLUSH_DISK_TIMEOUT = 10;
+
+    public static final int SLAVE_NOT_AVAILABLE = 11;
+
+    public static final int FLUSH_SLAVE_TIMEOUT = 12;
+
+    public static final int MESSAGE_ILLEGAL = 13;
+
+    public static final int SERVICE_NOT_AVAILABLE = 14;
+
+    public static final int VERSION_NOT_SUPPORTED = 15;
+
+    public static final int NO_PERMISSION = 16;
+
+    public static final int TOPIC_NOT_EXIST = 17;
+    public static final int TOPIC_EXIST_ALREADY = 18;
+    public static final int PULL_NOT_FOUND = 19;
+
+    public static final int PULL_RETRY_IMMEDIATELY = 20;
+
+    public static final int PULL_OFFSET_MOVED = 21;
+
+    public static final int QUERY_NOT_FOUND = 22;
+
+    public static final int SUBSCRIPTION_PARSE_FAILED = 23;
+
+    public static final int SUBSCRIPTION_NOT_EXIST = 24;
+
+    public static final int SUBSCRIPTION_NOT_LATEST = 25;
+
+    public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
+
+    public static final int TRANSACTION_SHOULD_COMMIT = 200;
+
+    public static final int TRANSACTION_SHOULD_ROLLBACK = 201;
+
+    public static final int TRANSACTION_STATE_UNKNOW = 202;
+
+    public static final int TRANSACTION_STATE_GROUP_WRONG = 203;
+    public static final int NO_BUYER_ID = 204;
+
+
+    public static final int NOT_IN_CURRENT_UNIT = 205;
+
+
+    public static final int CONSUMER_NOT_ONLINE = 206;
+
+
+    public static final int CONSUME_MSG_TIMEOUT = 207;
+
+
+    public static final int NO_MESSAGE = 208;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java
new file mode 100644
index 0000000..6f51b06
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class BrokerStatsData extends RemotingSerializable {
+
+    private BrokerStatsItem statsMinute;
+
+    private BrokerStatsItem statsHour;
+
+    private BrokerStatsItem statsDay;
+
+
+    public BrokerStatsItem getStatsMinute() {
+        return statsMinute;
+    }
+
+
+    public void setStatsMinute(BrokerStatsItem statsMinute) {
+        this.statsMinute = statsMinute;
+    }
+
+
+    public BrokerStatsItem getStatsHour() {
+        return statsHour;
+    }
+
+
+    public void setStatsHour(BrokerStatsItem statsHour) {
+        this.statsHour = statsHour;
+    }
+
+
+    public BrokerStatsItem getStatsDay() {
+        return statsDay;
+    }
+
+
+    public void setStatsDay(BrokerStatsItem statsDay) {
+        this.statsDay = statsDay;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java
new file mode 100644
index 0000000..1cf6c3d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+public class BrokerStatsItem {
+    private long sum;
+    private double tps;
+    private double avgpt;
+
+
+    public long getSum() {
+        return sum;
+    }
+
+
+    public void setSum(long sum) {
+        this.sum = sum;
+    }
+
+
+    public double getTps() {
+        return tps;
+    }
+
+
+    public void setTps(double tps) {
+        this.tps = tps;
+    }
+
+
+    public double getAvgpt() {
+        return avgpt;
+    }
+
+
+    public void setAvgpt(double avgpt) {
+        this.avgpt = avgpt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java
new file mode 100644
index 0000000..873b548
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+public enum CMResult {
+    CR_SUCCESS,
+    CR_LATER,
+    CR_ROLLBACK,
+    CR_COMMIT,
+    CR_THROW_EXCEPTION,
+    CR_RETURN_NULL,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java
new file mode 100644
index 0000000..81d6447
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClusterInfo extends RemotingSerializable {
+    private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
+    private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
+
+
+    public HashMap<String, BrokerData> getBrokerAddrTable() {
+        return brokerAddrTable;
+    }
+
+
+    public void setBrokerAddrTable(HashMap<String, BrokerData> brokerAddrTable) {
+        this.brokerAddrTable = brokerAddrTable;
+    }
+
+
+    public HashMap<String, Set<String>> getClusterAddrTable() {
+        return clusterAddrTable;
+    }
+
+
+    public void setClusterAddrTable(HashMap<String, Set<String>> clusterAddrTable) {
+        this.clusterAddrTable = clusterAddrTable;
+    }
+
+
+    public String[] retrieveAllAddrByCluster(String cluster) {
+        List<String> addrs = new ArrayList<String>();
+        if (clusterAddrTable.containsKey(cluster)) {
+            Set<String> brokerNames = clusterAddrTable.get(cluster);
+            for (String brokerName : brokerNames) {
+                BrokerData brokerData = brokerAddrTable.get(brokerName);
+                if (null != brokerData) {
+                    addrs.addAll(brokerData.getBrokerAddrs().values());
+                }
+            }
+        }
+
+        return addrs.toArray(new String[]{});
+    }
+
+
+    public String[] retrieveAllClusterNames() {
+        return clusterAddrTable.keySet().toArray(new String[]{});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java
new file mode 100644
index 0000000..72cf601
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.LanguageCode;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class Connection {
+    private String clientId;
+    private String clientAddr;
+    private LanguageCode language;
+    private int version;
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+
+    public String getClientAddr() {
+        return clientAddr;
+    }
+
+
+    public void setClientAddr(String clientAddr) {
+        this.clientAddr = clientAddr;
+    }
+
+
+    public LanguageCode getLanguage() {
+        return language;
+    }
+
+
+    public void setLanguage(LanguageCode language) {
+        this.language = language;
+    }
+
+
+    public int getVersion() {
+        return version;
+    }
+
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java
new file mode 100644
index 0000000..8a69352
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class ConsumeByWho extends RemotingSerializable {
+    private HashSet<String> consumedGroup = new HashSet<String>();
+    private HashSet<String> notConsumedGroup = new HashSet<String>();
+    private String topic;
+    private int queueId;
+    private long offset;
+
+
+    public HashSet<String> getConsumedGroup() {
+        return consumedGroup;
+    }
+
+
+    public void setConsumedGroup(HashSet<String> consumedGroup) {
+        this.consumedGroup = consumedGroup;
+    }
+
+
+    public HashSet<String> getNotConsumedGroup() {
+        return notConsumedGroup;
+    }
+
+
+    public void setNotConsumedGroup(HashSet<String> notConsumedGroup) {
+        this.notConsumedGroup = notConsumedGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+
+    public long getOffset() {
+        return offset;
+    }
+
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
new file mode 100644
index 0000000..c895fe2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class ConsumeMessageDirectlyResult extends RemotingSerializable {
+    private boolean order = false;
+    private boolean autoCommit = true;
+    private CMResult consumeResult;
+    private String remark;
+    private long spentTimeMills;
+
+
+    public boolean isOrder() {
+        return order;
+    }
+
+
+    public void setOrder(boolean order) {
+        this.order = order;
+    }
+
+
+    public boolean isAutoCommit() {
+        return autoCommit;
+    }
+
+
+    public void setAutoCommit(boolean autoCommit) {
+        this.autoCommit = autoCommit;
+    }
+
+
+    public String getRemark() {
+        return remark;
+    }
+
+
+    public void setRemark(String remark) {
+        this.remark = remark;
+    }
+
+
+    public CMResult getConsumeResult() {
+        return consumeResult;
+    }
+
+
+    public void setConsumeResult(CMResult consumeResult) {
+        this.consumeResult = consumeResult;
+    }
+
+
+    public long getSpentTimeMills() {
+        return spentTimeMills;
+    }
+
+
+    public void setSpentTimeMills(long spentTimeMills) {
+        this.spentTimeMills = spentTimeMills;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ConsumeMessageDirectlyResult [order=" + order + ", autoCommit=" + autoCommit
+                + ", consumeResult=" + consumeResult + ", remark=" + remark + ", spentTimeMills="
+                + spentTimeMills + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java
new file mode 100644
index 0000000..a1c608d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumeStatsList extends RemotingSerializable {
+    private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>();
+    private String brokerAddr;
+    private long totalDiff;
+
+    public List<Map<String, List<ConsumeStats>>> getConsumeStatsList() {
+        return consumeStatsList;
+    }
+
+    public void setConsumeStatsList(List<Map<String, List<ConsumeStats>>> consumeStatsList) {
+        this.consumeStatsList = consumeStatsList;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+    public long getTotalDiff() {
+        return totalDiff;
+    }
+
+    public void setTotalDiff(long totalDiff) {
+        this.totalDiff = totalDiff;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java
new file mode 100644
index 0000000..dcb6281
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+public class ConsumeStatus {
+    private double pullRT;
+    private double pullTPS;
+    private double consumeRT;
+    private double consumeOKTPS;
+    private double consumeFailedTPS;
+
+    private long consumeFailedMsgs;
+
+
+    public double getPullRT() {
+        return pullRT;
+    }
+
+
+    public void setPullRT(double pullRT) {
+        this.pullRT = pullRT;
+    }
+
+
+    public double getPullTPS() {
+        return pullTPS;
+    }
+
+
+    public void setPullTPS(double pullTPS) {
+        this.pullTPS = pullTPS;
+    }
+
+
+    public double getConsumeRT() {
+        return consumeRT;
+    }
+
+
+    public void setConsumeRT(double consumeRT) {
+        this.consumeRT = consumeRT;
+    }
+
+
+    public double getConsumeOKTPS() {
+        return consumeOKTPS;
+    }
+
+
+    public void setConsumeOKTPS(double consumeOKTPS) {
+        this.consumeOKTPS = consumeOKTPS;
+    }
+
+
+    public double getConsumeFailedTPS() {
+        return consumeFailedTPS;
+    }
+
+
+    public void setConsumeFailedTPS(double consumeFailedTPS) {
+        this.consumeFailedTPS = consumeFailedTPS;
+    }
+
+
+    public long getConsumeFailedMsgs() {
+        return consumeFailedMsgs;
+    }
+
+
+    public void setConsumeFailedMsgs(long consumeFailedMsgs) {
+        this.consumeFailedMsgs = consumeFailedMsgs;
+    }
+}


Mime
View raw message