rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [03/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp
new file mode 100755
index 0000000..fb2d2a6
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp
@@ -0,0 +1,672 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "CommandCustomHeader.h"
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sstream>
+#include <string>
+#include <cstdlib>
+#include "RemotingCommand.h"
+#include "MQProtos.h"
+#include "KPRUtil.h"
+#include "UtilAll.h"
+
+#include "json/json.h"
+
+namespace rmq
+{
+
+
+CommandCustomHeader* CommandCustomHeader::decode(int code, Json::Value& data, bool isResponseType)
+{
+	CommandCustomHeader* pCustomHeader = NULL;
+
+	try
+	{
+	    if (isResponseType)
+	    {
+	        switch (code)
+	        {
+	            case SEND_MESSAGE_VALUE:
+	            case SEND_MESSAGE_V2_VALUE:
+	                pCustomHeader = SendMessageResponseHeader::decode(data);
+	                break;
+	            case PULL_MESSAGE_VALUE:
+	                pCustomHeader = PullMessageResponseHeader::decode(data);
+	                break;
+	            case QUERY_CONSUMER_OFFSET_VALUE:
+	                pCustomHeader = QueryConsumerOffsetResponseHeader::decode(data);
+	                break;
+	            case SEARCH_OFFSET_BY_TIMESTAMP_VALUE:
+	            	pCustomHeader = SearchOffsetResponseHeader::decode(data);
+	            	break;
+	           	case GET_MAX_OFFSET_VALUE:
+	            	pCustomHeader = GetMaxOffsetResponseHeader::decode(data);
+	            	break;
+	            case GET_MIN_OFFSET_VALUE:
+	            	pCustomHeader = GetMinOffsetResponseHeader::decode(data);
+	            	break;
+	            case GET_EARLIEST_MSG_STORETIME_VALUE:
+	            	pCustomHeader = GetEarliestMsgStoretimeResponseHeader::decode(data);
+	            	break;
+	            case QUERY_MESSAGE_VALUE:
+	            	pCustomHeader = QueryMessageResponseHeader::decode(data);
+	            	break;
+	            case GET_KV_CONFIG_VALUE:
+	            	pCustomHeader = GetKVConfigResponseHeader::decode(data);
+	            	break;
+
+	            default:
+	                break;
+	        }
+	    }
+	    else
+	    {
+			switch (code)
+	        {
+	            case NOTIFY_CONSUMER_IDS_CHANGED_VALUE:
+	                pCustomHeader = NotifyConsumerIdsChangedRequestHeader::decode(data);
+	                break;
+	            case GET_CONSUMER_RUNNING_INFO_VALUE:
+	            	pCustomHeader = GetConsumerRunningInfoRequestHeader::decode(data);
+	            	break;
+	            default:
+	                break;
+	        }
+	    }
+    }
+    catch(std::exception& e)
+    {
+    	if (pCustomHeader != NULL)
+    	{
+    		delete pCustomHeader;
+    		pCustomHeader = NULL;
+    	}
+    	RMQ_ERROR("CommandCustomHeader decode exception, %d, %d, %s, %s",
+    		code, isResponseType, UtilAll::toString(data).c_str(), e.what());
+    }
+    catch(...)
+    {
+    	if (pCustomHeader != NULL)
+    	{
+    		delete pCustomHeader;
+    		pCustomHeader = NULL;
+    	}
+    	RMQ_ERROR("CommandCustomHeader decode exception, %d, %d, %s",
+    		code, isResponseType, UtilAll::toString(data).c_str());
+    }
+
+    return pCustomHeader;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+//GET_ROUTEINTO_BY_TOPIC_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void GetRouteInfoRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss << "{"
+       << "\"topic\":\"" << topic << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// UPDATE_AND_CREATE_TOPIC_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void CreateTopicRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"topic\":\"" << topic << "\","
+       << "\"defaultTopic\":\"" << defaultTopic << "\","
+	   << "\"readQueueNums\":\"" << readQueueNums << "\","
+	   << "\"writeQueueNums\":\"" << writeQueueNums << "\","
+	   << "\"perm\":\"" << perm << "\","
+	   << "\"topicFilterType\":\"" << topicFilterType << "\","
+	   << "\"topicSysFlag\":\"" << topicFilterType << "\","
+	   << "\"order\":\"" << topicFilterType << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// SEND_MESSAGE_VALUE/SEND_MESSAGE_V2_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void SendMessageRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"producerGroup\":\"" << producerGroup << "\","
+       << "\"topic\":\"" << topic << "\","
+       << "\"defaultTopic\":\"" << defaultTopic << "\","
+       << "\"defaultTopicQueueNums\":" << defaultTopicQueueNums << ","
+       << "\"queueId\":" << queueId << ","
+       << "\"sysFlag\":" << sysFlag << ","
+       << "\"bornTimestamp\":" << bornTimestamp << ","
+       << "\"flag\":" << flag << ","
+       << "\"properties\":\"" << properties << "\","
+       << "\"reconsumeTimes\":" << reconsumeTimes
+       << "}";
+
+    outData = ss.str();
+}
+
+void SendMessageRequestHeaderV2::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"a\":\"" << a << "\","
+       << "\"b\":\"" << b << "\","
+       << "\"c\":\"" << c << "\","
+       << "\"d\":\"" << d << "\","
+       << "\"e\":\"" << e << "\","
+       << "\"f\":\"" << f << "\","
+       << "\"g\":\"" << g << "\","
+       << "\"h\":\"" << h << "\","
+       << "\"i\":\"" << i << "\","
+       << "\"j\":\"" << j << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+SendMessageRequestHeader* SendMessageRequestHeaderV2::createSendMessageRequestHeaderV1(
+	const SendMessageRequestHeaderV2* v2)
+{
+    SendMessageRequestHeader* v1 = new SendMessageRequestHeader();
+    v1->producerGroup = v2->a;
+    v1->topic = v2->b;
+    v1->defaultTopic = v2->c;
+    v1->defaultTopicQueueNums = v2->d;
+    v1->queueId = v2->e;
+    v1->sysFlag = v2->f;
+    v1->bornTimestamp = v2->g;
+    v1->flag = v2->h;
+    v1->properties = v2->i;
+    v1->reconsumeTimes = v2->j;
+
+    return v1;
+}
+
+SendMessageRequestHeaderV2* SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(
+	const SendMessageRequestHeader* v1)
+{
+    SendMessageRequestHeaderV2* v2 = new SendMessageRequestHeaderV2();
+    v2->a = v1->producerGroup;
+    v2->b = v1->topic;
+    v2->c = v1->defaultTopic;
+    v2->d = v1->defaultTopicQueueNums;
+    v2->e = v1->queueId;
+    v2->f = v1->sysFlag;
+    v2->g = v1->bornTimestamp;
+    v2->h = v1->flag;
+    v2->i = v1->properties;
+    v2->j = v1->reconsumeTimes;
+
+    return v2;
+}
+
+void SendMessageResponseHeader::encode(std::string& outData)
+{
+}
+
+CommandCustomHeader* SendMessageResponseHeader::decode(Json::Value& data)
+{
+    std::string msgId = data["msgId"].asString();
+    int queueId = atoi(data["queueId"].asCString());
+    long long queueOffset = KPRUtil::str2ll(data["queueOffset"].asCString());
+
+    SendMessageResponseHeader* h = new SendMessageResponseHeader();
+
+    h->msgId = msgId;
+    h->queueId = queueId;
+    h->queueOffset = queueOffset;
+
+    return h;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// PULL_MESSAGE_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void PullMessageRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"consumerGroup\":\"" << consumerGroup << "\","
+       << "\"topic\":\"" << topic << "\","
+       << "\"queueId\":\"" << queueId << "\","
+       << "\"queueOffset\":\"" << queueOffset << "\","
+       << "\"maxMsgNums\":\"" << maxMsgNums << "\","
+       << "\"sysFlag\":\"" << sysFlag << "\","
+       << "\"commitOffset\":\"" << commitOffset << "\","
+       << "\"suspendTimeoutMillis\":\"" << suspendTimeoutMillis << "\","
+       << "\"subscription\":\"" << subscription << "\","
+       << "\"subVersion\":\"" << subVersion << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+void PullMessageResponseHeader::encode(std::string& outData)
+{
+	std::stringstream ss;
+    ss  << "{"
+        << "\"suggestWhichBrokerId\":\"" << suggestWhichBrokerId << "\","
+        << "\"nextBeginOffset\":\"" << nextBeginOffset << "\","
+        << "\"minOffset\":\"" << minOffset << "\","
+        << "\"maxOffset\":\"" << maxOffset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* PullMessageResponseHeader::decode(Json::Value& data)
+{
+    long long suggestWhichBrokerId = KPRUtil::str2ll(data["suggestWhichBrokerId"].asCString());
+    long long nextBeginOffset = KPRUtil::str2ll(data["nextBeginOffset"].asCString());
+    long long minOffset = KPRUtil::str2ll(data["minOffset"].asCString());
+    long long maxOffset = KPRUtil::str2ll(data["maxOffset"].asCString());
+
+    PullMessageResponseHeader* h = new PullMessageResponseHeader();
+    h->suggestWhichBrokerId = suggestWhichBrokerId;
+    h->nextBeginOffset = nextBeginOffset;
+    h->minOffset = minOffset;
+    h->maxOffset = maxOffset;
+
+    return h;
+}
+
+
+
+////////////////////////////////////////////////////////////////////////////////
+// GET_CONSUMER_LIST_BY_GROUP_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void GetConsumerListByGroupRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"consumerGroup\":\"" << consumerGroup << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// CONSUMER_SEND_MSG_BACK_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void ConsumerSendMsgBackRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"offset\":\"" << offset << "\","
+       << "\"group\":\"" << group << "\","
+       << "\"delayLevel\":\"" << delayLevel << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// QUERY_CONSUMER_OFFSET_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void QueryConsumerOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"consumerGroup\":\"" << consumerGroup << "\","
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void QueryConsumerOffsetResponseHeader::encode(std::string& outData)
+{
+	std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":\"" << offset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* QueryConsumerOffsetResponseHeader::decode(Json::Value& data)
+{
+	long long offset = -1;
+
+	if (data.isMember("offset"))
+	{
+    	offset = KPRUtil::str2ll(data["offset"].asCString());
+    }
+
+    QueryConsumerOffsetResponseHeader* h = new QueryConsumerOffsetResponseHeader();
+    h->offset = offset;
+
+    return h;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// UPDATE_CONSUMER_OFFSET_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void UpdateConsumerOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"consumerGroup\":\"" << consumerGroup << "\","
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\","
+        << "\"commitOffset\":\"" << commitOffset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// UNREGISTER_CLIENT_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void UnregisterClientRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"producerGroup\":\"" << producerGroup << "\","
+        << "\"consumerGroup\":\"" << consumerGroup << "\","
+        << "\"clientID\":\"" << clientID << "\""
+        << "}";
+    outData = ss.str();
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// VIEW_MESSAGE_BY_ID_VALUE
+///////////////////////////////////////////////////////////////////////
+void ViewMessageRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":" << offset
+        << "}";
+    outData = ss.str();
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// SEARCH_OFFSET_BY_TIMESTAMP_VALUE
+///////////////////////////////////////////////////////////////////////
+void SearchOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\","
+        << "\"timestamp\":\"" << timestamp << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void SearchOffsetResponseHeader::encode(std::string& outData)
+{
+	std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":\"" << offset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* SearchOffsetResponseHeader::decode(Json::Value& data)
+{
+    long long offset = KPRUtil::str2ll(data["offset"].asCString());
+
+    SearchOffsetResponseHeader* h = new SearchOffsetResponseHeader();
+    h->offset = offset;
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_MAX_OFFSET_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetMaxOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void GetMaxOffsetResponseHeader::encode(std::string& outData)
+{
+	std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":\"" << offset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* GetMaxOffsetResponseHeader::decode(Json::Value& data)
+{
+    long long offset = KPRUtil::str2ll(data["offset"].asCString());
+
+    GetMaxOffsetResponseHeader* h = new GetMaxOffsetResponseHeader();
+    h->offset = offset;
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_MIN_OFFSET_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetMinOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void GetMinOffsetResponseHeader::encode(std::string& outData)
+{
+	std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":\"" << offset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* GetMinOffsetResponseHeader::decode(Json::Value& data)
+{
+    long long offset = KPRUtil::str2ll(data["offset"].asCString());
+
+    GetMinOffsetResponseHeader* h = new GetMinOffsetResponseHeader();
+    h->offset = offset;
+
+    return h;
+}
+
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_EARLIEST_MSG_STORETIME_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetEarliestMsgStoretimeRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void GetEarliestMsgStoretimeResponseHeader::encode(std::string& outData)
+{
+	std::stringstream ss;
+    ss  << "{"
+        << "\"timestamp\":\"" << timestamp << "\""
+        << "}";
+    outData = ss.str();
+}
+
+
+CommandCustomHeader* GetEarliestMsgStoretimeResponseHeader::decode(Json::Value& data)
+{
+    long long timestamp = KPRUtil::str2ll(data["timestamp"].asCString());
+
+    GetEarliestMsgStoretimeResponseHeader* h = new GetEarliestMsgStoretimeResponseHeader();
+    h->timestamp = timestamp;
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// QUERY_MESSAGE_VALUE
+///////////////////////////////////////////////////////////////////////
+void QueryMessageRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"key\":\"" << key << "\","
+        << "\"maxNum\":\"" << maxNum << "\","
+        << "\"beginTimestamp\":\"" << beginTimestamp << "\","
+        << "\"endTimestamp\":\"" << endTimestamp << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void QueryMessageResponseHeader::encode(std::string& outData)
+{
+	std::stringstream ss;
+    ss  << "{"
+        << "\"indexLastUpdateTimestamp\":\"" << indexLastUpdateTimestamp << "\","
+        << "\"indexLastUpdatePhyoffset\":\"" << indexLastUpdatePhyoffset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* QueryMessageResponseHeader::decode(Json::Value& data)
+{
+    long long indexLastUpdateTimestamp = KPRUtil::str2ll(data["indexLastUpdateTimestamp"].asCString());
+    long long indexLastUpdatePhyoffset = KPRUtil::str2ll(data["indexLastUpdatePhyoffset"].asCString());
+
+    QueryMessageResponseHeader* h = new QueryMessageResponseHeader();
+    h->indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+    h->indexLastUpdatePhyoffset = indexLastUpdatePhyoffset;
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_KV_CONFIG_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetKVConfigRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"namespace\":\"" << namespace_ << "\","
+        << "\"key\":\"" << key << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void GetKVConfigResponseHeader::encode(std::string& outData)
+{
+	std::stringstream ss;
+    ss  << "{"
+        << "\"value\":\"" << value << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* GetKVConfigResponseHeader::decode(Json::Value& data)
+{
+    GetKVConfigResponseHeader* h = new GetKVConfigResponseHeader();
+    h->value = data["value"].asString();
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// NOTIFY_CONSUMER_IDS_CHANGED_VALUE
+///////////////////////////////////////////////////////////////////////
+void NotifyConsumerIdsChangedRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"consumerGroup\":\"" << consumerGroup << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* NotifyConsumerIdsChangedRequestHeader::decode(Json::Value& data)
+{
+    NotifyConsumerIdsChangedRequestHeader* h = new NotifyConsumerIdsChangedRequestHeader();
+    h->consumerGroup = data["consumerGroup"].asString();
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_CONSUMER_RUNNING_INFO_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetConsumerRunningInfoRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"consumerGroup\":\"" << consumerGroup << "\","
+        << "\"clientId\":\"" << clientId << "\","
+        << "\"jstackEnable\":\"" << jstackEnable << "\","
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* GetConsumerRunningInfoRequestHeader::decode(Json::Value& data)
+{
+    GetConsumerRunningInfoRequestHeader* h = new GetConsumerRunningInfoRequestHeader();
+    h->consumerGroup = data["consumerGroup"].asString();
+    h->clientId = data["clientId"].asString();
+    h->jstackEnable = false;//not support
+
+    return h;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h
new file mode 100755
index 0000000..93f811a
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h
@@ -0,0 +1,604 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __COMMANDCUSTOMHEADER_H__
+#define __COMMANDCUSTOMHEADER_H__
+
+#include <string>
+#include <json/json.h>
+
+namespace rmq
+{
+    /**
+    * RemotingCommand custom header
+    *
+    */
+    class CommandCustomHeader
+    {
+    public :
+        virtual ~CommandCustomHeader() {}
+        virtual void encode(std::string& outData) = 0;
+        static CommandCustomHeader* decode(int code, Json::Value& data, bool isResponseType);
+    };
+
+	///////////////////////////////////////////////////////////////////////
+	// GET_ROUTEINTO_BY_TOPIC_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class GetRouteInfoRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetRouteInfoRequestHeader()
+		{
+		};
+        ~GetRouteInfoRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+    };
+
+	///////////////////////////////////////////////////////////////////////
+	// UPDATE_AND_CREATE_TOPIC_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class CreateTopicRequestHeader : public CommandCustomHeader
+    {
+    public:
+        CreateTopicRequestHeader()
+		{
+			readQueueNums = 0;
+			writeQueueNums = 0;
+			perm = 0;
+			topicSysFlag = 0;
+			order = false;
+		};
+        ~CreateTopicRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        std::string defaultTopic;
+        int readQueueNums;
+        int writeQueueNums;
+        int perm;
+        std::string topicFilterType;
+		int topicSysFlag;
+		bool order;
+    };
+
+	///////////////////////////////////////////////////////////////////////
+	// SEND_MESSAGE_VALUE/SEND_MESSAGE_V2_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class SendMessageRequestHeader: public CommandCustomHeader
+    {
+    public:
+        SendMessageRequestHeader()
+			: defaultTopicQueueNums(0),queueId(0),sysFlag(0),
+			bornTimestamp(0),flag(0),reconsumeTimes(0)
+		{
+		};
+        ~SendMessageRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string producerGroup;
+        std::string topic;
+        std::string defaultTopic;
+        int defaultTopicQueueNums;
+        int queueId;
+        int sysFlag;
+        long long bornTimestamp;
+        int flag;
+        std::string properties;
+        int reconsumeTimes;
+    };
+
+	class SendMessageRequestHeaderV2: public CommandCustomHeader
+	{
+	public:
+		SendMessageRequestHeaderV2()
+			: d(0),e(0),f(0),
+			g(0),h(0),j(0)
+		{
+		};
+		~SendMessageRequestHeaderV2() {};
+
+		virtual void encode(std::string& outData);
+		static CommandCustomHeader* decode(Json::Value& data);
+		static SendMessageRequestHeader* createSendMessageRequestHeaderV1(const SendMessageRequestHeaderV2* v2);
+		static SendMessageRequestHeaderV2* createSendMessageRequestHeaderV2(const SendMessageRequestHeader* v1);
+	public:
+		std::string a;	//producerGroup
+		std::string b;	//topic
+		std::string c;	//defaultTopic
+		int d;			//defaultTopicQueueNums
+		int e;			//queueId
+		int f;			//sysFlag
+		long long g;	//bornTimestamp
+		int h;			//flag
+		std::string i;	//properties
+		int j;			//reconsumeTimes
+	};
+
+    class SendMessageResponseHeader: public CommandCustomHeader
+    {
+    public:
+        SendMessageResponseHeader()
+		{
+			queueId = 0;
+			queueOffset = 0;
+		};
+        ~SendMessageResponseHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string msgId;
+        int queueId;
+        long long queueOffset;
+    };
+
+
+	///////////////////////////////////////////////////////////////////////
+	// PULL_MESSAGE_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class PullMessageRequestHeader: public CommandCustomHeader
+    {
+    public:
+        PullMessageRequestHeader()
+		{
+			queueId = 0;
+			queueOffset = 0;
+			maxMsgNums = 0;
+			sysFlag = 0;
+			commitOffset = 0;
+			suspendTimeoutMillis = 0;
+			subVersion = 0;
+		};
+        ~PullMessageRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+        std::string topic;
+        int queueId;
+        long long queueOffset;
+        int maxMsgNums;
+        int sysFlag;
+        long long commitOffset;
+        long long suspendTimeoutMillis;
+        std::string subscription;
+        long long subVersion;
+    };
+
+    class PullMessageResponseHeader: public CommandCustomHeader
+    {
+    public:
+        PullMessageResponseHeader()
+		{
+			suggestWhichBrokerId = 0;
+			nextBeginOffset = 0;
+			minOffset = 0;
+			maxOffset = 0;
+		};
+        ~PullMessageResponseHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        long long suggestWhichBrokerId;
+        long long nextBeginOffset;
+        long long minOffset;
+        long long maxOffset;
+    };
+
+	///////////////////////////////////////////////////////////////////////
+	// GET_CONSUMER_LIST_BY_GROUP_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class GetConsumerListByGroupRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetConsumerListByGroupRequestHeader() {};
+        ~GetConsumerListByGroupRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+    };
+
+
+	///////////////////////////////////////////////////////////////////////
+	// CONSUMER_SEND_MSG_BACK_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class ConsumerSendMsgBackRequestHeader : public CommandCustomHeader
+    {
+    public:
+        ConsumerSendMsgBackRequestHeader()
+		{
+			offset = 0;
+			delayLevel = 0;
+		};
+        ~ConsumerSendMsgBackRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        long long offset;
+        std::string group;
+        int delayLevel;
+    };
+
+
+	///////////////////////////////////////////////////////////////////////
+	// QUERY_CONSUMER_OFFSET_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class QueryConsumerOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        QueryConsumerOffsetRequestHeader()
+		{
+			queueId = 0;
+		};
+        ~QueryConsumerOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+        std::string topic;
+        int queueId;
+    };
+
+    class QueryConsumerOffsetResponseHeader : public CommandCustomHeader
+    {
+    public:
+        QueryConsumerOffsetResponseHeader()
+		{
+			offset = 0;
+		};
+        ~QueryConsumerOffsetResponseHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        long long offset;
+    };
+
+	///////////////////////////////////////////////////////////////////////
+	// UPDATE_CONSUMER_OFFSET_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class UpdateConsumerOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        UpdateConsumerOffsetRequestHeader()
+		{
+			queueId = 0;
+			commitOffset = 0;
+		};
+        ~UpdateConsumerOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+        std::string topic;
+        int queueId;
+        long long commitOffset;
+    };
+
+	///////////////////////////////////////////////////////////////////////
+	// UNREGISTER_CLIENT_VALUE
+	///////////////////////////////////////////////////////////////////////
+    class UnregisterClientRequestHeader : public CommandCustomHeader
+    {
+    public:
+        UnregisterClientRequestHeader() {};
+        ~UnregisterClientRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string clientID;
+        std::string producerGroup;
+        std::string consumerGroup;
+    };
+
+
+	///////////////////////////////////////////////////////////////////////
+	// VIEW_MESSAGE_BY_ID_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class ViewMessageRequestHeader : public CommandCustomHeader
+    {
+    public:
+        ViewMessageRequestHeader()
+		{
+			offset = 0;
+		};
+        ~ViewMessageRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        long long offset;
+    };
+
+	///////////////////////////////////////////////////////////////////////
+	// SEARCH_OFFSET_BY_TIMESTAMP_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class SearchOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        SearchOffsetRequestHeader()
+		{
+			queueId = 0;
+			timestamp = 0;
+		};
+        ~SearchOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        int queueId;
+        long long timestamp;
+    };
+
+	class SearchOffsetResponseHeader : public CommandCustomHeader
+	{
+	public:
+		SearchOffsetResponseHeader()
+		{
+			offset = 0;
+		};
+		~SearchOffsetResponseHeader() {};
+
+		virtual void encode(std::string& outData);
+		static CommandCustomHeader* decode(Json::Value& data);
+
+	public:
+		long long offset;
+	};
+
+	///////////////////////////////////////////////////////////////////////
+	// GET_MAX_OFFSET_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class GetMaxOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetMaxOffsetRequestHeader()
+		{
+			queueId = 0;
+		};
+        ~GetMaxOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        int queueId;
+    };
+
+	class GetMaxOffsetResponseHeader : public CommandCustomHeader
+	{
+	public:
+		GetMaxOffsetResponseHeader()
+		{
+			offset = 0;
+		};
+		~GetMaxOffsetResponseHeader() {};
+
+		virtual void encode(std::string& outData);
+		static CommandCustomHeader* decode(Json::Value& data);
+
+	public:
+		long long offset;
+	};
+
+	///////////////////////////////////////////////////////////////////////
+	// GET_MIN_OFFSET_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class GetMinOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetMinOffsetRequestHeader()
+		{
+			queueId = 0;
+		};
+        ~GetMinOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        int queueId;
+    };
+
+	class GetMinOffsetResponseHeader : public CommandCustomHeader
+	{
+	public:
+		GetMinOffsetResponseHeader()
+		{
+			offset = 0;
+		};
+		~GetMinOffsetResponseHeader() {};
+
+		virtual void encode(std::string& outData);
+		static CommandCustomHeader* decode(Json::Value& data);
+
+	public:
+		long long offset;
+	};
+
+
+	///////////////////////////////////////////////////////////////////////
+	// GET_EARLIEST_MSG_STORETIME_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class GetEarliestMsgStoretimeRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetEarliestMsgStoretimeRequestHeader()
+		{
+			queueId = 0;
+		};
+        ~GetEarliestMsgStoretimeRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        int queueId;
+    };
+
+	class GetEarliestMsgStoretimeResponseHeader : public CommandCustomHeader
+	{
+	public:
+		GetEarliestMsgStoretimeResponseHeader()
+		{
+			timestamp = 0;
+		};
+		~GetEarliestMsgStoretimeResponseHeader() {};
+
+		virtual void encode(std::string& outData);
+		static CommandCustomHeader* decode(Json::Value& data);
+
+	public:
+		long long timestamp;
+	};
+
+	///////////////////////////////////////////////////////////////////////
+	// QUERY_MESSAGE_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class QueryMessageRequestHeader : public CommandCustomHeader
+    {
+    public:
+        QueryMessageRequestHeader()
+		{
+			maxNum = 0;
+			beginTimestamp = 0;
+			endTimestamp = 0;
+		};
+        ~QueryMessageRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        std::string key;
+		int maxNum;
+		long long beginTimestamp;
+		long long endTimestamp;
+    };
+
+	class QueryMessageResponseHeader : public CommandCustomHeader
+	{
+	public:
+		QueryMessageResponseHeader()
+		{
+			indexLastUpdateTimestamp = 0;
+			indexLastUpdatePhyoffset = 0;
+		};
+		~QueryMessageResponseHeader() {};
+
+		virtual void encode(std::string& outData);
+		static CommandCustomHeader* decode(Json::Value& data);
+
+	public:
+		long long indexLastUpdateTimestamp;
+		long long indexLastUpdatePhyoffset;
+	};
+
+	///////////////////////////////////////////////////////////////////////
+	// GET_KV_CONFIG_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class GetKVConfigRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetKVConfigRequestHeader() {};
+        ~GetKVConfigRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string namespace_;
+        std::string key;
+    };
+
+	class GetKVConfigResponseHeader : public CommandCustomHeader
+	{
+	public:
+		GetKVConfigResponseHeader() {};
+		~GetKVConfigResponseHeader() {};
+
+		virtual void encode(std::string& outData);
+		static CommandCustomHeader* decode(Json::Value& data);
+
+	public:
+		std::string value;
+	};
+
+	///////////////////////////////////////////////////////////////////////
+	// NOTIFY_CONSUMER_IDS_CHANGED_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class NotifyConsumerIdsChangedRequestHeader : public CommandCustomHeader
+    {
+    public:
+        NotifyConsumerIdsChangedRequestHeader() {};
+        ~NotifyConsumerIdsChangedRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+    };
+
+
+	///////////////////////////////////////////////////////////////////////
+	// GET_CONSUMER_RUNNING_INFO_VALUE
+	///////////////////////////////////////////////////////////////////////
+	class GetConsumerRunningInfoRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetConsumerRunningInfoRequestHeader() {};
+        ~GetConsumerRunningInfoRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+		std::string clientId;
+		bool jstackEnable;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp
new file mode 100755
index 0000000..58cecde
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp
@@ -0,0 +1,168 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ConsumerRunningInfo.h"
+
+namespace rmq
+{
+
+const std::string ConsumerRunningInfo::PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
+const std::string ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE";
+const std::string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
+const std::string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
+const std::string ConsumerRunningInfo::PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
+const std::string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP";
+
+
+ConsumerRunningInfo::ConsumerRunningInfo()
+{
+}
+
+ConsumerRunningInfo::~ConsumerRunningInfo()
+{
+}
+
+void ConsumerRunningInfo::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "}";
+    outData = ss.str();
+}
+
+
+std::string ConsumerRunningInfo::formatString()
+{
+	std::string sb = "rocketmq-client4cpp not suppport this feature";
+
+	/*
+	// 1
+	{
+		sb.append("#Consumer Properties#\n");
+		Iterator<Entry<Object, Object>> it = m_properties.entrySet().iterator();
+		while (it.hasNext()) {
+			Entry<Object, Object> next = it.next();
+			String item =
+					String.format("%-40s: %s\n", next.getKey().toString(), next.getValue().toString());
+			sb.append(item);
+		}
+	}
+
+	// 2
+	{
+		sb.append("\n\n#Consumer Subscription#\n");
+
+		Iterator<SubscriptionData> it = m_subscriptionSet.iterator();
+		int i = 0;
+		while (it.hasNext()) {
+			SubscriptionData next = it.next();
+			String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s\n", //
+				++i,//
+				next.getTopic(),//
+				next.isClassFilterMode(),//
+				next.getSubString());
+
+			sb.append(item);
+		}
+	}
+
+	// 3
+	{
+		sb.append("\n\n#Consumer Offset#\n");
+		sb.append(String.format("%-32s  %-32s  %-4s  %-20s\n",//
+			"#Topic",//
+			"#Broker Name",//
+			"#QID",//
+			"#Consumer Offset"//
+		));
+
+		Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = m_mqTable.entrySet().iterator();
+		while (it.hasNext()) {
+			Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+			String item = String.format("%-32s  %-32s  %-4d  %-20d\n",//
+				next.getKey().getTopic(),//
+				next.getKey().getBrokerName(),//
+				next.getKey().getQueueId(),//
+				next.getValue().getCommitOffset());
+
+			sb.append(item);
+		}
+	}
+
+	// 4
+	{
+		sb.append("\n\n#Consumer MQ Detail#\n");
+		sb.append(String.format("%-32s  %-32s  %-4s  %-20s\n",//
+			"#Topic",//
+			"#Broker Name",//
+			"#QID",//
+			"#ProcessQueueInfo"//
+		));
+
+		Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = m_mqTable.entrySet().iterator();
+		while (it.hasNext()) {
+			Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+			String item = String.format("%-32s  %-32s  %-4d  %s\n",//
+				next.getKey().getTopic(),//
+				next.getKey().getBrokerName(),//
+				next.getKey().getQueueId(),//
+				next.getValue().toString());
+
+			sb.append(item);
+		}
+	}
+
+	// 5
+	{
+		sb.append("\n\n#Consumer RT&TPS#\n");
+		sb.append(String.format("%-32s  %14s %14s %14s %14s %18s %25s\n",//
+			"#Topic",//
+			"#Pull RT",//
+			"#Pull TPS",//
+			"#Consume RT",//
+			"#ConsumeOK TPS",//
+			"#ConsumeFailed TPS",//
+			"#ConsumeFailedMsgsInHour"//
+		));
+
+		Iterator<Entry<String, ConsumeStatus>> it = m_statusTable.entrySet().iterator();
+		while (it.hasNext()) {
+			Entry<String, ConsumeStatus> next = it.next();
+			String item = String.format("%-32s  %14.2f %14.2f %14.2f %14.2f %18.2f %25d\n",//
+				next.getKey(),//
+				next.getValue().getPullRT(),//
+				next.getValue().getPullTPS(),//
+				next.getValue().getConsumeRT(),//
+				next.getValue().getConsumeOKTPS(),//
+				next.getValue().getConsumeFailedTPS(),//
+				next.getValue().getConsumeFailedMsgs()//
+				);
+
+			sb.append(item);
+		}
+	}
+
+	// 6
+	if (m_jstack != null) {
+		sb.append("\n\n#Consumer jstack#\n");
+		sb.append(m_jstack);
+	}
+	*/
+
+	return sb;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h
new file mode 100755
index 0000000..588bf07
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h
@@ -0,0 +1,97 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __ConsumerRunningInfo_H__
+#define __ConsumerRunningInfo_H__
+
+#include <string>
+#include <set>
+#include <map>
+
+#include "RemotingSerializable.h"
+#include "MessageQueue.h"
+#include "SubscriptionData.h"
+#include "ConsumerStatManage.h"
+
+namespace rmq
+{
+    class ConsumerRunningInfo : public RemotingSerializable
+    {
+    public:
+        ConsumerRunningInfo();
+        ~ConsumerRunningInfo();
+
+		/*
+		std::map<std::string, std::string>& getProperties()
+		{
+			return m_properties;
+		}
+        void setProperties(const std::map<std::string, std::string>& properties)
+		{
+			m_properties = properties;
+		}
+
+        std::map<MessageQueue, ProcessQueueInfo>& getMqTable()
+		{
+			return m_mqTable;
+		}
+        void setMqTable(const std::map<MessageQueue, ProcessQueueInfo>& mqTable)
+		{
+			m_mqTable = mqTable;
+		}
+
+        std::map<std::string, ConsumeStatus>& getStatusTable()
+		{
+			return m_statusTable;
+		}
+        void setStatusTable(const std::map<std::string, ConsumeStatus>& statusTable)
+		{
+		    m_statusTable = statusTable;
+		}
+
+        std::set<SubscriptionData>& getSubscriptionSet()
+		{
+			return m_subscriptionSet;
+		}
+        void setSubscriptionSet(const std::set<SubscriptionData>& subscriptionSet)
+		{
+			m_subscriptionSet = subscriptionSet;
+		}
+		*/
+
+		void encode(std::string& outData);
+		std::string formatString();
+
+	public:
+		static const std::string PROP_NAMESERVER_ADDR;
+		static const std::string PROP_THREADPOOL_CORE_SIZE;
+		static const std::string PROP_CONSUME_ORDERLY;
+		static const std::string PROP_CONSUME_TYPE;
+		static const std::string PROP_CLIENT_VERSION;
+		static const std::string PROP_CONSUMER_START_TIMESTAMP;
+
+    private:
+		/*
+		std::map<std::string, std::string> m_properties;
+		std::set<SubscriptionData> m_subscriptionSet;
+		std::map<MessageQueue, ProcessQueueInfo> m_mqTable;
+		std::map<string, ConsumerStat> m_statusTable;
+		std::string m_jstack;
+		*/
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h b/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h
new file mode 100755
index 0000000..0ea19da
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h
@@ -0,0 +1,97 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __GETCONSUMERLISTBYGROUPRESPONSEBODY_H__
+#define __GETCONSUMERLISTBYGROUPRESPONSEBODY_H__
+
+#include <string>
+#include <sstream>
+#include <list>
+#include "UtilAll.h"
+#include "RemotingSerializable.h"
+
+namespace rmq
+{
+    class GetConsumerListByGroupResponseBody : public RemotingSerializable
+    {
+    public:
+        GetConsumerListByGroupResponseBody()
+        {
+
+        }
+
+        ~GetConsumerListByGroupResponseBody()
+        {
+
+        }
+
+        void encode(std::string& outData)
+        {
+
+        }
+
+        static GetConsumerListByGroupResponseBody* decode(const char* pData, int len)
+        {
+            /*
+            {"consumerIdList":["10.12.22.213@DEFAULT", "10.12.22.213@xxx"]}
+            */
+            //RMQ_DEBUG("GET_CONSUMER_LIST_BY_GROUP_VALUE:%s", pData);
+
+            Json::Reader reader;
+            Json::Value object;
+            if (!reader.parse(pData, pData + len, object))
+            {
+                RMQ_ERROR("parse fail: %s", reader.getFormattedErrorMessages().c_str());
+                return NULL;
+            }
+
+            GetConsumerListByGroupResponseBody* rsp =  new GetConsumerListByGroupResponseBody();
+            Json::Value cidList = object["consumerIdList"];
+            for (size_t i = 0; i < cidList.size(); i++)
+            {
+                Json::Value cid = cidList[i];
+                if (cid != Json::Value::null)
+                {
+                    rsp->m_consumerIdList.push_back(cid.asString());
+                }
+            }
+
+            return rsp;
+        }
+
+        std::list<std::string>& getConsumerIdList()
+        {
+            return m_consumerIdList;
+        }
+
+        void setConsumerIdList(const std::list<std::string>& consumerIdList)
+        {
+            m_consumerIdList = consumerIdList;
+        }
+
+        std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{consumerIdList=" << UtilAll::toString(m_consumerIdList) << "}";
+            return ss.str();
+        }
+
+    private:
+        std::list<std::string> m_consumerIdList;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp b/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp
new file mode 100755
index 0000000..73f197a
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp
@@ -0,0 +1,52 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "HeartbeatData.h"
+
+namespace rmq
+{
+
+void HeartbeatData::encode(std::string& outData)
+{
+    //{"clientID":"10.6.223.90@16164","consumerDataSet":[{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_ACTIVELY","groupName":"please_rename_unique_group_name_5","messageModel":"CLUSTERING","subscriptionDataSet":[],"unitMode":false}],"producerDataSet":[{"groupName":"CLIENT_INNER_PRODUCER"}]}
+    Json::Value obj;
+    obj["clientID"] = m_clientID;
+
+    Json::Value consumerDataSet(Json::arrayValue);
+    for (typeof(m_consumerDataSet.begin()) it = m_consumerDataSet.begin(); it != m_consumerDataSet.end(); it++)
+    {
+        Json::Value o;
+        (*it).toJson(o);
+        consumerDataSet.append(o);
+    }
+    obj["consumerDataSet"] = consumerDataSet;
+
+    Json::Value producerDataSet(Json::arrayValue);
+    for (typeof(m_producerDataSet.begin()) it = m_producerDataSet.begin(); it != m_producerDataSet.end(); it++)
+    {
+        Json::Value o;
+        it->toJson(o);
+        producerDataSet.append(o);
+    }
+    obj["producerDataSet"] = producerDataSet;
+
+    Json::FastWriter outer;
+    outData = outer.write(obj);
+}
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/HeartbeatData.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/HeartbeatData.h b/rocketmq-client4cpp/src/protocol/HeartbeatData.h
new file mode 100755
index 0000000..cb0f720
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/HeartbeatData.h
@@ -0,0 +1,157 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __HEARTBEATDATA_H__
+#define __HEARTBEATDATA_H__
+
+#include <string>
+#include <set>
+#include <sstream>
+
+#include "RocketMQClient.h"
+#include "ConsumeType.h"
+#include "SubscriptionData.h"
+#include "RemotingSerializable.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+    struct ConsumerData
+    {
+        std::string groupName;
+        ConsumeType consumeType;
+        MessageModel messageModel;
+        ConsumeFromWhere consumeFromWhere;
+        std::set<SubscriptionData> subscriptionDataSet;
+        bool operator < (const ConsumerData& cd)const
+        {
+            return groupName < cd.groupName;
+        }
+
+        void toJson(Json::Value& obj) const
+        {
+            //{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_ACTIVELY","groupName":"please_rename_unique_group_name_5","messageModel":"CLUSTERING","subscriptionDataSet":[],"unitMode":false}
+            obj["groupName"] = groupName;
+            obj["messageModel"] = getMessageModelString(messageModel);
+            obj["consumeFromWhere"] = getConsumeFromWhereString(consumeFromWhere);
+            obj["consumeType"] = getConsumeTypeString(consumeType);
+            obj["unitMode"] = false;
+
+            Json::Value objSub(Json::arrayValue);
+            RMQ_FOR_EACH(subscriptionDataSet, it)
+            {
+                Json::Value o;
+                (*it).toJson(o);
+                objSub.append(o);
+            }
+            obj["subscriptionDataSet"] = objSub;
+        }
+
+		std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{groupName=" << groupName
+               << ",messageModel=" << getMessageModelString(messageModel)
+               << ",consumeFromWhere=" << getConsumeFromWhereString(consumeFromWhere)
+               << ",consumeType=" << getConsumeTypeString(consumeType)
+               << ",subscriptionDataSet=" << UtilAll::toString(subscriptionDataSet)
+               << "}";
+            return ss.str();
+        }
+    };
+	inline std::ostream& operator<<(std::ostream& os, const ConsumerData& obj)
+	{
+	    os << obj.toString();
+	    return os;
+	}
+
+    struct ProducerData
+    {
+        std::string groupName;
+        bool operator < (const ProducerData& pd)const
+        {
+            return groupName < pd.groupName;
+        }
+        void toJson(Json::Value& obj) const
+        {
+            obj["groupName"] = groupName;
+        }
+
+		std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{groupName=" << groupName << "}";
+            return ss.str();
+        }
+    };
+	inline std::ostream& operator<<(std::ostream& os, const ProducerData& obj)
+	{
+	    os << obj.toString();
+	    return os;
+	}
+
+
+    class HeartbeatData : public RemotingSerializable
+    {
+    public:
+        void encode(std::string& outData);
+
+        std::string getClientID()
+        {
+            return m_clientID;
+        }
+
+        void setClientID(const std::string& clientID)
+        {
+            m_clientID = clientID;
+        }
+
+        std::set<ProducerData>& getProducerDataSet()
+        {
+            return m_producerDataSet;
+        }
+
+        void setProducerDataSet(const std::set<ProducerData>& producerDataSet)
+        {
+            m_producerDataSet = producerDataSet;
+        }
+
+        std::set<ConsumerData>& getConsumerDataSet()
+        {
+            return m_consumerDataSet;
+        }
+
+        void setConsumerDataSet(const std::set<ConsumerData>& consumerDataSet)
+        {
+            m_consumerDataSet = consumerDataSet;
+        }
+
+		std::string toString() const
+		{
+		    std::stringstream ss;
+		    ss << "{clientID=" << m_clientID
+		       << ",producerDataSet=" << UtilAll::toString(m_producerDataSet)
+		       << ",consumerDataSet=" << UtilAll::toString(m_consumerDataSet) << "}";
+		    return ss.str();
+		}
+
+    private:
+        std::string m_clientID;
+        std::set<ProducerData> m_producerDataSet;
+        std::set<ConsumerData> m_consumerDataSet;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/KVTable.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/KVTable.h b/rocketmq-client4cpp/src/protocol/KVTable.h
new file mode 100755
index 0000000..726b872
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/KVTable.h
@@ -0,0 +1,58 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __KVTABLE_H__
+#define __KVTABLE_H__
+
+#include <map>
+#include <string>
+#include "RemotingSerializable.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+    class KVTable : public RemotingSerializable
+    {
+    public:
+        void encode(std::string& outData)
+        {
+
+        }
+
+		std::string toString() const
+		{
+			std::stringstream ss;
+			ss << "{table=" << UtilAll::toString(m_table)
+			   << "}";
+			return ss.str();
+		}
+
+        const std::map<std::string, std::string>& getTable()
+        {
+            return m_table;
+        }
+
+        void setTable(const std::map<std::string, std::string>& table)
+        {
+            m_table = table;
+        }
+
+    private:
+        std::map<std::string, std::string> m_table ;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp b/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp
new file mode 100755
index 0000000..947abe2
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp
@@ -0,0 +1,112 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "LockBatchBody.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+LockBatchRequestBody::LockBatchRequestBody()
+{
+}
+
+LockBatchRequestBody::~LockBatchRequestBody()
+{
+}
+
+void LockBatchRequestBody::encode(std::string& outData)
+{
+
+}
+
+std::string LockBatchRequestBody::toString() const
+{
+	std::stringstream ss;
+	ss << "{consumerGroup=" << m_consumerGroup
+	   << ",clientId=" << m_clientId
+	   << ",mqSet=" << UtilAll::toString(m_mqSet)
+	   << "}";
+	return ss.str();
+}
+
+
+std::string LockBatchRequestBody::getConsumerGroup()
+{
+    return m_consumerGroup;
+}
+
+void LockBatchRequestBody::setConsumerGroup(const std::string& consumerGroup)
+{
+    m_consumerGroup = consumerGroup;
+}
+
+std::string LockBatchRequestBody::getClientId()
+{
+    return m_clientId;
+}
+
+void LockBatchRequestBody::setClientId(const std::string& clientId)
+{
+    m_clientId = clientId;
+}
+
+std::set<MessageQueue>& LockBatchRequestBody::getMqSet()
+{
+    return m_mqSet;
+}
+
+void LockBatchRequestBody::setMqSet(const std::set<MessageQueue>& mqSet)
+{
+    m_mqSet = mqSet;
+}
+
+LockBatchResponseBody::LockBatchResponseBody()
+{
+}
+
+LockBatchResponseBody::~LockBatchResponseBody()
+{
+}
+
+void LockBatchResponseBody::encode(std::string& outData)
+{
+}
+
+std::string LockBatchResponseBody::toString() const
+{
+	std::stringstream ss;
+	ss << "{consumerGroup=" << UtilAll::toString(m_lockOKMQSet)
+	   << "}";
+	return ss.str();
+}
+
+
+LockBatchResponseBody* LockBatchResponseBody::decode(const char* pData, int len)
+{
+    return new LockBatchResponseBody();
+}
+
+std::set<MessageQueue> LockBatchResponseBody::getLockOKMQSet()
+{
+    return m_lockOKMQSet;
+}
+
+void LockBatchResponseBody::setLockOKMQSet(const std::set<MessageQueue>& lockOKMQSet)
+{
+    m_lockOKMQSet = lockOKMQSet;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/LockBatchBody.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/LockBatchBody.h b/rocketmq-client4cpp/src/protocol/LockBatchBody.h
new file mode 100755
index 0000000..ab9ee02
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/LockBatchBody.h
@@ -0,0 +1,73 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __LOCKBATCHBODY_H__
+#define __LOCKBATCHBODY_H__
+
+#include <string>
+#include <set>
+
+#include "RemotingSerializable.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+    class LockBatchRequestBody : public RemotingSerializable
+    {
+    public:
+        LockBatchRequestBody();
+        ~LockBatchRequestBody();
+
+        void encode(std::string& outData);
+		std::string toString() const;
+
+        std::string getConsumerGroup();
+        void setConsumerGroup(const std::string& consumerGroup);
+
+        std::string getClientId();
+        void setClientId(const std::string& clientId);
+
+        std::set<MessageQueue>& getMqSet();
+        void setMqSet(const std::set<MessageQueue>& mqSet);
+
+    private:
+        std::string m_consumerGroup;
+        std::string m_clientId;
+        std::set<MessageQueue> m_mqSet;
+    };
+
+    class LockBatchResponseBody : public RemotingSerializable
+    {
+    public:
+        LockBatchResponseBody();
+        ~LockBatchResponseBody();
+
+        void encode(std::string& outData);
+		std::string toString() const;
+
+        static LockBatchResponseBody* decode(const char* pData, int len);
+
+        std::set<MessageQueue> getLockOKMQSet();
+        void setLockOKMQSet(const std::set<MessageQueue>& lockOKMQSet);
+
+    private:
+        std::set<MessageQueue> m_lockOKMQSet;
+    };
+
+    typedef LockBatchRequestBody UnlockBatchRequestBody;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/MQProtos.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/MQProtos.cpp b/rocketmq-client4cpp/src/protocol/MQProtos.cpp
new file mode 100755
index 0000000..052c104
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/MQProtos.cpp
@@ -0,0 +1,248 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "MQProtos.h"
+
+namespace rmq
+{
+
+const char* getMQRequestCodeString(int code)
+{
+    switch (code)
+    {
+		case SEND_MESSAGE_VALUE:
+			return "SEND_MESSAGE_VALUE";
+		case PULL_MESSAGE_VALUE:
+			return "PULL_MESSAGE_VALUE";
+		case QUERY_MESSAGE_VALUE:
+			return "QUERY_MESSAGE_VALUE";
+		case QUERY_BROKER_OFFSET_VALUE:
+			return "QUERY_BROKER_OFFSET_VALUE";
+		case QUERY_CONSUMER_OFFSET_VALUE:
+			return "QUERY_CONSUMER_OFFSET_VALUE";
+		case UPDATE_CONSUMER_OFFSET_VALUE:
+			return "UPDATE_CONSUMER_OFFSET_VALUE";
+		case UPDATE_AND_CREATE_TOPIC_VALUE:
+			return "UPDATE_AND_CREATE_TOPIC_VALUE";
+		case GET_ALL_TOPIC_CONFIG_VALUE:
+			return "GET_ALL_TOPIC_CONFIG_VALUE";
+		case GET_TOPIC_CONFIG_LIST_VALUE:
+			return "GET_TOPIC_CONFIG_LIST_VALUE";
+		case GET_TOPIC_NAME_LIST_VALUE:
+			return "GET_TOPIC_NAME_LIST_VALUE";
+		case UPDATE_BROKER_CONFIG_VALUE:
+			return "UPDATE_BROKER_CONFIG_VALUE";
+		case GET_BROKER_CONFIG_VALUE:
+			return "GET_BROKER_CONFIG_VALUE";
+		case TRIGGER_DELETE_FILES_VALUE:
+			return "TRIGGER_DELETE_FILES_VALUE";
+		case GET_BROKER_RUNTIME_INFO_VALUE:
+			return "GET_BROKER_RUNTIME_INFO_VALUE";
+		case SEARCH_OFFSET_BY_TIMESTAMP_VALUE:
+			return "SEARCH_OFFSET_BY_TIMESTAMP_VALUE";
+		case GET_MAX_OFFSET_VALUE:
+			return "GET_MAX_OFFSET_VALUE";
+		case GET_MIN_OFFSET_VALUE:
+			return "GET_MIN_OFFSET_VALUE";
+		case GET_EARLIEST_MSG_STORETIME_VALUE:
+			return "GET_EARLIEST_MSG_STORETIME_VALUE";
+		case VIEW_MESSAGE_BY_ID_VALUE:
+			return "VIEW_MESSAGE_BY_ID_VALUE";
+		case HEART_BEAT_VALUE:
+			return "HEART_BEAT_VALUE";
+		case UNREGISTER_CLIENT_VALUE:
+			return "UNREGISTER_CLIENT_VALUE";
+		case CONSUMER_SEND_MSG_BACK_VALUE:
+			return "CONSUMER_SEND_MSG_BACK_VALUE";
+		case END_TRANSACTION_VALUE:
+			return "END_TRANSACTION_VALUE";
+		case GET_CONSUMER_LIST_BY_GROUP_VALUE:
+			return "GET_CONSUMER_LIST_BY_GROUP_VALUE";
+		case CHECK_TRANSACTION_STATE_VALUE:
+			return "CHECK_TRANSACTION_STATE_VALUE";
+		case NOTIFY_CONSUMER_IDS_CHANGED_VALUE:
+			return "NOTIFY_CONSUMER_IDS_CHANGED_VALUE";
+		case LOCK_BATCH_MQ_VALUE:
+			return "LOCK_BATCH_MQ_VALUE";
+		case UNLOCK_BATCH_MQ_VALUE:
+			return "UNLOCK_BATCH_MQ_VALUE";
+		case GET_ALL_CONSUMER_OFFSET_VALUE:
+			return "GET_ALL_CONSUMER_OFFSET_VALUE";
+		case GET_ALL_DELAY_OFFSET_VALUE:
+			return "GET_ALL_DELAY_OFFSET_VALUE";
+		case PUT_KV_CONFIG_VALUE:
+			return "PUT_KV_CONFIG_VALUE";
+		case GET_KV_CONFIG_VALUE:
+			return "GET_KV_CONFIG_VALUE";
+		case DELETE_KV_CONFIG_VALUE:
+			return "DELETE_KV_CONFIG_VALUE";
+		case REGISTER_BROKER_VALUE:
+			return "REGISTER_BROKER_VALUE";
+		case UNREGISTER_BROKER_VALUE:
+			return "UNREGISTER_BROKER_VALUE";
+		case GET_ROUTEINTO_BY_TOPIC_VALUE:
+			return "GET_ROUTEINTO_BY_TOPIC_VALUE";
+		case GET_BROKER_CLUSTER_INFO_VALUE:
+			return "GET_BROKER_CLUSTER_INFO_VALUE";
+		case UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE:
+			return "UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE";
+		case GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE:
+			return "GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE";
+		case GET_TOPIC_STATS_INFO_VALUE:
+			return "GET_TOPIC_STATS_INFO_VALUE";
+		case GET_CONSUMER_CONNECTION_LIST_VALUE:
+			return "GET_CONSUMER_CONNECTION_LIST_VALUE";
+		case GET_PRODUCER_CONNECTION_LIST_VALUE:
+			return "GET_PRODUCER_CONNECTION_LIST_VALUE";
+		case WIPE_WRITE_PERM_OF_BROKER_VALUE:
+			return "WIPE_WRITE_PERM_OF_BROKER_VALUE";
+		case GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE:
+			return "GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE";
+		case DELETE_SUBSCRIPTIONGROUP_VALUE:
+			return "DELETE_SUBSCRIPTIONGROUP_VALUE";
+		case GET_CONSUME_STATS_VALUE:
+			return "GET_CONSUME_STATS_VALUE";
+		case SUSPEND_CONSUMER_VALUE:
+			return "SUSPEND_CONSUMER_VALUE";
+		case RESUME_CONSUMER_VALUE:
+			return "RESUME_CONSUMER_VALUE";
+		case RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE:
+			return "RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE";
+		case RESET_CONSUMER_OFFSET_IN_BROKER_VALUE:
+			return "RESET_CONSUMER_OFFSET_IN_BROKER_VALUE";
+		case ADJUST_CONSUMER_THREAD_POOL_VALUE:
+			return "ADJUST_CONSUMER_THREAD_POOL_VALUE";
+		case WHO_CONSUME_THE_MESSAGE_VALUE:
+			return "WHO_CONSUME_THE_MESSAGE_VALUE";
+		case DELETE_TOPIC_IN_BROKER_VALUE:
+			return "DELETE_TOPIC_IN_BROKER_VALUE";
+		case DELETE_TOPIC_IN_NAMESRV_VALUE:
+			return "DELETE_TOPIC_IN_NAMESRV_VALUE";
+		case GET_KV_CONFIG_BY_VALUE_VALUE:
+			return "GET_KV_CONFIG_BY_VALUE_VALUE";
+		case DELETE_KV_CONFIG_BY_VALUE_VALUE:
+			return "DELETE_KV_CONFIG_BY_VALUE_VALUE";
+		case GET_KVLIST_BY_NAMESPACE_VALUE:
+			return "GET_KVLIST_BY_NAMESPACE_VALUE";
+		case RESET_CONSUMER_CLIENT_OFFSET_VALUE:
+			return "RESET_CONSUMER_CLIENT_OFFSET_VALUE";
+		case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE:
+			return "GET_CONSUMER_STATUS_FROM_CLIENT_VALUE";
+		case INVOKE_BROKER_TO_RESET_OFFSET_VALUE:
+			return "INVOKE_BROKER_TO_RESET_OFFSET_VALUE";
+		case INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE:
+			return "INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE";
+		case QUERY_TOPIC_CONSUME_BY_WHO_VALUE:
+			return "QUERY_TOPIC_CONSUME_BY_WHO_VALUE";
+		case GET_TOPICS_BY_CLUSTER_VALUE:
+			return "GET_TOPICS_BY_CLUSTER_VALUE";
+		case REGISTER_FILTER_SERVER_VALUE:
+			return "REGISTER_FILTER_SERVER_VALUE";
+		case REGISTER_MESSAGE_FILTER_CLASS_VALUE:
+			return "REGISTER_MESSAGE_FILTER_CLASS_VALUE";
+		case QUERY_CONSUME_TIME_SPAN_VALUE:
+			return "QUERY_CONSUME_TIME_SPAN_VALUE";
+		case GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE:
+			return "GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE";
+		case GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE:
+			return "GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE";
+		case CLEAN_EXPIRED_CONSUMEQUEUE_VALUE:
+			return "CLEAN_EXPIRED_CONSUMEQUEUE_VALUE";
+		case GET_CONSUMER_RUNNING_INFO_VALUE:
+			return "GET_CONSUMER_RUNNING_INFO_VALUE";
+		case QUERY_CORRECTION_OFFSET_VALUE:
+			return "QUERY_CORRECTION_OFFSET_VALUE";
+		case CONSUME_MESSAGE_DIRECTLY_VALUE:
+			return "CONSUME_MESSAGE_DIRECTLY_VALUE";
+		case SEND_MESSAGE_V2_VALUE:
+			return "SEND_MESSAGE_V2_VALUE";
+		case GET_UNIT_TOPIC_LIST_VALUE:
+			return "GET_UNIT_TOPIC_LIST_VALUE";
+		case GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE:
+			return "GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE";
+		case GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE:
+			return "GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE";
+		case CLONE_GROUP_OFFSET_VALUE:
+			return "CLONE_GROUP_OFFSET_VALUE";
+		case VIEW_BROKER_STATS_DATA_VALUE:
+			return "VIEW_BROKER_STATS_DATA_VALUE";
+    }
+
+    return "UnknowMQRequestCode";
+}
+
+const char* getMQResponseCodeString(int code)
+{
+    switch (code)
+    {
+		case 0:
+			return "OK";
+		case FLUSH_DISK_TIMEOUT_VALUE:
+			return "FLUSH_DISK_TIMEOUT_VALUE";
+		case SLAVE_NOT_AVAILABLE_VALUE:
+			return "SLAVE_NOT_AVAILABLE_VALUE";
+		case FLUSH_SLAVE_TIMEOUT_VALUE:
+			return "FLUSH_SLAVE_TIMEOUT_VALUE";
+		case MESSAGE_ILLEGAL_VALUE:
+			return "MESSAGE_ILLEGAL_VALUE";
+		case SERVICE_NOT_AVAILABLE_VALUE:
+			return "SERVICE_NOT_AVAILABLE_VALUE";
+		case VERSION_NOT_SUPPORTED_VALUE:
+			return "VERSION_NOT_SUPPORTED_VALUE";
+		case NO_PERMISSION_VALUE:
+			return "NO_PERMISSION_VALUE";
+		case TOPIC_NOT_EXIST_VALUE:
+			return "TOPIC_NOT_EXIST_VALUE";
+		case TOPIC_EXIST_ALREADY_VALUE:
+			return "TOPIC_EXIST_ALREADY_VALUE";
+		case PULL_NOT_FOUND_VALUE:
+			return "PULL_NOT_FOUND_VALUE";
+		case PULL_RETRY_IMMEDIATELY_VALUE:
+			return "PULL_RETRY_IMMEDIATELY_VALUE";
+		case PULL_OFFSET_MOVED_VALUE:
+			return "PULL_OFFSET_MOVED_VALUE";
+		case QUERY_NOT_FOUND_VALUE:
+			return "QUERY_NOT_FOUND_VALUE";
+		case SUBSCRIPTION_PARSE_FAILED_VALUE:
+			return "SUBSCRIPTION_PARSE_FAILED_VALUE";
+		case SUBSCRIPTION_NOT_EXIST_VALUE:
+			return "SUBSCRIPTION_NOT_EXIST_VALUE";
+		case SUBSCRIPTION_NOT_LATEST_VALUE:
+			return "SUBSCRIPTION_NOT_LATEST_VALUE";
+		case SUBSCRIPTION_GROUP_NOT_EXIST_VALUE:
+			return "SUBSCRIPTION_GROUP_NOT_EXIST_VALUE";
+		case TRANSACTION_SHOULD_COMMIT_VALUE:
+			return "TRANSACTION_SHOULD_COMMIT_VALUE";
+		case TRANSACTION_SHOULD_ROLLBACK_VALUE:
+			return "TRANSACTION_SHOULD_ROLLBACK_VALUE";
+		case TRANSACTION_STATE_UNKNOW_VALUE:
+			return "TRANSACTION_STATE_UNKNOW_VALUE";
+		case TRANSACTION_STATE_GROUP_WRONG_VALUE:
+			return "TRANSACTION_STATE_GROUP_WRONG_VALUE";
+		case NO_BUYER_ID_VALUE:
+			return "NO_BUYER_ID_VALUE";
+		case NOT_IN_CURRENT_UNIT_VALUE:
+			return "NOT_IN_CURRENT_UNIT_VALUE";
+		case CONSUMER_NOT_ONLINE_VALUE:
+			return "CONSUMER_NOT_ONLINE_VALUE";
+		case CONSUME_MSG_TIMEOUT_VALUE:
+			return "CONSUME_MSG_TIMEOUT_VALUE";
+    }
+
+    return "UnknowMQResponseCode";
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/MQProtos.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/MQProtos.h b/rocketmq-client4cpp/src/protocol/MQProtos.h
new file mode 100755
index 0000000..94167ea
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/MQProtos.h
@@ -0,0 +1,150 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __MQPROTOS_H__
+#define __MQPROTOS_H__
+
+namespace rmq
+{
+    enum MQRequestCode
+    {
+        // broker
+        SEND_MESSAGE_VALUE = 10,
+        PULL_MESSAGE_VALUE = 11,
+        QUERY_MESSAGE_VALUE = 12,
+        QUERY_BROKER_OFFSET_VALUE = 13,
+        QUERY_CONSUMER_OFFSET_VALUE = 14,
+        UPDATE_CONSUMER_OFFSET_VALUE = 15,
+        UPDATE_AND_CREATE_TOPIC_VALUE = 17,
+
+        GET_ALL_TOPIC_CONFIG_VALUE = 21,
+        GET_TOPIC_CONFIG_LIST_VALUE = 22,
+        GET_TOPIC_NAME_LIST_VALUE = 23,
+        UPDATE_BROKER_CONFIG_VALUE = 25,
+        GET_BROKER_CONFIG_VALUE = 26,
+        TRIGGER_DELETE_FILES_VALUE = 27,
+        GET_BROKER_RUNTIME_INFO_VALUE = 28,
+        SEARCH_OFFSET_BY_TIMESTAMP_VALUE = 29,
+
+        GET_MAX_OFFSET_VALUE = 30,
+        GET_MIN_OFFSET_VALUE = 31,
+        GET_EARLIEST_MSG_STORETIME_VALUE = 32,
+        VIEW_MESSAGE_BY_ID_VALUE = 33,
+        HEART_BEAT_VALUE = 34,
+        UNREGISTER_CLIENT_VALUE = 35,
+        CONSUMER_SEND_MSG_BACK_VALUE = 36,
+        END_TRANSACTION_VALUE = 37,
+        GET_CONSUMER_LIST_BY_GROUP_VALUE = 38,
+        CHECK_TRANSACTION_STATE_VALUE = 39,
+
+        NOTIFY_CONSUMER_IDS_CHANGED_VALUE = 40,
+        LOCK_BATCH_MQ_VALUE = 41,
+        UNLOCK_BATCH_MQ_VALUE = 42,
+        GET_ALL_CONSUMER_OFFSET_VALUE = 43,
+        GET_ALL_DELAY_OFFSET_VALUE = 45,
+
+        // Namesrv
+        PUT_KV_CONFIG_VALUE = 100,
+        GET_KV_CONFIG_VALUE = 101,
+        DELETE_KV_CONFIG_VALUE = 102,
+        REGISTER_BROKER_VALUE = 103,
+        UNREGISTER_BROKER_VALUE = 104,
+        GET_ROUTEINTO_BY_TOPIC_VALUE = 105,
+        GET_BROKER_CLUSTER_INFO_VALUE = 106,
+
+        // broker && namesrv
+        UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE = 200,
+        GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE = 201,
+        GET_TOPIC_STATS_INFO_VALUE = 202,
+        GET_CONSUMER_CONNECTION_LIST_VALUE = 203,
+        GET_PRODUCER_CONNECTION_LIST_VALUE = 204,
+        WIPE_WRITE_PERM_OF_BROKER_VALUE = 205,
+        GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE = 206,
+        DELETE_SUBSCRIPTIONGROUP_VALUE = 207,
+        GET_CONSUME_STATS_VALUE = 208,
+        SUSPEND_CONSUMER_VALUE = 209,
+
+        RESUME_CONSUMER_VALUE = 210,
+        RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE = 211,
+        RESET_CONSUMER_OFFSET_IN_BROKER_VALUE = 212,
+        ADJUST_CONSUMER_THREAD_POOL_VALUE = 213,
+        WHO_CONSUME_THE_MESSAGE_VALUE = 214,
+        DELETE_TOPIC_IN_BROKER_VALUE = 215,
+        DELETE_TOPIC_IN_NAMESRV_VALUE = 216,
+        GET_KV_CONFIG_BY_VALUE_VALUE = 217,
+        DELETE_KV_CONFIG_BY_VALUE_VALUE = 218,
+        GET_KVLIST_BY_NAMESPACE_VALUE = 219,
+
+        RESET_CONSUMER_CLIENT_OFFSET_VALUE = 220,
+        GET_CONSUMER_STATUS_FROM_CLIENT_VALUE = 221,
+        INVOKE_BROKER_TO_RESET_OFFSET_VALUE = 222,
+        INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE = 223,
+        GET_TOPICS_BY_CLUSTER_VALUE = 224,
+
+        QUERY_TOPIC_CONSUME_BY_WHO_VALUE = 300,
+        REGISTER_FILTER_SERVER_VALUE = 301,
+        REGISTER_MESSAGE_FILTER_CLASS_VALUE = 302,
+        QUERY_CONSUME_TIME_SPAN_VALUE = 303,
+        GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE = 304,
+        GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE = 305,
+        CLEAN_EXPIRED_CONSUMEQUEUE_VALUE = 306,
+        GET_CONSUMER_RUNNING_INFO_VALUE = 307,
+        QUERY_CORRECTION_OFFSET_VALUE = 308,
+        CONSUME_MESSAGE_DIRECTLY_VALUE = 309,
+
+        SEND_MESSAGE_V2_VALUE = 310,
+        GET_UNIT_TOPIC_LIST_VALUE = 311,
+        GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE = 312,
+        GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE = 313,
+        CLONE_GROUP_OFFSET_VALUE = 314,
+        VIEW_BROKER_STATS_DATA_VALUE = 315,
+    };
+
+    enum MQResponseCode
+    {
+        FLUSH_DISK_TIMEOUT_VALUE = 10,
+        SLAVE_NOT_AVAILABLE_VALUE = 11,
+        FLUSH_SLAVE_TIMEOUT_VALUE = 12,
+        MESSAGE_ILLEGAL_VALUE = 13,
+        SERVICE_NOT_AVAILABLE_VALUE = 14,
+        VERSION_NOT_SUPPORTED_VALUE = 15,
+        NO_PERMISSION_VALUE = 16,
+        TOPIC_NOT_EXIST_VALUE = 17,
+        TOPIC_EXIST_ALREADY_VALUE = 18,
+        PULL_NOT_FOUND_VALUE = 19,
+
+        PULL_RETRY_IMMEDIATELY_VALUE = 20,
+        PULL_OFFSET_MOVED_VALUE = 21,
+        QUERY_NOT_FOUND_VALUE = 22,
+        SUBSCRIPTION_PARSE_FAILED_VALUE = 23,
+        SUBSCRIPTION_NOT_EXIST_VALUE = 24,
+        SUBSCRIPTION_NOT_LATEST_VALUE = 25,
+        SUBSCRIPTION_GROUP_NOT_EXIST_VALUE = 26,
+
+        TRANSACTION_SHOULD_COMMIT_VALUE = 200,
+        TRANSACTION_SHOULD_ROLLBACK_VALUE = 201,
+        TRANSACTION_STATE_UNKNOW_VALUE = 202,
+        TRANSACTION_STATE_GROUP_WRONG_VALUE = 203,
+		NO_BUYER_ID_VALUE = 204,
+		NOT_IN_CURRENT_UNIT_VALUE = 205,
+		CONSUMER_NOT_ONLINE_VALUE = 206,
+		CONSUME_MSG_TIMEOUT_VALUE = 207,
+    };
+
+	const char* getMQRequestCodeString(int code);
+	const char* getMQResponseCodeString(int code);
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h b/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h
new file mode 100755
index 0000000..56ee4e4
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h
@@ -0,0 +1,135 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __OFFSETSERIALIZEWRAPPER_H__
+#define __OFFSETSERIALIZEWRAPPER_H__
+
+#include <map>
+#include <string>
+#include "RemotingSerializable.h"
+#include "MessageQueue.h"
+#include "AtomicValue.h"
+#include "UtilAll.h"
+#include "json/json.h"
+
+
+namespace rmq
+{
+    class OffsetSerializeWrapper : public RemotingSerializable
+    {
+    public:
+        void encode(std::string& outData)
+        {
+            Json::Value offsetTable;
+            RMQ_FOR_EACH(m_offsetTable, it)
+            {
+                MessageQueue mq = it->first;
+                kpr::AtomicLong& offset = it->second;
+
+                std::string mqStr = mq.toJsonString();
+                offsetTable[mqStr] = offset.get();
+            }
+
+            Json::Value obj;
+            obj["offsetTable"] = offsetTable;
+
+            Json::FastWriter writer;
+            outData = writer.write(obj);
+        }
+        static OffsetSerializeWrapper* decode(const char* pData, int len)
+        {
+            /*
+            {
+                "offsetTable":{
+                    '{"brokerName":"broker-a","queueId":3,"topic":"TopicTest"}':0,
+                    '{"brokerName":"broker-a","queueId":2,"topic":"TopicTest"}':0
+                }
+
+            }
+            */
+
+            RMQ_DEBUG("decode, data:%s", pData);
+
+            Json::Reader reader;
+            Json::Value obj;
+            if (!reader.parse(pData, pData + len, obj))
+            {
+                return NULL;
+            }
+
+            RMQ_DEBUG("decode ok");
+
+            if (obj.isObject())
+            {
+                Json::Value objOffsetTable = obj["offsetTable"];
+                if (objOffsetTable.isObject())
+                {
+                    std::map<MessageQueue, kpr::AtomicLong> offsetTable;
+                    OffsetSerializeWrapper* offsetWrapper = new OffsetSerializeWrapper();
+
+                    Json::Value::Members members = objOffsetTable.getMemberNames();
+                    for (typeof(members.begin()) it = members.begin(); it != members.end(); it++)
+                    {
+                        std::string key = *it;
+                        Json::Value objMq;
+                        RMQ_DEBUG("decode, key:%s", key.c_str());
+                        if (!reader.parse(key, objMq))
+                        {
+                            continue;
+                        }
+                        RMQ_DEBUG("decode, key ok");
+
+                        MessageQueue mq(objMq["topic"].asString(), objMq["brokerName"].asString(),
+                                        objMq["queueId"].asInt());
+                        long long offset = objOffsetTable[key].asInt64();
+
+                        offsetTable[mq] = kpr::AtomicLong(offset);
+                    }
+                    offsetWrapper->setOffsetTable(offsetTable);
+
+                    return offsetWrapper;
+                }
+            }
+
+            return NULL;
+        }
+
+		std::string toString() const
+		{
+			std::stringstream ss;
+			ss << "{offsetTable=" << UtilAll::toString(m_offsetTable)
+			   << "}";
+			return ss.str();
+		}
+
+        std::map<MessageQueue, kpr::AtomicLong>& getOffsetTable()
+        {
+            return m_offsetTable;
+        }
+
+        void setOffsetTable(const std::map<MessageQueue, kpr::AtomicLong>& table)
+        {
+            m_offsetTable = table;
+        }
+
+    private:
+        std::map<MessageQueue, kpr::AtomicLong> m_offsetTable;
+    };
+
+    typedef kpr::RefHandleT<OffsetSerializeWrapper> OffsetSerializeWrapperPtr;
+}
+
+#endif


Mime
View raw message