rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [13/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/PermName.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PermName.cpp b/rocketmq-client4cpp/src/common/PermName.cpp
new file mode 100644
index 0000000..084de79
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/PermName.cpp
@@ -0,0 +1,63 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "PermName.h"
+
+namespace rmq
+{
+
+int PermName::PERM_PRIORITY = 0x1 << 3;
+int PermName::PERM_READ = 0x1 << 2;
+int PermName::PERM_WRITE = 0x1 << 1;
+int PermName::PERM_INHERIT = 0x1 << 0;
+
+bool PermName::isReadable(int perm)
+{
+    return (perm & PERM_READ) == PERM_READ;
+}
+
+bool PermName::isWriteable(int perm)
+{
+    return (perm & PERM_WRITE) == PERM_WRITE;
+}
+
+bool PermName::isInherited(int perm)
+{
+    return (perm & PERM_INHERIT) == PERM_INHERIT;
+}
+
+std::string PermName::perm2String(int perm)
+{
+    std::string pm("---");
+    if (isReadable(perm))
+    {
+        pm.replace(0, 1, "R");
+    }
+
+    if (isWriteable(perm))
+    {
+        pm.replace(1, 2, "W");
+    }
+
+    if (isInherited(perm))
+    {
+        pm.replace(2, 3, "X");
+    }
+
+    return pm;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/PermName.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PermName.h b/rocketmq-client4cpp/src/common/PermName.h
new file mode 100644
index 0000000..364ddeb
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/PermName.h
@@ -0,0 +1,39 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PERMNAME_H__
+#define __PERMNAME_H__
+
+#include <string>
+
+namespace rmq
+{
+	class PermName
+	{
+	public:
+	    static int PERM_PRIORITY;
+	    static int PERM_READ;
+	    static int PERM_WRITE;
+	    static int PERM_INHERIT;
+
+	    static bool isReadable(int perm);
+	    static bool isWriteable(int perm);
+	    static bool isInherited(int perm);
+	    static std::string perm2String(int perm);
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/PullSysFlag.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PullSysFlag.cpp b/rocketmq-client4cpp/src/common/PullSysFlag.cpp
new file mode 100644
index 0000000..f6fc1c2
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/PullSysFlag.cpp
@@ -0,0 +1,68 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "PullSysFlag.h"
+
+namespace rmq
+{
+
+int PullSysFlag::FLAG_COMMIT_OFFSET = 0x1 << 0;
+int PullSysFlag::FLAG_SUSPEND = 0x1 << 1;
+int PullSysFlag::FLAG_SUBSCRIPTION = 0x1 << 2;
+
+int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend, bool subscription)
+{
+    int flag = 0;
+
+    if (commitOffset)
+    {
+        flag |= FLAG_COMMIT_OFFSET;
+    }
+
+    if (suspend)
+    {
+        flag |= FLAG_SUSPEND;
+    }
+
+    if (subscription)
+    {
+        flag |= FLAG_SUBSCRIPTION;
+    }
+
+    return flag;
+}
+
+int PullSysFlag::clearCommitOffsetFlag(int sysFlag)
+{
+    return sysFlag & (~FLAG_COMMIT_OFFSET);
+}
+
+bool PullSysFlag::hasCommitOffsetFlag(int sysFlag)
+{
+    return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
+}
+
+bool PullSysFlag::hasSuspendFlag(int sysFlag)
+{
+    return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
+}
+
+bool PullSysFlag::hasSubscriptionFlag(int sysFlag)
+{
+    return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/PullSysFlag.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PullSysFlag.h b/rocketmq-client4cpp/src/common/PullSysFlag.h
new file mode 100755
index 0000000..c19eac3
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/PullSysFlag.h
@@ -0,0 +1,38 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PULLSYSFLAG_H__
+#define __PULLSYSFLAG_H__
+
+namespace rmq
+{
+	class PullSysFlag
+	{
+	public:
+	    static int buildSysFlag(bool commitOffset, bool suspend, bool subscription);
+	    static int clearCommitOffsetFlag(int sysFlag);
+	    static bool hasCommitOffsetFlag(int sysFlag);
+	    static bool hasSuspendFlag(int sysFlag);
+	    static bool hasSubscriptionFlag(int sysFlag);
+
+	private:
+	    static int FLAG_COMMIT_OFFSET;
+	    static int FLAG_SUSPEND;
+	    static int FLAG_SUBSCRIPTION;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/SendResult.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/SendResult.cpp b/rocketmq-client4cpp/src/common/SendResult.cpp
new file mode 100755
index 0000000..5263d29
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/SendResult.cpp
@@ -0,0 +1,132 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "SendResult.h"
+#include "UtilAll.h"
+#include "VirtualEnvUtil.h"
+
+namespace rmq
+{
+
+SendResult::SendResult()
+    : m_sendStatus(SEND_OK),m_queueOffset(0)
+{
+}
+
+SendResult::SendResult(const SendStatus& sendStatus,
+                       const std::string&  msgId,
+                       MessageQueue& messageQueue,
+                       long long queueOffset,
+                       std::string&  projectGroupPrefix)
+    : m_sendStatus(sendStatus),
+      m_msgId(msgId),
+      m_messageQueue(messageQueue),
+      m_queueOffset(queueOffset)
+{
+    if (!UtilAll::isBlank(projectGroupPrefix))
+    {
+        m_messageQueue.setTopic(VirtualEnvUtil::clearProjectGroup(m_messageQueue.getTopic(),
+                                projectGroupPrefix));
+    }
+}
+
+const std::string&  SendResult::getMsgId()
+{
+    return m_msgId;
+}
+
+void SendResult::setMsgId(const std::string&  msgId)
+{
+    m_msgId = msgId;
+}
+
+SendStatus SendResult::getSendStatus()
+{
+    return m_sendStatus;
+}
+
+void SendResult::setSendStatus(const SendStatus& sendStatus)
+{
+    m_sendStatus = sendStatus;
+}
+
+MessageQueue& SendResult::getMessageQueue()
+{
+    return m_messageQueue;
+}
+
+void SendResult::setMessageQueue(MessageQueue& messageQueue)
+{
+    m_messageQueue = messageQueue;
+}
+
+long long SendResult::getQueueOffset()
+{
+    return m_queueOffset;
+}
+
+void SendResult::setQueueOffset(long long queueOffset)
+{
+    m_queueOffset = queueOffset;
+}
+
+
+bool SendResult::hasResult()
+{
+	return !m_msgId.empty();
+}
+
+
+
+std::string SendResult::toString() const
+{
+    std::stringstream ss;
+    ss << "{sendStatus=" << m_sendStatus
+       << ",msgId=" << m_msgId
+       << ",messageQueue=" << m_messageQueue.toString()
+       << ",queueOffset=" << m_queueOffset
+       << "}";
+    return ss.str();
+}
+
+
+std::string SendResult::toJsonString() const
+{
+    std::stringstream ss;
+    ss << "{\"sendStatus\":\"" << m_sendStatus
+       << "\",\"msgId\":\"" << m_msgId
+       << "\",\"messageQueue\":" << m_messageQueue.toJsonString()
+       << ",\"queueOffset\":\"" << m_queueOffset
+       << "}";
+    return ss.str();
+}
+
+
+
+TransactionSendResult::TransactionSendResult()
+{
+}
+
+LocalTransactionState TransactionSendResult::getLocalTransactionState()
+{
+    return m_localTransactionState;
+}
+
+void TransactionSendResult::setLocalTransactionState(LocalTransactionState localTransactionState)
+{
+    m_localTransactionState = localTransactionState;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/ServiceState.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceState.h b/rocketmq-client4cpp/src/common/ServiceState.h
new file mode 100755
index 0000000..7b41add
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/ServiceState.h
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SERVICESTATE_H__
+#define __SERVICESTATE_H__
+
+namespace rmq
+{
+    enum ServiceState
+    {
+        CREATE_JUST,
+        RUNNING,
+        SHUTDOWN_ALREADY,
+        START_FAILED
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/ServiceThread.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceThread.cpp b/rocketmq-client4cpp/src/common/ServiceThread.cpp
new file mode 100644
index 0000000..1abff9f
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/ServiceThread.cpp
@@ -0,0 +1,73 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "ServiceThread.h"
+#include "Monitor.h"
+#include "ScopedLock.h"
+
+namespace rmq
+{
+
+ServiceThread::ServiceThread(const char* name)
+    : kpr::Thread(name),
+      m_notified(false),
+      m_stoped(false)
+{
+
+}
+
+ServiceThread::~ServiceThread()
+{
+
+}
+
+void ServiceThread::stop()
+{
+    m_stoped = true;
+    wakeup();
+}
+
+void ServiceThread::wakeup()
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+
+    if (!m_notified)
+    {
+        m_notified = true;
+        Notify();
+    }
+}
+
+void ServiceThread::waitForRunning(long interval)
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    if (m_notified)
+    {
+        m_notified = false;
+        return;
+    }
+
+    try
+    {
+        Wait(interval);
+    }
+    catch (...)
+    {
+        m_notified = false;
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/ServiceThread.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceThread.h b/rocketmq-client4cpp/src/common/ServiceThread.h
new file mode 100755
index 0000000..d7ec3ef
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/ServiceThread.h
@@ -0,0 +1,50 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __SERVICETHREAD_H__
+#define __SERVICETHREAD_H__
+
+#include <string>
+#include "Thread.h"
+#include "Monitor.h"
+
+namespace rmq
+{
+	const long JoinTime = 90 * 1000;
+
+	/**
+	* service thread base class
+	*
+	*/
+	class ServiceThread : public kpr::Thread, public kpr::Monitor
+	{
+	public:
+	    ServiceThread(const char* name = NULL);
+	    virtual ~ServiceThread();
+
+	    virtual std::string  getServiceName() = 0;
+
+	    void stop();
+	    void wakeup();
+	    void waitForRunning(long interval);
+
+	protected:
+	    volatile bool m_notified;
+	    volatile bool m_stoped;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h b/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
new file mode 100755
index 0000000..12bd48a
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
@@ -0,0 +1,50 @@
+/**
+* Copyright (C) 2013 kangliqiang, kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __SUBSCRIPTIONGROUPCONFIG_H__
+#define __SUBSCRIPTIONGROUPCONFIG_H__
+
+#include <string>
+#include "MixAll.h"
+
+namespace rmq
+{
+    class SubscriptionGroupConfig
+    {
+    public:
+        SubscriptionGroupConfig(const std::string& groupName)
+        {
+            this->groupName = groupName;
+            consumeEnable = true;
+            consumeFromMinEnable = true;
+            consumeBroadcastEnable = true;
+            retryQueueNums = 1;
+            retryMaxTimes = 5;
+            brokerId = MixAll::MASTER_ID;
+            whichBrokerWhenConsumeSlowly = 1;
+        }
+
+        std::string groupName;
+        bool consumeEnable;
+        bool consumeFromMinEnable;
+        bool consumeBroadcastEnable;
+        int retryQueueNums;
+        int retryMaxTimes;
+        long brokerId;
+        long whichBrokerWhenConsumeSlowly;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/TopAddressing.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopAddressing.h b/rocketmq-client4cpp/src/common/TopAddressing.h
new file mode 100755
index 0000000..07b0c0c
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/TopAddressing.h
@@ -0,0 +1,54 @@
+/**
+* Copyright (C) 2013 kangliqiang, kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __TOPADDRESSING_H__
+#define  __TOPADDRESSING_H__
+
+#include <string>
+#include <sstream>
+#include "SocketUtil.h"
+
+namespace rmq
+{
+    class TopAddressing
+    {
+    public:
+        TopAddressing()
+			: m_nsAddr("")
+        {
+        }
+
+		const std::string& getNsAddr()
+        {
+            return m_nsAddr;
+        }
+
+		void setNsAddr(std::string& nsAddr)
+        {
+           	m_nsAddr = nsAddr;
+        }
+
+        std::string fetchNSAddr()
+        {
+
+            return "";
+        }
+
+    private:
+		std::string m_nsAddr;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/TopicConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicConfig.cpp b/rocketmq-client4cpp/src/common/TopicConfig.cpp
new file mode 100644
index 0000000..036b41c
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/TopicConfig.cpp
@@ -0,0 +1,167 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdlib.h>
+#include <sstream>
+
+#include "TopicConfig.h"
+#include "PermName.h"
+
+namespace rmq
+{
+
+int TopicConfig::DefaultReadQueueNums = 16;
+int TopicConfig::DefaultWriteQueueNums = 16;
+std::string TopicConfig::SEPARATOR = " ";
+
+TopicConfig::TopicConfig()
+    : m_topicName(""),
+      m_readQueueNums(DefaultReadQueueNums),
+      m_writeQueueNums(DefaultWriteQueueNums),
+      m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
+      m_topicFilterType(SINGLE_TAG),
+      m_topicSysFlag(0),
+      m_order(false)
+{
+
+}
+
+TopicConfig::TopicConfig(const std::string& topicName)
+    : m_topicName(topicName),
+      m_readQueueNums(DefaultReadQueueNums),
+      m_writeQueueNums(DefaultWriteQueueNums),
+      m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
+      m_topicFilterType(SINGLE_TAG),
+      m_topicSysFlag(0),
+      m_order(false)
+{
+
+}
+
+TopicConfig::TopicConfig(const std::string& topicName, int readQueueNums, int writeQueueNums, int perm)
+    : m_topicName(topicName),
+      m_readQueueNums(readQueueNums),
+      m_writeQueueNums(writeQueueNums),
+      m_perm(perm),
+      m_topicFilterType(SINGLE_TAG),
+      m_topicSysFlag(0),
+      m_order(false)
+{
+}
+
+TopicConfig::~TopicConfig()
+{
+}
+
+std::string  TopicConfig::encode()
+{
+    std::stringstream ss;
+    ss << m_topicName << SEPARATOR
+       << m_readQueueNums << SEPARATOR
+       << m_writeQueueNums << SEPARATOR
+       << m_perm << SEPARATOR
+       << m_topicFilterType;
+
+    return ss.str();
+}
+
+bool  TopicConfig::decode(const std::string& in)
+{
+    std::stringstream ss(in);
+
+    ss >> m_topicName;
+    ss >> m_readQueueNums;
+    ss >> m_writeQueueNums;
+    ss >> m_perm;
+
+    int type;
+    ss >> type;
+    m_topicFilterType = (TopicFilterType)type;
+
+    return true;
+}
+
+const std::string&  TopicConfig::getTopicName()
+{
+    return m_topicName;
+}
+
+void  TopicConfig::setTopicName(const std::string& topicName)
+{
+    m_topicName = topicName;
+}
+
+int  TopicConfig::getReadQueueNums()
+{
+    return m_readQueueNums;
+}
+
+void  TopicConfig::setReadQueueNums(int readQueueNums)
+{
+    m_readQueueNums = readQueueNums;
+}
+
+int  TopicConfig::getWriteQueueNums()
+{
+    return m_writeQueueNums;
+}
+
+void  TopicConfig::setWriteQueueNums(int writeQueueNums)
+{
+    m_writeQueueNums = writeQueueNums;
+}
+
+int  TopicConfig::getPerm()
+{
+    return m_perm;
+}
+
+void  TopicConfig::setPerm(int perm)
+{
+    m_perm = perm;
+}
+
+TopicFilterType  TopicConfig::getTopicFilterType()
+{
+    return m_topicFilterType;
+}
+
+void  TopicConfig::setTopicFilterType(TopicFilterType topicFilterType)
+{
+    m_topicFilterType = topicFilterType;
+}
+
+int  TopicConfig::getTopicSysFlag()
+{
+    return m_topicSysFlag;
+}
+
+void  TopicConfig::setTopicSysFlag(int perm)
+{
+    m_topicSysFlag = perm;
+}
+
+bool  TopicConfig::isOrder()
+{
+    return m_order;
+}
+
+void  TopicConfig::setOrder(bool order)
+{
+    m_order = order;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/TopicConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicConfig.h b/rocketmq-client4cpp/src/common/TopicConfig.h
new file mode 100644
index 0000000..b9f2bcb
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/TopicConfig.h
@@ -0,0 +1,71 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __TOPICCONFIG_H__
+#define __TOPICCONFIG_H__
+
+#include <string>
+#include "TopicFilterType.h"
+
+namespace rmq
+    {
+    /**
+    * Topic
+    *
+    */
+    class TopicConfig
+    {
+    public:
+        TopicConfig();
+        TopicConfig(const std::string& topicName);
+        TopicConfig(const std::string& topicName, int readQueueNums, int writeQueueNums, int perm);
+        ~TopicConfig();
+
+        std::string encode();
+        bool decode(const std::string& in);
+        const std::string& getTopicName();
+        void setTopicName(const std::string& topicName);
+        int getReadQueueNums();
+        void setReadQueueNums(int readQueueNums);
+        int getWriteQueueNums();
+        void setWriteQueueNums(int writeQueueNums);
+        int getPerm();
+        void setPerm(int perm);
+        TopicFilterType getTopicFilterType();
+        void setTopicFilterType(TopicFilterType topicFilterType);
+		int getTopicSysFlag();
+        void setTopicSysFlag(int topicSysFlag);
+		bool isOrder();
+        void setOrder(bool order);
+
+    public:
+        static int DefaultReadQueueNums;
+        static int DefaultWriteQueueNums;
+
+    private:
+        static std::string SEPARATOR;
+
+        std::string m_topicName;
+        int m_readQueueNums;
+        int m_writeQueueNums;
+        int m_perm;
+        TopicFilterType m_topicFilterType;
+		int m_topicSysFlag;
+		bool m_order;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/TopicStatsTable.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicStatsTable.h b/rocketmq-client4cpp/src/common/TopicStatsTable.h
new file mode 100755
index 0000000..4319e54
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/TopicStatsTable.h
@@ -0,0 +1,51 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __TOPICSTATSTABLE_H__
+#define __TOPICSTATSTABLE_H__
+
+#include <map>
+
+namespace rmq
+{
+    class MessageQueue;
+
+    typedef struct
+    {
+        long long minOffset;
+        long long maxOffset;
+        long long lastUpdateTimestamp;
+    } TopicOffset;
+
+    class TopicStatsTable
+    {
+    public:
+        std::map<MessageQueue*, TopicOffset> getOffsetTable()
+        {
+            return m_offsetTable;
+        }
+
+        void setOffsetTable(const std::map<MessageQueue*, TopicOffset>& offsetTable)
+        {
+            m_offsetTable = offsetTable;
+        }
+
+    private:
+        std::map<MessageQueue*, TopicOffset> m_offsetTable;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/UtilAll.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/UtilAll.h b/rocketmq-client4cpp/src/common/UtilAll.h
new file mode 100755
index 0000000..b239edb
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/UtilAll.h
@@ -0,0 +1,608 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __UTILALL_H__
+#define __UTILALL_H__
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <time.h>
+
+#include <string>
+#include <sstream>
+#include <vector>
+#include <list>
+#include <set>
+#include <map>
+
+#include "RocketMQClient.h"
+#include "zlib.h"
+#include "json/value.h"
+#include "json/writer.h"
+
+namespace rmq
+{
+    const std::string WHITESPACE = " \t\r\n";
+    const int CHUNK = 8192;
+	const std::string yyyy_MM_dd_HH_mm_ss = "yyyy-MM-dd HH:mm:ss";
+    const std::string yyyy_MM_dd_HH_mm_ss_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
+    const std::string yyyyMMddHHmmss = "yyyyMMddHHmmss";
+
+    class UtilAll
+    {
+    public:
+        static pid_t getPid()
+        {
+            static __thread pid_t pid = 0;
+            if (!pid || pid != getpid())
+            {
+                pid = getpid();
+            }
+            return pid;
+        }
+
+        static pid_t getTid()
+        {
+            static __thread pid_t pid = 0;
+            static __thread pid_t tid = 0;
+            if (!pid || !tid || pid != getpid())
+            {
+                pid = getpid();
+                tid = syscall(__NR_gettid);
+            }
+            return tid;
+        }
+
+        static int Split(std::vector<std::string>& out, const std::string& in, const std::string& delimiter)
+        {
+            std::string::size_type left = 0;
+            for (size_t i = 1; i < in.size(); i++)
+            {
+                std::string::size_type right = in.find(delimiter, left);
+
+                if (right == std::string::npos)
+                {
+                    break;
+                }
+
+                out.push_back(in.substr(left, right - left));
+
+                left = right + delimiter.length();
+            }
+
+            out.push_back(in.substr(left));
+
+            return out.size();
+        }
+
+        static int Split(std::vector<std::string>& out, const std::string& in, const char delimiter)
+        {
+            std::string::size_type left = 0;
+            for (size_t i = 1; i < in.size(); i++)
+            {
+                std::string::size_type right = in.find(delimiter, left);
+
+                if (right == std::string::npos)
+                {
+                    break;
+                }
+
+                out.push_back(in.substr(left, right - left));
+
+                left = right + 1;
+            }
+
+            out.push_back(in.substr(left));
+
+            return out.size();
+        }
+
+        static std::string Trim(const std::string& str)
+        {
+            if (str.empty())
+            {
+                return str;
+            }
+
+            std::string::size_type left = str.find_first_not_of(WHITESPACE);
+
+            if (left == std::string::npos)
+            {
+                return "";
+            }
+
+            std::string::size_type right = str.find_last_not_of(WHITESPACE);
+
+            if (right == std::string::npos)
+            {
+                return str.substr(left);
+            }
+
+            return str.substr(left, right + 1 - left);
+        }
+
+        static bool isBlank(const std::string& str)
+        {
+            if (str.empty())
+            {
+                return true;
+            }
+
+            std::string::size_type left = str.find_first_not_of(WHITESPACE);
+
+            if (left == std::string::npos)
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+        static int availableProcessors()
+        {
+            return 4;
+        }
+
+
+        static int hashCode(const char* pData, int len)
+        {
+            int h = 0;
+            if (pData != NULL && len > 0)
+            {
+                unsigned char c;
+                for (int i = 0; i < len; i++)
+                {
+                    c = (unsigned char)pData[i];
+                    h = 31 * h + c;
+                }
+            }
+
+            return h;
+        }
+
+        static int hashCode(const std::string& s)
+        {
+            return hashCode(s.c_str(), s.length());
+        }
+
+        static int hashCode(const char* pData)
+        {
+            return hashCode(std::string(pData));
+        }
+
+        static int hashCode(char x)
+        {
+            return x;
+        }
+
+        static int hashCode(unsigned char x)
+        {
+            return x;
+        }
+
+        static int hashCode(short x)
+        {
+            return x;
+        }
+
+        static int hashCode(unsigned short x)
+        {
+            return x;
+        }
+
+        static int hashCode(int x)
+        {
+            return x;
+        }
+
+        static int hashCode(unsigned int x)
+        {
+            return x;
+        }
+
+        static int hashCode(long x)
+        {
+            return x;
+        }
+
+        static int hashCode(unsigned long x)
+        {
+            return x;
+        }
+
+        template <typename T>
+        static int hashCode(const std::vector<T>& v)
+        {
+            int h = 0;
+            typeof(v.begin()) it = v.begin();
+            while (it != v.end())
+            {
+                h += hashCode(*it);
+                ++it;
+            }
+            return h;
+        }
+
+        template <typename T>
+        static int hashCode(const std::set<T>& v)
+        {
+            int h = 0;
+            typeof(v.begin()) it = v.begin();
+            while (it != v.end())
+            {
+                h += hashCode(*it);
+                ++it;
+            }
+            return h;
+        }
+
+		static std::string toString(Json::Value& json)
+		{
+			Json::FastWriter fastWriter;
+			return fastWriter.write(json);
+		}
+
+        template<typename T>
+        static std::string toString(const T& v)
+        {
+            std::ostringstream ss;
+            ss << v;
+            return ss.str();
+        }
+
+        template<typename T>
+        static std::string toString(const std::vector<T>& v)
+        {
+            std::string s;
+            s.append("[");
+            typeof(v.begin()) it = v.begin();
+            while (it != v.end())
+            {
+                s.append(toString(*it));
+                s.append(",");
+                ++it;
+            }
+			if (s.size() > 1)
+			{
+				s.erase(s.size() - 1, 1);
+			}
+            s.append("]");
+            return s;
+        }
+
+
+        template <typename T>
+        static std::string toString(const std::list<T>& v)
+        {
+            std::string s;
+            s.append("[");
+            typeof(v.begin()) it = v.begin();
+            while (it != v.end())
+            {
+                s.append(toString(*it));
+                s.append(",");
+                ++it;
+            }
+			if (s.size() > 1)
+			{
+				s.erase(s.size() - 1, 1);
+			}
+            s.append("]");
+
+            return s;
+        }
+
+        template <typename T>
+        static std::string toString(const std::set<T>& v)
+        {
+            std::string s;
+            s.append("[");
+            typeof(v.begin()) it = v.begin();
+            while (it != v.end())
+            {
+                s.append(toString(*it));
+                s.append(",");
+                ++it;
+            }
+			if (s.size() > 1)
+			{
+				s.erase(s.size() - 1, 1);
+			}
+            s.append("]");
+            return s;
+        }
+
+        template<typename K, typename V, typename D, typename A>
+        static std::string toString(const std::map<K, V, D, A>& v)
+        {
+            std::string s;
+            s.append("{");
+            typeof(v.begin()) it = v.begin();
+            while (it != v.end())
+            {
+                s.append(toString(it->first));
+                s.append("=");
+                s.append(toString(it->second));
+                s.append(",");
+                ++it;
+            }
+			if (s.size() > 1)
+			{
+				s.erase(s.size() - 1, 1);
+			}
+            s.append("}");
+            return s;
+        }
+
+        template<typename out_type, typename in_type>
+        static out_type convert(const in_type& t)
+        {
+            out_type result;
+            std::stringstream stream;
+            stream << t;
+            stream >> result;
+            return result;
+        }
+
+        static bool compress(const char* pIn, int inLen, unsigned char** pOut, int* pOutLen, int level)
+        {
+            int ret, flush;
+            int have;
+            z_stream strm;
+            unsigned char out[CHUNK];
+
+            /* allocate deflate state */
+            strm.zalloc = Z_NULL;
+            strm.zfree = Z_NULL;
+            strm.opaque = Z_NULL;
+            ret = deflateInit(&strm, level);
+            if (ret != Z_OK)
+            {
+                return false;
+            }
+
+            int outBufferLen = inLen;
+            unsigned char* outData = (unsigned char*)malloc(outBufferLen);
+            int left = inLen;
+            int used = 0;
+            int outDataLen = 0;
+
+            /* compress until end of buffer */
+            do
+            {
+                strm.avail_in = left > CHUNK ? CHUNK : left;
+                flush = left <= CHUNK ? Z_FINISH : Z_NO_FLUSH;
+                strm.next_in = (unsigned char*)pIn + used;
+                used += strm.avail_in;
+                left -= strm.avail_in;
+
+                /* run deflate() on input until output buffer not full, finish
+                compression if all of source has been read in */
+                do
+                {
+                    strm.avail_out = CHUNK;
+                    strm.next_out = out;
+                    ret = deflate(&strm, flush);    /* no bad return value */
+                    assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
+                    have = CHUNK - strm.avail_out;
+
+                    if (outDataLen + have > outBufferLen)
+                    {
+                        outBufferLen = outDataLen + have;
+                        outBufferLen <<= 1;
+                        unsigned char* tmp = (unsigned char*)realloc(outData, outBufferLen);
+                        if (!tmp)
+                        {
+                            free(outData);
+                            return false;
+                        }
+
+                        outData = tmp;
+                    }
+
+                    memcpy(outData + outDataLen, out, have);
+                    outDataLen += have;
+
+                }
+                while (strm.avail_out == 0);
+                assert(strm.avail_in == 0);     /* all input will be used */
+
+                /* done when last data in file processed */
+            }
+            while (flush != Z_FINISH);
+            assert(ret == Z_STREAM_END);        /* stream will be complete */
+
+            *pOutLen = outDataLen;
+            *pOut = outData;
+
+            /* clean up and return */
+            (void)deflateEnd(&strm);
+            return true;
+        }
+
+        static bool decompress(const char* pIn, int inLen, unsigned char** pOut, int* pOutLen)
+        {
+            int ret;
+            int have;
+            z_stream strm;
+
+            unsigned char out[CHUNK];
+
+            /* allocate inflate state */
+            strm.zalloc = Z_NULL;
+            strm.zfree = Z_NULL;
+            strm.opaque = Z_NULL;
+            strm.avail_in = 0;
+            strm.next_in = Z_NULL;
+            ret = inflateInit(&strm);
+            if (ret != Z_OK)
+            {
+                return false;
+            }
+
+            int outBufferLen = inLen << 2;
+            unsigned char* outData = (unsigned char*)malloc(outBufferLen);
+
+            int left = inLen;
+            int used = 0;
+            int outDataLen = 0;
+
+            /* decompress until deflate stream ends or end of buffer */
+            do
+            {
+                strm.avail_in = left > CHUNK ? CHUNK : left;
+                if (strm.avail_in <= 0)
+                {
+                    break;
+                }
+
+                strm.next_in = (unsigned char*)pIn + used;
+                used += strm.avail_in;
+                left -= strm.avail_in;
+
+                /* run inflate() on input until output buffer not full */
+                do
+                {
+                    strm.avail_out = CHUNK;
+                    strm.next_out = out;
+                    ret = inflate(&strm, Z_NO_FLUSH);
+                    assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
+                    switch (ret)
+                    {
+                        case Z_NEED_DICT:
+                            ret = Z_DATA_ERROR;     /* and fall through */
+                        case Z_DATA_ERROR:
+                        case Z_MEM_ERROR:
+                            (void)inflateEnd(&strm);
+                            free(outData);
+                            return false;
+                    }
+                    have = CHUNK - strm.avail_out;
+
+                    if (outDataLen + have > outBufferLen)
+                    {
+                        outBufferLen = outDataLen + have;
+                        outBufferLen <<= 1;
+                        unsigned char* tmp = (unsigned char*)realloc(outData, outBufferLen);
+                        if (!tmp)
+                        {
+                            free(outData);
+                            return false;
+                        }
+
+                        outData = tmp;
+                    }
+
+                    memcpy(outData + outDataLen, out, have);
+                    outDataLen += have;
+
+                }
+                while (strm.avail_out == 0);
+
+                /* done when inflate() says it's done */
+            }
+            while (ret != Z_STREAM_END);
+
+            /* clean up and return */
+            (void)inflateEnd(&strm);
+
+            if (ret == Z_STREAM_END)
+            {
+                *pOutLen = outDataLen;
+                *pOut = outData;
+
+                return true;
+            }
+            else
+            {
+                free(outData);
+
+                return false;
+            }
+        }
+
+        static unsigned long long hexstr2ull(const char* str)
+        {
+            char* end;
+            return strtoull(str, &end, 16);
+        }
+
+		static long long str2ll(const char *str)
+		{
+			return atoll(str);
+		}
+
+
+		static std::string tm2str(const time_t& t, const std::string& sFormat)
+		{
+			struct tm stTm;
+			localtime_r(&t, &stTm);
+
+			char sTimeString[255] = "\0";
+			strftime(sTimeString, sizeof(sTimeString), sFormat.c_str(), &stTm);
+
+			return std::string(sTimeString);
+		}
+
+		static std::string now2str(const std::string& sFormat)
+		{
+			time_t t = time(NULL);
+			return tm2str(t, sFormat.c_str());
+		}
+
+		static std::string now2str()
+		{
+			return now2str("%Y-%m-%d %H:%M:%S");
+		}
+
+		static int64_t now2ms()
+		{
+			struct timeval tv;
+			gettimeofday(&tv, NULL);
+			return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
+		}
+
+		static int64_t now2us()
+		{
+			struct timeval tv;
+			gettimeofday(&tv, NULL);
+			return tv.tv_sec * (int64_t)1000*1000 + tv.tv_usec;
+		}
+
+		static int str2tm(const std::string &sString, const std::string &sFormat, struct tm &stTm)
+		{
+			char *p = strptime(sString.c_str(), sFormat.c_str(), &stTm);
+    		return (p != NULL) ? 0 : -1;
+		}
+
+		static time_t str2tm(const std::string &sString, const std::string &sFormat)
+		{
+			struct tm stTm;
+			if (str2tm(sString, sFormat, stTm) == 0)
+			{
+				time_t t = mktime(&stTm);
+				return t;
+			}
+			else
+			{
+				return -1;
+			}
+		}
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/Validators.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/Validators.cpp b/rocketmq-client4cpp/src/common/Validators.cpp
new file mode 100755
index 0000000..29f36a0
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/Validators.cpp
@@ -0,0 +1,132 @@
+/**
+* Copyright (C) 2013 kangliqiang, kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "Validators.h"
+
+#include <stdlib.h>
+#include <stdio.h>
+#include "MQClientException.h"
+#include "UtilAll.h"
+#include "MixAll.h"
+#include "Message.h"
+#include "MQProtos.h"
+#include "DefaultMQProducer.h"
+
+namespace rmq
+{
+
+const std::string Validators::validPatternStr = "^[a-zA-Z0-9_-]+$";
+const size_t Validators::CHARACTER_MAX_LENGTH = 255;
+
+bool Validators::regularExpressionMatcher(const std::string& origin, const std::string& patternStr)
+{
+    if (UtilAll::isBlank(origin))
+    {
+        return false;
+    }
+
+    if (UtilAll::isBlank(patternStr))
+    {
+        return true;
+    }
+
+    //Pattern pattern = Pattern.compile(patternStr);
+    //Matcher matcher = pattern.matcher(origin);
+
+    //return matcher.matches();
+    return true;
+}
+
+std::string Validators::getGroupWithRegularExpression(const std::string& origin, const std::string& patternStr)
+{
+    /*Pattern pattern = Pattern.compile(patternStr);
+    Matcher matcher = pattern.matcher(origin);
+    while (matcher.find()) {
+    return matcher.group(0);
+    }*/
+    return "";
+}
+
+void Validators::checkTopic(const std::string& topic)
+{
+    if (UtilAll::isBlank(topic))
+    {
+        THROW_MQEXCEPTION(MQClientException, "the specified topic is blank", -1);
+    }
+
+    if (topic.length() > CHARACTER_MAX_LENGTH)
+    {
+        THROW_MQEXCEPTION(MQClientException, "the specified topic is longer than topic max length 255.", -1);
+    }
+
+    // Topic�����Ƿ��뱣���ֶγ�ͻ
+    if (topic == MixAll::DEFAULT_TOPIC)
+    {
+        THROW_MQEXCEPTION(MQClientException, "the topic[" + topic + "] is conflict with default topic.", -1);
+    }
+
+    if (!regularExpressionMatcher(topic, validPatternStr))
+    {
+        std::string str;
+        str = "the specified topic[" + topic + "] contains illegal characters, allowing only" + validPatternStr;
+
+        THROW_MQEXCEPTION(MQClientException, str.c_str(), -1);
+    }
+}
+
+void Validators::checkGroup(const std::string& group)
+{
+    if (UtilAll::isBlank(group))
+    {
+        THROW_MQEXCEPTION(MQClientException, "the specified group is blank", -1);
+    }
+
+    if (!regularExpressionMatcher(group, validPatternStr))
+    {
+        std::string str;
+        str = "the specified group[" + group + "] contains illegal characters, allowing only" + validPatternStr;
+
+        THROW_MQEXCEPTION(MQClientException, str.c_str(), -1);
+    }
+    if (group.length() > CHARACTER_MAX_LENGTH)
+    {
+        THROW_MQEXCEPTION(MQClientException, "the specified group is longer than group max length 255.", -1);
+    }
+}
+
+void Validators::checkMessage(const Message& msg, DefaultMQProducer* pDefaultMQProducer)
+{
+    checkTopic(msg.getTopic());
+
+    //// body
+    if (msg.getBody() == NULL)
+    {
+        THROW_MQEXCEPTION(MQClientException, "the message body is null", MESSAGE_ILLEGAL_VALUE);
+    }
+
+    if (msg.getBodyLen() == 0)
+    {
+        THROW_MQEXCEPTION(MQClientException, "the message body length is zero", MESSAGE_ILLEGAL_VALUE);
+    }
+
+    if (msg.getBodyLen() > pDefaultMQProducer->getMaxMessageSize())
+    {
+        char info[256];
+        snprintf(info, sizeof(info), "the message body size over max value, MAX: %d", pDefaultMQProducer->getMaxMessageSize());
+        THROW_MQEXCEPTION(MQClientException, info, MESSAGE_ILLEGAL_VALUE);
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/Validators.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/Validators.h b/rocketmq-client4cpp/src/common/Validators.h
new file mode 100755
index 0000000..36ab299
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/Validators.h
@@ -0,0 +1,49 @@
+/**
+* Copyright (C) 2013 kangliqiang, kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __VALIDATORST_H__
+#define  __VALIDATORST_H__
+
+#include <string>
+
+namespace rmq
+{
+    class MQClientException;
+    class DefaultMQProducer;
+    class Message;
+
+    /**
+    * Validator class
+    *
+    * @author manhong.yqd<jodie.yqd@gmail.com>
+    * @since 2013-8-28
+    */
+    class Validators
+    {
+    public:
+        static bool regularExpressionMatcher(const std::string& origin, const std::string& patternStr);
+        static std::string getGroupWithRegularExpression(const std::string& origin, const std::string& patternStr);
+
+        static void checkTopic(const std::string& topic);
+        static void checkGroup(const std::string& group);
+        static void checkMessage(const Message& msg, DefaultMQProducer* pDefaultMQProducer);
+
+    public:
+        static const std::string validPatternStr;
+        static const size_t CHARACTER_MAX_LENGTH;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp b/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
new file mode 100755
index 0000000..c68bfc8
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
@@ -0,0 +1,66 @@
+/**
+* Copyright (C) 2013 kangliqiang, kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "VirtualEnvUtil.h"
+
+#include <stdlib.h>
+#include <stdio.h>
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+const char* VirtualEnvUtil::VIRTUAL_APPGROUP_PREFIX = "%%PROJECT_%s%%";
+
+std::string VirtualEnvUtil::buildWithProjectGroup(const std::string& origin, const std::string& projectGroup)
+{
+    if (!UtilAll::isBlank(projectGroup))
+    {
+        char prefix[1024];
+        snprintf(prefix, sizeof(prefix), VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
+
+        if (origin.find_last_of(prefix) == std::string::npos)
+        {
+            return origin + prefix;
+        }
+        else
+        {
+            return origin;
+        }
+    }
+    else
+    {
+        return origin;
+    }
+}
+
+
+std::string VirtualEnvUtil::clearProjectGroup(const std::string& origin, const std::string& projectGroup)
+{
+    char prefix[1024];
+    snprintf(prefix, sizeof(prefix), VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
+    std::string::size_type pos = origin.find_last_of(prefix);
+
+    if (!UtilAll::isBlank(prefix) && pos != std::string::npos)
+    {
+        return origin.substr(0, pos);
+    }
+    else
+    {
+        return origin;
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/VirtualEnvUtil.h b/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
new file mode 100755
index 0000000..10ca0cd
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
@@ -0,0 +1,41 @@
+/**
+* Copyright (C) 2013 kangliqiang, kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __VIRTUALENVUTIL_H__
+#define __VIRTUALENVUTIL_H__
+
+#include <string>
+
+namespace rmq
+{
+    /**
+    * VirtualEnv API
+    *
+    * @author manhong.yqd<jodie.yqd@gmail.com>
+    * @since 2013-8-26
+    */
+    class VirtualEnvUtil
+    {
+    public:
+        static std::string buildWithProjectGroup(const std::string& origin, const std::string& projectGroup);
+        static std::string clearProjectGroup(const std::string& origin, const std::string& projectGroup);
+
+    public:
+        static const char* VIRTUAL_APPGROUP_PREFIX;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h b/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
new file mode 100755
index 0000000..49e1e7c
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
@@ -0,0 +1,205 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __ALLOCATEMESSAGEQUEUESTRATEGYINNER_H__
+#define __ALLOCATEMESSAGEQUEUESTRATEGYINNER_H__
+
+#include <algorithm>
+
+#include "AllocateMessageQueueStrategy.h"
+#include "MQClientException.h"
+#include "UtilAll.h"
+
+
+namespace rmq
+{
+
+    class AllocateMessageQueueAveragely : public AllocateMessageQueueStrategy
+    {
+    public:
+        virtual ~AllocateMessageQueueAveragely() {}
+        virtual std::vector<MessageQueue>* allocate(
+				const std::string& consumerGroup,
+				const std::string& currentCID,
+                std::vector<MessageQueue>& mqAll,
+                std::list<std::string>& cidAll)
+        {
+            if (currentCID.empty())
+            {
+                THROW_MQEXCEPTION(MQClientException, "currentCID is empty", -1);
+            }
+
+            if (mqAll.empty())
+            {
+                THROW_MQEXCEPTION(MQClientException, "mqAll is empty", -1);
+            }
+
+            if (cidAll.empty())
+            {
+                THROW_MQEXCEPTION(MQClientException, "cidAll is empty", -1);
+            }
+
+            int index = -1;
+            int cidAllSize = cidAll.size();
+
+            std::list<std::string>::iterator it = cidAll.begin();
+            for (int i = 0; it != cidAll.end(); it++, i++)
+            {
+                if (*it == currentCID)
+                {
+                    index = i;
+                    break;
+                }
+            }
+
+            if (index == -1)
+            {
+				RMQ_ERROR("[BUG] ConsumerGroup: {%s} The consumerId: {%s} not in cidAll: {%s}", //
+                    consumerGroup.c_str(),
+                    currentCID.c_str(),
+                    UtilAll::toString(cidAll).c_str());
+                return NULL;
+            }
+
+            int mqAllSize = mqAll.size();
+            int mod = mqAllSize % cidAllSize;
+            int averageSize =
+                mqAllSize <= cidAllSize ? 1 : (mod > 0 && index < mod ? mqAllSize / cidAllSize
+                                               + 1 : mqAllSize / cidAllSize);
+            int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
+
+            std::vector<MessageQueue>* result = new std::vector<MessageQueue>();
+            int range = std::min<int>(averageSize, mqAllSize - startIndex);
+
+            for (int i = 0; i < range; i++)
+            {
+                result->push_back(mqAll.at((startIndex + i) % mqAllSize));
+            }
+
+            return result;
+        }
+
+        virtual std::string getName()
+        {
+            return "AVG";
+        }
+    };
+
+
+    class AllocateMessageQueueAveragelyByCircle : public AllocateMessageQueueStrategy
+    {
+    public:
+        virtual ~AllocateMessageQueueAveragelyByCircle() {}
+        virtual std::vector<MessageQueue>* allocate(
+				const std::string& consumerGroup,
+				const std::string& currentCID,
+                std::vector<MessageQueue>& mqAll,
+                std::list<std::string>& cidAll)
+        {
+            if (currentCID.empty())
+            {
+                THROW_MQEXCEPTION(MQClientException, "currentCID is empty", -1);
+            }
+
+            if (mqAll.empty())
+            {
+                THROW_MQEXCEPTION(MQClientException, "mqAll is empty", -1);
+            }
+
+            if (cidAll.empty())
+            {
+                THROW_MQEXCEPTION(MQClientException, "cidAll is empty", -1);
+            }
+
+            int index = -1;
+            std::list<std::string>::iterator it = cidAll.begin();
+            for (int i = 0; it != cidAll.end(); it++, i++)
+            {
+                if (*it == currentCID)
+                {
+                    index = i;
+                    break;
+                }
+            }
+
+            if (index == -1)
+            {
+				RMQ_ERROR("[BUG] ConsumerGroup: {%s} The consumerId: {%s} not in cidAll: {%s}", //
+                    consumerGroup.c_str(),
+                    currentCID.c_str(),
+                    UtilAll::toString(cidAll).c_str());
+                return NULL;
+            }
+
+			std::vector<MessageQueue>* result = new std::vector<MessageQueue>();
+	        for (int i = index; i < (int)mqAll.size(); i++)
+			{
+	            if (i % (int)cidAll.size() == index)
+				{
+					result->push_back(mqAll.at(i));
+	            }
+	        }
+
+	        return result;
+        }
+
+        virtual std::string getName()
+        {
+            return "AVG_BY_CIRCLE";
+        }
+    };
+
+
+    class AllocateMessageQueueByConfig : public AllocateMessageQueueStrategy
+    {
+    public:
+        virtual ~AllocateMessageQueueByConfig() {}
+        virtual std::vector<MessageQueue>* allocate(
+				const std::string& consumerGroup,
+				const std::string& currentCID,
+                std::vector<MessageQueue>& mqAll,
+                std::list<std::string>& cidAll)
+        {
+            return NULL;
+        }
+
+        virtual std::string getName()
+        {
+            return "CONFIG";
+        }
+    };
+
+
+    class AllocateMessageQueueByMachineRoom : public AllocateMessageQueueStrategy
+    {
+    public:
+        virtual ~AllocateMessageQueueByMachineRoom() {}
+        virtual std::vector<MessageQueue>* allocate(
+				const std::string& consumerGroup,
+				const std::string& currentCID,
+                std::vector<MessageQueue>& mqAll,
+                std::list<std::string>& cidAll)
+        {
+            return NULL;
+        }
+
+        virtual std::string getName()
+        {
+            return "MACHINE_ROOM";
+        }
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
new file mode 100755
index 0000000..7550acb
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -0,0 +1,476 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ConsumeMessageConcurrentlyService.h"
+
+#include "DefaultMQPushConsumerImpl.h"
+#include "MessageListener.h"
+#include "MessageQueue.h"
+#include "RebalanceImpl.h"
+#include "DefaultMQPushConsumer.h"
+#include "MixAll.h"
+#include "KPRUtil.h"
+#include "UtilAll.h"
+#include "OffsetStore.h"
+
+namespace rmq
+{
+
+
+class SubmitConsumeRequestLater : public kpr::TimerHandler
+{
+public:
+    SubmitConsumeRequestLater(std::list<MessageExt*>& msgs,
+                              ProcessQueue* pProcessQueue,
+                              MessageQueue messageQueue,
+                              ConsumeMessageConcurrentlyService* pService)
+        : m_msgs(msgs),
+          m_pProcessQueue(pProcessQueue),
+          m_messageQueue(messageQueue),
+          m_pService(pService)
+    {
+
+    }
+
+    void OnTimeOut(unsigned int timerID)
+    {
+    	try
+    	{
+        	m_pService->submitConsumeRequest(m_msgs, m_pProcessQueue, m_messageQueue, true);
+        }
+        catch(...)
+        {
+        	RMQ_ERROR("SubmitConsumeRequestLater OnTimeOut exception");
+        }
+
+        delete this;
+    }
+
+private:
+    std::list<MessageExt*> m_msgs;
+    ProcessQueue* m_pProcessQueue;
+    MessageQueue m_messageQueue;
+    ConsumeMessageConcurrentlyService* m_pService;
+};
+
+
+class CleanExpireMsgTask : public kpr::TimerHandler
+{
+public:
+    CleanExpireMsgTask(ConsumeMessageConcurrentlyService* pService)
+        : m_pService(pService)
+    {
+
+    }
+
+    void OnTimeOut(unsigned int timerID)
+    {
+    	try
+    	{
+        	m_pService->cleanExpireMsg();
+        }
+        catch(...)
+        {
+        	RMQ_ERROR("CleanExpireMsgTask OnTimeOut exception");
+        }
+    }
+
+private:
+    ConsumeMessageConcurrentlyService* m_pService;
+};
+
+
+
+ConsumeMessageConcurrentlyService::ConsumeMessageConcurrentlyService(
+    DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
+    MessageListenerConcurrently* pMessageListener)
+{
+    m_pDefaultMQPushConsumerImpl = pDefaultMQPushConsumerImpl;
+    m_pMessageListener = pMessageListener;
+    m_pDefaultMQPushConsumer = m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer();
+    m_consumerGroup = m_pDefaultMQPushConsumer->getConsumerGroup();
+    m_pConsumeExecutor = new kpr::ThreadPool("ConsumeMessageThreadPool", 5,
+    m_pDefaultMQPushConsumer->getConsumeThreadMin(), m_pDefaultMQPushConsumer->getConsumeThreadMax());
+    m_pScheduledExecutorService = new kpr::TimerThread("ConsumeMessageConcurrentlyService", 1000);
+    m_pCleanExpireMsgExecutors = new kpr::TimerThread("CleanExpireMsgService", 1000);
+	m_pCleanExpireMsgTask = new CleanExpireMsgTask(this);
+}
+
+ConsumeMessageConcurrentlyService::~ConsumeMessageConcurrentlyService()
+{
+	delete m_pCleanExpireMsgTask;
+}
+
+
+void ConsumeMessageConcurrentlyService::start()
+{
+	m_pCleanExpireMsgExecutors->RegisterTimer(60 * 1000, 60 * 1000, m_pCleanExpireMsgTask, true);
+    m_pScheduledExecutorService->Start();
+    m_pCleanExpireMsgExecutors->Start();
+}
+
+void ConsumeMessageConcurrentlyService::shutdown()
+{
+    m_pConsumeExecutor->Destroy();
+    m_pScheduledExecutorService->Stop();
+    m_pScheduledExecutorService->Join();
+
+    m_pCleanExpireMsgExecutors->Stop();
+    m_pCleanExpireMsgExecutors->Join();
+}
+
+
+void ConsumeMessageConcurrentlyService::cleanExpireMsg()
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->getProcessQueueTableLock());
+	std::map<MessageQueue, ProcessQueue*>& processQueueTable
+		= m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->getProcessQueueTable();
+	RMQ_FOR_EACH(processQueueTable, it)
+	{
+		ProcessQueue* pq = it->second;
+		if (!pq->isDropped())
+		{
+        	pq->cleanExpiredMsg(m_pDefaultMQPushConsumer);
+        }
+	}
+}
+
+
+ConsumerStat& ConsumeMessageConcurrentlyService::getConsumerStat()
+{
+    return m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat();
+}
+
+bool ConsumeMessageConcurrentlyService::sendMessageBack(MessageExt& msg,
+        ConsumeConcurrentlyContext& context)
+{
+    try
+    {
+        m_pDefaultMQPushConsumerImpl->sendMessageBack(msg,
+        	context.delayLevelWhenNextConsume, context.messageQueue.getBrokerName());
+        return true;
+    }
+    catch (...)
+    {
+		RMQ_ERROR("sendMessageBack exception, group: %s, msg: %s",
+			m_consumerGroup.c_str(), msg.toString().c_str());
+    }
+
+    return false;
+}
+
+void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(std::list<MessageExt*>& msgs,
+        ProcessQueue* pProcessQueue,
+        const MessageQueue& messageQueue)
+{
+    SubmitConsumeRequestLater* sc = new SubmitConsumeRequestLater(msgs, pProcessQueue, messageQueue, this);
+    m_pScheduledExecutorService->RegisterTimer(0, 5000, sc, false);
+}
+
+void ConsumeMessageConcurrentlyService::submitConsumeRequest(std::list<MessageExt*>& msgs,
+        ProcessQueue* pProcessQueue,
+        const MessageQueue& messageQueue,
+        bool dispathToConsume)
+{
+	size_t consumeBatchSize = m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize();
+
+	RMQ_DEBUG("submitConsumeRequest begin, msgs.size=%d, messageQueue=%s, consumeBatchSize=%d, dispathToConsume=%d",
+		(int)msgs.size(), messageQueue.toString().c_str(), (int)consumeBatchSize, dispathToConsume
+    );
+
+    if (msgs.size() <= consumeBatchSize)
+    {
+        kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeConcurrentlyRequest(msgs, pProcessQueue, messageQueue, this);
+        m_pConsumeExecutor->AddWork(consumeRequest);
+    }
+    else
+    {
+        std::list<MessageExt*>::iterator it = msgs.begin();
+        for (; it != msgs.end();)
+        {
+            std::list<MessageExt*> msgThis;
+            for (size_t i = 0; i < consumeBatchSize; i++, it++)
+            {
+                if (it != msgs.end())
+                {
+                    msgThis.push_back(*it);
+                }
+                else
+                {
+                    break;
+                }
+            }
+
+            kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeConcurrentlyRequest(msgThis, pProcessQueue, messageQueue, this);
+            m_pConsumeExecutor->AddWork(consumeRequest);
+        }
+    }
+
+    RMQ_DEBUG("submitConsumeRequest end");
+}
+
+void ConsumeMessageConcurrentlyService::updateCorePoolSize(int corePoolSize)
+{
+	//todo
+}
+
+void ConsumeMessageConcurrentlyService::processConsumeResult(ConsumeConcurrentlyStatus status,
+        ConsumeConcurrentlyContext& context,
+        ConsumeConcurrentlyRequest& consumeRequest)
+{
+    int ackIndex = context.ackIndex;
+
+    if (consumeRequest.getMsgs().empty())
+    {
+        return;
+    }
+
+    int msgsSize = consumeRequest.getMsgs().size();
+
+    switch (status)
+    {
+        case CONSUME_SUCCESS:
+        {
+            if (ackIndex >= msgsSize)
+            {
+                ackIndex = msgsSize - 1;
+            }
+
+            int ok = ackIndex + 1;
+            int failed = msgsSize - ok;
+            getConsumerStat().consumeMsgOKTotal.fetchAndAdd(ok);
+            getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(failed);
+        }
+
+        break;
+        case RECONSUME_LATER:
+            ackIndex = -1;
+            getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
+            break;
+        default:
+            break;
+    }
+
+    std::list<MessageExt*>& msgs = consumeRequest.getMsgs();
+    std::list<MessageExt*>::iterator it = msgs.begin();
+
+    for (int i = 0; i < ackIndex + 1 && it != msgs.end(); i++)
+    {
+        it++;
+    }
+
+    switch (m_pDefaultMQPushConsumer->getMessageModel())
+    {
+        case BROADCASTING:
+            for (; it != msgs.end(); it++)
+            {
+                MessageExt* msg = *it;
+                RMQ_WARN("BROADCASTING, the message consume failed, drop it, %s", msg->toString().c_str());
+            }
+            break;
+        case CLUSTERING:
+        {
+            std::list<MessageExt*> msgBackFailed;
+            for (; it != msgs.end(); it++)
+            {
+                MessageExt* msg = *it;
+                bool result = sendMessageBack(*msg, context);
+                if (!result)
+                {
+                    msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
+                    msgBackFailed.push_back(msg);
+                }
+            }
+
+            if (!msgBackFailed.empty())
+            {
+                it = msgs.begin();
+
+                for (; it != msgs.end();)
+                {
+                    bool find = false;
+                    std::list<MessageExt*>::iterator itFailed = msgBackFailed.begin();
+                    for (; itFailed != msgBackFailed.end(); itFailed++)
+                    {
+                        if (*it == *itFailed)
+                        {
+                            it = msgs.erase(it);
+                            find = true;
+                            break;
+                        }
+                    }
+
+                    if (!find)
+                    {
+                        it++;
+                    }
+                }
+
+                submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
+                                          consumeRequest.getMessageQueue());
+            }
+        }
+        break;
+        default:
+            break;
+    }
+
+    long long offset = consumeRequest.getProcessQueue()->removeMessage(consumeRequest.getMsgs());
+    if (offset >= 0 && !(consumeRequest.getProcessQueue()->isDropped()))
+    {
+        m_pDefaultMQPushConsumerImpl->getOffsetStore()->updateOffset(consumeRequest.getMessageQueue(),
+                offset, true);
+    }
+}
+
+std::string& ConsumeMessageConcurrentlyService::getConsumerGroup()
+{
+    return m_consumerGroup;
+}
+
+MessageListenerConcurrently* ConsumeMessageConcurrentlyService::getMessageListener()
+{
+    return m_pMessageListener;
+}
+
+DefaultMQPushConsumerImpl* ConsumeMessageConcurrentlyService::getDefaultMQPushConsumerImpl()
+{
+    return m_pDefaultMQPushConsumerImpl;
+}
+
+ConsumeConcurrentlyRequest::ConsumeConcurrentlyRequest(std::list<MessageExt*>& msgs,
+        ProcessQueue* pProcessQueue,
+        const MessageQueue& messageQueue,
+        ConsumeMessageConcurrentlyService* pService)
+{
+	m_msgs = msgs;
+	m_pProcessQueue = pProcessQueue;
+	m_pService = pService;
+    m_messageQueue = messageQueue;
+}
+
+ConsumeConcurrentlyRequest::~ConsumeConcurrentlyRequest()
+{
+	m_msgs.clear();
+}
+
+void ConsumeConcurrentlyRequest::Do()
+{
+	RMQ_DEBUG("consumeMessage begin, m_msgs.size=%d", (int)m_msgs.size());
+
+    if (m_pProcessQueue->isDropped())
+    {
+        RMQ_WARN("the message queue not be able to consume, because it's droped, {%s}",
+        	m_messageQueue.toString().c_str());
+        return;
+    }
+
+	try
+	{
+	    MessageListenerConcurrently* listener = m_pService->getMessageListener();
+	    ConsumeConcurrentlyContext context(m_messageQueue);
+	    ConsumeConcurrentlyStatus status = RECONSUME_LATER;
+
+	    ConsumeMessageContext consumeMessageContext;
+	    if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
+	    {
+	        consumeMessageContext.consumerGroup = m_pService->getConsumerGroup();
+	        consumeMessageContext.mq = m_messageQueue;
+	        consumeMessageContext.msgList = m_msgs;
+	        consumeMessageContext.success = false;
+	        m_pService->getDefaultMQPushConsumerImpl()->executeHookBefore(consumeMessageContext);
+	    }
+
+	    long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
+	    try
+	    {
+	        resetRetryTopic(m_msgs);
+	        if (!m_msgs.empty())
+	        {
+	        	std::list<MessageExt*>::iterator it = m_msgs.begin();
+			    for (; it != m_msgs.end(); it++)
+			    {
+			        MessageExt* msg = (*it);
+			        msg->putProperty(Message::PROPERTY_CONSUME_START_TIMESTAMP,
+			        	UtilAll::toString(KPRUtil::GetCurrentTimeMillis()));
+			    }
+            }
+	        status = listener->consumeMessage(m_msgs, context);
+	    }
+	    catch (...)
+	    {
+	        RMQ_WARN("consumeMessage exception, Group: {%s} Msgs: {%d} MQ: {%s}",
+	        	m_pService->getConsumerGroup().c_str(),
+	        	(int)m_msgs.size(),
+	        	m_messageQueue.toString().c_str()
+	        );
+	    }
+
+	    long long consumeRT = KPRUtil::GetCurrentTimeMillis() - beginTimestamp;
+
+	    if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
+	    {
+	        consumeMessageContext.success = (status == CONSUME_SUCCESS);
+	        m_pService->getDefaultMQPushConsumerImpl()->executeHookAfter(consumeMessageContext);
+	    }
+
+	    m_pService->getConsumerStat().consumeMsgRTTotal.fetchAndAdd(consumeRT);
+	    bool updated = MixAll::compareAndIncreaseOnly(m_pService->getConsumerStat().consumeMsgRTMax, consumeRT);
+	    if (updated)
+	    {
+	        RMQ_WARN("consumeMessage RT new max: %lld, Group: %s, Msgs: %d, MQ: %s",
+	        	consumeRT,
+	        	m_pService->getConsumerGroup().c_str(),
+	        	(int)m_msgs.size(),
+	        	m_messageQueue.toString().c_str()
+	        );
+	    }
+
+		if (!m_pProcessQueue->isDropped())
+		{
+	    	m_pService->processConsumeResult(status, context, *this);
+	    }
+	    else
+	    {
+	    	RMQ_WARN("processQueue is dropped without process consume result, messageQueue={%s}, msgs.size={%d}",
+	        	m_messageQueue.toString().c_str(), (int)m_msgs.size());
+	    }
+	}
+	catch(...)
+	{
+		RMQ_WARN("ConsumeConcurrentlyRequest exception");
+	}
+	RMQ_DEBUG("consumeMessage end, m_msgs.size=%d", (int)m_msgs.size());
+
+    return;
+}
+
+void ConsumeConcurrentlyRequest::resetRetryTopic(std::list<MessageExt*>& msgs)
+{
+    std::string groupTopic = MixAll::getRetryTopic(m_pService->getConsumerGroup());
+    std::list<MessageExt*>::iterator it = msgs.begin();
+
+    for (; it != msgs.end(); it++)
+    {
+        MessageExt* msg = (*it);
+        std::string retryTopic = msg->getProperty(Message::PROPERTY_RETRY_TOPIC);
+        if (!retryTopic.empty() && groupTopic == msg->getTopic())
+        {
+            msg->setTopic(retryTopic);
+        }
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
new file mode 100755
index 0000000..acb7538
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
@@ -0,0 +1,120 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __CONSUMEMESSAGECONCURRENTLYSERVICE_H__
+#define __CONSUMEMESSAGECONCURRENTLYSERVICE_H__
+
+#include "ConsumeMessageService.h"
+
+#include <list>
+#include <string>
+#include "MessageQueueLock.h"
+#include "ConsumerStatManage.h"
+#include "MessageExt.h"
+#include "MessageListener.h"
+#include "ProcessQueue.h"
+#include "ThreadPool.h"
+#include "TimerThread.h"
+
+namespace rmq
+{
+  class DefaultMQPushConsumerImpl;
+  class DefaultMQPushConsumer;
+  class MessageListenerConcurrently;
+  class ConsumeMessageConcurrentlyService;
+
+  class ConsumeConcurrentlyRequest: public kpr::ThreadPoolWork
+  {
+  public:
+      ConsumeConcurrentlyRequest(std::list<MessageExt*>& msgs,
+                                 ProcessQueue* pProcessQueue,
+                                 const MessageQueue& messageQueue,
+                                 ConsumeMessageConcurrentlyService* pService);
+      ~ConsumeConcurrentlyRequest();
+      virtual void Do();
+
+      std::list<MessageExt*>& getMsgs()
+      {
+          return m_msgs;
+      }
+
+      ProcessQueue* getProcessQueue()
+      {
+          return m_pProcessQueue;
+      }
+
+      MessageQueue getMessageQueue()
+      {
+          return m_messageQueue;
+      }
+
+  private:
+      void resetRetryTopic(std::list<MessageExt*>& msgs);
+
+  private:
+      std::list<MessageExt*> m_msgs;
+      ProcessQueue* m_pProcessQueue;
+      MessageQueue m_messageQueue;
+      ConsumeMessageConcurrentlyService* m_pService;
+  };
+
+
+  class ConsumeMessageConcurrentlyService : public ConsumeMessageService
+  {
+  public:
+      ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
+                                        MessageListenerConcurrently* pMessageListener);
+	  ~ConsumeMessageConcurrentlyService();
+
+      void start();
+      void shutdown();
+
+	  void cleanExpireMsg();
+      ConsumerStat& getConsumerStat();
+
+      bool sendMessageBack(MessageExt& msg, ConsumeConcurrentlyContext& context);
+      void processConsumeResult(ConsumeConcurrentlyStatus status,
+                                ConsumeConcurrentlyContext& context,
+                                ConsumeConcurrentlyRequest& consumeRequest);
+
+      void submitConsumeRequestLater(std::list<MessageExt*>& pMsgs,
+                                     ProcessQueue* pProcessQueue,
+                                     const MessageQueue& messageQueue);
+
+      void submitConsumeRequest(std::list<MessageExt*>& pMsgs,
+                                ProcessQueue* pProcessQueue,
+                                const MessageQueue& messageQueue,
+                                bool dispathToConsume);
+
+      void updateCorePoolSize(int corePoolSize);
+
+      std::string& getConsumerGroup();
+      MessageListenerConcurrently* getMessageListener();
+      DefaultMQPushConsumerImpl* getDefaultMQPushConsumerImpl();
+
+  private:
+      DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl;
+      DefaultMQPushConsumer* m_pDefaultMQPushConsumer;
+      MessageListenerConcurrently* m_pMessageListener;
+      std::string m_consumerGroup;
+      kpr::ThreadPoolPtr m_pConsumeExecutor;
+      kpr::TimerThreadPtr m_pScheduledExecutorService;
+      kpr::TimerThreadPtr m_pCleanExpireMsgExecutors;
+      kpr::TimerHandler* m_pCleanExpireMsgTask;
+  };
+}
+
+#endif


Mime
View raw message