http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQClientException.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQClientException.h b/rocketmq-client4cpp/include/MQClientException.h
new file mode 100755
index 0000000..f1d1d04
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQClientException.h
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RMQ_MQCLIENTEXCEPTION_H__
+#define __RMQ_MQCLIENTEXCEPTION_H__
+
+#include <string>
+#include <ostream>
+#include <sstream>
+#include <exception>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+ class MQException : public std::exception
+ {
+ public:
+ MQException(const std::string& msg, int error,const char* file,int line)throw()
+ : m_error(error),m_line(line),m_file(file)
+ {
+ try
+ {
+ std::stringstream ss;
+ ss << "[" << file << ":" << line <<"]|error: " << error << "|msg:" << msg;
+ m_msg = ss.str();
+ }
+ catch (...)
+ {
+ }
+ }
+
+ virtual ~MQException()throw()
+ {
+ }
+
+ const char* what() const throw()
+ {
+ return m_msg.c_str();
+ }
+
+ int GetError() const throw()
+ {
+ return m_error;
+ }
+
+ virtual const char* GetType() const throw()
+ {
+ return "MQException";
+ }
+
+ protected:
+ int m_error;
+ int m_line;
+ std::string m_msg;
+ std::string m_file;
+ };
+
+ inline std::ostream& operator<<(std::ostream& os, const MQException& e)
+ {
+ os <<"Type:"<<e.GetType() << e.what();
+ return os;
+ }
+
+ #define DEFINE_MQCLIENTEXCEPTION(name, parent) \
+ class name : public parent \
+ {\
+ public:\
+ name(const std::string& msg, int error,const char* file,int line) throw ()\
+ : parent(msg, error, file, line) {}\
+ virtual const char* GetType() const throw()\
+ {\
+ return #name;\
+ }\
+ };
+
+ DEFINE_MQCLIENTEXCEPTION(MQClientException, MQException)
+ DEFINE_MQCLIENTEXCEPTION(MQBrokerException, MQException)
+ DEFINE_MQCLIENTEXCEPTION(InterruptedException, MQException)
+ DEFINE_MQCLIENTEXCEPTION(UnknownHostException, MQException)
+
+ DEFINE_MQCLIENTEXCEPTION(RemotingException, MQException)
+ DEFINE_MQCLIENTEXCEPTION(RemotingCommandException, RemotingException)
+ DEFINE_MQCLIENTEXCEPTION(RemotingConnectException, RemotingException)
+ DEFINE_MQCLIENTEXCEPTION(RemotingSendRequestException, RemotingException)
+ DEFINE_MQCLIENTEXCEPTION(RemotingTimeoutException, RemotingException)
+ DEFINE_MQCLIENTEXCEPTION(RemotingTooMuchRequestException, RemotingException)
+
+ #define THROW_MQEXCEPTION(e,msg,err) throw e(msg,err,__FILE__,__LINE__)
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQConsumer.h b/rocketmq-client4cpp/include/MQConsumer.h
new file mode 100755
index 0000000..87efe97
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQConsumer.h
@@ -0,0 +1,48 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MQCONSUMER_H__
+#define __RMQ_MQCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQAdmin.h"
+#include "ConsumeType.h"
+
+
+namespace rmq
+{
+ class MessageExt;
+
+ /**
+ * Consumer interface
+ *
+ */
+ class MQConsumer : public MQAdmin
+ {
+ public:
+ virtual ~MQConsumer(){}
+
+ virtual void start()=0;
+ virtual void shutdown()=0;
+
+ virtual void sendMessageBack(MessageExt& msg, int delayLevel)=0;
+ virtual std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic)=0;
+ };
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQProducer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQProducer.h b/rocketmq-client4cpp/include/MQProducer.h
new file mode 100755
index 0000000..b353aba
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQProducer.h
@@ -0,0 +1,71 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License")=0;
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MQPRODUCER_H__
+#define __RMQ_MQPRODUCER_H__
+
+#include <vector>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQAdmin.h"
+#include "SendResult.h"
+
+namespace rmq
+{
+ class MessageQueue;
+ class SendCallback;
+ class LocalTransactionExecuter;
+ class MessageQueueSelector;
+
+ /**
+ * Producer interface
+ *
+ */
+ class MQProducer : public MQAdmin
+ {
+ public:
+ MQProducer()
+ {
+ }
+
+ virtual ~MQProducer()
+ {
+ }
+
+ virtual void start()=0;
+ virtual void shutdown()=0;
+
+ virtual std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic)=0;
+
+ virtual SendResult send(Message& msg)=0;
+ virtual void send(Message& msg, SendCallback* sendCallback)=0;
+ virtual void sendOneway(Message& msg)=0;
+
+ virtual SendResult send(Message& msg, MessageQueue& mq)=0;
+ virtual void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback)=0;
+ virtual void sendOneway(Message& msg, MessageQueue& mq)=0;
+
+ virtual SendResult send(Message& msg, MessageQueueSelector* selector, void* arg)=0;
+ virtual void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback)=0;
+ virtual void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg)=0;
+
+ virtual TransactionSendResult sendMessageInTransaction(Message& msg,
+ LocalTransactionExecuter* tranExecuter,
+ void* arg)=0;
+ };
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQPullConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQPullConsumer.h b/rocketmq-client4cpp/include/MQPullConsumer.h
new file mode 100755
index 0000000..ffb2ac5
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQPullConsumer.h
@@ -0,0 +1,54 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MQPULLCONSUMER_H__
+#define __RMQ_MQPULLCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQConsumer.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+ class MessageQueueListener;
+ class MessageQueue;
+ class PullCallback;
+
+ /**
+ * Pull Consumer
+ *
+ */
+ class MQPullConsumer : public MQConsumer
+ {
+ public:
+ virtual ~MQPullConsumer(){}
+ virtual void registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener)=0;
+
+ virtual PullResult* pull(MessageQueue& mq, const std::string& subExpression, long long offset,int maxNums)=0;
+ virtual void pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0;
+
+ virtual PullResult* pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)=0;
+ virtual void pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0;
+
+ virtual void updateConsumeOffset(MessageQueue& mq, long long offset)=0;
+ virtual long long fetchConsumeOffset(MessageQueue& mq, bool fromStore)=0;
+
+ virtual std::set<MessageQueue>* fetchMessageQueuesInBalance(const std::string& topic)=0;
+ };
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQPushConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQPushConsumer.h b/rocketmq-client4cpp/include/MQPushConsumer.h
new file mode 100755
index 0000000..fe6d4a0
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQPushConsumer.h
@@ -0,0 +1,49 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MQPUSHCONSUMER_H__
+#define __RMQ_MQPUSHCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQConsumer.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+ class MessageListener;
+
+ /**
+ * Push Consumer
+ *
+ */
+ class MQPushConsumer : public MQConsumer
+ {
+ public:
+ virtual void registerMessageListener(MessageListener* pMessageListener)=0;
+
+
+ virtual void subscribe(const std::string& topic, const std::string& subExpression)=0;
+ virtual void unsubscribe(const std::string& topic)=0;
+
+
+ virtual void updateCorePoolSize(int corePoolSize)=0;
+ virtual void suspend()=0;
+ virtual void resume()=0;
+ };
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/Message.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/Message.h b/rocketmq-client4cpp/include/Message.h
new file mode 100755
index 0000000..441b4e5
--- /dev/null
+++ b/rocketmq-client4cpp/include/Message.h
@@ -0,0 +1,136 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGE_H__
+#define __RMQ_MESSAGE_H__
+
+#include <map>
+#include <string>
+#include <list>
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+ /**
+ * Message
+ *
+ */
+ class Message
+ {
+ public:
+ Message();
+ Message(const std::string& topic, const char* body,int len);
+ Message(const std::string& topic, const std::string& tags, const char* body,int len);
+ Message(const std::string& topic, const std::string& tags,const std::string& keys, const char* body,int len);
+ Message(const std::string& topic,
+ const std::string& tags,
+ const std::string& keys,
+ const int flag,
+ const char* body,
+ int len,
+ bool waitStoreMsgOK);
+
+ virtual ~Message();
+ Message(const Message& other);
+ Message& operator=(const Message& other);
+
+ void clearProperty(const std::string& name);
+ void putProperty(const std::string& name, const std::string& value);
+ std::string getProperty(const std::string& name);
+
+ std::string getTopic()const;
+ void setTopic(const std::string& topic);
+
+ std::string getTags();
+ void setTags(const std::string& tags);
+
+ std::string getKeys();
+ void setKeys(const std::string& keys);
+ void setKeys(const std::list<std::string> keys);
+
+ int getDelayTimeLevel();
+ void setDelayTimeLevel(int level);
+
+ bool isWaitStoreMsgOK();
+ void setWaitStoreMsgOK(bool waitStoreMsgOK);
+
+ int getFlag();
+ void setFlag(int flag);
+
+ const char* getBody() const;
+ int getBodyLen() const;
+ void setBody(const char* body, int len);
+
+ bool tryToCompress(int compressLevel);
+ const char* getCompressBody() const;
+ int getCompressBodyLen() const;
+
+ std::map<std::string, std::string>& getProperties();
+ void setProperties(const std::map<std::string, std::string>& properties);
+
+ std::string toString() const;
+
+ protected:
+ void Init(const std::string& topic,
+ const std::string& tags,
+ const std::string& keys,
+ const int flag,
+ const char* body,
+ int len,
+ bool waitStoreMsgOK);
+
+ public:
+ static const std::string PROPERTY_KEYS;
+ static const std::string PROPERTY_TAGS;
+ static const std::string PROPERTY_WAIT_STORE_MSG_OK;
+ static const std::string PROPERTY_DELAY_TIME_LEVEL;
+
+ /**
+ * for inner use
+ */
+ static const std::string PROPERTY_RETRY_TOPIC;
+ static const std::string PROPERTY_REAL_TOPIC;
+ static const std::string PROPERTY_REAL_QUEUE_ID;
+ static const std::string PROPERTY_TRANSACTION_PREPARED;
+ static const std::string PROPERTY_PRODUCER_GROUP;
+ static const std::string PROPERTY_MIN_OFFSET;
+ static const std::string PROPERTY_MAX_OFFSET;
+ static const std::string PROPERTY_BUYER_ID;
+ static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
+ static const std::string PROPERTY_TRANSFER_FLAG;
+ static const std::string PROPERTY_CORRECTION_FLAG;
+ static const std::string PROPERTY_MQ2_FLAG;
+ static const std::string PROPERTY_RECONSUME_TIME;
+ static const std::string PROPERTY_MSG_REGION;
+ static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
+ static const std::string PROPERTY_MAX_RECONSUME_TIMES;
+ static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
+
+ static const std::string KEY_SEPARATOR;
+ private:
+ std::string m_topic;
+ int m_flag;
+ std::map<std::string, std::string> m_properties;
+
+ char* m_body;
+ int m_bodyLen;
+
+ char* m_compressBody;
+ int m_compressBodyLen;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageExt.h b/rocketmq-client4cpp/include/MessageExt.h
new file mode 100755
index 0000000..f70041c
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageExt.h
@@ -0,0 +1,108 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MESSAGEEXT_H__
+#define __RMQ_MESSAGEEXT_H__
+
+#include <sys/socket.h>
+#include <string>
+#include "Message.h"
+#include "TopicFilterType.h"
+#include "RocketMQClient.h"
+
+namespace rmq
+ {
+ /**
+ * Message extend
+ *
+ */
+ class MessageExt : public Message
+ {
+ public:
+ MessageExt();
+
+ MessageExt(int queueId,
+ long long bornTimestamp,
+ sockaddr bornHost,
+ long long storeTimestamp,
+ sockaddr storeHost,
+ std::string msgId);
+
+ ~MessageExt();
+
+ static TopicFilterType parseTopicFilterType(int sysFlag);
+
+ int getQueueId();
+ void setQueueId(int queueId);
+
+ long long getBornTimestamp();
+ void setBornTimestamp(long long bornTimestamp);
+
+ sockaddr getBornHost();
+ std::string getBornHostString();
+ std::string getBornHostNameString();
+ void setBornHost(const sockaddr& bornHost);
+
+ long long getStoreTimestamp();
+ void setStoreTimestamp(long long storeTimestamp);
+
+ sockaddr getStoreHost();
+ std::string getStoreHostString();
+ void setStoreHost(const sockaddr& storeHost);
+
+ std::string getMsgId();
+ void setMsgId(const std::string& msgId);
+
+ int getSysFlag();
+ void setSysFlag(int sysFlag);
+
+ int getBodyCRC();
+ void setBodyCRC(int bodyCRC);
+
+ long long getQueueOffset();
+ void setQueueOffset(long long queueOffset);
+
+ long long getCommitLogOffset();
+ void setCommitLogOffset(long long physicOffset);
+
+ int getStoreSize();
+ void setStoreSize(int storeSize);
+
+ int getReconsumeTimes();
+ void setReconsumeTimes(int reconsumeTimes);
+
+ long long getPreparedTransactionOffset();
+ void setPreparedTransactionOffset(long long preparedTransactionOffset);
+
+ std::string toString() const;
+
+ private:
+ long long m_queueOffset;
+ long long m_commitLogOffset;
+ long long m_bornTimestamp;
+ long long m_storeTimestamp;
+ long long m_preparedTransactionOffset;
+ int m_queueId;
+ int m_storeSize;
+ int m_sysFlag;
+ int m_bodyCRC;
+ int m_reconsumeTimes;
+ sockaddr m_bornHost;
+ sockaddr m_storeHost;
+ std::string m_msgId;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageListener.h b/rocketmq-client4cpp/include/MessageListener.h
new file mode 100755
index 0000000..130a219
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageListener.h
@@ -0,0 +1,94 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGELISTENER_H__
+#define __RMQ_MESSAGELISTENER_H__
+
+#include <limits.h>
+#include <list>
+
+#include "MessageExt.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+ /**
+ * Message Listener
+ *
+ */
+ class MessageListener
+ {
+ public:
+ virtual ~MessageListener(){}
+ };
+
+ enum ConsumeOrderlyStatus
+ {
+ SUCCESS,
+ ROLLBACK,
+ COMMIT,
+ SUSPEND_CURRENT_QUEUE_A_MOMENT,
+ };
+
+ typedef struct tagConsumeOrderlyContext
+ {
+ tagConsumeOrderlyContext(MessageQueue& mq)
+ :messageQueue(mq),
+ autoCommit(true),
+ suspendCurrentQueueTimeMillis(1000)
+ {
+ }
+
+ MessageQueue messageQueue;///< Ҫ���ѵ���Ϣ�����ĸ�����
+ bool autoCommit;///< ��ϢOffset�Ƿ��Զ��ύ
+ long suspendCurrentQueueTimeMillis;
+ }ConsumeOrderlyContext;
+
+ class MessageListenerOrderly : public MessageListener
+ {
+ public:
+ virtual ConsumeOrderlyStatus consumeMessage(std::list<MessageExt*>& msgs,
+ ConsumeOrderlyContext& context)=0;
+ };
+
+ enum ConsumeConcurrentlyStatus
+ {
+ CONSUME_SUCCESS,
+ RECONSUME_LATER,
+ };
+
+ struct ConsumeConcurrentlyContext
+ {
+ ConsumeConcurrentlyContext(MessageQueue& mq)
+ :messageQueue(mq),
+ delayLevelWhenNextConsume(0),
+ ackIndex(INT_MAX)
+ {
+ }
+ MessageQueue messageQueue;
+ int delayLevelWhenNextConsume;
+ int ackIndex;
+ };
+
+ class MessageListenerConcurrently : public MessageListener
+ {
+ public:
+ virtual ConsumeConcurrentlyStatus consumeMessage(std::list<MessageExt*>& msgs,
+ ConsumeConcurrentlyContext& context)=0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageQueue.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageQueue.h b/rocketmq-client4cpp/include/MessageQueue.h
new file mode 100755
index 0000000..89ddf58
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageQueue.h
@@ -0,0 +1,70 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGEQUEUE_H__
+#define __RMQ_MESSAGEQUEUE_H__
+
+#include <iostream>
+#include <string>
+#include <sstream>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+ /**
+ * Message Queue
+ *
+ */
+ class MessageQueue
+ {
+ public:
+ MessageQueue();
+ ~MessageQueue(){};
+
+ MessageQueue(const std::string& topic, const std::string& brokerName, int queueId);
+
+ std::string getTopic()const;
+ void setTopic(const std::string& topic);
+
+ std::string getBrokerName()const;
+ void setBrokerName(const std::string& brokerName);
+
+ int getQueueId()const;
+ void setQueueId(int queueId);
+
+ int hashCode();
+ std::string toString() const;
+ std::string toJsonString() const;
+
+ bool operator==(const MessageQueue& mq) const;
+ bool operator<(const MessageQueue& mq) const;
+ int compareTo(const MessageQueue& mq) const;
+
+ private:
+ std::string m_topic;
+ std::string m_brokerName;
+ int m_queueId;
+ };
+
+ inline std::ostream& operator<<(std::ostream& os, const MessageQueue& obj)
+ {
+ os << obj.toString();
+ return os;
+ }
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageQueueListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageQueueListener.h b/rocketmq-client4cpp/include/MessageQueueListener.h
new file mode 100755
index 0000000..9f04c3e
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageQueueListener.h
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_MESSAGEQUEUELISTENER_H__
+#define __RMQ_MESSAGEQUEUELISTENER_H__
+
+#include <set>
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+ /**
+ * Message Queue Listener
+ *
+ */
+ class MessageQueueListener
+ {
+ public:
+ virtual ~MessageQueueListener() {}
+ virtual void messageQueueChanged(const std::string& topic,
+ std::set<MessageQueue>& mqAll,
+ std::set<MessageQueue>& mqDivided)=0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/OffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/OffsetStore.h b/rocketmq-client4cpp/include/OffsetStore.h
new file mode 100755
index 0000000..a533750
--- /dev/null
+++ b/rocketmq-client4cpp/include/OffsetStore.h
@@ -0,0 +1,58 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_OFFSETSTORE_H__
+#define __RMQ_OFFSETSTORE_H__
+
+#include <set>
+#include <map>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+ class MessageQueue;
+
+ enum ReadOffsetType
+ {
+ READ_FROM_MEMORY,
+ READ_FROM_STORE,
+ MEMORY_FIRST_THEN_STORE,
+ };
+
+ /**
+ * Consumer Offset Store
+ *
+ */
+ class OffsetStore
+ {
+ public:
+ virtual ~OffsetStore() {}
+
+ virtual void load()=0;
+
+ virtual void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)=0;
+ virtual long long readOffset(const MessageQueue& mq, ReadOffsetType type)=0;
+
+ virtual void persistAll(std::set<MessageQueue>& mqs)=0;
+ virtual void persist(const MessageQueue& mq)=0;
+
+ virtual void removeOffset(const MessageQueue& mq)=0;
+
+ virtual std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic) = 0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/PullCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/PullCallback.h b/rocketmq-client4cpp/include/PullCallback.h
new file mode 100755
index 0000000..47ade68
--- /dev/null
+++ b/rocketmq-client4cpp/include/PullCallback.h
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_PULLCALLBACK_H__
+#define __RMQ_PULLCALLBACK_H__
+
+#include "RocketMQClient.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+ class MQException;
+
+ /**
+ * PullCallback
+ *
+ */
+ class PullCallback
+ {
+ public:
+ virtual ~PullCallback() {}
+ virtual void onSuccess(PullResult& pullResult)=0;
+ virtual void onException(MQException& e)=0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/PullResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/PullResult.h b/rocketmq-client4cpp/include/PullResult.h
new file mode 100755
index 0000000..42c13ca
--- /dev/null
+++ b/rocketmq-client4cpp/include/PullResult.h
@@ -0,0 +1,91 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_PULLRESULT_H__
+#define __RMQ_PULLRESULT_H__
+
+#include <list>
+#include <string>
+#include <sstream>
+
+#include "RocketMQClient.h"
+#include "MessageExt.h"
+
+namespace rmq
+{
+ enum PullStatus
+ {
+ FOUND,
+ NO_NEW_MSG,
+ NO_MATCHED_MSG,
+ OFFSET_ILLEGAL
+ };
+
+ /**
+ * PullResult
+ *
+ */
+ struct PullResult
+ {
+ PullResult()
+ {
+
+ }
+
+ PullResult(PullStatus pullStatus,
+ long long nextBeginOffset,
+ long long minOffset,
+ long long maxOffset,
+ std::list<MessageExt*>& msgFoundList)
+ :pullStatus(pullStatus),
+ nextBeginOffset(nextBeginOffset),
+ minOffset(minOffset),
+ maxOffset(maxOffset),
+ msgFoundList(msgFoundList)
+ {
+
+ }
+
+ ~PullResult()
+ {
+ std::list<MessageExt*>::iterator it = msgFoundList.begin();
+
+ for (;it!=msgFoundList.end();it++)
+ {
+ delete *it;
+ }
+ }
+
+ std::string toString() const
+ {
+ std::stringstream ss;
+ ss << "{pullStatus=" << pullStatus
+ << ",nextBeginOffset=" << nextBeginOffset
+ << ",minOffset=" << nextBeginOffset
+ << ",maxOffset=" << nextBeginOffset
+ << ",msgFoundList.size=" << msgFoundList.size()
+ <<"}";
+ return ss.str();
+ }
+
+ PullStatus pullStatus;
+ long long nextBeginOffset;
+ long long minOffset;
+ long long maxOffset;
+ std::list<MessageExt*> msgFoundList;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/QueryResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/QueryResult.h b/rocketmq-client4cpp/include/QueryResult.h
new file mode 100644
index 0000000..13164e4
--- /dev/null
+++ b/rocketmq-client4cpp/include/QueryResult.h
@@ -0,0 +1,56 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_QUERYRESULT_H__
+#define __RMQ_QUERYRESULT_H__
+
+#include <list>
+
+#include "RocketMQClient.h"
+#include "MessageExt.h"
+
+namespace rmq
+{
+ /**
+ * QueryResult
+ *
+ */
+ class QueryResult
+ {
+ public:
+ QueryResult(long long indexLastUpdateTimestamp, const std::list<MessageExt*>& messageList)
+ {
+ m_indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+ m_messageList = messageList;
+ }
+
+ long long getIndexLastUpdateTimestamp()
+ {
+ return m_indexLastUpdateTimestamp;
+ }
+
+ std::list<MessageExt*>& getMessageList()
+ {
+ return m_messageList;
+ }
+
+ private:
+ long long m_indexLastUpdateTimestamp;
+ std::list<MessageExt*> m_messageList;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/RocketMQClient.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/RocketMQClient.h b/rocketmq-client4cpp/include/RocketMQClient.h
new file mode 100755
index 0000000..e4c71c9
--- /dev/null
+++ b/rocketmq-client4cpp/include/RocketMQClient.h
@@ -0,0 +1,100 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_ROCKETMQCLIENT_H__
+#define __RMQ_ROCKETMQCLIENT_H__
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <assert.h>
+#include <time.h>
+#include <stdarg.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <signal.h>
+#include <pthread.h>
+
+#include <sys/time.h>
+#include <sys/timeb.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/file.h>
+#include <sys/syscall.h>
+#include <linux/unistd.h>
+
+#include <cstdio>
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <vector>
+#include <map>
+#include <set>
+
+
+class RocketMQUtil
+{
+public:
+ enum
+ {
+ NONE_LOG = 0,
+ ERROR_LOG = 1,
+ WARN_LOG = 2,
+ INFO_LOG = 3,
+ DEBUG_LOG = 4,
+ };
+
+public:
+ static pid_t getPid();
+ static pid_t getTid();
+
+ static int getDiffDays(time_t tmFirst, time_t tmSecond);
+ static std::string tm2str(const time_t &t, const std::string &sFormat);
+ static std::string now2str(const std::string &sFormat);
+ static std::string now2str();
+ static int64_t getNowMs();
+ static std::string str2fmt(const char* format, ...)__attribute__((format(__printf__,1,2)));
+
+ static int initLog(const std::string& sLogPath);
+ static void setLogLevel(int logLevel);
+ static void writeLog(const char* fmt, ...) __attribute__((format(__printf__,1,2)));
+ static inline bool isNeedLog(int level)
+ {
+ return (level <= _logLevel);
+ };
+
+public:
+ static volatile int _logFd;
+ static int _logLevel;
+ static std::string _logPath;
+};
+
+#define RMQ_AUTO(name, value) typeof(value) name = value
+#define RMQ_FOR_EACH(container, it) \
+ for(typeof((container).begin()) it = (container).begin();it!=(container).end(); ++it)
+
+
+
+#define RMQ_DEBUG(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::DEBUG_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0)
+#define RMQ_INFO(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::INFO_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][INFO]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+#define RMQ_WARN(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::WARN_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][WARN]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+#define RMQ_ERROR(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::ERROR_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][ERROR]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+
+#define RMQ_PRINT(fmt, args...) do{ printf("%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0)
+
+
+#endif
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendCallback.h b/rocketmq-client4cpp/include/SendCallback.h
new file mode 100755
index 0000000..0feb5a1
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendCallback.h
@@ -0,0 +1,39 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDCALLBACK_H__
+#define __RMQ_SENDCALLBACK_H__
+
+#include "SendResult.h"
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+ class MQException;
+
+ /**
+ * Send Mesage Callback
+ *
+ */
+ class SendCallback
+ {
+ public:
+ virtual ~SendCallback() {}
+ virtual void onSuccess(SendResult& sendResult)=0;
+ virtual void onException(MQException& e)=0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendMessageHook.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendMessageHook.h b/rocketmq-client4cpp/include/SendMessageHook.h
new file mode 100644
index 0000000..9869aa6
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendMessageHook.h
@@ -0,0 +1,50 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDMESSAGEHOOK_H__
+#define __RMQ_SENDMESSAGEHOOK_H__
+
+#include <string>
+
+#include "RocketMQClient.h"
+#include "Message.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+ class SendMessageContext
+ {
+ public:
+ std::string producerGroup;
+ Message msg;
+ MessageQueue mq;
+ std::string brokerAddr;
+ CommunicationMode communicationMode;
+ SendResult sendResult;
+ MQException* pException;
+ void* pArg;
+ };
+
+ class SendMessageHook
+ {
+ public:
+ virtual ~SendMessageHook() {}
+ virtual std::string hookName()=0;
+ virtual void sendMessageBefore(const SendMessageContext& context)=0;
+ virtual void sendMessageAfter(const SendMessageContext& context)=0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendResult.h b/rocketmq-client4cpp/include/SendResult.h
new file mode 100755
index 0000000..d6a3174
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendResult.h
@@ -0,0 +1,89 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDRESULT_H__
+#define __RMQ_SENDRESULT_H__
+
+#include "RocketMQClient.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+ enum SendStatus
+ {
+ SEND_OK,
+ FLUSH_DISK_TIMEOUT,
+ FLUSH_SLAVE_TIMEOUT,
+ SLAVE_NOT_AVAILABLE
+ };
+
+ /**
+ * Send Message Result
+ *
+ */
+ class SendResult
+ {
+ public:
+ SendResult();
+ SendResult(const SendStatus& sendStatus,
+ const std::string& msgId,
+ MessageQueue& messageQueue,
+ long long queueOffset,
+ std::string& projectGroupPrefix);
+
+ const std::string& getMsgId();
+ void setMsgId(const std::string& msgId);
+ SendStatus getSendStatus();
+ void setSendStatus(const SendStatus& sendStatus);
+ MessageQueue& getMessageQueue();
+ void setMessageQueue(MessageQueue& messageQueue);
+ long long getQueueOffset();
+ void setQueueOffset(long long queueOffset);
+ bool hasResult();
+
+ std::string toString() const;
+ std::string toJsonString() const;
+
+ private:
+ SendStatus m_sendStatus;
+ std::string m_msgId;
+ MessageQueue m_messageQueue;
+ long long m_queueOffset;
+ };
+
+ enum LocalTransactionState
+ {
+ COMMIT_MESSAGE,
+ ROLLBACK_MESSAGE,
+ UNKNOW,
+ };
+
+ /**
+ * Send transaction message result
+ *
+ */
+ class TransactionSendResult : public SendResult
+ {
+ public:
+ TransactionSendResult();
+ LocalTransactionState getLocalTransactionState();
+ void setLocalTransactionState(LocalTransactionState localTransactionState);
+
+ private:
+ LocalTransactionState m_localTransactionState;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/TopicFilterType.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/TopicFilterType.h b/rocketmq-client4cpp/include/TopicFilterType.h
new file mode 100755
index 0000000..e51ae20
--- /dev/null
+++ b/rocketmq-client4cpp/include/TopicFilterType.h
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_TOPICFILTERTYPE_H__
+#define __RMQ_TOPICFILTERTYPE_H__
+
+namespace rmq
+{
+ /**
+ * Topic filter type
+ *
+ */
+ enum TopicFilterType
+ {
+ SINGLE_TAG,
+ MULTI_TAG
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/rocketmq.mk
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/rocketmq.mk b/rocketmq-client4cpp/rocketmq.mk
new file mode 100644
index 0000000..eecc458
--- /dev/null
+++ b/rocketmq-client4cpp/rocketmq.mk
@@ -0,0 +1,6 @@
+ROCKETMQ_PATH := /data/libs/rocketmq
+
+INCLUDE += -I$(ROCKETMQ_PATH)/include
+INCLUDE_32 += -I$(ROCKETMQ_PATH)/include -march=i686
+LIB_32 += -L$(ROCKETMQ_PATH)/lib32 -lrocketmq -lz -lrt -lpthread
+LIB_64 += -L$(ROCKETMQ_PATH)/lib64 -lrocketmq -lz -lrt -lpthread
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientConfig.cpp b/rocketmq-client4cpp/src/ClientConfig.cpp
new file mode 100755
index 0000000..986d67d
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientConfig.cpp
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2010-2013 kangliqiang, kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdlib.h>
+#include <sstream>
+
+#include "MQClientException.h"
+#include "SocketUtil.h"
+#include "ClientConfig.h"
+#include "UtilAll.h"
+#include "MixAll.h"
+
+namespace rmq
+{
+
+ClientConfig::ClientConfig()
+{
+ char* addr = getenv(MixAll::NAMESRV_ADDR_ENV.c_str());
+ if (addr)
+ {
+ m_namesrvAddr = addr;
+ }
+ else
+ {
+ m_namesrvAddr = "";
+ }
+
+ m_clientIP = getLocalAddress();
+ m_instanceName = "DEFAULT";
+ m_clientCallbackExecutorThreads = UtilAll::availableProcessors();
+ m_pollNameServerInterval = 1000 * 30;
+ m_heartbeatBrokerInterval = 1000 * 30;
+ m_persistConsumerOffsetInterval = 1000 * 5;
+}
+
+ClientConfig::~ClientConfig()
+{
+}
+
+std::string ClientConfig::buildMQClientId()
+{
+ return m_clientIP + "@" + m_instanceName;
+}
+
+void ClientConfig::changeInstanceNameToPID()
+{
+ if (m_instanceName == "DEFAULT")
+ {
+ m_instanceName = UtilAll::toString(UtilAll::getPid());
+ }
+}
+
+
+void ClientConfig::resetClientConfig(const ClientConfig& cc)
+{
+ m_namesrvAddr = cc.m_namesrvAddr;
+ m_clientIP = cc.m_clientIP;
+ m_instanceName = cc.m_instanceName;
+ m_clientCallbackExecutorThreads = cc.m_clientCallbackExecutorThreads;
+ m_pollNameServerInterval = cc.m_pollNameServerInterval;
+ m_heartbeatBrokerInterval = cc.m_heartbeatBrokerInterval;
+ m_persistConsumerOffsetInterval = cc.m_persistConsumerOffsetInterval;
+}
+
+ClientConfig ClientConfig::cloneClientConfig()
+{
+ return *this;
+}
+
+std::string ClientConfig::getNamesrvAddr()
+{
+ return m_namesrvAddr;
+}
+
+void ClientConfig::setNamesrvAddr(const std::string& namesrvAddr)
+{
+ m_namesrvAddr = namesrvAddr;
+}
+
+std::string ClientConfig::getClientIP()
+{
+ return m_clientIP;
+}
+
+void ClientConfig::setClientIP(const std::string& clientIP)
+{
+ m_clientIP = clientIP;
+}
+
+std::string ClientConfig::getInstanceName()
+{
+ return m_instanceName;
+}
+
+void ClientConfig::setInstanceName(const std::string& instanceName)
+{
+ m_instanceName = instanceName;
+}
+
+int ClientConfig::getClientCallbackExecutorThreads()
+{
+ return m_clientCallbackExecutorThreads;
+}
+
+void ClientConfig::setClientCallbackExecutorThreads(int clientCallbackExecutorThreads)
+{
+ m_clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+}
+
+int ClientConfig::getPollNameServerInterval()
+{
+ return m_pollNameServerInterval;
+}
+
+void ClientConfig::setPollNameServerInterval(int pollNameServerInterval)
+{
+ m_pollNameServerInterval = pollNameServerInterval;
+}
+
+int ClientConfig::getHeartbeatBrokerInterval()
+{
+ return m_heartbeatBrokerInterval;
+}
+
+void ClientConfig::setHeartbeatBrokerInterval(int heartbeatBrokerInterval)
+{
+ m_heartbeatBrokerInterval = heartbeatBrokerInterval;
+}
+
+int ClientConfig:: getPersistConsumerOffsetInterval()
+{
+ return m_persistConsumerOffsetInterval;
+}
+
+void ClientConfig::setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval)
+{
+ m_persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+}
+
+
+std::string ClientConfig::toString() const
+{
+ std::stringstream ss;
+ ss << "{namesrvAddr=" << m_namesrvAddr
+ << ",clientIP=" << m_clientIP
+ << ",instanceName=" << m_instanceName
+ << ",clientCallbackExecutorThreads=" << m_clientCallbackExecutorThreads
+ << ",pollNameServerInteval=" << m_pollNameServerInterval
+ << ",heartbeatBrokerInterval=" << m_heartbeatBrokerInterval
+ << ",persistConsumerOffsetInterval=" << m_persistConsumerOffsetInterval
+ <<"}";
+ return ss.str();
+}
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
new file mode 100755
index 0000000..ae88de5
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
@@ -0,0 +1,154 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ClientRemotingProcessor.h"
+#include "MQProtos.h"
+#include "TcpTransport.h"
+#include "RemotingCommand.h"
+#include "MQClientFactory.h"
+#include "CommandCustomHeader.h"
+#include "ConsumerRunningInfo.h"
+
+
+
+namespace rmq
+{
+
+ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* pMQClientFactory)
+ : m_pMQClientFactory(pMQClientFactory)
+{
+
+}
+
+RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+ int code = pRequest->getCode();
+ switch (code)
+ {
+ case CHECK_TRANSACTION_STATE_VALUE:
+ return checkTransactionState(pTts, pRequest);
+ case NOTIFY_CONSUMER_IDS_CHANGED_VALUE:
+ return notifyConsumerIdsChanged(pTts, pRequest);
+ case RESET_CONSUMER_CLIENT_OFFSET_VALUE:
+ return resetOffset(pTts, pRequest);
+ case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE:
+ return getConsumeStatus(pTts, pRequest);
+ case GET_CONSUMER_RUNNING_INFO_VALUE:
+ return getConsumerRunningInfo(pTts, pRequest);
+ case CONSUME_MESSAGE_DIRECTLY_VALUE:
+ return consumeMessageDirectly(pTts, pRequest);
+ default:
+ break;
+ }
+
+ return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+ //TODO
+ return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+ try
+ {
+ NotifyConsumerIdsChangedRequestHeader* extHeader = (NotifyConsumerIdsChangedRequestHeader*)pRequest->getCommandCustomHeader();
+ RMQ_INFO("receive broker's notification[{%s}], the consumer group: {%s} changed, rebalance immediately",
+ pTts->getServerAddr().c_str(),
+ extHeader->consumerGroup.c_str());
+ m_pMQClientFactory->rebalanceImmediately();
+ }
+ catch (std::exception& e)
+ {
+ RMQ_ERROR("notifyConsumerIdsChanged exception: %s", e.what());
+ }
+
+ return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::resetOffset(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+ //TODO
+ return NULL;
+}
+
+
+RemotingCommand* ClientRemotingProcessor::getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+ //TODO
+ return NULL;
+}
+
+
+RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+ return NULL;
+
+ /*
+ GetConsumerRunningInfoRequestHeader* requestHeader = (GetConsumerRunningInfoRequestHeader)pRequest->getCommandCustomHeader();
+ RemotingCommand* pResponse = RemotingCommand::createResponseCommand(NULL);
+
+ pResponse = RemotingCommand::createResponseCommand(
+ REQUEST_CODE_NOT_SUPPORTED_VALUE, "request type not supported", NULL);
+ pResponse->setOpaque(pCmd->getOpaque());
+
+ ConsumerRunningInfo* consumerRunningInfo = m_pMQClientFactory->consumerRunningInfo(requestHeader->consumerGroup);
+ if (NULL != consumerRunningInfo) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(consumerRunningInfo.encode());
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer",
+ requestHeader.getConsumerGroup()));
+ }
+ return pResponse;
+
+ // java
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetConsumerRunningInfoRequestHeader requestHeader =
+ (GetConsumerRunningInfoRequestHeader) request
+ .decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+
+ ConsumerRunningInfo consumerRunningInfo =
+ this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
+ if (null != consumerRunningInfo) {
+ if (requestHeader.isJstackEnable()) {
+ String jstack = UtilAll.jstack();
+ consumerRunningInfo.setJstack(jstack);
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(consumerRunningInfo.encode());
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer",
+ requestHeader.getConsumerGroup()));
+ }
+
+ return response;
+ */
+}
+
+
+RemotingCommand* ClientRemotingProcessor::consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest)
+{
+ //TODO
+ return NULL;
+}
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientRemotingProcessor.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.h b/rocketmq-client4cpp/src/ClientRemotingProcessor.h
new file mode 100755
index 0000000..4cd2873
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientRemotingProcessor.h
@@ -0,0 +1,45 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __CLIENTREMOTINGPROCESSOR_H__
+#define __CLIENTREMOTINGPROCESSOR_H__
+
+#include "TcpRequestProcessor.h"
+
+namespace rmq
+{
+ class MQClientFactory;
+ class RemotingCommand;
+
+ class ClientRemotingProcessor : public TcpRequestProcessor
+ {
+ public:
+ ClientRemotingProcessor(MQClientFactory* pMQClientFactory);
+
+ RemotingCommand* processRequest(TcpTransport* pTts, RemotingCommand* pRequest);
+ RemotingCommand* checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest);
+ RemotingCommand* notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest);
+ RemotingCommand* resetOffset(TcpTransport* pTts, RemotingCommand* pRequest);
+ RemotingCommand* getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest);
+ RemotingCommand* getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest);
+ RemotingCommand* consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest);
+
+ private:
+ MQClientFactory* m_pMQClientFactory;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/CommunicationMode.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/CommunicationMode.h b/rocketmq-client4cpp/src/CommunicationMode.h
new file mode 100755
index 0000000..43b2941
--- /dev/null
+++ b/rocketmq-client4cpp/src/CommunicationMode.h
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __COMMUNICATIONMODE_H__
+#define __COMMUNICATIONMODE_H__
+
+namespace rmq
+{
+ /**
+ * Communication Mode
+ *
+ */
+ enum CommunicationMode
+ {
+ SYNC,
+ ASYNC,
+ ONEWAY
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/FindBrokerResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/FindBrokerResult.h b/rocketmq-client4cpp/src/FindBrokerResult.h
new file mode 100644
index 0000000..51a9845
--- /dev/null
+++ b/rocketmq-client4cpp/src/FindBrokerResult.h
@@ -0,0 +1,28 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __FINDBROKERRESULT_H__
+#define __FINDBROKERRESULT_H__
+
+namespace rmq
+{
+ typedef struct
+ {
+ std::string brokerAddr;
+ bool slave;
+ } FindBrokerResult;
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQAdminImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQAdminImpl.cpp b/rocketmq-client4cpp/src/MQAdminImpl.cpp
new file mode 100755
index 0000000..2a6b597
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQAdminImpl.cpp
@@ -0,0 +1,295 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <list>
+#include "SocketUtil.h"
+#include "MQAdminImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+#include "TopicConfig.h"
+#include "TopicPublishInfo.h"
+#include "MessageId.h"
+#include "MessageDecoder.h"
+
+namespace rmq
+{
+
+
+MQAdminImpl::MQAdminImpl(MQClientFactory* pMQClientFactory)
+{
+ m_pMQClientFactory = pMQClientFactory;
+}
+
+MQAdminImpl::~MQAdminImpl()
+{
+
+}
+
+void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic,
+ int queueNum)
+{
+ return createTopic(key, newTopic, queueNum, 0);
+}
+
+
+void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic,
+ int queueNum, int topicSysFlag)
+{
+ try
+ {
+ MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl();
+ TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(key, 1000 * 3);
+
+ std::list<BrokerData> brokerDataList = topicRouteData->getBrokerDatas();
+ if (!brokerDataList.empty())
+ {
+ brokerDataList.sort();
+
+ MQClientException exception("", 0, "", 0);
+ bool hasException = false;
+
+ std::list<BrokerData>::iterator it = brokerDataList.begin();
+
+ for (; it != brokerDataList.end(); it++)
+ {
+ std::map<int, std::string>::iterator it1 = (*it).brokerAddrs.find(MixAll::MASTER_ID);
+ if (it1 != (*it).brokerAddrs.end())
+ {
+ std::string addr = it1->second;
+
+ TopicConfig topicConfig(newTopic);
+ topicConfig.setReadQueueNums(queueNum);
+ topicConfig.setWriteQueueNums(queueNum);
+ topicConfig.setTopicSysFlag(topicSysFlag);
+
+ try
+ {
+ api->createTopic(addr, key, topicConfig, 1000 * 3);
+ }
+ catch (MQClientException& e)
+ {
+ hasException = true;
+ exception = e;
+ }
+ }
+ }
+
+ if (hasException)
+ {
+ throw exception;
+ }
+ }
+ else
+ {
+ THROW_MQEXCEPTION(MQClientException, "Not found broker, maybe key is wrong", -1);
+ }
+ }
+ catch (MQClientException e)
+ {
+ THROW_MQEXCEPTION(MQClientException, "create new topic failed", -1);
+ }
+}
+
+std::vector<MessageQueue>* MQAdminImpl::fetchPublishMessageQueues(const std::string& topic)
+{
+ try
+ {
+ MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl();
+ TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+
+ if (topicRouteData.ptr() != NULL)
+ {
+ TopicPublishInfoPtr topicPublishInfo =
+ MQClientFactory::topicRouteData2TopicPublishInfo(topic, *topicRouteData);
+ if (topicPublishInfo.ptr() != NULL && topicPublishInfo->ok())
+ {
+ std::vector<MessageQueue>* ret = new std::vector<MessageQueue>();
+ (*ret) = topicPublishInfo->getMessageQueueList();
+
+ /*
+ std::vector<MessageQueue>& mqs = ;
+ std::vector<MessageQueue>::iterator it = mqs.begin();
+ for (; it != mqs.end(); it++)
+ {
+ ret->push_back(*it);
+ }
+ */
+
+ return ret;
+ }
+ }
+ }
+ catch (MQClientException e)
+ {
+ THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic, " + topic, -1);
+}
+
+std::set<MessageQueue>* MQAdminImpl::fetchSubscribeMessageQueues(const std::string& topic)
+{
+ try
+ {
+ TopicRouteDataPtr topicRouteData =
+ m_pMQClientFactory->getMQClientAPIImpl()->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+ if (topicRouteData.ptr() != NULL)
+ {
+ std::set<MessageQueue>* mqList =
+ MQClientFactory::topicRouteData2TopicSubscribeInfo(topic, *topicRouteData);
+ if (!mqList->empty())
+ {
+ return mqList;
+ }
+ else
+ {
+ THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
+ }
+ }
+ }
+ catch (MQClientException e)
+ {
+ THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic: " + topic, -1);
+}
+
+long long MQAdminImpl::searchOffset(const MessageQueue& mq, long long timestamp)
+{
+ std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ if (brokerAddr.empty())
+ {
+ m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+ brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ }
+
+ if (!brokerAddr.empty())
+ {
+ try
+ {
+ return m_pMQClientFactory->getMQClientAPIImpl()->searchOffset(brokerAddr, mq.getTopic(),
+ mq.getQueueId(), timestamp, 1000 * 3);
+ }
+ catch (MQClientException e)
+ {
+ THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
+ }
+ }
+ THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+long long MQAdminImpl::maxOffset(const MessageQueue& mq)
+{
+ std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ if (brokerAddr.empty())
+ {
+ m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+ brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ }
+
+ if (!brokerAddr.empty())
+ {
+ try
+ {
+ return m_pMQClientFactory->getMQClientAPIImpl()->getMaxOffset(brokerAddr, mq.getTopic(),
+ mq.getQueueId(), 1000 * 3);
+ }
+ catch (MQClientException e)
+ {
+ THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
+ }
+ }
+ THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+long long MQAdminImpl::minOffset(const MessageQueue& mq)
+{
+ std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ if (brokerAddr.empty())
+ {
+ m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+ brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ }
+
+ if (!brokerAddr.empty())
+ {
+ try
+ {
+ return m_pMQClientFactory->getMQClientAPIImpl()->getMinOffset(brokerAddr, mq.getTopic(),
+ mq.getQueueId(), 1000 * 3);
+ }
+ catch (MQClientException e)
+ {
+ THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
+ }
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+long long MQAdminImpl::earliestMsgStoreTime(const MessageQueue& mq)
+{
+ std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ if (brokerAddr.empty())
+ {
+ m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+ brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ }
+
+ if (!brokerAddr.empty())
+ {
+ try
+ {
+ return m_pMQClientFactory->getMQClientAPIImpl()->getEarliestMsgStoretime(brokerAddr,
+ mq.getTopic(), mq.getQueueId(), 1000 * 3);
+ }
+ catch (MQClientException e)
+ {
+ THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
+ }
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+MessageExt* MQAdminImpl::viewMessage(const std::string& msgId)
+{
+ try
+ {
+ MessageId messageId = MessageDecoder::decodeMessageId(msgId);
+ return m_pMQClientFactory->getMQClientAPIImpl()->viewMessage(
+ socketAddress2String(messageId.getAddress()), messageId.getOffset(), 1000 * 3);
+ }
+ catch (UnknownHostException e)
+ {
+ THROW_MQEXCEPTION(MQClientException, "message id illegal", -1);
+ }
+}
+
+QueryResult MQAdminImpl::queryMessage(const std::string& topic,
+ const std::string& key,
+ int maxNum, long long begin, long long end)
+{
+ //TODO
+ std::list<MessageExt*> messageList;
+ QueryResult result(0, messageList);
+
+ return result;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQAdminImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQAdminImpl.h b/rocketmq-client4cpp/src/MQAdminImpl.h
new file mode 100755
index 0000000..907d61e
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQAdminImpl.h
@@ -0,0 +1,63 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQADMINIMPL_H__
+#define __MQADMINIMPL_H__
+
+#include <string>
+#include <list>
+#include <set>
+#include <vector>
+
+#include "MessageExt.h"
+#include "QueryResult.h"
+
+namespace rmq
+{
+ class MQClientFactory;
+ class MessageQueue;
+
+ class MQAdminImpl
+ {
+ public:
+ MQAdminImpl(MQClientFactory* pMQClientFactory);
+ ~MQAdminImpl();
+
+ void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
+ void createTopic(const std::string& key, const std::string& newTopic, int queueNum, int topicSysFlag);
+
+ std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic);
+ std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic);
+ long long searchOffset(const MessageQueue& mq, long long timestamp);
+ long long maxOffset(const MessageQueue& mq);
+ long long minOffset(const MessageQueue& mq);
+
+ long long earliestMsgStoreTime(const MessageQueue& mq);
+
+ MessageExt* viewMessage(const std::string& msgId);
+
+ QueryResult queryMessage(const std::string& topic,
+ const std::string& key,
+ int maxNum,
+ long long begin,
+ long long end);
+
+ private:
+ MQClientFactory* m_pMQClientFactory;
+ };
+}
+
+#endif
|