rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [16/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQClientException.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQClientException.h b/rocketmq-client4cpp/include/MQClientException.h
new file mode 100755
index 0000000..f1d1d04
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQClientException.h
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RMQ_MQCLIENTEXCEPTION_H__
+#define __RMQ_MQCLIENTEXCEPTION_H__
+
+#include <string>
+#include <ostream>
+#include <sstream>
+#include <exception>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+	class MQException : public std::exception
+	{
+	public:
+		MQException(const std::string& msg, int error,const char* file,int line)throw()
+			: m_error(error),m_line(line),m_file(file)
+		{
+			try
+			{
+				std::stringstream ss;
+				ss << "[" << file << ":" << line <<"]|error: " << error << "|msg:" << msg;
+				m_msg = ss.str();
+			}
+			catch (...)
+			{
+			}
+		}
+
+		virtual ~MQException()throw()
+		{
+		}
+
+		const char* what() const throw()
+		{
+			return m_msg.c_str();
+		}
+
+		int GetError() const throw()
+		{
+			return m_error;
+		}
+
+		virtual const char* GetType() const throw()
+		{
+			return "MQException";
+		}
+
+	protected:
+		int m_error;
+		int m_line;
+		std::string m_msg;
+		std::string m_file;
+	};
+
+	inline std::ostream& operator<<(std::ostream& os, const MQException& e)
+	{
+		os <<"Type:"<<e.GetType() <<  e.what();
+		return os;
+	}
+
+	#define DEFINE_MQCLIENTEXCEPTION(name, parent) \
+	class name : public parent \
+	{\
+	public:\
+		name(const std::string& msg, int error,const char* file,int line) throw ()\
+		: parent(msg, error, file, line) {}\
+		virtual const char* GetType() const throw()\
+	{\
+		return #name;\
+	}\
+	};
+
+	DEFINE_MQCLIENTEXCEPTION(MQClientException, MQException)
+	DEFINE_MQCLIENTEXCEPTION(MQBrokerException, MQException)
+	DEFINE_MQCLIENTEXCEPTION(InterruptedException, MQException)
+	DEFINE_MQCLIENTEXCEPTION(UnknownHostException, MQException)
+
+	DEFINE_MQCLIENTEXCEPTION(RemotingException, MQException)
+	DEFINE_MQCLIENTEXCEPTION(RemotingCommandException, RemotingException)
+	DEFINE_MQCLIENTEXCEPTION(RemotingConnectException, RemotingException)
+	DEFINE_MQCLIENTEXCEPTION(RemotingSendRequestException, RemotingException)
+	DEFINE_MQCLIENTEXCEPTION(RemotingTimeoutException, RemotingException)
+	DEFINE_MQCLIENTEXCEPTION(RemotingTooMuchRequestException, RemotingException)
+
+	#define THROW_MQEXCEPTION(e,msg,err) throw e(msg,err,__FILE__,__LINE__)
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQConsumer.h b/rocketmq-client4cpp/include/MQConsumer.h
new file mode 100755
index 0000000..87efe97
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQConsumer.h
@@ -0,0 +1,48 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MQCONSUMER_H__
+#define __RMQ_MQCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQAdmin.h"
+#include "ConsumeType.h"
+
+
+namespace rmq
+{
+	class MessageExt;
+
+	/**
+	* Consumer interface
+	*
+	*/
+	class MQConsumer : public MQAdmin
+	{
+	public:
+		virtual ~MQConsumer(){}
+
+		virtual void  start()=0;
+		virtual void shutdown()=0;
+
+		virtual void sendMessageBack(MessageExt& msg, int delayLevel)=0;
+		virtual std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic)=0;
+	};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQProducer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQProducer.h b/rocketmq-client4cpp/include/MQProducer.h
new file mode 100755
index 0000000..b353aba
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQProducer.h
@@ -0,0 +1,71 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License")=0;
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MQPRODUCER_H__
+#define __RMQ_MQPRODUCER_H__
+
+#include <vector>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQAdmin.h"
+#include "SendResult.h"
+
+namespace rmq
+{
+	class MessageQueue;
+	class SendCallback;
+	class LocalTransactionExecuter;
+	class MessageQueueSelector;
+
+	/**
+	* Producer interface
+	*
+	*/
+	class MQProducer : public MQAdmin
+	{
+	public:
+		MQProducer()
+		{
+		}
+
+		virtual ~MQProducer()
+		{
+		}
+
+		virtual void start()=0;
+		virtual void shutdown()=0;
+
+		virtual std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic)=0;
+
+		virtual SendResult send(Message& msg)=0;
+		virtual void send(Message& msg, SendCallback* sendCallback)=0;
+		virtual void sendOneway(Message& msg)=0;
+
+		virtual SendResult send(Message& msg, MessageQueue& mq)=0;
+		virtual void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback)=0;
+		virtual void sendOneway(Message& msg, MessageQueue& mq)=0;
+
+		virtual SendResult send(Message& msg, MessageQueueSelector* selector, void* arg)=0;
+		virtual void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback)=0;
+		virtual void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg)=0;
+
+		virtual TransactionSendResult sendMessageInTransaction(Message& msg,
+																LocalTransactionExecuter* tranExecuter,
+																void* arg)=0;
+	};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQPullConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQPullConsumer.h b/rocketmq-client4cpp/include/MQPullConsumer.h
new file mode 100755
index 0000000..ffb2ac5
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQPullConsumer.h
@@ -0,0 +1,54 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MQPULLCONSUMER_H__
+#define __RMQ_MQPULLCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQConsumer.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+	class MessageQueueListener;
+	class MessageQueue;
+	class PullCallback;
+
+	/**
+	* Pull Consumer
+	*
+	*/
+	class MQPullConsumer : public MQConsumer
+	{
+	public:
+		virtual ~MQPullConsumer(){}
+		virtual void registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener)=0;
+
+		virtual PullResult* pull(MessageQueue& mq, const std::string& subExpression, long long offset,int maxNums)=0;
+		virtual void pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0;
+
+		virtual PullResult* pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)=0;
+		virtual void pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0;
+
+		virtual void updateConsumeOffset(MessageQueue& mq, long long offset)=0;
+		virtual long long fetchConsumeOffset(MessageQueue& mq, bool fromStore)=0;
+
+		virtual std::set<MessageQueue>* fetchMessageQueuesInBalance(const std::string& topic)=0;
+	};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQPushConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQPushConsumer.h b/rocketmq-client4cpp/include/MQPushConsumer.h
new file mode 100755
index 0000000..fe6d4a0
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQPushConsumer.h
@@ -0,0 +1,49 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MQPUSHCONSUMER_H__
+#define __RMQ_MQPUSHCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQConsumer.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+	class MessageListener;
+
+	/**
+	* Push Consumer
+	*
+	*/
+	class MQPushConsumer : public MQConsumer
+	{
+	public:
+		virtual void registerMessageListener(MessageListener* pMessageListener)=0;
+
+
+		virtual  void subscribe(const std::string& topic, const std::string& subExpression)=0;
+		virtual void unsubscribe(const std::string& topic)=0;
+
+
+		virtual void updateCorePoolSize(int corePoolSize)=0;
+		virtual void suspend()=0;
+		virtual void resume()=0;
+	};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/Message.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/Message.h b/rocketmq-client4cpp/include/Message.h
new file mode 100755
index 0000000..441b4e5
--- /dev/null
+++ b/rocketmq-client4cpp/include/Message.h
@@ -0,0 +1,136 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGE_H__
+#define __RMQ_MESSAGE_H__
+
+#include <map>
+#include <string>
+#include <list>
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+	/**
+	* Message
+	*
+	*/
+	class Message
+	{
+	public:
+		Message();
+		Message(const std::string& topic, const char* body,int len);
+		Message(const std::string& topic, const std::string& tags, const char* body,int len);
+		Message(const std::string& topic, const std::string& tags,const std::string& keys, const char* body,int len);
+		Message(const std::string& topic,
+				const std::string& tags,
+				const std::string& keys,
+				const int	flag,
+				const char* body,
+				int len,
+				bool waitStoreMsgOK);
+
+		virtual ~Message();
+		Message(const Message& other);
+		Message& operator=(const Message& other);
+
+		void clearProperty(const std::string& name);
+		void putProperty(const std::string& name, const std::string& value);
+		std::string getProperty(const std::string& name);
+
+		std::string getTopic()const;
+		void setTopic(const std::string& topic);
+
+		std::string getTags();
+		void setTags(const std::string& tags);
+
+		std::string getKeys();
+		void setKeys(const std::string& keys);
+		void setKeys(const std::list<std::string> keys);
+
+		int getDelayTimeLevel();
+		void setDelayTimeLevel(int level);
+
+		bool isWaitStoreMsgOK();
+		void setWaitStoreMsgOK(bool waitStoreMsgOK);
+
+		int getFlag();
+		void setFlag(int flag);
+
+		const char* getBody() const;
+		int getBodyLen() const;
+		void setBody(const char* body, int len);
+
+		bool tryToCompress(int compressLevel);
+		const char* getCompressBody() const;
+		int getCompressBodyLen() const;
+
+		std::map<std::string, std::string>& getProperties();
+		void setProperties(const std::map<std::string, std::string>& properties);
+
+		std::string toString() const;
+
+	protected:
+		void Init(const std::string& topic,
+				  const std::string& tags,
+				  const std::string& keys,
+				  const int	flag,
+				  const char* body,
+				  int len,
+				  bool waitStoreMsgOK);
+
+	public:
+		static const std::string PROPERTY_KEYS;
+		static const std::string PROPERTY_TAGS;
+		static const std::string PROPERTY_WAIT_STORE_MSG_OK;
+		static const std::string PROPERTY_DELAY_TIME_LEVEL;
+
+		/**
+		* for inner use
+		*/
+		static const std::string PROPERTY_RETRY_TOPIC;
+		static const std::string PROPERTY_REAL_TOPIC;
+		static const std::string PROPERTY_REAL_QUEUE_ID;
+		static const std::string PROPERTY_TRANSACTION_PREPARED;
+		static const std::string PROPERTY_PRODUCER_GROUP;
+		static const std::string PROPERTY_MIN_OFFSET;
+		static const std::string PROPERTY_MAX_OFFSET;
+		static const std::string PROPERTY_BUYER_ID;
+		static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
+		static const std::string PROPERTY_TRANSFER_FLAG;
+		static const std::string PROPERTY_CORRECTION_FLAG;
+		static const std::string PROPERTY_MQ2_FLAG;
+		static const std::string PROPERTY_RECONSUME_TIME;
+		static const std::string PROPERTY_MSG_REGION;
+		static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
+		static const std::string PROPERTY_MAX_RECONSUME_TIMES;
+		static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
+
+		static const std::string KEY_SEPARATOR;
+	private:
+		std::string m_topic;
+		int m_flag;
+		std::map<std::string, std::string> m_properties;
+
+		char* m_body;
+		int   m_bodyLen;
+
+		char* m_compressBody;
+		int   m_compressBodyLen;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageExt.h b/rocketmq-client4cpp/include/MessageExt.h
new file mode 100755
index 0000000..f70041c
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageExt.h
@@ -0,0 +1,108 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MESSAGEEXT_H__
+#define __RMQ_MESSAGEEXT_H__
+
+#include <sys/socket.h>
+#include <string>
+#include "Message.h"
+#include "TopicFilterType.h"
+#include "RocketMQClient.h"
+
+namespace rmq
+	{
+	/**
+	* Message extend
+	*
+	*/
+	class MessageExt : public Message
+	{
+	public:
+		MessageExt();
+
+		MessageExt(int queueId,
+				   long long bornTimestamp,
+				   sockaddr bornHost,
+				   long long storeTimestamp,
+				   sockaddr storeHost,
+				   std::string msgId);
+
+		~MessageExt();
+
+		static TopicFilterType parseTopicFilterType(int sysFlag);
+
+		int getQueueId();
+		void setQueueId(int queueId);
+
+		long long getBornTimestamp();
+		void setBornTimestamp(long long bornTimestamp);
+
+		sockaddr getBornHost();
+		std::string getBornHostString();
+		std::string getBornHostNameString();
+		void setBornHost(const sockaddr& bornHost);
+
+		long long getStoreTimestamp();
+		void setStoreTimestamp(long long storeTimestamp);
+
+		sockaddr getStoreHost();
+		std::string getStoreHostString();
+		void setStoreHost(const sockaddr& storeHost);
+
+		std::string getMsgId();
+		void setMsgId(const std::string& msgId);
+
+		int getSysFlag();
+		void setSysFlag(int sysFlag);
+
+		int getBodyCRC();
+		void setBodyCRC(int bodyCRC);
+
+		long long getQueueOffset();
+		void setQueueOffset(long long queueOffset);
+
+		long long getCommitLogOffset();
+		void setCommitLogOffset(long long physicOffset);
+
+		int getStoreSize();
+		void setStoreSize(int storeSize);
+
+		int getReconsumeTimes();
+		void setReconsumeTimes(int reconsumeTimes);
+
+		long long getPreparedTransactionOffset();
+		void setPreparedTransactionOffset(long long preparedTransactionOffset);
+
+		std::string toString() const;
+
+	private:
+		long long m_queueOffset;
+		long long m_commitLogOffset;
+		long long m_bornTimestamp;
+		long long m_storeTimestamp;
+		long long m_preparedTransactionOffset;
+		int m_queueId;
+		int m_storeSize;
+		int m_sysFlag;
+		int m_bodyCRC;
+		int m_reconsumeTimes;
+		sockaddr m_bornHost;
+		sockaddr m_storeHost;
+		std::string m_msgId;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageListener.h b/rocketmq-client4cpp/include/MessageListener.h
new file mode 100755
index 0000000..130a219
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageListener.h
@@ -0,0 +1,94 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGELISTENER_H__
+#define __RMQ_MESSAGELISTENER_H__
+
+#include <limits.h>
+#include <list>
+
+#include "MessageExt.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+	/**
+	* Message Listener
+	*
+	*/
+	class MessageListener
+	{
+	public:
+		virtual ~MessageListener(){}
+	};
+
+	enum ConsumeOrderlyStatus
+	{
+		SUCCESS,
+		ROLLBACK,
+		COMMIT,
+		SUSPEND_CURRENT_QUEUE_A_MOMENT,
+	};
+
+	typedef struct tagConsumeOrderlyContext
+	{
+		tagConsumeOrderlyContext(MessageQueue& mq)
+			:messageQueue(mq),
+			autoCommit(true),
+			suspendCurrentQueueTimeMillis(1000)
+		{
+		}
+
+		MessageQueue messageQueue;///< Ҫ���ѵ���Ϣ�����ĸ�����
+		bool autoCommit;///< ��ϢOffset�Ƿ��Զ��ύ
+		long suspendCurrentQueueTimeMillis;
+	}ConsumeOrderlyContext;
+
+	class MessageListenerOrderly : public MessageListener
+	{
+	public:
+		virtual ConsumeOrderlyStatus consumeMessage(std::list<MessageExt*>& msgs,
+													ConsumeOrderlyContext& context)=0;
+	};
+
+	enum ConsumeConcurrentlyStatus
+	{
+		CONSUME_SUCCESS,
+		RECONSUME_LATER,
+	};
+
+	struct ConsumeConcurrentlyContext
+	{
+		ConsumeConcurrentlyContext(MessageQueue& mq)
+			:messageQueue(mq),
+			delayLevelWhenNextConsume(0),
+			ackIndex(INT_MAX)
+		{
+		}
+		MessageQueue messageQueue;
+		int delayLevelWhenNextConsume;
+		int ackIndex;
+	};
+
+	class MessageListenerConcurrently : public MessageListener
+	{
+	public:
+		virtual ConsumeConcurrentlyStatus consumeMessage(std::list<MessageExt*>& msgs,
+														ConsumeConcurrentlyContext& context)=0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageQueue.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageQueue.h b/rocketmq-client4cpp/include/MessageQueue.h
new file mode 100755
index 0000000..89ddf58
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageQueue.h
@@ -0,0 +1,70 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGEQUEUE_H__
+#define __RMQ_MESSAGEQUEUE_H__
+
+#include <iostream>
+#include <string>
+#include <sstream>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+	/**
+	* Message Queue
+	*
+	*/
+	class MessageQueue
+	{
+	public:
+		MessageQueue();
+		~MessageQueue(){};
+
+		MessageQueue(const std::string& topic, const std::string& brokerName, int queueId);
+
+		std::string getTopic()const;
+		void setTopic(const std::string& topic);
+
+		std::string getBrokerName()const;
+		void setBrokerName(const std::string& brokerName);
+
+		int getQueueId()const;
+		void setQueueId(int queueId);
+
+		int hashCode();
+		std::string toString() const;
+		std::string toJsonString() const;
+
+		bool operator==(const MessageQueue& mq) const;
+		bool operator<(const MessageQueue& mq) const;
+		int compareTo(const MessageQueue& mq) const;
+
+	private:
+		std::string m_topic;
+		std::string m_brokerName;
+		int m_queueId;
+	};
+
+	inline std::ostream& operator<<(std::ostream& os, const MessageQueue& obj)
+	{
+	    os << obj.toString();
+	    return os;
+	}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageQueueListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageQueueListener.h b/rocketmq-client4cpp/include/MessageQueueListener.h
new file mode 100755
index 0000000..9f04c3e
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageQueueListener.h
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_MESSAGEQUEUELISTENER_H__
+#define __RMQ_MESSAGEQUEUELISTENER_H__
+
+#include <set>
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+	/**
+	 * Message Queue Listener
+	 *
+	 */
+	class MessageQueueListener
+	{
+	public:
+		virtual ~MessageQueueListener() {}
+		virtual void messageQueueChanged(const std::string& topic,
+			std::set<MessageQueue>& mqAll,
+			std::set<MessageQueue>& mqDivided)=0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/OffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/OffsetStore.h b/rocketmq-client4cpp/include/OffsetStore.h
new file mode 100755
index 0000000..a533750
--- /dev/null
+++ b/rocketmq-client4cpp/include/OffsetStore.h
@@ -0,0 +1,58 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_OFFSETSTORE_H__
+#define __RMQ_OFFSETSTORE_H__
+
+#include <set>
+#include <map>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+	class MessageQueue;
+
+	enum ReadOffsetType
+	{
+		READ_FROM_MEMORY,
+		READ_FROM_STORE,
+		MEMORY_FIRST_THEN_STORE,
+	};
+
+	/**
+	* Consumer Offset Store
+	*
+	*/
+	class OffsetStore
+	{
+	public:
+		virtual ~OffsetStore() {}
+
+		virtual void load()=0;
+
+		virtual void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)=0;
+		virtual long long readOffset(const MessageQueue& mq, ReadOffsetType type)=0;
+
+		virtual void persistAll(std::set<MessageQueue>& mqs)=0;
+		virtual void persist(const MessageQueue& mq)=0;
+
+		virtual void removeOffset(const MessageQueue& mq)=0;
+
+		virtual std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic) = 0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/PullCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/PullCallback.h b/rocketmq-client4cpp/include/PullCallback.h
new file mode 100755
index 0000000..47ade68
--- /dev/null
+++ b/rocketmq-client4cpp/include/PullCallback.h
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_PULLCALLBACK_H__
+#define __RMQ_PULLCALLBACK_H__
+
+#include "RocketMQClient.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+	class MQException;
+
+	/**
+	 * PullCallback
+	 *
+	 */
+	class PullCallback
+	{
+	public:
+		virtual ~PullCallback() {}
+		virtual void onSuccess(PullResult& pullResult)=0;
+		virtual void onException(MQException& e)=0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/PullResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/PullResult.h b/rocketmq-client4cpp/include/PullResult.h
new file mode 100755
index 0000000..42c13ca
--- /dev/null
+++ b/rocketmq-client4cpp/include/PullResult.h
@@ -0,0 +1,91 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_PULLRESULT_H__
+#define __RMQ_PULLRESULT_H__
+
+#include <list>
+#include <string>
+#include <sstream>
+
+#include "RocketMQClient.h"
+#include "MessageExt.h"
+
+namespace rmq
+{
+	enum PullStatus
+	{
+		FOUND,
+		NO_NEW_MSG,
+		NO_MATCHED_MSG,
+		OFFSET_ILLEGAL
+	};
+
+	/**
+	* PullResult
+	*
+	*/
+	struct PullResult
+	{
+		PullResult()
+		{
+
+		}
+
+		PullResult(PullStatus pullStatus,
+				   long long nextBeginOffset,
+				   long long minOffset,
+				   long long maxOffset,
+				   std::list<MessageExt*>& msgFoundList)
+			:pullStatus(pullStatus),
+			 nextBeginOffset(nextBeginOffset),
+			 minOffset(minOffset),
+			 maxOffset(maxOffset),
+			 msgFoundList(msgFoundList)
+		{
+
+		}
+
+		~PullResult()
+		{
+			std::list<MessageExt*>::iterator it = msgFoundList.begin();
+
+			for (;it!=msgFoundList.end();it++)
+			{
+				delete *it;
+			}
+		}
+
+		std::string toString() const
+		{
+			std::stringstream ss;
+			ss 	<< "{pullStatus=" << pullStatus
+				<< ",nextBeginOffset=" << nextBeginOffset
+				<< ",minOffset=" << nextBeginOffset
+				<< ",maxOffset=" << nextBeginOffset
+				<< ",msgFoundList.size=" << msgFoundList.size()
+				<<"}";
+			return ss.str();
+		}
+
+		PullStatus pullStatus;
+		long long nextBeginOffset;
+		long long minOffset;
+		long long maxOffset;
+		std::list<MessageExt*> msgFoundList;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/QueryResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/QueryResult.h b/rocketmq-client4cpp/include/QueryResult.h
new file mode 100644
index 0000000..13164e4
--- /dev/null
+++ b/rocketmq-client4cpp/include/QueryResult.h
@@ -0,0 +1,56 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_QUERYRESULT_H__
+#define  __RMQ_QUERYRESULT_H__
+
+#include <list>
+
+#include "RocketMQClient.h"
+#include "MessageExt.h"
+
+namespace rmq
+{
+	/**
+	* QueryResult
+	*
+	*/
+	class QueryResult
+	{
+	public:
+		QueryResult(long long indexLastUpdateTimestamp, const std::list<MessageExt*>& messageList)
+		{
+			m_indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+			m_messageList = messageList;
+		}
+
+		long long getIndexLastUpdateTimestamp()
+		{
+			return m_indexLastUpdateTimestamp;
+		}
+
+		std::list<MessageExt*>& getMessageList()
+		{
+			return m_messageList;
+		}
+
+	private:
+		long long m_indexLastUpdateTimestamp;
+		std::list<MessageExt*> m_messageList;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/RocketMQClient.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/RocketMQClient.h b/rocketmq-client4cpp/include/RocketMQClient.h
new file mode 100755
index 0000000..e4c71c9
--- /dev/null
+++ b/rocketmq-client4cpp/include/RocketMQClient.h
@@ -0,0 +1,100 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_ROCKETMQCLIENT_H__
+#define __RMQ_ROCKETMQCLIENT_H__
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <assert.h>
+#include <time.h>
+#include <stdarg.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <signal.h>
+#include <pthread.h>
+
+#include <sys/time.h>
+#include <sys/timeb.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/file.h>
+#include <sys/syscall.h>
+#include <linux/unistd.h>
+
+#include <cstdio>
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <vector>
+#include <map>
+#include <set>
+
+
+class RocketMQUtil
+{
+public:
+	enum
+    {
+        NONE_LOG    = 0,
+        ERROR_LOG   = 1,
+        WARN_LOG    = 2,
+        INFO_LOG    = 3,
+        DEBUG_LOG   = 4,
+    };
+
+public:
+	static pid_t getPid();
+	static pid_t getTid();
+
+	static int getDiffDays(time_t tmFirst, time_t tmSecond);
+	static std::string tm2str(const time_t &t, const std::string &sFormat);
+	static std::string now2str(const std::string &sFormat);
+	static std::string now2str();
+	static int64_t getNowMs();
+	static std::string str2fmt(const char* format, ...)__attribute__((format(__printf__,1,2)));
+
+	static int initLog(const std::string& sLogPath);
+	static void setLogLevel(int logLevel);
+	static void writeLog(const char* fmt, ...) __attribute__((format(__printf__,1,2)));
+	static inline bool isNeedLog(int level)
+	{
+		return (level <= _logLevel);
+	};
+
+public:
+	static volatile int _logFd;
+	static int _logLevel;
+	static std::string _logPath;
+};
+
+#define RMQ_AUTO(name, value) typeof(value) name = value
+#define RMQ_FOR_EACH(container, it) \
+    for(typeof((container).begin()) it = (container).begin();it!=(container).end(); ++it)
+
+
+
+#define RMQ_DEBUG(fmt, args...)	do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::DEBUG_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0)
+#define RMQ_INFO(fmt, args...)	do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::INFO_LOG))  RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][INFO]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+#define RMQ_WARN(fmt, args...)	do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::WARN_LOG))  RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][WARN]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+#define RMQ_ERROR(fmt, args...)	do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::ERROR_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][ERROR]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+
+#define RMQ_PRINT(fmt, args...)	do{ printf("%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0)
+
+
+#endif
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendCallback.h b/rocketmq-client4cpp/include/SendCallback.h
new file mode 100755
index 0000000..0feb5a1
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendCallback.h
@@ -0,0 +1,39 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDCALLBACK_H__
+#define __RMQ_SENDCALLBACK_H__
+
+#include "SendResult.h"
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+	class MQException;
+
+	/**
+	* Send Mesage Callback
+	*
+	*/
+	class SendCallback
+	{
+	public:
+		virtual ~SendCallback() {}
+		virtual void onSuccess(SendResult& sendResult)=0;
+		virtual void onException(MQException& e)=0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendMessageHook.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendMessageHook.h b/rocketmq-client4cpp/include/SendMessageHook.h
new file mode 100644
index 0000000..9869aa6
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendMessageHook.h
@@ -0,0 +1,50 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDMESSAGEHOOK_H__
+#define __RMQ_SENDMESSAGEHOOK_H__
+
+#include <string>
+
+#include "RocketMQClient.h"
+#include "Message.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+	class SendMessageContext
+	{
+	public:
+		std::string producerGroup;
+		Message msg;
+		MessageQueue mq;
+		std::string brokerAddr;
+		CommunicationMode communicationMode;
+		SendResult sendResult;
+		MQException* pException;
+		void* pArg;
+	};
+
+	class SendMessageHook
+	{
+	public:
+		virtual ~SendMessageHook() {}
+		virtual std::string hookName()=0;
+		virtual void sendMessageBefore(const SendMessageContext& context)=0;
+		virtual void sendMessageAfter(const SendMessageContext& context)=0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendResult.h b/rocketmq-client4cpp/include/SendResult.h
new file mode 100755
index 0000000..d6a3174
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendResult.h
@@ -0,0 +1,89 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDRESULT_H__
+#define __RMQ_SENDRESULT_H__
+
+#include "RocketMQClient.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+	enum SendStatus
+	{
+		SEND_OK,
+		FLUSH_DISK_TIMEOUT,
+		FLUSH_SLAVE_TIMEOUT,
+		SLAVE_NOT_AVAILABLE
+	};
+
+	/**
+	* Send Message Result
+	*
+	*/
+	class SendResult
+	{
+	public:
+		SendResult();
+		SendResult(const SendStatus& sendStatus,
+			const std::string&  msgId,
+			MessageQueue& messageQueue,
+			long long queueOffset,
+			std::string&  projectGroupPrefix);
+
+		const std::string&  getMsgId();
+		void setMsgId(const std::string&  msgId);
+		SendStatus getSendStatus();
+		void setSendStatus(const SendStatus& sendStatus);
+		MessageQueue& getMessageQueue();
+		void setMessageQueue(MessageQueue& messageQueue);
+		long long getQueueOffset();
+		void setQueueOffset(long long queueOffset);
+		bool hasResult();
+
+		std::string toString() const;
+		std::string toJsonString() const;
+
+	private:
+		SendStatus m_sendStatus;
+		std::string m_msgId;
+		MessageQueue m_messageQueue;
+		long long m_queueOffset;
+	};
+
+	enum LocalTransactionState
+	{
+		COMMIT_MESSAGE,
+		ROLLBACK_MESSAGE,
+		UNKNOW,
+	};
+
+	/**
+	* Send transaction message result
+	*
+	*/
+	class TransactionSendResult : public SendResult
+	{
+	public:
+		TransactionSendResult();
+		LocalTransactionState getLocalTransactionState();
+		void setLocalTransactionState(LocalTransactionState localTransactionState);
+
+	private:
+		LocalTransactionState m_localTransactionState;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/TopicFilterType.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/TopicFilterType.h b/rocketmq-client4cpp/include/TopicFilterType.h
new file mode 100755
index 0000000..e51ae20
--- /dev/null
+++ b/rocketmq-client4cpp/include/TopicFilterType.h
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_TOPICFILTERTYPE_H__
+#define __RMQ_TOPICFILTERTYPE_H__
+
+namespace rmq
+{
+	/**
+	 * Topic filter type
+	 *
+	 */
+	enum TopicFilterType
+	{
+		SINGLE_TAG,
+		MULTI_TAG
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/rocketmq.mk
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/rocketmq.mk b/rocketmq-client4cpp/rocketmq.mk
new file mode 100644
index 0000000..eecc458
--- /dev/null
+++ b/rocketmq-client4cpp/rocketmq.mk
@@ -0,0 +1,6 @@
+ROCKETMQ_PATH := /data/libs/rocketmq
+
+INCLUDE += -I$(ROCKETMQ_PATH)/include 
+INCLUDE_32 += -I$(ROCKETMQ_PATH)/include -march=i686
+LIB_32 += -L$(ROCKETMQ_PATH)/lib32 -lrocketmq -lz -lrt -lpthread
+LIB_64 += -L$(ROCKETMQ_PATH)/lib64 -lrocketmq -lz -lrt -lpthread

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientConfig.cpp b/rocketmq-client4cpp/src/ClientConfig.cpp
new file mode 100755
index 0000000..986d67d
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientConfig.cpp
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2010-2013 kangliqiang, kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdlib.h>
+#include <sstream>
+
+#include "MQClientException.h"
+#include "SocketUtil.h"
+#include "ClientConfig.h"
+#include "UtilAll.h"
+#include "MixAll.h"
+
+namespace rmq
+{
+
+ClientConfig::ClientConfig()
+{
+    char* addr = getenv(MixAll::NAMESRV_ADDR_ENV.c_str());
+    if (addr)
+    {
+        m_namesrvAddr = addr;
+    }
+    else
+    {
+        m_namesrvAddr = "";
+    }
+
+    m_clientIP = getLocalAddress();
+    m_instanceName = "DEFAULT";
+    m_clientCallbackExecutorThreads = UtilAll::availableProcessors();
+    m_pollNameServerInterval = 1000 * 30;
+    m_heartbeatBrokerInterval = 1000 * 30;
+    m_persistConsumerOffsetInterval = 1000 * 5;
+}
+
+ClientConfig::~ClientConfig()
+{
+}
+
+std::string ClientConfig::buildMQClientId()
+{
+    return m_clientIP + "@" + m_instanceName;
+}
+
+void ClientConfig::changeInstanceNameToPID()
+{
+    if (m_instanceName == "DEFAULT")
+    {
+        m_instanceName = UtilAll::toString(UtilAll::getPid());
+    }
+}
+
+
+void ClientConfig::resetClientConfig(const ClientConfig& cc)
+{
+    m_namesrvAddr = cc.m_namesrvAddr;
+    m_clientIP = cc.m_clientIP;
+    m_instanceName = cc.m_instanceName;
+    m_clientCallbackExecutorThreads = cc.m_clientCallbackExecutorThreads;
+    m_pollNameServerInterval = cc.m_pollNameServerInterval;
+    m_heartbeatBrokerInterval = cc.m_heartbeatBrokerInterval;
+    m_persistConsumerOffsetInterval = cc.m_persistConsumerOffsetInterval;
+}
+
+ClientConfig ClientConfig::cloneClientConfig()
+{
+    return *this;
+}
+
+std::string ClientConfig::getNamesrvAddr()
+{
+    return m_namesrvAddr;
+}
+
+void ClientConfig::setNamesrvAddr(const std::string& namesrvAddr)
+{
+    m_namesrvAddr = namesrvAddr;
+}
+
+std::string ClientConfig::getClientIP()
+{
+    return m_clientIP;
+}
+
+void ClientConfig::setClientIP(const std::string& clientIP)
+{
+    m_clientIP = clientIP;
+}
+
+std::string ClientConfig::getInstanceName()
+{
+    return m_instanceName;
+}
+
+void ClientConfig::setInstanceName(const std::string& instanceName)
+{
+    m_instanceName = instanceName;
+}
+
+int ClientConfig::getClientCallbackExecutorThreads()
+{
+    return m_clientCallbackExecutorThreads;
+}
+
+void ClientConfig::setClientCallbackExecutorThreads(int clientCallbackExecutorThreads)
+{
+    m_clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+}
+
+int ClientConfig::getPollNameServerInterval()
+{
+    return m_pollNameServerInterval;
+}
+
+void ClientConfig::setPollNameServerInterval(int pollNameServerInterval)
+{
+    m_pollNameServerInterval = pollNameServerInterval;
+}
+
+int ClientConfig::getHeartbeatBrokerInterval()
+{
+    return m_heartbeatBrokerInterval;
+}
+
+void ClientConfig::setHeartbeatBrokerInterval(int heartbeatBrokerInterval)
+{
+    m_heartbeatBrokerInterval = heartbeatBrokerInterval;
+}
+
+int ClientConfig:: getPersistConsumerOffsetInterval()
+{
+    return m_persistConsumerOffsetInterval;
+}
+
+void ClientConfig::setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval)
+{
+    m_persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+}
+
+
+std::string ClientConfig::toString() const
+{
+	std::stringstream ss;
+	ss 	<< "{namesrvAddr=" << m_namesrvAddr
+		<< ",clientIP=" << m_clientIP
+		<< ",instanceName=" << m_instanceName
+		<< ",clientCallbackExecutorThreads=" << m_clientCallbackExecutorThreads
+		<< ",pollNameServerInteval=" << m_pollNameServerInterval
+		<< ",heartbeatBrokerInterval=" << m_heartbeatBrokerInterval
+		<< ",persistConsumerOffsetInterval=" << m_persistConsumerOffsetInterval
+		<<"}";
+	return ss.str();
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
new file mode 100755
index 0000000..ae88de5
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
@@ -0,0 +1,154 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ClientRemotingProcessor.h"
+#include "MQProtos.h"
+#include "TcpTransport.h"
+#include "RemotingCommand.h"
+#include "MQClientFactory.h"
+#include "CommandCustomHeader.h"
+#include "ConsumerRunningInfo.h"
+
+
+
+namespace rmq
+{
+
+ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* pMQClientFactory)
+    : m_pMQClientFactory(pMQClientFactory)
+{
+
+}
+
+RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+    int code = pRequest->getCode();
+    switch (code)
+    {
+        case CHECK_TRANSACTION_STATE_VALUE:
+            return checkTransactionState(pTts, pRequest);
+        case NOTIFY_CONSUMER_IDS_CHANGED_VALUE:
+            return notifyConsumerIdsChanged(pTts, pRequest);
+        case RESET_CONSUMER_CLIENT_OFFSET_VALUE:
+            return resetOffset(pTts, pRequest);
+        case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE:
+            return getConsumeStatus(pTts, pRequest);
+        case GET_CONSUMER_RUNNING_INFO_VALUE:
+            return getConsumerRunningInfo(pTts, pRequest);
+        case CONSUME_MESSAGE_DIRECTLY_VALUE:
+            return consumeMessageDirectly(pTts, pRequest);
+        default:
+            break;
+    }
+
+    return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+    //TODO
+    return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+    try
+    {
+        NotifyConsumerIdsChangedRequestHeader* extHeader = (NotifyConsumerIdsChangedRequestHeader*)pRequest->getCommandCustomHeader();
+        RMQ_INFO("receive broker's notification[{%s}], the consumer group: {%s} changed, rebalance immediately",
+                pTts->getServerAddr().c_str(),
+                extHeader->consumerGroup.c_str());
+        m_pMQClientFactory->rebalanceImmediately();
+    }
+    catch (std::exception& e)
+    {
+        RMQ_ERROR("notifyConsumerIdsChanged exception: %s", e.what());
+    }
+
+    return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::resetOffset(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+    //TODO
+    return NULL;
+}
+
+
+RemotingCommand* ClientRemotingProcessor::getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+    //TODO
+    return NULL;
+}
+
+
+RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+	return NULL;
+
+	/*
+    GetConsumerRunningInfoRequestHeader* requestHeader = (GetConsumerRunningInfoRequestHeader)pRequest->getCommandCustomHeader();
+	RemotingCommand* pResponse = RemotingCommand::createResponseCommand(NULL);
+
+	pResponse = RemotingCommand::createResponseCommand(
+					REQUEST_CODE_NOT_SUPPORTED_VALUE, "request type not supported", NULL);
+	pResponse->setOpaque(pCmd->getOpaque());
+
+    ConsumerRunningInfo* consumerRunningInfo = m_pMQClientFactory->consumerRunningInfo(requestHeader->consumerGroup);
+    if (NULL != consumerRunningInfo) {
+        response.setCode(ResponseCode.SUCCESS);
+        response.setBody(consumerRunningInfo.encode());
+    } else {
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer",
+                requestHeader.getConsumerGroup()));
+    }
+    return pResponse;
+
+	// java
+    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+    final GetConsumerRunningInfoRequestHeader requestHeader =
+            (GetConsumerRunningInfoRequestHeader) request
+                    .decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+
+    ConsumerRunningInfo consumerRunningInfo =
+            this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
+    if (null != consumerRunningInfo) {
+        if (requestHeader.isJstackEnable()) {
+            String jstack = UtilAll.jstack();
+            consumerRunningInfo.setJstack(jstack);
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setBody(consumerRunningInfo.encode());
+    } else {
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer",
+                requestHeader.getConsumerGroup()));
+    }
+
+    return response;
+    */
+}
+
+
+RemotingCommand* ClientRemotingProcessor::consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+    //TODO
+    return NULL;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientRemotingProcessor.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.h b/rocketmq-client4cpp/src/ClientRemotingProcessor.h
new file mode 100755
index 0000000..4cd2873
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientRemotingProcessor.h
@@ -0,0 +1,45 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __CLIENTREMOTINGPROCESSOR_H__
+#define __CLIENTREMOTINGPROCESSOR_H__
+
+#include "TcpRequestProcessor.h"
+
+namespace rmq
+{
+	class MQClientFactory;
+	class RemotingCommand;
+
+	class ClientRemotingProcessor : public TcpRequestProcessor
+	{
+	public:
+	    ClientRemotingProcessor(MQClientFactory* pMQClientFactory);
+
+	    RemotingCommand* processRequest(TcpTransport* pTts, RemotingCommand* pRequest);
+	    RemotingCommand* checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest);
+	    RemotingCommand* notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest);
+	    RemotingCommand* resetOffset(TcpTransport* pTts, RemotingCommand* pRequest);
+	    RemotingCommand* getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest);
+	    RemotingCommand* getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest);
+	    RemotingCommand* consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest);
+
+	private:
+	    MQClientFactory* m_pMQClientFactory;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/CommunicationMode.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/CommunicationMode.h b/rocketmq-client4cpp/src/CommunicationMode.h
new file mode 100755
index 0000000..43b2941
--- /dev/null
+++ b/rocketmq-client4cpp/src/CommunicationMode.h
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __COMMUNICATIONMODE_H__
+#define __COMMUNICATIONMODE_H__
+
+namespace rmq
+{
+	/**
+	 * Communication Mode
+	 *
+	 */
+	enum CommunicationMode
+	{
+	    SYNC,
+	    ASYNC,
+	    ONEWAY
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/FindBrokerResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/FindBrokerResult.h b/rocketmq-client4cpp/src/FindBrokerResult.h
new file mode 100644
index 0000000..51a9845
--- /dev/null
+++ b/rocketmq-client4cpp/src/FindBrokerResult.h
@@ -0,0 +1,28 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __FINDBROKERRESULT_H__
+#define __FINDBROKERRESULT_H__
+
+namespace rmq
+{
+	typedef struct
+	{
+	    std::string brokerAddr;
+	    bool slave;
+	} FindBrokerResult;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQAdminImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQAdminImpl.cpp b/rocketmq-client4cpp/src/MQAdminImpl.cpp
new file mode 100755
index 0000000..2a6b597
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQAdminImpl.cpp
@@ -0,0 +1,295 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <list>
+#include "SocketUtil.h"
+#include "MQAdminImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+#include "TopicConfig.h"
+#include "TopicPublishInfo.h"
+#include "MessageId.h"
+#include "MessageDecoder.h"
+
+namespace rmq
+{
+
+
+MQAdminImpl::MQAdminImpl(MQClientFactory* pMQClientFactory)
+{
+    m_pMQClientFactory = pMQClientFactory;
+}
+
+MQAdminImpl::~MQAdminImpl()
+{
+
+}
+
+void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic,
+	int queueNum)
+{
+	return createTopic(key, newTopic, queueNum, 0);
+}
+
+
+void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic,
+	int queueNum, int topicSysFlag)
+{
+    try
+    {
+        MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl();
+        TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(key, 1000 * 3);
+
+        std::list<BrokerData> brokerDataList = topicRouteData->getBrokerDatas();
+        if (!brokerDataList.empty())
+        {
+            brokerDataList.sort();
+
+            MQClientException exception("", 0, "", 0);
+            bool hasException = false;
+
+            std::list<BrokerData>::iterator it = brokerDataList.begin();
+
+            for (; it != brokerDataList.end(); it++)
+            {
+                std::map<int, std::string>::iterator it1 = (*it).brokerAddrs.find(MixAll::MASTER_ID);
+                if (it1 != (*it).brokerAddrs.end())
+                {
+                    std::string addr = it1->second;
+
+                    TopicConfig topicConfig(newTopic);
+                    topicConfig.setReadQueueNums(queueNum);
+                    topicConfig.setWriteQueueNums(queueNum);
+                    topicConfig.setTopicSysFlag(topicSysFlag);
+
+                    try
+                    {
+                        api->createTopic(addr, key, topicConfig, 1000 * 3);
+                    }
+                    catch (MQClientException& e)
+                    {
+                        hasException = true;
+                        exception = e;
+                    }
+                }
+            }
+
+            if (hasException)
+            {
+                throw exception;
+            }
+        }
+        else
+        {
+            THROW_MQEXCEPTION(MQClientException, "Not found broker, maybe key is wrong", -1);
+        }
+    }
+    catch (MQClientException e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "create new topic failed", -1);
+    }
+}
+
+std::vector<MessageQueue>* MQAdminImpl::fetchPublishMessageQueues(const std::string& topic)
+{
+    try
+    {
+        MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl();
+        TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+
+        if (topicRouteData.ptr() != NULL)
+        {
+            TopicPublishInfoPtr topicPublishInfo =
+                MQClientFactory::topicRouteData2TopicPublishInfo(topic, *topicRouteData);
+            if (topicPublishInfo.ptr() != NULL && topicPublishInfo->ok())
+            {
+                std::vector<MessageQueue>* ret = new std::vector<MessageQueue>();
+                (*ret) = topicPublishInfo->getMessageQueueList();
+
+				/*
+                std::vector<MessageQueue>& mqs = ;
+                std::vector<MessageQueue>::iterator it = mqs.begin();
+                for (; it != mqs.end(); it++)
+                {
+                    ret->push_back(*it);
+                }
+                */
+
+                return ret;
+            }
+        }
+    }
+    catch (MQClientException e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic, " + topic, -1);
+}
+
+std::set<MessageQueue>* MQAdminImpl::fetchSubscribeMessageQueues(const std::string& topic)
+{
+    try
+    {
+        TopicRouteDataPtr topicRouteData =
+            m_pMQClientFactory->getMQClientAPIImpl()->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+        if (topicRouteData.ptr() != NULL)
+        {
+            std::set<MessageQueue>* mqList =
+                MQClientFactory::topicRouteData2TopicSubscribeInfo(topic, *topicRouteData);
+            if (!mqList->empty())
+            {
+                return mqList;
+            }
+            else
+            {
+                THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
+            }
+        }
+    }
+    catch (MQClientException e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic: " + topic, -1);
+}
+
+long long MQAdminImpl::searchOffset(const MessageQueue& mq, long long timestamp)
+{
+    std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return m_pMQClientFactory->getMQClientAPIImpl()->searchOffset(brokerAddr, mq.getTopic(),
+                    mq.getQueueId(), timestamp, 1000 * 3);
+        }
+        catch (MQClientException e)
+        {
+            THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
+        }
+    }
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+long long MQAdminImpl::maxOffset(const MessageQueue& mq)
+{
+    std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return m_pMQClientFactory->getMQClientAPIImpl()->getMaxOffset(brokerAddr, mq.getTopic(),
+                    mq.getQueueId(), 1000 * 3);
+        }
+        catch (MQClientException e)
+        {
+            THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
+        }
+    }
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+long long MQAdminImpl::minOffset(const MessageQueue& mq)
+{
+    std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return m_pMQClientFactory->getMQClientAPIImpl()->getMinOffset(brokerAddr, mq.getTopic(),
+                    mq.getQueueId(), 1000 * 3);
+        }
+        catch (MQClientException e)
+        {
+            THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
+        }
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+long long MQAdminImpl::earliestMsgStoreTime(const MessageQueue& mq)
+{
+    std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return m_pMQClientFactory->getMQClientAPIImpl()->getEarliestMsgStoretime(brokerAddr,
+                    mq.getTopic(), mq.getQueueId(), 1000 * 3);
+        }
+        catch (MQClientException e)
+        {
+            THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
+        }
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+MessageExt* MQAdminImpl::viewMessage(const std::string& msgId)
+{
+    try
+    {
+        MessageId messageId = MessageDecoder::decodeMessageId(msgId);
+        return m_pMQClientFactory->getMQClientAPIImpl()->viewMessage(
+                   socketAddress2String(messageId.getAddress()), messageId.getOffset(), 1000 * 3);
+    }
+    catch (UnknownHostException e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "message id illegal", -1);
+    }
+}
+
+QueryResult MQAdminImpl::queryMessage(const std::string& topic,
+                                      const std::string& key,
+                                      int maxNum, long long begin, long long end)
+{
+    //TODO
+    std::list<MessageExt*> messageList;
+    QueryResult result(0, messageList);
+
+    return result;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQAdminImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQAdminImpl.h b/rocketmq-client4cpp/src/MQAdminImpl.h
new file mode 100755
index 0000000..907d61e
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQAdminImpl.h
@@ -0,0 +1,63 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQADMINIMPL_H__
+#define __MQADMINIMPL_H__
+
+#include <string>
+#include <list>
+#include <set>
+#include <vector>
+
+#include "MessageExt.h"
+#include "QueryResult.h"
+
+namespace rmq
+{
+    class MQClientFactory;
+    class MessageQueue;
+
+    class MQAdminImpl
+    {
+    public:
+        MQAdminImpl(MQClientFactory* pMQClientFactory);
+        ~MQAdminImpl();
+
+        void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
+		void createTopic(const std::string& key, const std::string& newTopic, int queueNum, int topicSysFlag);
+
+        std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic);
+        std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic);
+        long long searchOffset(const MessageQueue& mq, long long timestamp);
+        long long maxOffset(const MessageQueue& mq);
+        long long minOffset(const MessageQueue& mq);
+
+        long long earliestMsgStoreTime(const MessageQueue& mq);
+
+        MessageExt* viewMessage(const std::string& msgId);
+
+        QueryResult queryMessage(const std::string& topic,
+                                 const std::string& key,
+                                 int maxNum,
+                                 long long begin,
+                                 long long end);
+
+    private:
+        MQClientFactory* m_pMQClientFactory;
+    };
+}
+
+#endif


Mime
View raw message