rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [02/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/RemotingCommand.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/RemotingCommand.cpp b/rocketmq-client4cpp/src/protocol/RemotingCommand.cpp
new file mode 100755
index 0000000..2f58d20
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/RemotingCommand.cpp
@@ -0,0 +1,421 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "RemotingCommand.h"
+
+#include <sstream>
+#include <string>
+#include <stdlib.h>
+#include <string.h>
+#include <json/json.h>
+#include "SocketUtil.h"
+#include "CommandCustomHeader.h"
+#include "MQVersion.h"
+
+namespace rmq
+{
+
+kpr::AtomicInteger RemotingCommand::s_seqNumber = 0;
+volatile int RemotingCommand::s_configVersion = MQVersion::s_CurrentVersion;
+
+RemotingCommand::RemotingCommand(int code)
+    : m_code(code), m_language("CPP"), m_version(0), m_opaque(s_seqNumber++),
+      m_flag(0), m_remark(""), m_pCustomHeader(NULL),
+      m_dataLen(0), m_pData(NULL), m_bodyLen(0), m_pBody(NULL), m_releaseBody(false)
+{
+}
+
+RemotingCommand::RemotingCommand(int code,
+                                 const std::string& language,
+                                 int version,
+                                 int opaque,
+                                 int flag,
+                                 const std::string& remark,
+                                 CommandCustomHeader* pCustomHeader)
+    : m_code(code), m_language(language), m_version(version), m_opaque(opaque),
+      m_flag(flag), m_remark(remark), m_pCustomHeader(pCustomHeader),
+      m_dataLen(0), m_pData(NULL), m_bodyLen(0), m_pBody(NULL), m_releaseBody(false)
+{
+
+}
+
+RemotingCommand::~RemotingCommand()
+{
+	if (m_pData)
+	{
+    	delete[] m_pData;
+    }
+
+    if (m_releaseBody)
+    {
+        delete[] m_pBody;
+        m_bodyLen = 0;
+        m_pBody = NULL;
+    }
+
+    // TODO: maybe memleak
+    if (m_pCustomHeader)
+    {
+        delete m_pCustomHeader;
+        m_pCustomHeader = NULL;
+    }
+}
+
+void RemotingCommand::encode()
+{
+    std::string extHeader = "{}";
+    if (m_pCustomHeader)
+    {
+        m_pCustomHeader->encode(extHeader);
+    }
+
+    std::stringstream ss;
+    ss << "{"
+       << CODE_STRING << m_code << ","
+       << language_STRING << "\"CPP\","
+       << version_STRING << m_version << ","
+       << opaque_STRING << m_opaque << ","
+       << flag_STRING << m_flag << ","
+       << remark_STRING << "\"" << m_remark << "\","
+       << extFields_STRING << extHeader
+       << "}";
+
+	/* protocol:
+	 * | 4        | 4           | headerlen    | bodylen    |
+	 * | 1-length | 2-headerlen | 3-headerdata | 4-bodydata |
+	 */
+    int headLen = ss.str().size();
+    m_dataLen = 8 + headLen + m_bodyLen;
+    m_pData = new char[m_dataLen];
+
+	//length = len(2 + 3 + 4)
+    int tmp = htonl(4 + headLen + m_bodyLen);
+    memcpy(m_pData, &tmp, 4);
+
+	//headerlength = len(3)
+    tmp = htonl(headLen);
+    memcpy(m_pData + 4, &tmp, 4);
+
+    //headerdata
+    memcpy(m_pData + 8, ss.str().c_str(), headLen);
+
+    //bodydata
+    if (m_pBody)
+    {
+    	memcpy(m_pData + 8 + headLen, m_pBody, m_bodyLen);
+    }
+
+    //RMQ_DEBUG("encode|%s%s", ss.str().c_str(), m_pBody ? std::string(m_pBody, m_bodyLen).c_str() : "");
+}
+
+std::string RemotingCommand::toString() const
+{
+	std::string extHeader;
+    if (m_pCustomHeader)
+    {
+        m_pCustomHeader->encode(extHeader);
+    }
+
+    std::stringstream ss;
+    ss << "{"
+       << CODE_STRING << m_code << ","
+       << language_STRING << "\"CPP\","
+       << version_STRING << m_version << ","
+       << opaque_STRING << m_opaque << ","
+       << flag_STRING << m_flag << ","
+       << remark_STRING << "\"" << m_remark << "\"";
+    if (!extHeader.empty())
+    {
+        ss << "," << extFields_STRING << extHeader;
+    }
+    ss << "}";
+
+    if (m_pBody)
+    {
+        ss << "|" << m_bodyLen << "|" << std::string(m_pBody, m_bodyLen);
+    }
+
+    return ss.str();
+}
+
+
+const char* RemotingCommand::getData()
+{
+    return m_pData;
+}
+
+int RemotingCommand::getDataLen()
+{
+    return m_dataLen;
+}
+
+const char* RemotingCommand::getBody()
+{
+    return m_pBody;
+}
+
+int RemotingCommand::getBodyLen()
+{
+    return m_bodyLen;
+}
+
+void RemotingCommand::setBody(char* pData, int len, bool copy)
+{
+    m_releaseBody = copy;
+
+    if (copy)
+    {
+        m_pBody = new char[len];
+        m_bodyLen = len;
+        memcpy(m_pBody, pData, len);
+    }
+    else
+    {
+        m_pBody = pData;
+        m_bodyLen = len;
+    }
+}
+
+RemotingCommand* RemotingCommand::decode(const char* pData, int len)
+{
+    Json::Reader reader;
+    Json::Value object;
+
+    int headLen;
+    memcpy(&headLen, pData + 4, 4);
+    headLen = ntohl(headLen);
+
+    //RMQ_DEBUG("decode[%d,%d,%d]|%s%s", len, headLen, len - 8 - headLen, std::string(pData + 8, headLen).c_str(),
+    //          std::string(pData + 8 + headLen, len - 8 - headLen).c_str());
+
+    if (!reader.parse(pData + 8, pData + 8 + headLen, object))
+    {
+        RMQ_ERROR("parse header fail, %s", std::string(pData + 8, headLen).c_str());
+        return NULL;
+    }
+
+    int code = object["code"].asInt();
+    std::string language = object["language"].asString();
+    int version = object["version"].asInt();
+    int opaque = object["opaque"].asInt();
+    int flag = object["flag"].asInt();
+
+    Json::Value v = object["remark"];
+    std::string remark = "";
+    if (!v.isNull())
+    {
+        remark = object["remark"].asString();
+    }
+
+    RemotingCommand* cmd = new RemotingCommand(code,
+            language,
+            version,
+            opaque,
+            flag,
+            remark,
+            NULL);
+
+    int bodyLen = len - 8 - headLen;
+    if (bodyLen > 0)
+    {
+        cmd->setBody((char*)(pData + 8 + headLen), bodyLen, true);
+    }
+
+    return cmd;
+}
+
+CommandCustomHeader* RemotingCommand::makeCustomHeader(int code, const char* pData, int len)
+{
+    Json::Reader reader;
+    Json::Value object;
+
+    int headLen;
+    memcpy(&headLen, pData + 4, 4);
+    headLen = ntohl(headLen);
+
+    if (!reader.parse(pData + 8, pData + 8 + headLen, object))
+    {
+        RMQ_ERROR("parse header fail, %s", std::string(pData + 8, headLen).c_str());
+        return NULL;
+    }
+
+    if (object.isMember("extFields") && object["extFields"].isObject() && object["extFields"].size() > 0)
+    {
+        CommandCustomHeader* pCustomHeader = CommandCustomHeader::decode(
+        	code, object["extFields"], isResponseType());
+        if (pCustomHeader == NULL)
+        {
+        	RMQ_WARN("invalid extFields, %d, %s", code, std::string(pData + 8, headLen).c_str());
+        }
+
+        setCommandCustomHeader(pCustomHeader);
+        return pCustomHeader;
+    }
+
+    return NULL;
+}
+
+
+RemotingCommand* RemotingCommand::createRequestCommand(int code, CommandCustomHeader* pCustomHeader)
+{
+    RemotingCommand* cmd = new RemotingCommand(code);
+    cmd->setCommandCustomHeader(pCustomHeader);
+    setCmdVersion(cmd);
+
+    return cmd;
+}
+
+RemotingCommand* RemotingCommand::createResponseCommand(int code, const std::string& remark)
+{
+	return createResponseCommand(code, remark, NULL);
+}
+
+
+RemotingCommand* RemotingCommand::createResponseCommand(int code, const std::string& remark,
+	CommandCustomHeader* pCustomHeader)
+{
+    RemotingCommand* cmd = new RemotingCommand(code);
+    cmd->markResponseType();
+    cmd->setRemark(remark);
+    setCmdVersion(cmd);
+
+    if (pCustomHeader)
+    {
+    	cmd->setCommandCustomHeader(pCustomHeader);
+    }
+
+    return cmd;
+}
+
+
+void RemotingCommand::markResponseType()
+{
+    int bits = 1 << RPC_TYPE;
+    m_flag |= bits;
+}
+
+bool RemotingCommand::isResponseType()
+{
+    int bits = 1 << RPC_TYPE;
+    return (m_flag & bits) == bits;
+}
+
+void RemotingCommand::markOnewayRPC()
+{
+    int bits = 1 << RPC_ONEWAY;
+    m_flag |= bits;
+}
+
+bool RemotingCommand::isOnewayRPC()
+{
+    int bits = 1 << RPC_ONEWAY;
+    return (m_flag & bits) == bits;
+}
+
+void RemotingCommand::setCmdVersion(RemotingCommand* pCmd)
+{
+    if (s_configVersion >= 0)
+    {
+        pCmd->setVersion(s_configVersion);
+    }
+    else
+    {
+        int value = MQVersion::s_CurrentVersion;
+        pCmd->setVersion(value);
+        s_configVersion = value;
+    }
+}
+
+int RemotingCommand::getCode()
+{
+    return m_code;
+}
+
+void RemotingCommand::setCode(int code)
+{
+    m_code = code;
+}
+
+std::string RemotingCommand::getLanguage()
+{
+    return m_language;
+}
+
+void RemotingCommand::setLanguage(const std::string& language)
+{
+    m_language = language;
+}
+
+int RemotingCommand::getVersion()
+{
+    return m_version;
+}
+
+void RemotingCommand::setVersion(int version)
+{
+    m_version = version;
+}
+
+int RemotingCommand::getOpaque()
+{
+    return m_opaque;
+}
+
+void RemotingCommand::setOpaque(int opaque)
+{
+    m_opaque = opaque;
+}
+
+int RemotingCommand::getFlag()
+{
+    return m_flag;
+}
+
+void RemotingCommand::setFlag(int flag)
+{
+    m_flag = flag;
+}
+
+std::string RemotingCommand::getRemark()
+{
+    return m_remark;
+}
+
+void RemotingCommand::setRemark(const std::string& remark)
+{
+    m_remark = remark;
+}
+
+void RemotingCommand::setCommandCustomHeader(CommandCustomHeader* pCommandCustomHeader)
+{
+    m_pCustomHeader = pCommandCustomHeader;
+}
+
+CommandCustomHeader* RemotingCommand::getCommandCustomHeader()
+{
+    return m_pCustomHeader;
+}
+
+RemotingCommandType RemotingCommand::getType()
+{
+    if (isResponseType())
+    {
+        return RESPONSE_COMMAND;
+    }
+
+    return REQUEST_COMMAND;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/RemotingCommand.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/RemotingCommand.h b/rocketmq-client4cpp/src/protocol/RemotingCommand.h
new file mode 100755
index 0000000..c51fcfd
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/RemotingCommand.h
@@ -0,0 +1,153 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __REMOTINGCOMMAND_H__
+#define __REMOTINGCOMMAND_H__
+
+#include <sstream>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "AtomicValue.h"
+#include "RefHandle.h"
+
+namespace rmq
+{
+    const std::string CODE_STRING = "\"code\":";
+    const std::string language_STRING = "\"language\":";
+    const std::string version_STRING = "\"version\":";
+    const std::string opaque_STRING = "\"opaque\":";
+    const std::string flag_STRING = "\"flag\":";
+    const std::string remark_STRING = "\"remark\":";
+    const std::string extFields_STRING = "\"extFields\":";
+
+    const std::string RemotingVersionKey = "rocketmq.remoting.version";
+
+    class CommandCustomHeader;
+
+    typedef enum
+    {
+        REQUEST_COMMAND,
+        RESPONSE_COMMAND
+    } RemotingCommandType;
+
+    typedef enum
+    {
+        SUCCESS_VALUE = 0,
+        SYSTEM_ERROR_VALUE,
+        SYSTEM_BUSY_VALUE,
+        REQUEST_CODE_NOT_SUPPORTED_VALUE,
+    } ResponseCode;
+
+    typedef enum
+    {
+        JAVA,
+        CPP,
+        DOTNET,
+        PYTHON,
+        DELPHI,
+        ERLANG,
+        RUBY,
+        OTHER,
+    } LanguageCode;
+
+    const int RPC_TYPE = 0; // 0, REQUEST_COMMAND // 1, RESPONSE_COMMAND
+    const int RPC_ONEWAY = 1; // 0, RPC // 1, Oneway
+
+    class RemotingCommand : public kpr::RefCount
+    {
+    public:
+        RemotingCommand(int code);
+        RemotingCommand(int code,
+                        const std::string& language,
+                        int version,
+                        int opaque,
+                        int flag,
+                        const std::string& remark,
+                        CommandCustomHeader* pCustomHeader);
+        ~RemotingCommand();
+
+        void encode();
+        std::string toString() const;
+
+        const char* getData();
+        int getDataLen();
+
+        const char* getBody();
+        int getBodyLen();
+        void setBody(char* pData, int len, bool copy);
+        CommandCustomHeader* makeCustomHeader(int code, const char* pData, int len);
+
+        int getCode();
+        void setCode(int code);
+
+        std::string getLanguage();
+        void setLanguage(const std::string& language);
+
+        int getVersion();
+        void setVersion(int version);
+
+        int getOpaque();
+        void setOpaque(int opaque);
+
+        int getFlag();
+        void setFlag(int flag);
+
+        std::string getRemark();
+        void setRemark(const std::string& remark);
+
+        void setCommandCustomHeader(CommandCustomHeader* pCommandCustomHeader);
+        CommandCustomHeader* getCommandCustomHeader();
+
+        RemotingCommandType getType();
+        void markResponseType();
+        bool isResponseType() ;
+        void markOnewayRPC();
+        bool isOnewayRPC();
+
+        static void setCmdVersion(RemotingCommand* pCmd);
+		static RemotingCommand* decode(const char* pData, int len);
+        static RemotingCommand* createRequestCommand(int code, CommandCustomHeader* pCustomHeader);
+		static RemotingCommand* createResponseCommand(int code, const std::string& remark);
+		static RemotingCommand* createResponseCommand(int code, const std::string& remark, CommandCustomHeader* pCustomHeader);
+
+
+    private:
+        static volatile int s_configVersion;
+
+    private:
+        int m_code;
+        std::string m_language;
+        int m_version;
+        int m_opaque;
+        int m_flag;
+        std::string m_remark;
+        CommandCustomHeader* m_pCustomHeader;
+
+        int m_dataLen;
+        char* m_pData;
+
+        int m_bodyLen;
+        char* m_pBody;
+
+        bool m_releaseBody;
+
+        static kpr::AtomicInteger s_seqNumber;
+    };
+    typedef kpr::RefHandleT<RemotingCommand> RemotingCommandPtr;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/RemotingSerializable.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/RemotingSerializable.h b/rocketmq-client4cpp/src/protocol/RemotingSerializable.h
new file mode 100755
index 0000000..8e50ab0
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/RemotingSerializable.h
@@ -0,0 +1,33 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __REMOTINGSERIALIZABLE_H__
+#define __REMOTINGSERIALIZABLE_H__
+
+#include "RocketMQClient.h"
+#include "RefHandle.h"
+
+namespace rmq
+{
+	class RemotingSerializable : public kpr::RefCount
+	{
+	public:
+	    virtual ~RemotingSerializable() {};
+	    virtual void encode(std::string& outData) = 0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/TopicList.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/TopicList.h b/rocketmq-client4cpp/src/protocol/TopicList.h
new file mode 100755
index 0000000..e827540
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/TopicList.h
@@ -0,0 +1,60 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __TOPICLIST_H__
+#define __TOPICLIST_H__
+
+#include <set>
+#include <string>
+#include <UtilAll.h>
+
+namespace rmq
+{
+    class TopicList : public RemotingSerializable
+    {
+    public:
+        static TopicList* decode(const char* pData, int len)
+        {
+            return new TopicList();
+        }
+
+        void encode(std::string& outData)
+        {
+        }
+
+		std::string toString() const
+		{
+			std::stringstream ss;
+			ss << "{topicList=" << UtilAll::toString(m_topicList)
+			   << "}";
+			return ss.str();
+		}
+
+        const std::set<std::string>& getTopicList()
+        {
+            return m_topicList;
+        }
+
+        void setTopicList(const std::set<std::string>& topicList)
+        {
+            m_topicList = topicList;
+        }
+
+    private:
+        std::set<std::string> m_topicList;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/TopicRouteData.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/TopicRouteData.h b/rocketmq-client4cpp/src/protocol/TopicRouteData.h
new file mode 100755
index 0000000..a40ef7d
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/TopicRouteData.h
@@ -0,0 +1,279 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __TOPICROUTEDATA_H__
+#define __TOPICROUTEDATA_H__
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <iostream>
+#include <list>
+#include <map>
+#include <string>
+#include <sstream>
+#include "RocketMQClient.h"
+#include "RemotingSerializable.h"
+#include "UtilAll.h"
+#include "MixAll.h"
+#include "json/json.h"
+
+namespace rmq
+{
+    struct QueueData
+    {
+        std::string brokerName;
+        int readQueueNums;
+        int writeQueueNums;
+        int perm;
+
+        bool operator < (const QueueData& other)
+        {
+            return brokerName < other.brokerName;
+        }
+
+        bool operator==(const QueueData& other)const
+        {
+            if (brokerName == other.brokerName
+                && readQueueNums == other.readQueueNums
+                && writeQueueNums == other.writeQueueNums
+                && perm == other.perm)
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+        std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{brokerName=" << brokerName
+               << ",readQueueNums=" << readQueueNums
+               << ",writeQueueNums=" << writeQueueNums
+               << ",perm=" << perm
+               << "}";
+            return ss.str();
+        }
+    };
+    inline std::ostream& operator<<(std::ostream& os, const QueueData& obj)
+    {
+        os << obj.toString();
+        return os;
+    }
+
+
+    struct BrokerData
+    {
+        std::string brokerName;
+        std::map<int, std::string> brokerAddrs;
+
+        bool operator < (const BrokerData& other)
+        {
+            return brokerName < other.brokerName;
+        }
+
+        bool operator == (const BrokerData& other)const
+        {
+            if (brokerName == other.brokerName
+                && brokerAddrs == other.brokerAddrs)
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+        std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{brokerName=" << brokerName
+               << ",brokerAddrs=" << UtilAll::toString(brokerAddrs)
+               << "}";
+            return ss.str();
+        }
+    };
+
+
+    inline std::ostream& operator<<(std::ostream& os, const BrokerData& obj)
+    {
+        os << obj.toString();
+        return os;
+    }
+
+
+    class TopicRouteData : public RemotingSerializable
+    {
+    public:
+        void encode(std::string& outData)
+        {
+
+        }
+
+        static TopicRouteData* encode(const char* pData, int len)
+        {
+            /*
+            {
+                "orderTopicConf":"",
+                "brokerDatas":[
+                    {"brokerAddrs":{0:"10.134.143.77:10911"},"brokerName":"broker-a"}
+                ],
+                "filterServerTable":{},
+                "queueDatas":[
+                    {"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}
+                ]
+            }
+            */
+            Json::Reader reader;
+            Json::Value object;
+
+            if (!reader.parse(pData, pData + len, object))
+            {
+                RMQ_ERROR("parse fail:%s", reader.getFormattedErrorMessages().c_str());
+                return NULL;
+            }
+
+            TopicRouteData* trd = new TopicRouteData();
+            trd->setOrderTopicConf(object["orderTopicConf"].asString());
+
+            Json::Value qds = object["queueDatas"];
+            for (size_t i = 0; i < qds.size(); i++)
+            {
+                QueueData d;
+                Json::Value qd = qds[i];
+                d.brokerName = qd["brokerName"].asString();
+                d.readQueueNums = qd["readQueueNums"].asInt();
+                d.writeQueueNums = qd["writeQueueNums"].asInt();
+                d.perm = qd["perm"].asInt();
+
+                trd->getQueueDatas().push_back(d);
+            }
+
+            Json::Value bds = object["brokerDatas"];
+            for (size_t i = 0; i < bds.size(); i++)
+            {
+                BrokerData d;
+                Json::Value bd = bds[i];
+                d.brokerName = bd["brokerName"].asString();
+
+                Json::Value bas = bd["brokerAddrs"];
+                Json::Value::Members mbs = bas.getMemberNames();
+                for (size_t i = 0; i < mbs.size(); i++)
+                {
+                    std::string key = mbs.at(i);
+                    d.brokerAddrs[atoi(key.c_str())] = bas[key].asString();
+                }
+
+                trd->getBrokerDatas().push_back(d);
+            }
+
+            return trd;
+        }
+
+        static std::string selectBrokerAddr(BrokerData& data)
+        {
+            std::map<int, std::string>::iterator it = data.brokerAddrs.find(MixAll::MASTER_ID);
+            std::string value = "";
+            if (it == data.brokerAddrs.end())
+            {
+                it = data.brokerAddrs.begin();
+                if (it != data.brokerAddrs.end())
+                {
+                    value = it->second;
+                }
+            }
+            else
+            {
+                value = it->second;
+            }
+
+            return value;
+        }
+
+        std::list<QueueData>& getQueueDatas()
+        {
+            return m_queueDatas;
+        }
+
+        void setQueueDatas(const std::list<QueueData>& queueDatas)
+        {
+            m_queueDatas = queueDatas;
+        }
+
+        std::list<BrokerData>& getBrokerDatas()
+        {
+            return m_brokerDatas;
+        }
+
+        void setBrokerDatas(const std::list<BrokerData>& brokerDatas)
+        {
+            m_brokerDatas = brokerDatas;
+        }
+
+        const std::string& getOrderTopicConf()
+        {
+            return m_orderTopicConf;
+        }
+
+        void setOrderTopicConf(const std::string& orderTopicConf)
+        {
+            m_orderTopicConf = orderTopicConf;
+        }
+
+        bool operator ==(const TopicRouteData& other)
+        {
+            if (m_brokerDatas != other.m_brokerDatas)
+            {
+                return false;
+            }
+
+            if (m_orderTopicConf != other.m_orderTopicConf)
+            {
+                return false;
+            }
+
+            if (m_queueDatas != other.m_queueDatas)
+            {
+                return false;
+            }
+
+            return true;
+        }
+
+        std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{orderTopicConf=" << m_orderTopicConf
+               << ",queueDatas=" << UtilAll::toString(m_queueDatas)
+               << ",brokerDatas=" << UtilAll::toString(m_brokerDatas)
+               << "}";
+            return ss.str();
+        }
+
+    private:
+        std::string m_orderTopicConf;
+        std::list<QueueData> m_queueDatas;
+        std::list<BrokerData> m_brokerDatas;
+    };
+	typedef kpr::RefHandleT<TopicRouteData> TopicRouteDataPtr;
+
+    inline std::ostream& operator<<(std::ostream& os, const TopicRouteData& obj)
+    {
+        os << obj.toString();
+        return os;
+    }
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/InvokeCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/InvokeCallback.h b/rocketmq-client4cpp/src/transport/InvokeCallback.h
new file mode 100755
index 0000000..4b5b3c7
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/InvokeCallback.h
@@ -0,0 +1,32 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __INVOKECALLBACK_H__
+#define __INVOKECALLBACK_H__
+
+#include "ResponseFuture.h"
+
+namespace rmq
+{
+	class InvokeCallback
+	{
+	public:
+	    virtual ~InvokeCallback() {}
+	    virtual void operationComplete(ResponseFuturePtr pResponseFuture) = 0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/RemoteClientConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/RemoteClientConfig.h b/rocketmq-client4cpp/src/transport/RemoteClientConfig.h
new file mode 100755
index 0000000..930fc78
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/RemoteClientConfig.h
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __REMOTECLIENTCONFIG_H__
+#define __REMOTECLIENTCONFIG_H__
+
+#include <unistd.h>
+#include <sys/sysinfo.h>
+
+namespace rmq
+{
+    /**
+     * remote client config
+     *
+     */
+    class RemoteClientConfig
+    {
+    public:
+        RemoteClientConfig()
+        {
+            clientWorkerThreads = 4;
+            clientCallbackExecutorThreads = get_nprocs();
+            clientSelectorThreads = 1;
+            clientOnewaySemaphoreValue = 2048;
+            clientAsyncSemaphoreValue = 2048;
+            connectTimeoutMillis = 3000;
+            channelNotActiveInterval = 1000 * 60;
+            clientChannelMaxIdleTimeSeconds = 120;
+			clientSocketSndBufSize = 65535;
+			clientSocketRcvBufSize = 65535;
+
+			nsL5ModId = 0;
+			nsL5CmdId = 0;
+        }
+
+        // Server Response/Request
+        int clientWorkerThreads;
+        int clientCallbackExecutorThreads;
+        int clientSelectorThreads;
+        int clientOnewaySemaphoreValue;
+        int clientAsyncSemaphoreValue;
+        int connectTimeoutMillis;
+
+        int channelNotActiveInterval;
+        int clientChannelMaxIdleTimeSeconds;
+		int clientSocketSndBufSize;
+		int clientSocketRcvBufSize;
+
+		int nsL5ModId;
+		int nsL5CmdId;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/ResponseFuture.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/ResponseFuture.cpp b/rocketmq-client4cpp/src/transport/ResponseFuture.cpp
new file mode 100755
index 0000000..c80fb84
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/ResponseFuture.cpp
@@ -0,0 +1,183 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ResponseFuture.h"
+
+#include "RocketMQClient.h"
+#include "KPRUtil.h"
+#include "InvokeCallback.h"
+#include "Monitor.h"
+#include "Semaphore.h"
+#include "ScopedLock.h"
+
+namespace rmq
+{
+
+ResponseFuture::ResponseFuture(int requestCode, int opaque, int timeoutMillis,
+	InvokeCallback* pInvokeCallback, bool block, kpr::Semaphore* pSem)
+{
+    m_requestCode = requestCode;
+    m_opaque = opaque;
+    m_timeoutMillis = timeoutMillis;
+    m_pInvokeCallback = pInvokeCallback;
+    m_beginTimestamp = KPRUtil::GetCurrentTimeMillis();
+    m_pResponseCommand = NULL;
+    m_notifyFlag = false;
+    m_pMonitor = NULL;
+    m_sendRequestOK = false;
+    m_exec = 0;
+
+	m_pSemaphore = pSem;
+    m_released = 0;
+
+    if (block)
+    {
+        m_pMonitor = new kpr::Monitor();
+    }
+}
+
+ResponseFuture::~ResponseFuture()
+{
+    if (m_pMonitor)
+    {
+        delete m_pMonitor;
+    }
+}
+
+void  ResponseFuture::executeInvokeCallback()
+{
+    if (m_pInvokeCallback != NULL)
+    {
+    	if (m_exec.compareAndSet(0, 1))
+    	{
+    		try
+    		{
+        		m_pInvokeCallback->operationComplete(this);
+        	}
+        	catch(std::exception& e)
+        	{
+        		RMQ_ERROR("executeInvokeCallback exception: %s", e.what());
+        	}
+        	catch(...)
+        	{
+        		RMQ_ERROR("executeInvokeCallback exception");
+        	}
+        }
+    }
+}
+
+void ResponseFuture::release()
+{
+    if (m_pSemaphore != NULL)
+	{
+        if (m_released.compareAndSet(0, 1))
+		{
+            m_pSemaphore->Release();
+        }
+    }
+}
+
+bool ResponseFuture::isTimeout()
+{
+    long long diff = KPRUtil::GetCurrentTimeMillis() - m_beginTimestamp;
+    return diff > m_timeoutMillis;
+}
+
+RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis)
+{
+    if (m_pMonitor)
+    {
+        kpr::ScopedLock<kpr::Monitor> lock(*m_pMonitor);
+        if (!m_notifyFlag)
+        {
+            m_pMonitor->Wait(timeoutMillis);
+        }
+    }
+
+    return m_pResponseCommand;
+}
+
+void  ResponseFuture::putResponse(RemotingCommand* pResponseCommand)
+{
+    m_pResponseCommand = pResponseCommand;
+    if (m_pMonitor)
+    {
+        kpr::ScopedLock<kpr::Monitor> lock(*m_pMonitor);
+        m_notifyFlag = true;
+        m_pMonitor->Notify();
+    }
+}
+
+long long  ResponseFuture::getBeginTimestamp()
+{
+    return m_beginTimestamp;
+}
+
+bool  ResponseFuture::isSendRequestOK()
+{
+    return m_sendRequestOK;
+}
+
+void  ResponseFuture::setSendRequestOK(bool sendRequestOK)
+{
+    m_sendRequestOK = sendRequestOK;
+}
+
+long long  ResponseFuture::getTimeoutMillis()
+{
+    return m_timeoutMillis;
+}
+
+InvokeCallback*  ResponseFuture::getInvokeCallback()
+{
+    return m_pInvokeCallback;
+}
+
+RemotingCommand*  ResponseFuture::getResponseCommand()
+{
+    return m_pResponseCommand;
+}
+
+void  ResponseFuture::setResponseCommand(RemotingCommand* pResponseCommand)
+{
+    m_pResponseCommand = pResponseCommand;
+}
+
+int  ResponseFuture::getOpaque()
+{
+    return m_opaque;
+}
+
+int ResponseFuture::getRequestCode()
+{
+    return m_requestCode;
+}
+
+void ResponseFuture::setRequestCode(int requestCode)
+{
+    m_requestCode = requestCode;
+}
+
+std::string ResponseFuture::toString() const
+{
+	std::stringstream oss;
+	oss << "{responseCommand=" << m_pResponseCommand << ",sendRequestOK=" << m_sendRequestOK
+        << ",opaque=" << m_opaque << ",timeoutMillis=" << m_timeoutMillis
+        << ",invokeCallback=" << m_pInvokeCallback << ",beginTimestamp=" << m_beginTimestamp
+		<< "}";
+	return oss.str();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/ResponseFuture.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/ResponseFuture.h b/rocketmq-client4cpp/src/transport/ResponseFuture.h
new file mode 100755
index 0000000..f1dfc01
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/ResponseFuture.h
@@ -0,0 +1,77 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RESPONSEFUTURE_H__
+#define __RESPONSEFUTURE_H__
+
+#include <string>
+#include "AtomicValue.h"
+#include "RefHandle.h"
+
+namespace kpr
+{
+class Monitor;
+class Semaphore;
+}
+
+namespace rmq
+{
+    class InvokeCallback;
+    class RemotingCommand;
+
+    class ResponseFuture : public kpr::RefCount
+    {
+    public:
+        ResponseFuture(int requestCode, int opaque, int timeoutMillis, InvokeCallback* pInvokeCallback,
+			bool block, kpr::Semaphore* pSem);
+        ~ResponseFuture();
+        void executeInvokeCallback();
+		void release();
+        bool isTimeout();
+        RemotingCommand* waitResponse(int timeoutMillis);
+        void putResponse(RemotingCommand* pResponseCommand);
+        long long getBeginTimestamp();
+        bool isSendRequestOK();
+        void setSendRequestOK(bool sendRequestOK);
+        int getRequestCode();
+        void setRequestCode(int requestCode);
+        long long getTimeoutMillis();
+        InvokeCallback* getInvokeCallback();
+        RemotingCommand* getResponseCommand();
+        void setResponseCommand(RemotingCommand* pResponseCommand);
+        int getOpaque();
+		std::string toString() const;
+
+    private:
+        RemotingCommand* m_pResponseCommand;
+        volatile bool m_sendRequestOK;
+        int m_requestCode;
+        int m_opaque;
+        long long m_timeoutMillis;
+        InvokeCallback* m_pInvokeCallback;
+        long long m_beginTimestamp;
+        kpr::Monitor* m_pMonitor;
+        bool m_notifyFlag;
+
+        kpr::AtomicInteger m_exec;
+
+		kpr::Semaphore* m_pSemaphore;
+		kpr::AtomicInteger m_released;
+    };
+	typedef kpr::RefHandleT<ResponseFuture> ResponseFuturePtr;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/SocketUtil.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/SocketUtil.cpp b/rocketmq-client4cpp/src/transport/SocketUtil.cpp
new file mode 100755
index 0000000..a1e0d57
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/SocketUtil.cpp
@@ -0,0 +1,250 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "SocketUtil.h"
+#include "MixAll.h"
+#include "MQClientException.h"
+
+
+namespace rmq
+{
+
+int SocketInit()
+{
+    signal(SIGPIPE, SIG_IGN);
+
+    return 0;
+}
+
+int MakeSocketNonblocking(SOCKET fd)
+{
+    int flags = fcntl(fd, F_GETFL, 0);
+    assert(flags != -1);
+    flags = (flags | O_NONBLOCK);
+    return fcntl(fd, F_SETFL, flags);
+}
+
+int SetTcpNoDelay(SOCKET fd)
+{
+    int flag = 1;
+    return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const char*)&flag, sizeof(flag));
+}
+
+bool SplitURL(const std::string& serverURL, std::string& addr, short& nPort)
+{
+    size_t pos = serverURL.find(':');
+    if (pos == std::string::npos)
+    {
+        return false;
+    }
+
+    addr = serverURL.substr(0, pos);
+    if (0 == addr.compare("localhost"))
+    {
+        addr = "127.0.0.1";
+    }
+    pos++;
+
+    std::string port = serverURL.substr(pos, serverURL.length() - pos);
+    nPort = atoi(port.c_str());
+    return true;
+}
+
+sockaddr string2SocketAddress(const std::string& addrString)
+{
+    std::string strAddr;
+    short port;
+    SplitURL(addrString, strAddr, port);
+
+    struct sockaddr_in sa;
+    sa.sin_family = AF_INET;
+    sa.sin_port = htons(port);
+
+    sa.sin_addr.s_addr = inet_addr(strAddr.c_str());
+
+    sockaddr addr;
+    memcpy(&addr, &sa, sizeof(sockaddr));
+
+    return addr;
+}
+
+std::string socketAddress2String(sockaddr addr)
+{
+    sockaddr_in in;
+    memcpy(&in, &addr, sizeof(sockaddr));
+
+	std::stringstream ss;
+    ss << inet_ntoa(in.sin_addr) << ":" << in.sin_port;
+
+    return ss.str();
+}
+
+void GetLocalAddrs(std::vector<unsigned int>& addrs)
+{
+    addrs.clear();
+
+    struct ifconf ifc;
+    ifc.ifc_buf = NULL;
+    ifc.ifc_len = 0;
+
+    int sfd = socket(AF_INET, SOCK_DGRAM, 0);
+    if (sfd != INVALID_SOCKET)
+    {
+        int ret = ioctl(sfd, SIOCGIFCONF, (char*)&ifc);
+
+        if (ret != -1)
+        {
+            ifc.ifc_req = (struct ifreq*)malloc(ifc.ifc_len);
+            ret = ioctl(sfd, SIOCGIFCONF, (char*)&ifc);
+            if (ret != -1)
+            {
+                for (size_t i = 0; i < ifc.ifc_len / sizeof(struct ifreq); i++)
+                {
+                    struct sockaddr* sa = (struct sockaddr*) & (ifc.ifc_req[i].ifr_addr);
+                    if (AF_INET == sa->sa_family)
+                    {
+                        unsigned int addr = ((struct sockaddr_in*)sa)->sin_addr.s_addr;
+                        addrs.push_back(htonl(addr));
+                    }
+                }
+            }
+
+            free(ifc.ifc_req);
+            ifc.ifc_req = NULL;
+        }
+
+        close(sfd);
+    }
+
+    if (addrs.empty())
+    {
+        char hostname[1024];
+
+        int ret = gethostname(hostname, sizeof(hostname));
+        if (ret == 0)
+        {
+            struct addrinfo* result = NULL;
+            struct addrinfo* ptr = NULL;
+            struct addrinfo hints;
+
+            memset(&hints, 0, sizeof(hints));
+            hints.ai_family = AF_INET;
+            hints.ai_socktype = SOCK_STREAM;
+            hints.ai_protocol = IPPROTO_TCP;
+
+            ret = getaddrinfo(hostname, NULL, &hints, &result);
+            if (ret == 0)
+            {
+                for (ptr = result; ptr != NULL ; ptr = ptr->ai_next)
+                {
+
+                    struct sockaddr_in*  sockaddr_ipv4 = (struct sockaddr_in*) ptr->ai_addr;
+                    addrs.push_back(ntohl(sockaddr_ipv4->sin_addr.s_addr));
+                }
+            }
+
+            freeaddrinfo(result);
+        }
+    }
+
+    std::vector<unsigned int>::iterator it = addrs.begin();
+    for (; it != addrs.end();)
+    {
+        if (*it >= 0x7F000000U && *it < 0x80000000U)
+        {
+            it = addrs.erase(it);
+        }
+        else
+        {
+            it++;
+        }
+    }
+
+    if (addrs.empty())
+    {
+        addrs.push_back(INADDR_LOOPBACK);
+    }
+}
+
+std::string getLocalAddress()
+{
+    std::vector<unsigned int> addrs;
+    GetLocalAddrs(addrs);
+    struct in_addr addr;
+    addr.s_addr = htonl(addrs[0]);
+
+    return inet_ntoa(addr);
+}
+
+std::string getHostName(sockaddr addr)
+{
+    sockaddr_in in;
+    memcpy(&in, &addr, sizeof(sockaddr));
+
+    struct hostent* remoteHost = gethostbyaddr((char*) & (in.sin_addr), 4, AF_INET);
+    char** alias = remoteHost->h_aliases;
+    if (*alias != 0)
+    {
+        return *alias;
+    }
+    else
+    {
+        return inet_ntoa(in.sin_addr);
+    }
+}
+
+
+unsigned long long swapll(unsigned long long v)
+{
+#ifdef ENDIANMODE_BIG
+    return v;
+#else
+    unsigned long long ret = ((v << 56)
+                              | ((v & 0xff00) << 40)
+                              | ((v & 0xff0000) << 24)
+                              | ((v & 0xff000000) << 8)
+                              | ((v >> 8) & 0xff000000)
+                              | ((v >> 24) & 0xff0000)
+                              | ((v >> 40) & 0xff00)
+                              | (v >> 56));
+
+    return ret;
+#endif
+}
+
+unsigned long long h2nll(unsigned long long v)
+{
+    return swapll(v);
+}
+
+unsigned long long n2hll(unsigned long long v)
+{
+    return swapll(v);
+}
+
+std::string socketAddress2IPPort(sockaddr addr)
+{
+    sockaddr_in in;
+    memcpy(&in, &addr, sizeof(sockaddr));
+
+    char tmp[32];
+    snprintf(tmp, sizeof(tmp), "%s:%d", inet_ntoa(in.sin_addr), ntohs(in.sin_port));
+
+    std::string ipport = tmp;
+    return ipport;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/SocketUtil.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/SocketUtil.h b/rocketmq-client4cpp/src/transport/SocketUtil.h
new file mode 100755
index 0000000..bfd86d8
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/SocketUtil.h
@@ -0,0 +1,75 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __SOCKETUTIL_H__
+#define __SOCKETUTIL_H__
+
+#include <unistd.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/ioctl.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <net/if.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <signal.h>
+#include <string>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <sstream>
+#include <vector>
+#include <iostream>
+
+#include "RocketMQClient.h"
+
+
+#define NET_ERROR errno
+#define SOCKET_ERROR -1
+#define INVALID_SOCKET -1
+#define WSAECONNRESET ECONNRESET
+#define WSAEWOULDBLOCK EWOULDBLOCK
+#define WSAEINPROGRESS EINPROGRESS
+#define WSAEBADF EBADF
+#define closesocket close
+#define SD_SEND SHUT_WR
+#define SD_RECEIVE SHUT_RD
+#define SD_BOTH SHUT_RDWR
+typedef int SOCKET;
+#define SocketUninit()
+
+namespace rmq
+{
+	int SocketInit();
+	int MakeSocketNonblocking(SOCKET fd);
+	int SetTcpNoDelay(SOCKET fd);
+
+	bool SplitURL(const std::string& serverURL, std::string& addr, short& nPort);
+	sockaddr string2SocketAddress(const std::string& addr);
+	std::string socketAddress2String(sockaddr addr);
+	std::string socketAddress2IPPort(sockaddr addr);
+	std::string getHostName(sockaddr addr);
+	std::string getLocalAddress();
+
+	unsigned long long h2nll(unsigned long long v);
+	unsigned long long n2hll(unsigned long long v);
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/TcpRemotingClient.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/TcpRemotingClient.cpp b/rocketmq-client4cpp/src/transport/TcpRemotingClient.cpp
new file mode 100755
index 0000000..03b8ca7
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/TcpRemotingClient.cpp
@@ -0,0 +1,841 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "TcpRemotingClient.h"
+#include "MQClientException.h"
+#include "TcpRequestProcessor.h"
+#include "MQProtos.h"
+#include "ThreadPoolWork.h"
+
+namespace rmq
+{
+
+
+ProcessDataWork::ProcessDataWork(TcpRemotingClient* pClient, TcpTransport* pTts, std::string* pData)
+    : m_pClient(pClient), m_pTts(pTts), m_pData(pData)
+{
+}
+
+ProcessDataWork::~ProcessDataWork()
+{
+	delete m_pData;
+}
+
+void ProcessDataWork::Do()
+{
+    try
+    {
+        m_pClient->processData(m_pTts, m_pData);
+    }
+    catch (std::exception& e)
+    {
+    	RMQ_ERROR("processDataWork catch Exception: %s", e.what());
+    }
+    catch (...)
+    {
+    	RMQ_ERROR("processDataWork catch Exception");
+    }
+}
+
+TcpRemotingClient::TcpRemotingClient(const RemoteClientConfig& config)
+    : m_stop(false), m_epoller(false), m_config(config),
+      m_semaphoreOneway(s_ClientOnewaySemaphoreValue), m_semaphoreAsync(s_ClientAsyncSemaphoreValue)
+{
+    m_pNetThreadPool = new kpr::ThreadPool("NetClientThreadPool", 5, 5, 20);
+    m_pEventThread = new EventThread(*this);
+    SocketInit();
+    m_epoller.create(10240);
+}
+
+TcpRemotingClient::~TcpRemotingClient()
+{
+    SocketUninit();
+}
+
+void TcpRemotingClient::start()
+{
+    RMQ_DEBUG("TcpRemotingClient::start()");
+    m_pEventThread->Start();
+}
+
+void TcpRemotingClient::shutdown()
+{
+    RMQ_DEBUG("TcpRemotingClient::shutdown()");
+    m_stop = true;
+    m_pNetThreadPool->Destroy();
+    m_pEventThread->Join();
+}
+
+/*
+void printMsg(const std::string& prefix, const char* pData, int len)
+{
+	int headLen;
+    memcpy(&headLen, pData + 4, 4);
+    headLen = ntohl(headLen);
+
+    RMQ_DEBUG("%s|decode[%d,%d,%d]|%s%s", prefix.c_str(), len, headLen, len - 8 - headLen, std::string(pData + 8, headLen).c_str(),
+              std::string(pData + 8 + headLen, len - 8 - headLen).c_str());
+}
+*/
+
+void TcpRemotingClient::run()
+{
+    RMQ_INFO("EventThread run begin: %lld", KPRUtil::GetCurrentTimeMillis());
+    do
+    {
+        try
+        {
+            int nfds = m_epoller.wait(500);
+            if (nfds > 0)
+            {
+                int ret = 0;
+                std::vector<TcpTransport*> errTts;
+                for (int i = 0; i < nfds && !m_stop; ++i)
+                {
+                    const epoll_event& ev = m_epoller.get(i);
+                    std::map<std::string , TcpTransport*>::iterator it;
+                    {
+                        kpr::ScopedRLock<kpr::RWMutex> lock(m_transportTableLock);
+                        it = m_transportTable.find((char*)ev.data.ptr);
+                        if (it == m_transportTable.end())
+                        {
+                            continue;
+                        }
+                    }
+
+                    TcpTransport* pTts = it->second;
+                    if (ev.events & EPOLLERR || ev.events & EPOLLHUP)
+                    {
+                    	RMQ_ERROR("recvData fail, err=%d(%s), pts=%p", errno, strerror(errno), pTts);
+                        errTts.push_back(pTts);
+                    }
+
+                    if (ev.events & EPOLLIN)
+                    {
+                    	std::list<std::string*> dataList;
+                        ret = pTts->recvData(dataList);
+                        if (ret < 0)
+                        {
+                        	RMQ_ERROR("recvData fail, ret=%d, errno=%d, pts=%p", ret, NET_ERROR, pTts);
+                            errTts.push_back(pTts);
+                        }
+
+                        if (dataList.size() > 0)
+                        {
+                        	for (typeof(dataList.begin()) it = dataList.begin();
+                        		it != dataList.end(); it++)
+                        	{
+                        		//printMsg("run", (*it)->c_str(), (*it)->size());
+                        		kpr::ThreadPoolWorkPtr work = new ProcessDataWork(this, pTts, *it);
+								m_pNetThreadPool->AddWork(work);
+							}
+                        }
+                    }
+                }
+
+                std::vector<TcpTransport*>::iterator itErr = errTts.begin();
+                for (; itErr != errTts.end(); itErr++)
+                {
+                    removeTTS(*itErr, true);
+                }
+            }
+
+            handleTimerEvent();
+        }
+        catch (...)
+        {
+			RMQ_ERROR("TcpRemotingClient.run catch exception");
+        }
+    }
+    while (!m_stop);
+    handleTimerEvent();
+
+    RMQ_INFO("EventThread run end: %lld", KPRUtil::GetCurrentTimeMillis());
+}
+
+
+void TcpRemotingClient::updateNameServerAddressList(const std::vector<std::string>& addrs)
+{
+    m_namesrvAddrList = addrs;
+    m_namesrvIndex = 0;
+}
+
+std::vector<std::string> TcpRemotingClient::getNameServerAddressList()
+{
+    return m_namesrvAddrList;
+}
+
+void TcpRemotingClient::registerProcessor(int requestCode, TcpRequestProcessor* pProcessor)
+{
+    m_processorTable[requestCode] = pProcessor;
+}
+
+
+RemotingCommand* TcpRemotingClient::invokeSync(const std::string& addr,
+        RemotingCommand* pRequest,
+        int timeoutMillis)
+{
+    TcpTransport* pTts = getAndCreateTransport(addr, timeoutMillis);
+    if (pTts != NULL && pTts->isConnected())
+    {
+    	RemotingCommand* pResponse = NULL;
+    	try
+    	{
+        	pResponse = invokeSyncImpl(pTts, pRequest, timeoutMillis);
+        }
+        catch(const RemotingSendRequestException& e)
+        {
+			RMQ_WARN("invokeSync: send pRequest exception, so close the channel[{%s}]",
+				pTts->getServerAddr().c_str());
+            removeTTS(pTts, false);
+            throw e;
+        }
+        catch(const RemotingTimeoutException& e)
+        {
+			RMQ_WARN("invokeSync: wait response timeout exception, the channel[{%s}], timeout=%d",
+				pTts->getServerAddr().c_str(), timeoutMillis);
+			throw e;
+        }
+
+        return pResponse;
+    }
+    else
+    {
+        removeTTS(pTts, false);
+        THROW_MQEXCEPTION(RemotingConnectException, "connect fail", -1);
+        //return NULL;
+    }
+}
+
+void TcpRemotingClient::invokeAsync(const std::string& addr,
+                                   RemotingCommand* pRequest,
+                                   int timeoutMillis,
+                                   InvokeCallback* pInvokeCallback)
+{
+    TcpTransport* pTts = getAndCreateTransport(addr, timeoutMillis);
+    if (pTts != NULL && pTts->isConnected())
+    {
+    	try
+    	{
+        	this->invokeAsyncImpl(pTts, pRequest, timeoutMillis, pInvokeCallback);
+        }
+        catch (const RemotingSendRequestException& e)
+        {
+			RMQ_WARN("invokeAsync: send pRequest exception, so close the channel[{%s}]", addr.c_str());
+            removeTTS(pTts, false);
+            throw e;
+        }
+
+        return;
+    }
+    else
+    {
+        removeTTS(pTts, false);
+        std::string msg;msg.append("connect to <").append(addr).append("> failed");
+        THROW_MQEXCEPTION(RemotingConnectException, msg, -1);
+    }
+}
+
+int TcpRemotingClient::invokeOneway(const std::string& addr,
+                                    RemotingCommand* pRequest,
+                                    int timeoutMillis)
+{
+    TcpTransport* pTts = getAndCreateTransport(addr, timeoutMillis);
+    if (pTts != NULL && pTts->isConnected())
+    {
+        return invokeOnewayImpl(pTts, pRequest, timeoutMillis);
+    }
+    else
+    {
+        removeTTS(pTts, false);
+        return -1;
+    }
+}
+
+
+TcpTransport* TcpRemotingClient::getAndCreateTransport(const std::string& addr, int timeoutMillis)
+{
+	if (addr.empty())
+	{
+    	return getAndCreateNameserverTransport(timeoutMillis);
+    }
+
+	{
+        kpr::ScopedRLock<kpr::RWMutex> lock(m_transportTableLock);
+        std::map<std::string , TcpTransport*>::iterator it = m_transportTable.find(addr);
+        if (it != m_transportTable.end())
+        {
+            return it->second;
+        }
+    }
+
+    return this->createTransport(addr, timeoutMillis);
+}
+
+
+TcpTransport* TcpRemotingClient::createTransport(const std::string& addr, int timeoutMillis)
+{
+	TcpTransport* pTts = NULL;
+	{
+		kpr::ScopedRLock<kpr::RWMutex> lock(m_transportTableLock);
+		std::map<std::string , TcpTransport*>::iterator it = m_transportTable.find(addr);
+        if (it != m_transportTable.end())
+        {
+            return it->second;
+        }
+    }
+
+	if (m_transportTableLock.TryWriteLock(s_LockTimeoutMillis))
+	{
+		std::map<std::string , TcpTransport*>::iterator it = m_transportTable.find(addr);
+        if (it != m_transportTable.end())
+        {
+            return it->second;
+        }
+
+	    std::map<std::string , std::string> config;
+	    pTts = new TcpTransport(config);
+	    if (pTts->connect(addr, timeoutMillis) != CLIENT_ERROR_SUCCESS)
+	    {
+	        delete pTts;
+	        pTts = NULL;
+
+	        RMQ_INFO("[NETWORK]: CONNECT {%s} failed", addr.c_str());
+	    }
+	    else
+	    {
+		    m_transportTable[addr] = pTts;
+	        m_epoller.add(pTts->getSocket(), (long long)((pTts->getServerAddr()).c_str()), EPOLLIN);
+
+	        RMQ_INFO("[NETWORK]: CONNECT => {%s} success", addr.c_str());
+        }
+        m_transportTableLock.Unlock();
+    }
+    else
+    {
+    	RMQ_WARN("createTransport: try to lock m_transportTable, but timeout, {%d}ms", timeoutMillis);
+    }
+
+    return pTts;
+}
+
+
+TcpTransport* TcpRemotingClient::getAndCreateNameserverTransport(int timeoutMillis)
+{
+	TcpTransport* pTts = NULL;
+
+	if (m_namesrvAddrChoosed.get() != NULL)
+	{
+		std::string addr = *m_namesrvAddrChoosed;
+		if (!addr.empty())
+		{
+			pTts = getAndCreateTransport(addr, timeoutMillis);
+	        if (pTts != NULL)
+	        {
+	        	return pTts;
+	        }
+		}
+	}
+
+	if (m_namesrvAddrChoosedLock.TryLock(s_LockTimeoutMillis))
+	{
+    	if (m_namesrvAddrChoosed.get() != NULL)
+    	{
+	    	std::string addr = *m_namesrvAddrChoosed;
+	    	if (!addr.empty())
+			{
+				pTts = getAndCreateTransport(addr, timeoutMillis);
+		        if (pTts != NULL)
+		        {
+		        	m_namesrvAddrChoosedLock.Unlock();
+		        	return pTts;
+		        }
+			}
+		}
+
+		if (!m_namesrvAddrList.empty())
+    	{
+	        for (size_t i = 0; i < m_namesrvAddrList.size(); i++)
+	        {
+	            int index = abs(++m_namesrvIndex) % m_namesrvAddrList.size();
+	            std::string& newAddr = m_namesrvAddrList.at(index);
+	            m_namesrvAddrChoosed.set(&newAddr);
+	            TcpTransport* pTts = getAndCreateTransport(newAddr, timeoutMillis);
+	            if (pTts != NULL)
+	            {
+	            	m_namesrvAddrChoosedLock.Unlock();
+	            	return pTts;
+	            }
+	        }
+        }
+
+        m_namesrvAddrChoosedLock.Unlock();
+	}
+
+	return NULL;
+}
+
+
+void TcpRemotingClient::handleTimerEvent()
+{
+	// every 1000ms
+	static unsigned long long lastTime = 0;
+	if (!m_stop && (int)(KPRUtil::GetCurrentTimeMillis() - lastTime) < s_CheckIntervalMillis)
+	{
+		return;
+	}
+
+    try
+    {
+    	lastTime = KPRUtil::GetCurrentTimeMillis();
+
+    	this->scanResponseTable();
+
+    	this->scanCloseTransportTable();
+    }
+    catch(...)
+    {
+    	RMQ_ERROR("scanResponseTable exception");
+    }
+}
+
+
+void TcpRemotingClient::scanCloseTransportTable()
+{
+	if (m_closeTransportTable.empty())
+	{
+		return;
+	}
+
+	if (m_closeTransportTableLock.TryLock())
+	{
+		std::list<TcpTransport*>::iterator it;
+		for( it = m_closeTransportTable.begin(); it != m_closeTransportTable.end(); )
+		{
+			TcpTransport* pTts = *it;
+			long long diffTime = KPRUtil::GetCurrentTimeMillis() - pTts->getLastSendRecvTime();
+			if (m_stop || (diffTime > 5000))
+			{
+				RMQ_WARN("remove close connection, %lld, {%s}", diffTime, pTts->getServerAddr().c_str());
+				it = m_closeTransportTable.erase(it);
+				delete pTts;
+			}
+			else
+			{
+				it++;
+			}
+		}
+		m_closeTransportTableLock.Unlock();
+	}
+	else
+	{
+		RMQ_WARN("m_closeTransportTableLock TryLock fail");
+	}
+}
+
+
+void TcpRemotingClient::scanResponseTable()
+{
+	kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock);
+	for(typeof(m_responseTable.begin()) it = m_responseTable.begin();it != m_responseTable.end();)
+	{
+		long long diffTime = KPRUtil::GetCurrentTimeMillis() - it->second->getBeginTimestamp();
+		if (m_stop || (diffTime > it->second->getTimeoutMillis() + 2000))
+		{
+			RMQ_WARN("remove timeout request, %lld, %s", diffTime, it->second->toString().c_str());
+			try
+			{
+				it->second->executeInvokeCallback();
+			}
+			catch(...)
+			{
+				RMQ_WARN("scanResponseTable, operationComplete Exception");
+			}
+			it->second->release();
+			m_responseTable.erase(it++);
+		}
+		else
+		{
+			it++;
+		}
+	}
+}
+
+void TcpRemotingClient::processData(TcpTransport* pTts, std::string* pData)
+{
+	//printMsg("processData", pData->c_str(), pData->size());
+    RemotingCommand* pCmd = RemotingCommand::decode(pData->data(), (int)pData->size());
+	if (pCmd == NULL)
+	{
+		RMQ_ERROR("invalid data format, len:%d, data: %s", (int)pData->size(), pData->c_str());
+		return;
+	}
+
+    int code = 0;
+    if (pCmd->isResponseType())
+    {
+        kpr::ScopedRLock<kpr::RWMutex> lock(m_responseTableLock);
+        std::map<int, ResponseFuturePtr>::iterator it = m_responseTable.find(pCmd->getOpaque());
+        if (it != m_responseTable.end())
+        {
+            code = it->second->getRequestCode();
+        }
+        else
+        {
+            RMQ_WARN("receive response, but not matched any request, maybe timeout or oneway, pCmd: %s", pCmd->toString().c_str());
+            delete pCmd;
+            return;
+        }
+    }
+    else
+    {
+        code = pCmd->getCode();
+    }
+
+    pCmd->makeCustomHeader(code, pData->data(), (int)pData->size());
+    if (pCmd->isResponseType())
+    {
+	    RMQ_DEBUG("[NETWORK]: RECV => {%s}, {opaque=%d, requst.code=%s(%d), response.code=%s(%d)}, %s",
+	    	pTts->getServerAddr().c_str(), pCmd->getOpaque(), getMQRequestCodeString(code), code,
+	    	getMQResponseCodeString(pCmd->getCode()), pCmd->getCode(), pCmd->toString().c_str());
+    }
+    else
+    {
+    	RMQ_DEBUG("[NETWORK]: RECV => {%s}, {opaque=%d, requst.code=%s(%d)}, %s",
+	    	pTts->getServerAddr().c_str(), pCmd->getOpaque(),
+	    	getMQRequestCodeString(code), code,	pCmd->toString().c_str());
+    }
+
+    processMessageReceived(pTts, pCmd);
+}
+
+RemotingCommand* TcpRemotingClient::invokeSyncImpl(TcpTransport* pTts,
+        RemotingCommand* pRequest,
+        int timeoutMillis)
+{
+    ResponseFuturePtr pResponseFuture = new ResponseFuture(
+    	pRequest->getCode(), pRequest->getOpaque(), timeoutMillis,
+        NULL, true, NULL);
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock);
+        m_responseTable.insert(std::pair<int, ResponseFuturePtr>(pRequest->getOpaque(), pResponseFuture));
+    }
+
+    int ret = sendCmd(pTts, pRequest, timeoutMillis);
+    if (ret == 0)
+    {
+        pResponseFuture->setSendRequestOK(true);
+    }
+    else
+    {
+    	pResponseFuture->setSendRequestOK(false);
+    	pResponseFuture->putResponse(NULL);
+    	RMQ_WARN("send a pRequest command to channel <%s> failed.", pTts->getServerAddr().c_str());
+    }
+
+    RemotingCommand* pResponse = pResponseFuture->waitResponse(timeoutMillis);
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock);
+        std::map<int, ResponseFuturePtr>::iterator it = m_responseTable.find(pRequest->getOpaque());
+        if (it != m_responseTable.end())
+        {
+            m_responseTable.erase(it);
+        }
+    }
+
+    if (pResponse == NULL)
+    {
+        if (ret == 0)
+        {
+            std::stringstream oss;
+            oss << "wait response on the channel <" << pTts->getServerAddr() << "> timeout," << timeoutMillis << "ms";
+            THROW_MQEXCEPTION(RemotingTimeoutException, oss.str(), -1);
+        }
+        else
+        {
+            std::stringstream oss;
+            oss << "send request to <" << pTts->getServerAddr() << "> failed";
+            THROW_MQEXCEPTION(RemotingSendRequestException, oss.str(), -1);
+        }
+    }
+
+    return pResponse;
+}
+
+void TcpRemotingClient::invokeAsyncImpl(TcpTransport* pTts,
+                                       RemotingCommand* pRequest,
+                                       int timeoutMillis,
+                                       InvokeCallback* pInvokeCallback)
+{
+	bool acquired = m_semaphoreAsync.Wait(timeoutMillis);
+	if (acquired)
+	{
+	    ResponseFuturePtr pResponseFuture = new ResponseFuture(
+	    	pRequest->getCode(), pRequest->getOpaque(), timeoutMillis,
+        	pInvokeCallback, false, &m_semaphoreAsync);
+	    {
+	        kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock);
+	        m_responseTable.insert(std::pair<int, ResponseFuturePtr>(pRequest->getOpaque(), pResponseFuture));
+	    }
+
+	    int ret = sendCmd(pTts, pRequest, timeoutMillis);
+	    if (ret == 0)
+	    {
+	        pResponseFuture->setSendRequestOK(true);
+	    }
+	    else
+	    {
+	    	pResponseFuture->setSendRequestOK(false);
+	    	pResponseFuture->putResponse(NULL);
+			{
+		        kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock);
+		        std::map<int, ResponseFuturePtr>::iterator it = m_responseTable.find(pRequest->getOpaque());
+		        if (it != m_responseTable.end())
+		        {
+		            m_responseTable.erase(it);
+		        }
+	        }
+
+	    	try
+	    	{
+	    		pResponseFuture->executeInvokeCallback();
+	    	}
+	    	catch (...)
+	    	{
+	            RMQ_WARN("executeInvokeCallback exception");
+	        }
+	        pResponseFuture->release();
+
+	    	RMQ_WARN("send a pRequest command to channel <%s> failed, requet: %s",
+	    		pTts->getServerAddr().c_str(), pRequest->toString().c_str());
+	    }
+    }
+    else
+    {
+    	if (timeoutMillis <= 0)
+    	{
+            THROW_MQEXCEPTION(RemotingTooMuchRequestException, "invokeAsyncImpl invoke too fast", -1);
+        }
+        else
+        {
+        	std::string info = RocketMQUtil::str2fmt(
+        		"invokeAsyncImpl wait semaphore timeout, %dms, semaphoreAsyncValue: %d, request: %s",
+                timeoutMillis,
+                m_semaphoreAsync.GetValue(),
+                pRequest->toString().c_str()
+            );
+            RMQ_WARN("%s", info.c_str());
+            THROW_MQEXCEPTION(RemotingTimeoutException, info, -1);
+        }
+    }
+
+    return;
+}
+
+int TcpRemotingClient::invokeOnewayImpl(TcpTransport* pTts,
+                                        RemotingCommand* pRequest,
+                                        int timeoutMillis)
+{
+	pRequest->markOnewayRPC();
+
+	bool acquired = m_semaphoreOneway.Wait(timeoutMillis);
+	if (acquired)
+	{
+		int ret = sendCmd(pTts, pRequest, timeoutMillis);
+		m_semaphoreOneway.Release();
+	    if (ret != 0)
+	    {
+	    	RMQ_WARN("send a pRequest command to channel <%s> failed, requet: %s",
+	    		pTts->getServerAddr().c_str(), pRequest->toString().c_str());
+	    	THROW_MQEXCEPTION(RemotingSendRequestException, std::string("send request to <") + pTts->getServerAddr() + "> fail", -1);
+	    }
+    }
+	else
+	{
+		if (timeoutMillis <= 0)
+		{
+			THROW_MQEXCEPTION(RemotingTooMuchRequestException, "invokeOnewayImpl invoke too fast", -1);
+		}
+		else
+		{
+			std::string info = RocketMQUtil::str2fmt(
+				"invokeOnewayImpl wait semaphore timeout, %dms, semaphoreAsyncValue: %d, request: %s",
+				timeoutMillis,
+				m_semaphoreAsync.GetValue(),
+				pRequest->toString().c_str()
+			);
+			RMQ_WARN("%s", info.c_str());
+			THROW_MQEXCEPTION(RemotingTimeoutException, info, -1);
+		}
+	}
+
+    return 0;
+}
+
+void TcpRemotingClient::processMessageReceived(TcpTransport* pTts, RemotingCommand* pCmd)
+{
+    try
+    {
+        switch (pCmd->getType())
+        {
+            case REQUEST_COMMAND:
+                processRequestCommand(pTts, pCmd);
+                break;
+            case RESPONSE_COMMAND:
+                processResponseCommand(pTts, pCmd);
+                break;
+            default:
+                break;
+        }
+    }
+    catch (std::exception& e)
+    {
+    	RMQ_ERROR("processMessageReceived catch Exception: %s", e.what());
+    }
+    catch (...)
+    {
+    	RMQ_ERROR("processMessageReceived catch Exception");
+    }
+}
+
+void TcpRemotingClient::processRequestCommand(TcpTransport* pTts, RemotingCommand* pCmd)
+{
+	RMQ_DEBUG("receive request from server, cmd: %s", pCmd->toString().c_str());
+	RemotingCommandPtr pResponse = NULL;
+	std::map<int, TcpRequestProcessor*>::iterator it = m_processorTable.find(pCmd->getCode());
+    if (it != m_processorTable.end())
+    {
+    	try
+    	{
+	        pResponse = it->second->processRequest(pTts, pCmd);
+	        if (!pCmd->isOnewayRPC())
+	        {
+	        	if (pResponse.ptr() != NULL)
+	        	{
+	        		pResponse->setOpaque(pCmd->getOpaque());
+	                pResponse->markResponseType();
+	                int ret = this->sendCmd(pTts, pResponse, 3000);
+	                if (ret != 0)
+	                {
+	                	RMQ_ERROR("process request over, but response failed");
+	                }
+	        	}
+	        	else
+	        	{
+                    // ignore
+	        	}
+	        }
+		}
+		catch (const std::exception& e)
+		{
+			RMQ_ERROR("process request exception:%s", e.what());
+			if (!pCmd->isOnewayRPC())
+			{
+				pResponse = RemotingCommand::createResponseCommand(
+					SYSTEM_ERROR_VALUE, e.what(), NULL);
+				pResponse->setOpaque(pCmd->getOpaque());
+				int ret = this->sendCmd(pTts, pResponse, 3000);
+				if (ret != 0)
+                {
+                	RMQ_ERROR("process request over, but response failed");
+                }
+			}
+		}
+    }
+    else
+    {
+    	pResponse = RemotingCommand::createResponseCommand(
+					REQUEST_CODE_NOT_SUPPORTED_VALUE, "request type not supported", NULL);
+		pResponse->setOpaque(pCmd->getOpaque());
+		int ret = this->sendCmd(pTts, pResponse, 3000);
+		if (ret != 0)
+        {
+        	RMQ_ERROR("process request over, but pResponse failed");
+        }
+    }
+	delete pCmd;
+}
+
+void TcpRemotingClient::processResponseCommand(TcpTransport* pTts, RemotingCommand* pCmd)
+{
+    ResponseFuturePtr res = NULL;
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock);
+        std::map<int, ResponseFuturePtr>::iterator it = m_responseTable.find(pCmd->getOpaque());
+        if (it != m_responseTable.end())
+        {
+            res = it->second;
+            res->release();
+            m_responseTable.erase(it);
+        }
+    }
+
+    if (res)
+    {
+        res->putResponse(pCmd);
+        res->executeInvokeCallback();
+    }
+    else
+    {
+        RMQ_WARN("receive response, but not matched any request, cmd: %s", pCmd->toString().c_str());
+        delete pCmd;
+    }
+}
+
+int TcpRemotingClient::sendCmd(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis)
+{
+    pRequest->encode();
+    int ret = pTts->sendData(pRequest->getData(), pRequest->getDataLen(), timeoutMillis);
+
+    RMQ_DEBUG("[NETWORK]: SEND => {%s}, {opaque=%d, request.code=%s(%d), ret=%d, timeout=%d}, %s",
+    	pTts->getServerAddr().c_str(), pRequest->getOpaque(),
+    	getMQRequestCodeString(pRequest->getCode()), pRequest->getCode(),
+    	ret, timeoutMillis, pRequest->toString().c_str());
+
+    return ret;
+}
+
+void TcpRemotingClient::removeTTS(TcpTransport* pTts, bool isDisConnected)
+{
+    if (pTts)
+    {
+    	RMQ_INFO("[NETWORK]: %s  => {%s}", isDisConnected ? "DISCONNECT" : "CLOSE",
+    		pTts->getServerAddr().c_str());
+
+		bool bNeedClear = false;
+        m_epoller.del(pTts->getSocket(), (long long)(pTts->getServerAddr().c_str()), 0);
+        {
+            kpr::ScopedWLock<kpr::RWMutex> lock(m_transportTableLock);
+            std::map<std::string , TcpTransport*>::iterator it = m_transportTable.find(pTts->getServerAddr());
+            if (it != m_transportTable.end())
+            {
+            	if (it->second == pTts)
+            	{
+            		m_transportTable.erase(it);
+            		bNeedClear = true;
+            	}
+            }
+        }
+
+        if (bNeedClear)
+        {
+        	kpr::ScopedLock<kpr::Mutex> lock(m_closeTransportTableLock);
+        	m_closeTransportTable.push_back(pTts);
+        }
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/TcpRemotingClient.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/TcpRemotingClient.h b/rocketmq-client4cpp/src/transport/TcpRemotingClient.h
new file mode 100755
index 0000000..d8bbf96
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/TcpRemotingClient.h
@@ -0,0 +1,152 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __TCPREMOTINGCLIENT_H__
+#define __TCPREMOTINGCLIENT_H__
+
+#include <map>
+#include <string>
+#include <list>
+
+#include "RocketMQClient.h"
+#include "SocketUtil.h"
+#include "Epoller.h"
+#include "RemotingCommand.h"
+#include "Thread.h"
+#include "ThreadPool.h"
+#include "ThreadPoolWork.h"
+#include "RemoteClientConfig.h"
+#include "TcpTransport.h"
+#include "ScopedLock.h"
+#include "KPRUtil.h"
+#include "Semaphore.h"
+#include "ResponseFuture.h"
+
+namespace rmq
+{
+    class TcpTransport;
+    class InvokeCallback;
+    class TcpRemotingClient;
+    class ResponseFuture;
+    class TcpRequestProcessor;
+
+    class ProcessDataWork : public kpr::ThreadPoolWork
+    {
+    public:
+        ProcessDataWork(TcpRemotingClient* pClient, TcpTransport* pTts, std::string* pData);
+        virtual ~ProcessDataWork();
+        virtual void Do();
+
+    private:
+        TcpRemotingClient* m_pClient;
+		TcpTransport* m_pTts;
+        std::string* m_pData;
+    };
+	typedef kpr::RefHandleT<ProcessDataWork> ProcessDataWorkPtr;
+
+    class TcpRemotingClient
+    {
+		class EventThread : public kpr::Thread
+        {
+        public:
+            EventThread(TcpRemotingClient& client)
+                : Thread("NetThread"), m_client(client)
+            {
+            }
+
+            void Run()
+            {
+                m_client.run();
+            }
+
+        private :
+            TcpRemotingClient& m_client;
+        };
+        friend class EventThread;
+        friend class ProcessDataWork;
+
+	public:
+		static const int s_LockTimeoutMillis = 3000;
+		static const int s_CheckIntervalMillis = 1000;
+		static const int s_ClientOnewaySemaphoreValue = 2048;
+		static const int s_ClientAsyncSemaphoreValue = 2048;
+
+    public:
+        TcpRemotingClient(const RemoteClientConfig& config);
+        virtual ~TcpRemotingClient();
+        virtual void start();
+        virtual void shutdown();
+
+        void updateNameServerAddressList(const std::vector<std::string>& addrs);
+        std::vector<std::string> getNameServerAddressList();
+		void registerProcessor(int requestCode, TcpRequestProcessor* pProcessor);
+
+        RemotingCommand* invokeSync(const std::string& addr, RemotingCommand* pRequest, int timeoutMillis) ;
+        void invokeAsync(const std::string& addr, RemotingCommand* pRequest, int timeoutMillis, InvokeCallback* invokeCallback);
+        int invokeOneway(const std::string& addr, RemotingCommand* pRequest, int timeoutMillis);
+
+    private:
+		void run();
+        int  sendCmd(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis);
+        void removeTTS(TcpTransport* pTts, bool isDisConnected = false);
+        void processData(TcpTransport* pTts, std::string* data);
+        void handleTimerEvent();
+		void scanResponseTable();
+		void scanCloseTransportTable();
+
+        void processMessageReceived(TcpTransport* pTts, RemotingCommand* pCmd);
+        void processRequestCommand(TcpTransport* pTts, RemotingCommand* pCmd);
+        void processResponseCommand(TcpTransport* pTts, RemotingCommand* pCmd);
+
+        TcpTransport* getAndCreateTransport(const std::string& addr, int timeoutMillis);
+		TcpTransport* getAndCreateNameserverTransport(int timeoutMillis);
+		TcpTransport* createTransport(const std::string& addr, int timeoutMillis);
+
+        RemotingCommand* invokeSyncImpl(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis) ;
+        void invokeAsyncImpl(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis, InvokeCallback* pInvokeCallback);
+        int invokeOnewayImpl(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis);
+
+    private:
+        bool m_stop;
+        kpr::Epoller m_epoller;
+        RemoteClientConfig m_config;
+
+		kpr::Semaphore m_semaphoreOneway;
+		kpr::Semaphore m_semaphoreAsync;
+
+        std::map<std::string , TcpTransport*> m_transportTable;
+        kpr::RWMutex m_transportTableLock;
+
+		std::list<TcpTransport*> m_closeTransportTable;
+		kpr::Mutex m_closeTransportTableLock;
+
+        std::map<int, ResponseFuturePtr> m_responseTable;
+        kpr::RWMutex m_responseTableLock;
+
+        std::vector<std::string> m_namesrvAddrList;
+		kpr::AtomicInteger m_namesrvIndex;
+		kpr::AtomicReference<std::string> m_namesrvAddrChoosed;
+		kpr::Mutex m_namesrvAddrChoosedLock;
+
+        kpr::ThreadPoolPtr m_pNetThreadPool;
+		kpr::ThreadPtr m_pEventThread;
+
+        TcpRequestProcessor* m_pDefaultRequestProcessor;
+        std::map<int, TcpRequestProcessor*> m_processorTable;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/TcpRequestProcessor.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/TcpRequestProcessor.h b/rocketmq-client4cpp/src/transport/TcpRequestProcessor.h
new file mode 100755
index 0000000..6ac02d1
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/TcpRequestProcessor.h
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TCPREQUESTPROCESSOR_H__
+#define __TCPREQUESTPROCESSOR_H__
+
+namespace rmq
+{
+	class RemotingCommand;
+	class TcpTransport;
+
+	class TcpRequestProcessor
+	{
+	public:
+	    virtual ~TcpRequestProcessor() {}
+	    virtual RemotingCommand* processRequest(TcpTransport* pTts, RemotingCommand* pRequest) = 0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/TcpTransport.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/transport/TcpTransport.cpp b/rocketmq-client4cpp/src/transport/TcpTransport.cpp
new file mode 100755
index 0000000..858adf3
--- /dev/null
+++ b/rocketmq-client4cpp/src/transport/TcpTransport.cpp
@@ -0,0 +1,387 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "TcpTransport.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <errno.h>
+#include <assert.h>
+#include "KPRUtil.h"
+#include "SocketUtil.h"
+#include "Epoller.h"
+#include "ScopedLock.h"
+
+namespace rmq
+{
+
+const int DEFAULT_SHRINK_COUNT = 32;
+const int DEFAULT_RECV_BUFFER_SIZE = 1024 * 16;
+
+TcpTransport::TcpTransport(std::map<std::string, std::string>& config)
+    : m_sfd(-1),
+      m_state(CLIENT_STATE_UNINIT),
+      m_pRecvBuf(NULL),
+      m_recvBufSize(DEFAULT_RECV_BUFFER_SIZE),
+      m_recvBufUsed(0),
+      m_shrinkMax(DEFAULT_RECV_BUFFER_SIZE),
+      m_shrinkCheckCnt(DEFAULT_SHRINK_COUNT)
+{
+    std::map<std::string, std::string>::iterator it = config.find("tcp.transport.recvBufferSize");
+    if (it != config.end())
+    {
+        m_recvBufSize = atoi(it->second.c_str());
+    }
+
+    it = config.find("tcp.transport.shrinkCheckMax");
+    if (it != config.end())
+    {
+        m_shrinkCheckCnt = atoi(it->second.c_str());
+    }
+
+    if (SocketInit() != 0)
+    {
+        m_state = CLIENT_STATE_UNINIT;
+    }
+
+    m_pRecvBuf = (char*)malloc(m_recvBufSize);
+    m_state = (NULL == m_pRecvBuf) ? CLIENT_STATE_UNINIT : CLIENT_STATE_INITED;
+    m_lastSendRecvTime = KPRUtil::GetCurrentTimeMillis();
+}
+
+TcpTransport::~TcpTransport()
+{
+    close();
+
+    if (m_sfd != INVALID_SOCKET)
+    {
+        ::shutdown(m_sfd, SD_BOTH);
+        ::closesocket(m_sfd);
+        m_sfd = INVALID_SOCKET;
+    }
+
+    if (m_pRecvBuf)
+    {
+        free(m_pRecvBuf);
+    }
+
+    SocketUninit();
+}
+
+
+int TcpTransport::connect(const std::string& serverAddr, int timeoutMillis)
+{
+    long long endTime = KPRUtil::GetCurrentTimeMillis() + timeoutMillis;
+    if (m_state == CLIENT_STATE_UNINIT)
+    {
+        return CLIENT_ERROR_INIT;
+    }
+
+    if (isConnected())
+    {
+        if (serverAddr.compare(m_serverAddr) == 0)
+        {
+            return CLIENT_ERROR_SUCCESS;
+        }
+        else
+        {
+            close();
+        }
+    }
+
+    short port;
+    std::string strAddr;
+
+    if (!SplitURL(serverAddr, strAddr, port))
+    {
+        return CLIENT_ERROR_INVALID_URL;
+    }
+
+    struct sockaddr_in sa;
+    sa.sin_family = AF_INET;
+    sa.sin_port = htons(port);
+
+    sa.sin_addr.s_addr = inet_addr(strAddr.c_str());
+    m_sfd = (int)socket(AF_INET, SOCK_STREAM, 0);
+
+    if (MakeSocketNonblocking(m_sfd) == -1)
+    {
+    	::closesocket(m_sfd);
+        return CLIENT_ERROR_CONNECT;
+    }
+
+    if (SetTcpNoDelay(m_sfd) == -1)
+    {
+        ::closesocket(m_sfd);
+        return CLIENT_ERROR_CONNECT;
+    }
+
+    if (::connect(m_sfd, (struct sockaddr*)&sa, sizeof(sockaddr)) == -1)
+    {
+        int err = NET_ERROR;
+        if (err == WSAEWOULDBLOCK || err == WSAEINPROGRESS)
+        {
+            kpr::Epoller epoller(false);
+            epoller.create(1);
+            epoller.add(m_sfd, 0, EPOLLOUT);
+            int iRetCode = epoller.wait(endTime - KPRUtil::GetCurrentTimeMillis());
+            if (iRetCode <= 0)
+            {
+                ::closesocket(m_sfd);
+                return CLIENT_ERROR_CONNECT;
+            }
+            else if (iRetCode == 0)
+            {
+                ::closesocket(m_sfd);
+                return CLIENT_ERROR_CONNECT;
+            }
+
+            const epoll_event& ev = epoller.get(0);
+            if (ev.events & EPOLLERR || ev.events & EPOLLHUP)
+            {
+                ::closesocket(m_sfd);
+                return CLIENT_ERROR_CONNECT;
+            }
+
+            int opterr = 0;
+            socklen_t errlen = sizeof(opterr);
+            if (getsockopt(m_sfd, SOL_SOCKET, SO_ERROR, &opterr, &errlen) == -1 || opterr)
+            {
+                ::closesocket(m_sfd);
+                return CLIENT_ERROR_CONNECT;
+            }
+        }
+        else
+        {
+            ::closesocket(m_sfd);
+            return CLIENT_ERROR_CONNECT;
+        }
+    }
+
+    m_serverAddr = serverAddr;
+    m_state = CLIENT_STATE_CONNECTED;
+    m_recvBufUsed = 0;
+    m_lastSendRecvTime = KPRUtil::GetCurrentTimeMillis();
+
+    return CLIENT_ERROR_SUCCESS;
+}
+
+
+bool TcpTransport::isConnected()
+{
+    return m_state == CLIENT_STATE_CONNECTED;
+}
+
+void TcpTransport::close()
+{
+    if (m_state == CLIENT_STATE_CONNECTED)
+    {
+        m_state = CLIENT_STATE_DISCONNECT;
+    }
+}
+
+int TcpTransport::sendData(const char* pBuffer, int len, int timeOut)
+{
+    kpr::ScopedLock<kpr::Mutex> lock(m_sendLock);
+    return sendOneMsg(pBuffer, len, timeOut > 0 ? timeOut : 0);
+}
+
+int TcpTransport::sendOneMsg(const char* pBuffer, int len, int nTimeOut)
+{
+    int pos = 0;
+    long long endTime = KPRUtil::GetCurrentTimeMillis() + nTimeOut;
+
+    while (len > 0 && m_state == CLIENT_STATE_CONNECTED)
+    {
+        int ret = send(m_sfd, pBuffer + pos, len, 0);
+        if (ret > 0)
+        {
+            len -= ret;
+            pos += ret;
+        }
+        else if (ret == 0)
+        {
+            close();
+            break;
+        }
+        else
+        {
+            int err = NET_ERROR;
+            if (err == WSAEWOULDBLOCK || err == EAGAIN)
+            {
+                kpr::Epoller epoller(false);
+                epoller.create(1);
+                epoller.add(m_sfd, 0, EPOLLOUT);
+                int iRetCode = epoller.wait(endTime - KPRUtil::GetCurrentTimeMillis());
+                if (iRetCode <= 0)
+                {
+                    close();
+                    break;
+                }
+                else if (iRetCode == 0)
+                {
+                    close();
+                    break;
+                }
+
+                const epoll_event& ev = epoller.get(0);
+                if (ev.events & EPOLLERR || ev.events & EPOLLHUP)
+                {
+                    close();
+                    break;
+                }
+            }
+            else
+            {
+                close();
+                break;
+            }
+        }
+    }
+    m_lastSendRecvTime = KPRUtil::GetCurrentTimeMillis();
+
+    return (len == 0) ? 0 : -1;
+}
+
+
+int TcpTransport::recvMsg()
+{
+    int ret = recv(m_sfd, m_pRecvBuf + m_recvBufUsed, m_recvBufSize - m_recvBufUsed, 0);
+
+    if (ret > 0)
+    {
+        m_recvBufUsed += ret;
+    }
+    else if (ret == 0)
+    {
+        close();
+        ret = -1;
+    }
+    else if (ret < 0)
+    {
+        int err = NET_ERROR;
+        if (err == WSAEWOULDBLOCK || err == EAGAIN || err == EINTR)
+        {
+            ret = 0;
+        }
+        else
+        {
+            close();
+        }
+    }
+    m_lastSendRecvTime = KPRUtil::GetCurrentTimeMillis();
+
+    return ret;
+}
+
+bool TcpTransport::resizeBuf(int nNewSize)
+{
+    char* newbuf = (char*)realloc(m_pRecvBuf, nNewSize);
+    if (!newbuf)
+    {
+        return false;
+    }
+
+    m_pRecvBuf = newbuf;
+    m_recvBufSize = nNewSize;
+
+    return true;
+}
+
+void TcpTransport::tryShrink(int MsgLen)
+{
+    m_shrinkMax = MsgLen > m_shrinkMax ? MsgLen : m_shrinkMax;
+    if (m_shrinkCheckCnt == 0)
+    {
+        m_shrinkCheckCnt = DEFAULT_SHRINK_COUNT;
+        if (m_recvBufSize > m_shrinkMax)
+        {
+            resizeBuf(m_shrinkMax);
+        }
+    }
+    else
+    {
+        m_shrinkCheckCnt--;
+    }
+}
+
+int TcpTransport::getMsgSize(const char* pBuf)
+{
+    int len = 0;
+    memcpy(&len, pBuf, sizeof(int));
+
+    return ntohl(len) + 4;
+}
+
+int TcpTransport::recvData(std::list<std::string*>& dataList)
+{
+    int ret = recvMsg();
+    processData(dataList);
+    return ret;
+}
+
+void TcpTransport::processData(std::list<std::string*>& dataList)
+{
+    while (m_recvBufUsed > int(sizeof(int)))
+    {
+        int msgLen = 0;
+        msgLen = getMsgSize(m_pRecvBuf);
+        if (msgLen > m_recvBufSize)
+        {
+            if (resizeBuf(msgLen))
+            {
+                m_shrinkCheckCnt = DEFAULT_SHRINK_COUNT;
+            }
+            break;
+        }
+        else
+        {
+            tryShrink(msgLen);
+        }
+
+        if (m_recvBufUsed >= msgLen)
+        {
+            std::string* data = new std::string;
+            data->assign(m_pRecvBuf, msgLen);
+            dataList.push_back(data);
+            m_recvBufUsed -= msgLen;
+
+            memmove(m_pRecvBuf, m_pRecvBuf + msgLen, m_recvBufUsed);
+        }
+        else
+        {
+            break;
+        }
+    }
+}
+
+SOCKET TcpTransport::getSocket()
+{
+    return m_sfd;
+}
+
+std::string& TcpTransport::getServerAddr()
+{
+    return m_serverAddr;
+}
+
+unsigned long long TcpTransport::getLastSendRecvTime()
+{
+	return m_lastSendRecvTime;
+}
+
+
+}



Mime
View raw message