This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 8735072 feat(apis):refactor apis for CPP styles (#236)
8735072 is described below
commit 87350727950d1ece9ed3ffab414bbe0f673eeaac
Author: dinglei <libya_003@163.com>
AuthorDate: Fri Feb 7 19:41:31 2020 +0800
feat(apis):refactor apis for CPP styles (#236)
* feat(apis):refactor apis for CPP styles
---
include/Arg_helper.h | 2 -
include/AsyncCallback.h | 4 --
include/DefaultMQProducer.h | 63 ++++++++++++-----------
include/DefaultMQPullConsumer.h | 67 +++++++++++++-----------
include/DefaultMQPushConsumer.h | 94 +++++++++++++++++-----------------
include/MQClient.h | 4 --
include/MQClientException.h | 3 --
include/MQMessage.h | 2 -
include/MQMessageExt.h | 2 -
include/MQMessageListener.h | 22 --------
include/MQMessageQueue.h | 4 --
include/MQSelector.h | 4 +-
include/MQueueListener.h | 2 -
include/PullResult.h | 3 --
include/SendResult.h | 5 +-
include/TransactionMQProducer.h | 25 ++-------
src/consumer/DefaultMQPullConsumer.cpp | 4 +-
src/consumer/DefaultMQPushConsumer.cpp | 7 +--
test/src/message/MQMessageIdTest.cpp | 5 +-
test/src/message/MQMessageTest.cpp | 16 ++++++
20 files changed, 147 insertions(+), 191 deletions(-)
diff --git a/include/Arg_helper.h b/include/Arg_helper.h
index bda4fce..bc28962 100644
--- a/include/Arg_helper.h
+++ b/include/Arg_helper.h
@@ -23,7 +23,6 @@
#include "RocketMQClient.h"
namespace rocketmq {
-//<!***************************************************************************
class ROCKETMQCLIENT_API Arg_helper {
public:
Arg_helper(int argc, char* argv[]);
@@ -36,7 +35,6 @@ class ROCKETMQCLIENT_API Arg_helper {
std::vector<std::string> m_args;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif //<!_ARG_HELPER_H_;
diff --git a/include/AsyncCallback.h b/include/AsyncCallback.h
index d9dbfe3..8ebac1d 100644
--- a/include/AsyncCallback.h
+++ b/include/AsyncCallback.h
@@ -24,9 +24,7 @@
#include "SendResult.h"
namespace rocketmq {
-//<!***************************************************************************
struct AsyncCallback {};
-//<!***************************************************************************
typedef enum sendCallbackType { noAutoDeleteSendCallback = 0, autoDeleteSendCallback = 1
} sendCallbackType;
class ROCKETMQCLIENT_API SendCallback : public AsyncCallback {
@@ -46,13 +44,11 @@ class ROCKETMQCLIENT_API AutoDeleteSendCallBack : public SendCallback
{
virtual sendCallbackType getSendCallbackType() { return autoDeleteSendCallback; }
};
-//<!************************************************************************
class ROCKETMQCLIENT_API PullCallback : public AsyncCallback {
public:
virtual ~PullCallback() {}
virtual void onSuccess(MQMessageQueue& mq, PullResult& result, bool bProducePullRequest)
= 0;
virtual void onException(MQException& e) = 0;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index a96a59b..0b938ac 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -28,7 +28,6 @@
namespace rocketmq {
class DefaultMQProducerImpl;
-//<!***************************************************************************
class ROCKETMQCLIENT_API DefaultMQProducer {
public:
DefaultMQProducer(const std::string& groupname);
@@ -54,11 +53,40 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
virtual void sendOneway(MQMessage& msg, const MQMessageQueue& mq);
virtual void sendOneway(MQMessage& msg, MessageQueueSelector* selector, void* arg);
+ const std::string& getNamesrvAddr() const;
+ void setNamesrvAddr(const std::string& namesrvAddr);
+
+ void setSessionCredentials(const std::string& accessKey,
+ const std::string& secretKey,
+ const std::string& accessChannel);
+ const SessionCredentials& getSessionCredentials() const;
+
+ const std::string& getNamesrvDomain() const;
+ void setNamesrvDomain(const std::string& namesrvDomain);
+
+ const std::string& getNameSpace() const;
+ void setNameSpace(const std::string& nameSpace);
+
+ const std::string& getGroupName() const;
+ void setGroupName(const std::string& groupname);
+
+ const std::string& getInstanceName() const;
+ void setInstanceName(const std::string& instanceName);
+
+ /**
+ * Log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
+ * log file num is 3, each log size is 100M
+ **/
+ void setLogLevel(elogLevel inputLevel);
+ elogLevel getLogLevel();
+ void setLogPath(const std::string& logPath);
+ void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit
+
int getSendMsgTimeout() const;
void setSendMsgTimeout(int sendMsgTimeout);
/*
- * if msgBody size is large than m_compressMsgBodyOverHowmuch
+ * If msgBody size is large than compressMsgBodyOverHowmuch
* rocketmq cpp will compress msgBody according to compressLevel
*/
int getCompressMsgBodyOverHowmuch() const;
@@ -66,35 +94,16 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
int getCompressLevel() const;
void setCompressLevel(int compressLevel);
- // if msgbody size larger than maxMsgBodySize, exception will be throwed
int getMaxMessageSize() const;
void setMaxMessageSize(int maxMessageSize);
- // set msg max retry times, default retry times is 5
int getRetryTimes() const;
void setRetryTimes(int times);
int getRetryTimes4Async() const;
void setRetryTimes4Async(int times);
- const std::string& getNamesrvAddr() const;
- void setNamesrvAddr(const std::string& namesrvAddr);
- const std::string& getNamesrvDomain() const;
- void setNamesrvDomain(const std::string& namesrvDomain);
- const std::string& getInstanceName() const;
- void setInstanceName(const std::string& instanceName);
- // nameSpace
- const std::string& getNameSpace() const;
- void setNameSpace(const std::string& nameSpace);
- const std::string& getGroupName() const;
- void setGroupName(const std::string& groupname);
-
- // log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
- // log file num is 3, each log size is 100M
- void setLogLevel(elogLevel inputLevel);
- elogLevel getLogLevel();
- void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit
- /** set TcpTransport pull thread num, which dermine the num of threads to
+ /** Set TcpTransport pull thread num, which dermine the num of threads to
* distribute network data,
* 1. its default value is CPU num, it must be setted before producer/consumer
* start, minimum value is CPU num;
@@ -106,7 +115,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
void setTcpTransportPullThreadNum(int num);
const int getTcpTransportPullThreadNum() const;
- /** timeout of tcp connect, it is same meaning for both producer and consumer;
+ /** Timeout of tcp connect, it is same meaning for both producer and consumer;
* 1. default value is 3000ms
* 2. input parameter could only be milliSecond, suggestion value is
* 1000-3000ms;
@@ -114,7 +123,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
void setTcpTransportConnectTimeout(uint64_t timeout); // ms
const uint64_t getTcpTransportConnectTimeout() const;
- /** timeout of tryLock tcpTransport before sendMsg/pullMsg, if timeout,
+ /** Timeout of tryLock tcpTransport before sendMsg/pullMsg, if timeout,
* returns NULL
* 1. paremeter unit is ms, default value is 3000ms, the minimun value is 1000ms
* suggestion value is 3000ms;
@@ -127,14 +136,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
void setUnitName(std::string unitName);
const std::string& getUnitName() const;
- void setSessionCredentials(const std::string& accessKey,
- const std::string& secretKey,
- const std::string& accessChannel);
- const SessionCredentials& getSessionCredentials() const;
-
private:
DefaultMQProducerImpl* impl;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 552bbab..203578f 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -29,55 +29,50 @@
#include "MQueueListener.h"
#include "PullResult.h"
#include "RocketMQClient.h"
+#include "SessionCredentials.h"
namespace rocketmq {
class SubscriptionData;
class DefaultMQPullConsumerImpl;
-//<!***************************************************************************
class ROCKETMQCLIENT_API DefaultMQPullConsumer {
public:
DefaultMQPullConsumer(const std::string& groupname);
virtual ~DefaultMQPullConsumer();
- //<!begin mqadmin;
virtual void start();
virtual void shutdown();
- //<!end mqadmin;
+
const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
+
+ void setSessionCredentials(const std::string& accessKey,
+ const std::string& secretKey,
+ const std::string& accessChannel);
+ const SessionCredentials& getSessionCredentials() const;
+
const std::string& getNamesrvDomain() const;
void setNamesrvDomain(const std::string& namesrvDomain);
+
const std::string& getInstanceName() const;
void setInstanceName(const std::string& instanceName);
- // nameSpace
+
const std::string& getNameSpace() const;
void setNameSpace(const std::string& nameSpace);
+
const std::string& getGroupName() const;
void setGroupName(const std::string& groupname);
- // log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
- // log file num is 3, each log size is 100M
+ /**
+ * Log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
+ * log file num is 3, each log size is 100M
+ **/
void setLogLevel(elogLevel inputLevel);
elogLevel getLogLevel();
+ void setLogPath(const std::string& logPath);
void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit
- void setSessionCredentials(const std::string& accessKey,
- const std::string& secretKey,
- const std::string& accessChannel);
- const SessionCredentials& getSessionCredentials() const;
- //<!begin MQConsumer
virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>&
mqs);
- virtual void persistConsumerOffset();
- virtual void persistConsumerOffsetByResetOffset();
- virtual void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>&
info);
- virtual ConsumeType getConsumeType();
- virtual ConsumeFromWhere getConsumeFromWhere();
- virtual void getSubscriptions(std::vector<SubscriptionData>&);
- virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset);
- virtual void removeConsumeOffset(const MQMessageQueue& mq);
- //<!end MQConsumer;
- void registerMessageQueueListener(const std::string& topic, MQueueListener* pListener);
/**
* Pull message from specified queue, if no msg in queue, return directly
*
@@ -102,7 +97,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer {
PullCallback* pPullCallback);
/**
- * pull msg from specified queue, if no msg, broker will suspend the pull request 20s
+ * Pull msg from specified queue, if no msg, broker will suspend the pull request 20s
*
* @param mq
* specify the pulled queue
@@ -117,24 +112,34 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer {
* @return
* accroding to PullResult
*/
- PullResult pullBlockIfNotFound(const MQMessageQueue& mq, const std::string& subExpression,
int64 offset, int maxNums);
- void pullBlockIfNotFound(const MQMessageQueue& mq,
- const std::string& subExpression,
- int64 offset,
- int maxNums,
- PullCallback* pPullCallback);
+ virtual PullResult pullBlockIfNotFound(const MQMessageQueue& mq,
+ const std::string& subExpression,
+ int64 offset,
+ int maxNums);
+ virtual void pullBlockIfNotFound(const MQMessageQueue& mq,
+ const std::string& subExpression,
+ int64 offset,
+ int maxNums,
+ PullCallback* pPullCallback);
+
+ void persistConsumerOffset();
+ void persistConsumerOffsetByResetOffset();
+ void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>&
info);
+ ConsumeFromWhere getConsumeFromWhere();
+ void getSubscriptions(std::vector<SubscriptionData>&);
+ void updateConsumeOffset(const MQMessageQueue& mq, int64 offset);
+ void removeConsumeOffset(const MQMessageQueue& mq);
+
+ void registerMessageQueueListener(const std::string& topic, MQueueListener* pListener);
int64 fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore);
void fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue>
mqs);
- // temp persist consumer offset interface, only valid with
- // RemoteBrokerOffsetStore, updateConsumeOffset should be called before.
void persistConsumerOffset4PullConsumer(const MQMessageQueue& mq);
private:
DefaultMQPullConsumerImpl* impl;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index cd4e599..686f6e7 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -24,78 +24,80 @@
#include "MQClient.h"
#include "MQMessageListener.h"
#include "MQMessageQueue.h"
+#include "SessionCredentials.h"
namespace rocketmq {
class DefaultMQPushConsumerImpl;
-//<!***************************************************************************
class ROCKETMQCLIENT_API DefaultMQPushConsumer {
public:
DefaultMQPushConsumer(const std::string& groupname);
virtual ~DefaultMQPushConsumer();
- //<!begin mqadmin;
virtual void start();
virtual void shutdown();
- virtual ConsumeType getConsumeType();
- virtual ConsumeFromWhere getConsumeFromWhere();
- void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere);
-
- void registerMessageListener(MQMessageListener* pMessageListener);
- MessageListenerType getMessageListenerType();
- void subscribe(const std::string& topic, const std::string& subExpression);
-
- /*
- for orderly consume, set the pull num of message size by each pullMsg,
- default value is 1;
- */
- void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize);
- int getConsumeMessageBatchMaxSize() const;
+ const std::string& getNamesrvAddr() const;
+ void setNamesrvAddr(const std::string& namesrvAddr);
- /*
- set consuming thread count, default value is cpu cores
- */
- void setConsumeThreadCount(int threadCount);
- int getConsumeThreadCount() const;
- void setMaxReconsumeTimes(int maxReconsumeTimes);
- int getMaxReconsumeTimes() const;
+ void setSessionCredentials(const std::string& accessKey,
+ const std::string& secretKey,
+ const std::string& accessChannel);
+ const SessionCredentials& getSessionCredentials() const;
- /*
- set pullMsg thread count, default value is cpu cores
- */
- void setPullMsgThreadPoolCount(int threadCount);
- int getPullMsgThreadPoolCount() const;
+ void subscribe(const std::string& topic, const std::string& subExpression);
- /*
- set max cache msg size perQueue in memory if consumer could not consume msgs
- immediately
- default maxCacheMsgSize perQueue is 1000, set range is:1~65535
- */
- void setMaxCacheMsgSizePerQueue(int maxCacheSize);
- int getMaxCacheMsgSizePerQueue() const;
+ void registerMessageListener(MQMessageListener* pMessageListener);
+ MessageListenerType getMessageListenerType();
MessageModel getMessageModel() const;
void setMessageModel(MessageModel messageModel);
- const std::string& getNamesrvAddr() const;
- void setNamesrvAddr(const std::string& namesrvAddr);
+
+ void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere);
+ ConsumeFromWhere getConsumeFromWhere();
+
const std::string& getNamesrvDomain() const;
void setNamesrvDomain(const std::string& namesrvDomain);
+
const std::string& getInstanceName() const;
void setInstanceName(const std::string& instanceName);
- // nameSpace
+
const std::string& getNameSpace() const;
void setNameSpace(const std::string& nameSpace);
+
const std::string& getGroupName() const;
void setGroupName(const std::string& groupname);
- // log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
- // log file num is 3, each log size is 100M
+ /**
+ * Log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
+ * log file num is 3, each log size is 100M
+ **/
void setLogLevel(elogLevel inputLevel);
elogLevel getLogLevel();
+ void setLogPath(const std::string& logPath);
void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit
- /** set TcpTransport pull thread num, which dermine the num of threads to
+ void setConsumeThreadCount(int threadCount);
+ int getConsumeThreadCount() const;
+
+ void setMaxReconsumeTimes(int maxReconsumeTimes);
+ int getMaxReconsumeTimes() const;
+
+ void setPullMsgThreadPoolCount(int threadCount);
+ int getPullMsgThreadPoolCount() const;
+
+ void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize);
+ int getConsumeMessageBatchMaxSize() const;
+
+ /**
+ * Set max cache msg size perQueue in memory if consumer could not consume msgs
+ * immediately
+ * default maxCacheMsgSize perQueue is 1000, set range is:1~65535
+ **/
+ void setMaxCacheMsgSizePerQueue(int maxCacheSize);
+ int getMaxCacheMsgSizePerQueue() const;
+
+ /** Set TcpTransport pull thread num, which dermine the num of threads to
* distribute network data,
* 1. its default value is CPU num, it must be setted before producer/consumer
* start, minimum value is CPU num;
@@ -107,7 +109,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer {
void setTcpTransportPullThreadNum(int num);
const int getTcpTransportPullThreadNum() const;
- /** timeout of tcp connect, it is same meaning for both producer and consumer;
+ /** Timeout of tcp connect, it is same meaning for both producer and consumer;
* 1. default value is 3000ms
* 2. input parameter could only be milliSecond, suggestion value is
* 1000-3000ms;
@@ -115,7 +117,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer {
void setTcpTransportConnectTimeout(uint64_t timeout); // ms
const uint64_t getTcpTransportConnectTimeout() const;
- /** timeout of tryLock tcpTransport before sendMsg/pullMsg, if timeout,
+ /** Timeout of tryLock tcpTransport before sendMsg/pullMsg, if timeout,
* returns NULL
* 1. paremeter unit is ms, default value is 3000ms, the minimun value is 1000ms
* suggestion value is 3000ms;
@@ -127,15 +129,11 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer {
void setUnitName(std::string unitName);
const std::string& getUnitName() const;
+
void setAsyncPull(bool asyncFlag);
- void setSessionCredentials(const std::string& accessKey,
- const std::string& secretKey,
- const std::string& accessChannel);
- const SessionCredentials& getSessionCredentials() const;
private:
DefaultMQPushConsumerImpl* impl;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/MQClient.h b/include/MQClient.h
index d449aab..db48d20 100644
--- a/include/MQClient.h
+++ b/include/MQClient.h
@@ -17,10 +17,6 @@
#ifndef __MQADMIN_H__
#define __MQADMIN_H__
-#include "MQMessageExt.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SessionCredentials.h"
namespace rocketmq {
diff --git a/include/MQClientException.h b/include/MQClientException.h
index ad642cb..883cd64 100644
--- a/include/MQClientException.h
+++ b/include/MQClientException.h
@@ -22,11 +22,9 @@
#include <ostream>
#include <sstream>
#include <string>
-#include "CCommon.h"
#include "RocketMQClient.h"
namespace rocketmq {
-//<!***************************************************************************
class ROCKETMQCLIENT_API MQException : public std::exception {
public:
MQException(const std::string& msg, int error, const char* file, int line) throw()
@@ -86,6 +84,5 @@ DEFINE_MQCLIENTEXCEPTION(UnknownHostException)
#define THROW_MQEXCEPTION(e, msg, err) throw e(msg, err, __FILE__, __LINE__)
#define NEW_MQEXCEPTION(e, msg, err) e(msg, err, __FILE__, __LINE__)
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/MQMessage.h b/include/MQMessage.h
index 70fab36..dd2f7c0 100644
--- a/include/MQMessage.h
+++ b/include/MQMessage.h
@@ -24,7 +24,6 @@
#include "RocketMQClient.h"
namespace rocketmq {
-//<!***************************************************************************
class ROCKETMQCLIENT_API MQMessage {
public:
MQMessage();
@@ -138,6 +137,5 @@ class ROCKETMQCLIENT_API MQMessage {
std::string m_transactionId;
std::map<std::string, std::string> m_properties;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/MQMessageExt.h b/include/MQMessageExt.h
index 8970282..194043e 100644
--- a/include/MQMessageExt.h
+++ b/include/MQMessageExt.h
@@ -29,7 +29,6 @@
namespace rocketmq {
//<!message extend class, which was generated on broker;
-//<!***************************************************************************
class ROCKETMQCLIENT_API MQMessageExt : public MQMessage {
public:
MQMessageExt();
@@ -112,6 +111,5 @@ class ROCKETMQCLIENT_API MQMessageExt : public MQMessage {
std::string m_msgId;
std::string m_offsetMsgId;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/MQMessageListener.h b/include/MQMessageListener.h
index dee7c7e..0342b60 100644
--- a/include/MQMessageListener.h
+++ b/include/MQMessageListener.h
@@ -30,29 +30,8 @@ enum ConsumeStatus {
RECONSUME_LATER
};
-/*enum ConsumeOrderlyStatus
-{*/
-/**
- * Success consumption
- */
-// SUCCESS,
-/**
- * Rollback consumption(only for binlog consumption)
- */
-// ROLLBACK,
-/**
- * Commit offset(only for binlog consumption)
- */
-// COMMIT,
-/**
- * Suspend current queue a moment
- */
-// SUSPEND_CURRENT_QUEUE_A_MOMENT
-/*};*/
-
enum MessageListenerType { messageListenerDefaultly = 0, messageListenerOrderly = 1, messageListenerConcurrently
= 2 };
-//<!***************************************************************************
class ROCKETMQCLIENT_API MQMessageListener {
public:
virtual ~MQMessageListener() {}
@@ -74,6 +53,5 @@ class ROCKETMQCLIENT_API MessageListenerConcurrently : public MQMessageListener
virtual MessageListenerType getMessageListenerType() { return messageListenerConcurrently;
}
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/MQMessageQueue.h b/include/MQMessageQueue.h
index aa80035..0879a94 100644
--- a/include/MQMessageQueue.h
+++ b/include/MQMessageQueue.h
@@ -23,9 +23,6 @@
#include "RocketMQClient.h"
namespace rocketmq {
-//<!************************************************************************/
-//<!* MQ(T,B,ID);
-//<!************************************************************************/
class ROCKETMQCLIENT_API MQMessageQueue {
public:
MQMessageQueue();
@@ -58,6 +55,5 @@ class ROCKETMQCLIENT_API MQMessageQueue {
std::string m_brokerName;
int m_queueId;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/MQSelector.h b/include/MQSelector.h
index bba5781..46e548c 100644
--- a/include/MQSelector.h
+++ b/include/MQSelector.h
@@ -21,12 +21,10 @@
#include "RocketMQClient.h"
namespace rocketmq {
-//<!***************************************************************************
class ROCKETMQCLIENT_API MessageQueueSelector {
public:
virtual ~MessageQueueSelector() {}
virtual MQMessageQueue select(const std::vector<MQMessageQueue>& mqs, const MQMessage&
msg, void* arg) = 0;
};
-//<!***************************************************************************
} // namespace rocketmq
-#endif //<! _MQSELECTOR_H_
+#endif
diff --git a/include/MQueueListener.h b/include/MQueueListener.h
index eef9590..f5eac41 100644
--- a/include/MQueueListener.h
+++ b/include/MQueueListener.h
@@ -21,7 +21,6 @@
#include "RocketMQClient.h"
namespace rocketmq {
-//<!***************************************************************************
class ROCKETMQCLIENT_API MQueueListener {
public:
virtual ~MQueueListener() {}
@@ -29,6 +28,5 @@ class ROCKETMQCLIENT_API MQueueListener {
std::vector<MQMessageQueue>& mqAll,
std::vector<MQMessageQueue>& mqDivided) = 0;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/PullResult.h b/include/PullResult.h
index 61eedf0..565058b 100644
--- a/include/PullResult.h
+++ b/include/PullResult.h
@@ -22,7 +22,6 @@
#include "RocketMQClient.h"
namespace rocketmq {
-//<!***************************************************************************
enum PullStatus {
FOUND,
NO_NEW_MSG,
@@ -33,7 +32,6 @@ enum PullStatus {
static const char* EnumStrings[] = {"FOUND", "NO_NEW_MSG", "NO_MATCHED_MSG", "OFFSET_ILLEGAL",
"BROKER_TIMEOUT"};
-//<!***************************************************************************
class ROCKETMQCLIENT_API PullResult {
public:
PullResult();
@@ -63,6 +61,5 @@ class ROCKETMQCLIENT_API PullResult {
int64 maxOffset;
std::vector<MQMessageExt> msgFoundList;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/SendResult.h b/include/SendResult.h
index 870d03b..94fd43b 100644
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -21,11 +21,9 @@
#include "RocketMQClient.h"
namespace rocketmq {
-//<!***************************************************************************
-//<!all to Master;
+
enum SendStatus { SEND_OK, SEND_FLUSH_DISK_TIMEOUT, SEND_FLUSH_SLAVE_TIMEOUT, SEND_SLAVE_NOT_AVAILABLE
};
-//<!***************************************************************************
class ROCKETMQCLIENT_API SendResult {
public:
SendResult();
@@ -59,6 +57,5 @@ class ROCKETMQCLIENT_API SendResult {
std::string m_transactionId;
};
-//<!***************************************************************************
} // namespace rocketmq
#endif
diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h
index 2ba34db..2a8a792 100644
--- a/include/TransactionMQProducer.h
+++ b/include/TransactionMQProducer.h
@@ -23,6 +23,7 @@
#include "MQClient.h"
#include "MQMessage.h"
#include "MQMessageExt.h"
+#include "SessionCredentials.h"
#include "TransactionListener.h"
#include "TransactionSendResult.h"
@@ -32,15 +33,17 @@ class ROCKETMQCLIENT_API TransactionMQProducer {
public:
TransactionMQProducer(const std::string& producerGroup);
virtual ~TransactionMQProducer();
+
void start();
void shutdown();
+
const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
const std::string& getNamesrvDomain() const;
void setNamesrvDomain(const std::string& namesrvDomain);
const std::string& getInstanceName() const;
void setInstanceName(const std::string& instanceName);
- // nameSpace
+
const std::string& getNameSpace() const;
void setNameSpace(const std::string& nameSpace);
const std::string& getGroupName() const;
@@ -58,38 +61,20 @@ class ROCKETMQCLIENT_API TransactionMQProducer {
void setTcpTransportPullThreadNum(int num);
const int getTcpTransportPullThreadNum() const;
- /** timeout of tcp connect, it is same meaning for both producer and consumer;
- * 1. default value is 3000ms
- * 2. input parameter could only be milliSecond, suggestion value is
- * 1000-3000ms;
- **/
void setTcpTransportConnectTimeout(uint64_t timeout); // ms
const uint64_t getTcpTransportConnectTimeout() const;
- /** timeout of tryLock tcpTransport before sendMsg/pullMsg, if timeout,
- * returns NULL
- * 1. paremeter unit is ms, default value is 3000ms, the minimun value is 1000ms
- * suggestion value is 3000ms;
- * 2. if configured with value smaller than 1000ms, the tryLockTimeout value
- * will be setted to 1000ms
- **/
void setTcpTransportTryLockTimeout(uint64_t timeout); // ms
const uint64_t getTcpTransportTryLockTimeout() const;
- /*
- * if msgBody size is large than m_compressMsgBodyOverHowmuch
- * rocketmq cpp will compress msgBody according to compressLevel
- */
int getCompressMsgBodyOverHowmuch() const;
void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch);
int getCompressLevel() const;
void setCompressLevel(int compressLevel);
- // if msgbody size larger than maxMsgBodySize, exception will be throwed
int getMaxMessageSize() const;
void setMaxMessageSize(int maxMessageSize);
- // log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
- // log file num is 3, each log size is 100M
+
void setLogLevel(elogLevel inputLevel);
elogLevel getLogLevel();
void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index b6998f3..0548807 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -114,9 +114,7 @@ void DefaultMQPullConsumer::persistConsumerOffsetByResetOffset() {
void DefaultMQPullConsumer::updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>&
info) {
impl->updateTopicSubscribeInfo(topic, info);
}
-ConsumeType DefaultMQPullConsumer::getConsumeType() {
- return impl->getConsumeType();
-}
+
ConsumeFromWhere DefaultMQPullConsumer::getConsumeFromWhere() {
return impl->getConsumeFromWhere();
}
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index cede0c9..b0373c0 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -35,9 +35,10 @@ void DefaultMQPushConsumer::shutdown() {
impl->shutdown();
}
-ConsumeType DefaultMQPushConsumer::getConsumeType() {
- return impl->getConsumeType();
-}
+// ConsumeType DefaultMQPushConsumer::getConsumeType() {
+// return impl->getConsumeType();
+//}
+
ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
return impl->getConsumeFromWhere();
}
diff --git a/test/src/message/MQMessageIdTest.cpp b/test/src/message/MQMessageIdTest.cpp
index d83de2f..6e97480 100644
--- a/test/src/message/MQMessageIdTest.cpp
+++ b/test/src/message/MQMessageIdTest.cpp
@@ -45,12 +45,15 @@ TEST(messageId, id) {
EXPECT_EQ(host, inet_addr("127.0.0.2"));
EXPECT_EQ(port, 10092);
EXPECT_EQ(id.getOffset(), 2048);
+
+ MQMessageId id2 = id;
+ EXPECT_EQ(id2.getOffset(), 2048);
}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
- testing::GTEST_FLAG(filter) = "messageId.id";
+ testing::GTEST_FLAG(filter) = "messageId.*";
int itestts = RUN_ALL_TESTS();
return itestts;
}
diff --git a/test/src/message/MQMessageTest.cpp b/test/src/message/MQMessageTest.cpp
index 100e73e..0f55cbb 100644
--- a/test/src/message/MQMessageTest.cpp
+++ b/test/src/message/MQMessageTest.cpp
@@ -70,6 +70,13 @@ TEST(message, Init) {
EXPECT_EQ(messageSix.getTags(), "tagTest");
EXPECT_EQ(messageSix.getKeys(), "testKey");
EXPECT_EQ(messageSix.getFlag(), 1);
+
+ MQMessage messageSeven = messageSix;
+ EXPECT_EQ(messageSeven.getTopic(), "test");
+ EXPECT_EQ(messageSeven.getBody(), "testBody");
+ EXPECT_EQ(messageSeven.getTags(), "tagTest");
+ EXPECT_EQ(messageSeven.getKeys(), "testKey");
+ EXPECT_EQ(messageSeven.getFlag(), 1);
}
TEST(message, info) {
@@ -145,6 +152,15 @@ TEST(message, properties) {
EXPECT_EQ(message.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED), "false");
}
+TEST(message, Keys) {
+ MQMessage message;
+ vector<string> keys;
+ keys.push_back("abc");
+ keys.push_back("efg");
+ keys.push_back("hij");
+ message.setKeys(keys);
+ EXPECT_EQ(message.getKeys(), "abc efg hij");
+}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
|