From commits-return-1287-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.incubator.apache.org Fri Apr 21 10:09:58 2017 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5196119D4B for ; Fri, 21 Apr 2017 10:09:58 +0000 (UTC) Received: (qmail 87103 invoked by uid 500); 21 Apr 2017 10:09:58 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 87072 invoked by uid 500); 21 Apr 2017 10:09:58 -0000 Mailing-List: contact commits-help@rocketmq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.incubator.apache.org Delivered-To: mailing list commits@rocketmq.incubator.apache.org Received: (qmail 87063 invoked by uid 99); 21 Apr 2017 10:09:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Apr 2017 10:09:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D09BEC054A for ; Fri, 21 Apr 2017 10:09:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.721 X-Spam-Level: X-Spam-Status: No, score=-3.721 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_NUMSUBJECT=0.5, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 7NqUmo6VxGL3 for ; Fri, 21 Apr 2017 10:09:53 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id C2D8D60DA2 for ; Fri, 21 Apr 2017 10:09:42 +0000 (UTC) Received: (qmail 86388 invoked by uid 99); 21 Apr 2017 10:09:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Apr 2017 10:09:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AF3C3F49EF; Fri, 21 Apr 2017 10:09:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: dongeforever@apache.org To: commits@rocketmq.incubator.apache.org Date: Fri, 21 Apr 2017 10:09:53 -0000 Message-Id: <5d52c66892434eb793d6243b396699f7@git.apache.org> In-Reply-To: <06749ac0370d417d8182d49ffa5ccb4b@git.apache.org> References: <06749ac0370d417d8182d49ffa5ccb4b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/PermName.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/PermName.cpp b/rocketmq-client4cpp/src/common/PermName.cpp new file mode 100644 index 0000000..084de79 --- /dev/null +++ b/rocketmq-client4cpp/src/common/PermName.cpp @@ -0,0 +1,63 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "PermName.h" + +namespace rmq +{ + +int PermName::PERM_PRIORITY = 0x1 << 3; +int PermName::PERM_READ = 0x1 << 2; +int PermName::PERM_WRITE = 0x1 << 1; +int PermName::PERM_INHERIT = 0x1 << 0; + +bool PermName::isReadable(int perm) +{ + return (perm & PERM_READ) == PERM_READ; +} + +bool PermName::isWriteable(int perm) +{ + return (perm & PERM_WRITE) == PERM_WRITE; +} + +bool PermName::isInherited(int perm) +{ + return (perm & PERM_INHERIT) == PERM_INHERIT; +} + +std::string PermName::perm2String(int perm) +{ + std::string pm("---"); + if (isReadable(perm)) + { + pm.replace(0, 1, "R"); + } + + if (isWriteable(perm)) + { + pm.replace(1, 2, "W"); + } + + if (isInherited(perm)) + { + pm.replace(2, 3, "X"); + } + + return pm; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/PermName.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/PermName.h b/rocketmq-client4cpp/src/common/PermName.h new file mode 100644 index 0000000..364ddeb --- /dev/null +++ b/rocketmq-client4cpp/src/common/PermName.h @@ -0,0 +1,39 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __PERMNAME_H__ +#define __PERMNAME_H__ + +#include + +namespace rmq +{ + class PermName + { + public: + static int PERM_PRIORITY; + static int PERM_READ; + static int PERM_WRITE; + static int PERM_INHERIT; + + static bool isReadable(int perm); + static bool isWriteable(int perm); + static bool isInherited(int perm); + static std::string perm2String(int perm); + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/PullSysFlag.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/PullSysFlag.cpp b/rocketmq-client4cpp/src/common/PullSysFlag.cpp new file mode 100644 index 0000000..f6fc1c2 --- /dev/null +++ b/rocketmq-client4cpp/src/common/PullSysFlag.cpp @@ -0,0 +1,68 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "PullSysFlag.h" + +namespace rmq +{ + +int PullSysFlag::FLAG_COMMIT_OFFSET = 0x1 << 0; +int PullSysFlag::FLAG_SUSPEND = 0x1 << 1; +int PullSysFlag::FLAG_SUBSCRIPTION = 0x1 << 2; + +int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend, bool subscription) +{ + int flag = 0; + + if (commitOffset) + { + flag |= FLAG_COMMIT_OFFSET; + } + + if (suspend) + { + flag |= FLAG_SUSPEND; + } + + if (subscription) + { + flag |= FLAG_SUBSCRIPTION; + } + + return flag; +} + +int PullSysFlag::clearCommitOffsetFlag(int sysFlag) +{ + return sysFlag & (~FLAG_COMMIT_OFFSET); +} + +bool PullSysFlag::hasCommitOffsetFlag(int sysFlag) +{ + return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET; +} + +bool PullSysFlag::hasSuspendFlag(int sysFlag) +{ + return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND; +} + +bool PullSysFlag::hasSubscriptionFlag(int sysFlag) +{ + return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/PullSysFlag.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/PullSysFlag.h b/rocketmq-client4cpp/src/common/PullSysFlag.h new file mode 100755 index 0000000..c19eac3 --- /dev/null +++ b/rocketmq-client4cpp/src/common/PullSysFlag.h @@ -0,0 +1,38 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __PULLSYSFLAG_H__ +#define __PULLSYSFLAG_H__ + +namespace rmq +{ + class PullSysFlag + { + public: + static int buildSysFlag(bool commitOffset, bool suspend, bool subscription); + static int clearCommitOffsetFlag(int sysFlag); + static bool hasCommitOffsetFlag(int sysFlag); + static bool hasSuspendFlag(int sysFlag); + static bool hasSubscriptionFlag(int sysFlag); + + private: + static int FLAG_COMMIT_OFFSET; + static int FLAG_SUSPEND; + static int FLAG_SUBSCRIPTION; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/SendResult.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/SendResult.cpp b/rocketmq-client4cpp/src/common/SendResult.cpp new file mode 100755 index 0000000..5263d29 --- /dev/null +++ b/rocketmq-client4cpp/src/common/SendResult.cpp @@ -0,0 +1,132 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "SendResult.h" +#include "UtilAll.h" +#include "VirtualEnvUtil.h" + +namespace rmq +{ + +SendResult::SendResult() + : m_sendStatus(SEND_OK),m_queueOffset(0) +{ +} + +SendResult::SendResult(const SendStatus& sendStatus, + const std::string& msgId, + MessageQueue& messageQueue, + long long queueOffset, + std::string& projectGroupPrefix) + : m_sendStatus(sendStatus), + m_msgId(msgId), + m_messageQueue(messageQueue), + m_queueOffset(queueOffset) +{ + if (!UtilAll::isBlank(projectGroupPrefix)) + { + m_messageQueue.setTopic(VirtualEnvUtil::clearProjectGroup(m_messageQueue.getTopic(), + projectGroupPrefix)); + } +} + +const std::string& SendResult::getMsgId() +{ + return m_msgId; +} + +void SendResult::setMsgId(const std::string& msgId) +{ + m_msgId = msgId; +} + +SendStatus SendResult::getSendStatus() +{ + return m_sendStatus; +} + +void SendResult::setSendStatus(const SendStatus& sendStatus) +{ + m_sendStatus = sendStatus; +} + +MessageQueue& SendResult::getMessageQueue() +{ + return m_messageQueue; +} + +void SendResult::setMessageQueue(MessageQueue& messageQueue) +{ + m_messageQueue = messageQueue; +} + +long long SendResult::getQueueOffset() +{ + return m_queueOffset; +} + +void SendResult::setQueueOffset(long long queueOffset) +{ + m_queueOffset = queueOffset; +} + + +bool SendResult::hasResult() +{ + return !m_msgId.empty(); +} + + + +std::string SendResult::toString() const +{ + std::stringstream ss; + ss << "{sendStatus=" << m_sendStatus + << ",msgId=" << m_msgId + << ",messageQueue=" << m_messageQueue.toString() + << ",queueOffset=" << m_queueOffset + << "}"; + return ss.str(); +} + + +std::string SendResult::toJsonString() const +{ + std::stringstream ss; + ss << "{\"sendStatus\":\"" << m_sendStatus + << "\",\"msgId\":\"" << m_msgId + << "\",\"messageQueue\":" << m_messageQueue.toJsonString() + << ",\"queueOffset\":\"" << m_queueOffset + << "}"; + return ss.str(); +} + + + +TransactionSendResult::TransactionSendResult() +{ +} + +LocalTransactionState TransactionSendResult::getLocalTransactionState() +{ + return m_localTransactionState; +} + +void TransactionSendResult::setLocalTransactionState(LocalTransactionState localTransactionState) +{ + m_localTransactionState = localTransactionState; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/ServiceState.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/ServiceState.h b/rocketmq-client4cpp/src/common/ServiceState.h new file mode 100755 index 0000000..7b41add --- /dev/null +++ b/rocketmq-client4cpp/src/common/ServiceState.h @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2013 kangliqiang ,kangliq@163.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __SERVICESTATE_H__ +#define __SERVICESTATE_H__ + +namespace rmq +{ + enum ServiceState + { + CREATE_JUST, + RUNNING, + SHUTDOWN_ALREADY, + START_FAILED + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/ServiceThread.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/ServiceThread.cpp b/rocketmq-client4cpp/src/common/ServiceThread.cpp new file mode 100644 index 0000000..1abff9f --- /dev/null +++ b/rocketmq-client4cpp/src/common/ServiceThread.cpp @@ -0,0 +1,73 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "ServiceThread.h" +#include "Monitor.h" +#include "ScopedLock.h" + +namespace rmq +{ + +ServiceThread::ServiceThread(const char* name) + : kpr::Thread(name), + m_notified(false), + m_stoped(false) +{ + +} + +ServiceThread::~ServiceThread() +{ + +} + +void ServiceThread::stop() +{ + m_stoped = true; + wakeup(); +} + +void ServiceThread::wakeup() +{ + kpr::ScopedLock lock(*this); + + if (!m_notified) + { + m_notified = true; + Notify(); + } +} + +void ServiceThread::waitForRunning(long interval) +{ + kpr::ScopedLock lock(*this); + if (m_notified) + { + m_notified = false; + return; + } + + try + { + Wait(interval); + } + catch (...) + { + m_notified = false; + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/ServiceThread.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/ServiceThread.h b/rocketmq-client4cpp/src/common/ServiceThread.h new file mode 100755 index 0000000..d7ec3ef --- /dev/null +++ b/rocketmq-client4cpp/src/common/ServiceThread.h @@ -0,0 +1,50 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __SERVICETHREAD_H__ +#define __SERVICETHREAD_H__ + +#include +#include "Thread.h" +#include "Monitor.h" + +namespace rmq +{ + const long JoinTime = 90 * 1000; + + /** + * service thread base class + * + */ + class ServiceThread : public kpr::Thread, public kpr::Monitor + { + public: + ServiceThread(const char* name = NULL); + virtual ~ServiceThread(); + + virtual std::string getServiceName() = 0; + + void stop(); + void wakeup(); + void waitForRunning(long interval); + + protected: + volatile bool m_notified; + volatile bool m_stoped; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h b/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h new file mode 100755 index 0000000..12bd48a --- /dev/null +++ b/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h @@ -0,0 +1,50 @@ +/** +* Copyright (C) 2013 kangliqiang, kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __SUBSCRIPTIONGROUPCONFIG_H__ +#define __SUBSCRIPTIONGROUPCONFIG_H__ + +#include +#include "MixAll.h" + +namespace rmq +{ + class SubscriptionGroupConfig + { + public: + SubscriptionGroupConfig(const std::string& groupName) + { + this->groupName = groupName; + consumeEnable = true; + consumeFromMinEnable = true; + consumeBroadcastEnable = true; + retryQueueNums = 1; + retryMaxTimes = 5; + brokerId = MixAll::MASTER_ID; + whichBrokerWhenConsumeSlowly = 1; + } + + std::string groupName; + bool consumeEnable; + bool consumeFromMinEnable; + bool consumeBroadcastEnable; + int retryQueueNums; + int retryMaxTimes; + long brokerId; + long whichBrokerWhenConsumeSlowly; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/TopAddressing.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/TopAddressing.h b/rocketmq-client4cpp/src/common/TopAddressing.h new file mode 100755 index 0000000..07b0c0c --- /dev/null +++ b/rocketmq-client4cpp/src/common/TopAddressing.h @@ -0,0 +1,54 @@ +/** +* Copyright (C) 2013 kangliqiang, kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __TOPADDRESSING_H__ +#define __TOPADDRESSING_H__ + +#include +#include +#include "SocketUtil.h" + +namespace rmq +{ + class TopAddressing + { + public: + TopAddressing() + : m_nsAddr("") + { + } + + const std::string& getNsAddr() + { + return m_nsAddr; + } + + void setNsAddr(std::string& nsAddr) + { + m_nsAddr = nsAddr; + } + + std::string fetchNSAddr() + { + + return ""; + } + + private: + std::string m_nsAddr; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/TopicConfig.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/TopicConfig.cpp b/rocketmq-client4cpp/src/common/TopicConfig.cpp new file mode 100644 index 0000000..036b41c --- /dev/null +++ b/rocketmq-client4cpp/src/common/TopicConfig.cpp @@ -0,0 +1,167 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include +#include + +#include "TopicConfig.h" +#include "PermName.h" + +namespace rmq +{ + +int TopicConfig::DefaultReadQueueNums = 16; +int TopicConfig::DefaultWriteQueueNums = 16; +std::string TopicConfig::SEPARATOR = " "; + +TopicConfig::TopicConfig() + : m_topicName(""), + m_readQueueNums(DefaultReadQueueNums), + m_writeQueueNums(DefaultWriteQueueNums), + m_perm(PermName::PERM_READ | PermName::PERM_WRITE), + m_topicFilterType(SINGLE_TAG), + m_topicSysFlag(0), + m_order(false) +{ + +} + +TopicConfig::TopicConfig(const std::string& topicName) + : m_topicName(topicName), + m_readQueueNums(DefaultReadQueueNums), + m_writeQueueNums(DefaultWriteQueueNums), + m_perm(PermName::PERM_READ | PermName::PERM_WRITE), + m_topicFilterType(SINGLE_TAG), + m_topicSysFlag(0), + m_order(false) +{ + +} + +TopicConfig::TopicConfig(const std::string& topicName, int readQueueNums, int writeQueueNums, int perm) + : m_topicName(topicName), + m_readQueueNums(readQueueNums), + m_writeQueueNums(writeQueueNums), + m_perm(perm), + m_topicFilterType(SINGLE_TAG), + m_topicSysFlag(0), + m_order(false) +{ +} + +TopicConfig::~TopicConfig() +{ +} + +std::string TopicConfig::encode() +{ + std::stringstream ss; + ss << m_topicName << SEPARATOR + << m_readQueueNums << SEPARATOR + << m_writeQueueNums << SEPARATOR + << m_perm << SEPARATOR + << m_topicFilterType; + + return ss.str(); +} + +bool TopicConfig::decode(const std::string& in) +{ + std::stringstream ss(in); + + ss >> m_topicName; + ss >> m_readQueueNums; + ss >> m_writeQueueNums; + ss >> m_perm; + + int type; + ss >> type; + m_topicFilterType = (TopicFilterType)type; + + return true; +} + +const std::string& TopicConfig::getTopicName() +{ + return m_topicName; +} + +void TopicConfig::setTopicName(const std::string& topicName) +{ + m_topicName = topicName; +} + +int TopicConfig::getReadQueueNums() +{ + return m_readQueueNums; +} + +void TopicConfig::setReadQueueNums(int readQueueNums) +{ + m_readQueueNums = readQueueNums; +} + +int TopicConfig::getWriteQueueNums() +{ + return m_writeQueueNums; +} + +void TopicConfig::setWriteQueueNums(int writeQueueNums) +{ + m_writeQueueNums = writeQueueNums; +} + +int TopicConfig::getPerm() +{ + return m_perm; +} + +void TopicConfig::setPerm(int perm) +{ + m_perm = perm; +} + +TopicFilterType TopicConfig::getTopicFilterType() +{ + return m_topicFilterType; +} + +void TopicConfig::setTopicFilterType(TopicFilterType topicFilterType) +{ + m_topicFilterType = topicFilterType; +} + +int TopicConfig::getTopicSysFlag() +{ + return m_topicSysFlag; +} + +void TopicConfig::setTopicSysFlag(int perm) +{ + m_topicSysFlag = perm; +} + +bool TopicConfig::isOrder() +{ + return m_order; +} + +void TopicConfig::setOrder(bool order) +{ + m_order = order; +} + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/TopicConfig.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/TopicConfig.h b/rocketmq-client4cpp/src/common/TopicConfig.h new file mode 100644 index 0000000..b9f2bcb --- /dev/null +++ b/rocketmq-client4cpp/src/common/TopicConfig.h @@ -0,0 +1,71 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __TOPICCONFIG_H__ +#define __TOPICCONFIG_H__ + +#include +#include "TopicFilterType.h" + +namespace rmq + { + /** + * Topic + * + */ + class TopicConfig + { + public: + TopicConfig(); + TopicConfig(const std::string& topicName); + TopicConfig(const std::string& topicName, int readQueueNums, int writeQueueNums, int perm); + ~TopicConfig(); + + std::string encode(); + bool decode(const std::string& in); + const std::string& getTopicName(); + void setTopicName(const std::string& topicName); + int getReadQueueNums(); + void setReadQueueNums(int readQueueNums); + int getWriteQueueNums(); + void setWriteQueueNums(int writeQueueNums); + int getPerm(); + void setPerm(int perm); + TopicFilterType getTopicFilterType(); + void setTopicFilterType(TopicFilterType topicFilterType); + int getTopicSysFlag(); + void setTopicSysFlag(int topicSysFlag); + bool isOrder(); + void setOrder(bool order); + + public: + static int DefaultReadQueueNums; + static int DefaultWriteQueueNums; + + private: + static std::string SEPARATOR; + + std::string m_topicName; + int m_readQueueNums; + int m_writeQueueNums; + int m_perm; + TopicFilterType m_topicFilterType; + int m_topicSysFlag; + bool m_order; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/TopicStatsTable.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/TopicStatsTable.h b/rocketmq-client4cpp/src/common/TopicStatsTable.h new file mode 100755 index 0000000..4319e54 --- /dev/null +++ b/rocketmq-client4cpp/src/common/TopicStatsTable.h @@ -0,0 +1,51 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __TOPICSTATSTABLE_H__ +#define __TOPICSTATSTABLE_H__ + +#include + +namespace rmq +{ + class MessageQueue; + + typedef struct + { + long long minOffset; + long long maxOffset; + long long lastUpdateTimestamp; + } TopicOffset; + + class TopicStatsTable + { + public: + std::map getOffsetTable() + { + return m_offsetTable; + } + + void setOffsetTable(const std::map& offsetTable) + { + m_offsetTable = offsetTable; + } + + private: + std::map m_offsetTable; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/UtilAll.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/UtilAll.h b/rocketmq-client4cpp/src/common/UtilAll.h new file mode 100755 index 0000000..b239edb --- /dev/null +++ b/rocketmq-client4cpp/src/common/UtilAll.h @@ -0,0 +1,608 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __UTILALL_H__ +#define __UTILALL_H__ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "RocketMQClient.h" +#include "zlib.h" +#include "json/value.h" +#include "json/writer.h" + +namespace rmq +{ + const std::string WHITESPACE = " \t\r\n"; + const int CHUNK = 8192; + const std::string yyyy_MM_dd_HH_mm_ss = "yyyy-MM-dd HH:mm:ss"; + const std::string yyyy_MM_dd_HH_mm_ss_SSS = "yyyy-MM-dd#HH:mm:ss:SSS"; + const std::string yyyyMMddHHmmss = "yyyyMMddHHmmss"; + + class UtilAll + { + public: + static pid_t getPid() + { + static __thread pid_t pid = 0; + if (!pid || pid != getpid()) + { + pid = getpid(); + } + return pid; + } + + static pid_t getTid() + { + static __thread pid_t pid = 0; + static __thread pid_t tid = 0; + if (!pid || !tid || pid != getpid()) + { + pid = getpid(); + tid = syscall(__NR_gettid); + } + return tid; + } + + static int Split(std::vector& out, const std::string& in, const std::string& delimiter) + { + std::string::size_type left = 0; + for (size_t i = 1; i < in.size(); i++) + { + std::string::size_type right = in.find(delimiter, left); + + if (right == std::string::npos) + { + break; + } + + out.push_back(in.substr(left, right - left)); + + left = right + delimiter.length(); + } + + out.push_back(in.substr(left)); + + return out.size(); + } + + static int Split(std::vector& out, const std::string& in, const char delimiter) + { + std::string::size_type left = 0; + for (size_t i = 1; i < in.size(); i++) + { + std::string::size_type right = in.find(delimiter, left); + + if (right == std::string::npos) + { + break; + } + + out.push_back(in.substr(left, right - left)); + + left = right + 1; + } + + out.push_back(in.substr(left)); + + return out.size(); + } + + static std::string Trim(const std::string& str) + { + if (str.empty()) + { + return str; + } + + std::string::size_type left = str.find_first_not_of(WHITESPACE); + + if (left == std::string::npos) + { + return ""; + } + + std::string::size_type right = str.find_last_not_of(WHITESPACE); + + if (right == std::string::npos) + { + return str.substr(left); + } + + return str.substr(left, right + 1 - left); + } + + static bool isBlank(const std::string& str) + { + if (str.empty()) + { + return true; + } + + std::string::size_type left = str.find_first_not_of(WHITESPACE); + + if (left == std::string::npos) + { + return true; + } + + return false; + } + + static int availableProcessors() + { + return 4; + } + + + static int hashCode(const char* pData, int len) + { + int h = 0; + if (pData != NULL && len > 0) + { + unsigned char c; + for (int i = 0; i < len; i++) + { + c = (unsigned char)pData[i]; + h = 31 * h + c; + } + } + + return h; + } + + static int hashCode(const std::string& s) + { + return hashCode(s.c_str(), s.length()); + } + + static int hashCode(const char* pData) + { + return hashCode(std::string(pData)); + } + + static int hashCode(char x) + { + return x; + } + + static int hashCode(unsigned char x) + { + return x; + } + + static int hashCode(short x) + { + return x; + } + + static int hashCode(unsigned short x) + { + return x; + } + + static int hashCode(int x) + { + return x; + } + + static int hashCode(unsigned int x) + { + return x; + } + + static int hashCode(long x) + { + return x; + } + + static int hashCode(unsigned long x) + { + return x; + } + + template + static int hashCode(const std::vector& v) + { + int h = 0; + typeof(v.begin()) it = v.begin(); + while (it != v.end()) + { + h += hashCode(*it); + ++it; + } + return h; + } + + template + static int hashCode(const std::set& v) + { + int h = 0; + typeof(v.begin()) it = v.begin(); + while (it != v.end()) + { + h += hashCode(*it); + ++it; + } + return h; + } + + static std::string toString(Json::Value& json) + { + Json::FastWriter fastWriter; + return fastWriter.write(json); + } + + template + static std::string toString(const T& v) + { + std::ostringstream ss; + ss << v; + return ss.str(); + } + + template + static std::string toString(const std::vector& v) + { + std::string s; + s.append("["); + typeof(v.begin()) it = v.begin(); + while (it != v.end()) + { + s.append(toString(*it)); + s.append(","); + ++it; + } + if (s.size() > 1) + { + s.erase(s.size() - 1, 1); + } + s.append("]"); + return s; + } + + + template + static std::string toString(const std::list& v) + { + std::string s; + s.append("["); + typeof(v.begin()) it = v.begin(); + while (it != v.end()) + { + s.append(toString(*it)); + s.append(","); + ++it; + } + if (s.size() > 1) + { + s.erase(s.size() - 1, 1); + } + s.append("]"); + + return s; + } + + template + static std::string toString(const std::set& v) + { + std::string s; + s.append("["); + typeof(v.begin()) it = v.begin(); + while (it != v.end()) + { + s.append(toString(*it)); + s.append(","); + ++it; + } + if (s.size() > 1) + { + s.erase(s.size() - 1, 1); + } + s.append("]"); + return s; + } + + template + static std::string toString(const std::map& v) + { + std::string s; + s.append("{"); + typeof(v.begin()) it = v.begin(); + while (it != v.end()) + { + s.append(toString(it->first)); + s.append("="); + s.append(toString(it->second)); + s.append(","); + ++it; + } + if (s.size() > 1) + { + s.erase(s.size() - 1, 1); + } + s.append("}"); + return s; + } + + template + static out_type convert(const in_type& t) + { + out_type result; + std::stringstream stream; + stream << t; + stream >> result; + return result; + } + + static bool compress(const char* pIn, int inLen, unsigned char** pOut, int* pOutLen, int level) + { + int ret, flush; + int have; + z_stream strm; + unsigned char out[CHUNK]; + + /* allocate deflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + ret = deflateInit(&strm, level); + if (ret != Z_OK) + { + return false; + } + + int outBufferLen = inLen; + unsigned char* outData = (unsigned char*)malloc(outBufferLen); + int left = inLen; + int used = 0; + int outDataLen = 0; + + /* compress until end of buffer */ + do + { + strm.avail_in = left > CHUNK ? CHUNK : left; + flush = left <= CHUNK ? Z_FINISH : Z_NO_FLUSH; + strm.next_in = (unsigned char*)pIn + used; + used += strm.avail_in; + left -= strm.avail_in; + + /* run deflate() on input until output buffer not full, finish + compression if all of source has been read in */ + do + { + strm.avail_out = CHUNK; + strm.next_out = out; + ret = deflate(&strm, flush); /* no bad return value */ + assert(ret != Z_STREAM_ERROR); /* state not clobbered */ + have = CHUNK - strm.avail_out; + + if (outDataLen + have > outBufferLen) + { + outBufferLen = outDataLen + have; + outBufferLen <<= 1; + unsigned char* tmp = (unsigned char*)realloc(outData, outBufferLen); + if (!tmp) + { + free(outData); + return false; + } + + outData = tmp; + } + + memcpy(outData + outDataLen, out, have); + outDataLen += have; + + } + while (strm.avail_out == 0); + assert(strm.avail_in == 0); /* all input will be used */ + + /* done when last data in file processed */ + } + while (flush != Z_FINISH); + assert(ret == Z_STREAM_END); /* stream will be complete */ + + *pOutLen = outDataLen; + *pOut = outData; + + /* clean up and return */ + (void)deflateEnd(&strm); + return true; + } + + static bool decompress(const char* pIn, int inLen, unsigned char** pOut, int* pOutLen) + { + int ret; + int have; + z_stream strm; + + unsigned char out[CHUNK]; + + /* allocate inflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.avail_in = 0; + strm.next_in = Z_NULL; + ret = inflateInit(&strm); + if (ret != Z_OK) + { + return false; + } + + int outBufferLen = inLen << 2; + unsigned char* outData = (unsigned char*)malloc(outBufferLen); + + int left = inLen; + int used = 0; + int outDataLen = 0; + + /* decompress until deflate stream ends or end of buffer */ + do + { + strm.avail_in = left > CHUNK ? CHUNK : left; + if (strm.avail_in <= 0) + { + break; + } + + strm.next_in = (unsigned char*)pIn + used; + used += strm.avail_in; + left -= strm.avail_in; + + /* run inflate() on input until output buffer not full */ + do + { + strm.avail_out = CHUNK; + strm.next_out = out; + ret = inflate(&strm, Z_NO_FLUSH); + assert(ret != Z_STREAM_ERROR); /* state not clobbered */ + switch (ret) + { + case Z_NEED_DICT: + ret = Z_DATA_ERROR; /* and fall through */ + case Z_DATA_ERROR: + case Z_MEM_ERROR: + (void)inflateEnd(&strm); + free(outData); + return false; + } + have = CHUNK - strm.avail_out; + + if (outDataLen + have > outBufferLen) + { + outBufferLen = outDataLen + have; + outBufferLen <<= 1; + unsigned char* tmp = (unsigned char*)realloc(outData, outBufferLen); + if (!tmp) + { + free(outData); + return false; + } + + outData = tmp; + } + + memcpy(outData + outDataLen, out, have); + outDataLen += have; + + } + while (strm.avail_out == 0); + + /* done when inflate() says it's done */ + } + while (ret != Z_STREAM_END); + + /* clean up and return */ + (void)inflateEnd(&strm); + + if (ret == Z_STREAM_END) + { + *pOutLen = outDataLen; + *pOut = outData; + + return true; + } + else + { + free(outData); + + return false; + } + } + + static unsigned long long hexstr2ull(const char* str) + { + char* end; + return strtoull(str, &end, 16); + } + + static long long str2ll(const char *str) + { + return atoll(str); + } + + + static std::string tm2str(const time_t& t, const std::string& sFormat) + { + struct tm stTm; + localtime_r(&t, &stTm); + + char sTimeString[255] = "\0"; + strftime(sTimeString, sizeof(sTimeString), sFormat.c_str(), &stTm); + + return std::string(sTimeString); + } + + static std::string now2str(const std::string& sFormat) + { + time_t t = time(NULL); + return tm2str(t, sFormat.c_str()); + } + + static std::string now2str() + { + return now2str("%Y-%m-%d %H:%M:%S"); + } + + static int64_t now2ms() + { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000; + } + + static int64_t now2us() + { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec * (int64_t)1000*1000 + tv.tv_usec; + } + + static int str2tm(const std::string &sString, const std::string &sFormat, struct tm &stTm) + { + char *p = strptime(sString.c_str(), sFormat.c_str(), &stTm); + return (p != NULL) ? 0 : -1; + } + + static time_t str2tm(const std::string &sString, const std::string &sFormat) + { + struct tm stTm; + if (str2tm(sString, sFormat, stTm) == 0) + { + time_t t = mktime(&stTm); + return t; + } + else + { + return -1; + } + } + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/Validators.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/Validators.cpp b/rocketmq-client4cpp/src/common/Validators.cpp new file mode 100755 index 0000000..29f36a0 --- /dev/null +++ b/rocketmq-client4cpp/src/common/Validators.cpp @@ -0,0 +1,132 @@ +/** +* Copyright (C) 2013 kangliqiang, kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "Validators.h" + +#include +#include +#include "MQClientException.h" +#include "UtilAll.h" +#include "MixAll.h" +#include "Message.h" +#include "MQProtos.h" +#include "DefaultMQProducer.h" + +namespace rmq +{ + +const std::string Validators::validPatternStr = "^[a-zA-Z0-9_-]+$"; +const size_t Validators::CHARACTER_MAX_LENGTH = 255; + +bool Validators::regularExpressionMatcher(const std::string& origin, const std::string& patternStr) +{ + if (UtilAll::isBlank(origin)) + { + return false; + } + + if (UtilAll::isBlank(patternStr)) + { + return true; + } + + //Pattern pattern = Pattern.compile(patternStr); + //Matcher matcher = pattern.matcher(origin); + + //return matcher.matches(); + return true; +} + +std::string Validators::getGroupWithRegularExpression(const std::string& origin, const std::string& patternStr) +{ + /*Pattern pattern = Pattern.compile(patternStr); + Matcher matcher = pattern.matcher(origin); + while (matcher.find()) { + return matcher.group(0); + }*/ + return ""; +} + +void Validators::checkTopic(const std::string& topic) +{ + if (UtilAll::isBlank(topic)) + { + THROW_MQEXCEPTION(MQClientException, "the specified topic is blank", -1); + } + + if (topic.length() > CHARACTER_MAX_LENGTH) + { + THROW_MQEXCEPTION(MQClientException, "the specified topic is longer than topic max length 255.", -1); + } + + // Topic�����Ƿ��뱣���ֶγ�ͻ + if (topic == MixAll::DEFAULT_TOPIC) + { + THROW_MQEXCEPTION(MQClientException, "the topic[" + topic + "] is conflict with default topic.", -1); + } + + if (!regularExpressionMatcher(topic, validPatternStr)) + { + std::string str; + str = "the specified topic[" + topic + "] contains illegal characters, allowing only" + validPatternStr; + + THROW_MQEXCEPTION(MQClientException, str.c_str(), -1); + } +} + +void Validators::checkGroup(const std::string& group) +{ + if (UtilAll::isBlank(group)) + { + THROW_MQEXCEPTION(MQClientException, "the specified group is blank", -1); + } + + if (!regularExpressionMatcher(group, validPatternStr)) + { + std::string str; + str = "the specified group[" + group + "] contains illegal characters, allowing only" + validPatternStr; + + THROW_MQEXCEPTION(MQClientException, str.c_str(), -1); + } + if (group.length() > CHARACTER_MAX_LENGTH) + { + THROW_MQEXCEPTION(MQClientException, "the specified group is longer than group max length 255.", -1); + } +} + +void Validators::checkMessage(const Message& msg, DefaultMQProducer* pDefaultMQProducer) +{ + checkTopic(msg.getTopic()); + + //// body + if (msg.getBody() == NULL) + { + THROW_MQEXCEPTION(MQClientException, "the message body is null", MESSAGE_ILLEGAL_VALUE); + } + + if (msg.getBodyLen() == 0) + { + THROW_MQEXCEPTION(MQClientException, "the message body length is zero", MESSAGE_ILLEGAL_VALUE); + } + + if (msg.getBodyLen() > pDefaultMQProducer->getMaxMessageSize()) + { + char info[256]; + snprintf(info, sizeof(info), "the message body size over max value, MAX: %d", pDefaultMQProducer->getMaxMessageSize()); + THROW_MQEXCEPTION(MQClientException, info, MESSAGE_ILLEGAL_VALUE); + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/Validators.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/Validators.h b/rocketmq-client4cpp/src/common/Validators.h new file mode 100755 index 0000000..36ab299 --- /dev/null +++ b/rocketmq-client4cpp/src/common/Validators.h @@ -0,0 +1,49 @@ +/** +* Copyright (C) 2013 kangliqiang, kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __VALIDATORST_H__ +#define __VALIDATORST_H__ + +#include + +namespace rmq +{ + class MQClientException; + class DefaultMQProducer; + class Message; + + /** + * Validator class + * + * @author manhong.yqd + * @since 2013-8-28 + */ + class Validators + { + public: + static bool regularExpressionMatcher(const std::string& origin, const std::string& patternStr); + static std::string getGroupWithRegularExpression(const std::string& origin, const std::string& patternStr); + + static void checkTopic(const std::string& topic); + static void checkGroup(const std::string& group); + static void checkMessage(const Message& msg, DefaultMQProducer* pDefaultMQProducer); + + public: + static const std::string validPatternStr; + static const size_t CHARACTER_MAX_LENGTH; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp b/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp new file mode 100755 index 0000000..c68bfc8 --- /dev/null +++ b/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp @@ -0,0 +1,66 @@ +/** +* Copyright (C) 2013 kangliqiang, kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "VirtualEnvUtil.h" + +#include +#include +#include "UtilAll.h" + +namespace rmq +{ + +const char* VirtualEnvUtil::VIRTUAL_APPGROUP_PREFIX = "%%PROJECT_%s%%"; + +std::string VirtualEnvUtil::buildWithProjectGroup(const std::string& origin, const std::string& projectGroup) +{ + if (!UtilAll::isBlank(projectGroup)) + { + char prefix[1024]; + snprintf(prefix, sizeof(prefix), VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str()); + + if (origin.find_last_of(prefix) == std::string::npos) + { + return origin + prefix; + } + else + { + return origin; + } + } + else + { + return origin; + } +} + + +std::string VirtualEnvUtil::clearProjectGroup(const std::string& origin, const std::string& projectGroup) +{ + char prefix[1024]; + snprintf(prefix, sizeof(prefix), VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str()); + std::string::size_type pos = origin.find_last_of(prefix); + + if (!UtilAll::isBlank(prefix) && pos != std::string::npos) + { + return origin.substr(0, pos); + } + else + { + return origin; + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/VirtualEnvUtil.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/VirtualEnvUtil.h b/rocketmq-client4cpp/src/common/VirtualEnvUtil.h new file mode 100755 index 0000000..10ca0cd --- /dev/null +++ b/rocketmq-client4cpp/src/common/VirtualEnvUtil.h @@ -0,0 +1,41 @@ +/** +* Copyright (C) 2013 kangliqiang, kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __VIRTUALENVUTIL_H__ +#define __VIRTUALENVUTIL_H__ + +#include + +namespace rmq +{ + /** + * VirtualEnv API + * + * @author manhong.yqd + * @since 2013-8-26 + */ + class VirtualEnvUtil + { + public: + static std::string buildWithProjectGroup(const std::string& origin, const std::string& projectGroup); + static std::string clearProjectGroup(const std::string& origin, const std::string& projectGroup); + + public: + static const char* VIRTUAL_APPGROUP_PREFIX; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h b/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h new file mode 100755 index 0000000..49e1e7c --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h @@ -0,0 +1,205 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __ALLOCATEMESSAGEQUEUESTRATEGYINNER_H__ +#define __ALLOCATEMESSAGEQUEUESTRATEGYINNER_H__ + +#include + +#include "AllocateMessageQueueStrategy.h" +#include "MQClientException.h" +#include "UtilAll.h" + + +namespace rmq +{ + + class AllocateMessageQueueAveragely : public AllocateMessageQueueStrategy + { + public: + virtual ~AllocateMessageQueueAveragely() {} + virtual std::vector* allocate( + const std::string& consumerGroup, + const std::string& currentCID, + std::vector& mqAll, + std::list& cidAll) + { + if (currentCID.empty()) + { + THROW_MQEXCEPTION(MQClientException, "currentCID is empty", -1); + } + + if (mqAll.empty()) + { + THROW_MQEXCEPTION(MQClientException, "mqAll is empty", -1); + } + + if (cidAll.empty()) + { + THROW_MQEXCEPTION(MQClientException, "cidAll is empty", -1); + } + + int index = -1; + int cidAllSize = cidAll.size(); + + std::list::iterator it = cidAll.begin(); + for (int i = 0; it != cidAll.end(); it++, i++) + { + if (*it == currentCID) + { + index = i; + break; + } + } + + if (index == -1) + { + RMQ_ERROR("[BUG] ConsumerGroup: {%s} The consumerId: {%s} not in cidAll: {%s}", // + consumerGroup.c_str(), + currentCID.c_str(), + UtilAll::toString(cidAll).c_str()); + return NULL; + } + + int mqAllSize = mqAll.size(); + int mod = mqAllSize % cidAllSize; + int averageSize = + mqAllSize <= cidAllSize ? 1 : (mod > 0 && index < mod ? mqAllSize / cidAllSize + + 1 : mqAllSize / cidAllSize); + int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; + + std::vector* result = new std::vector(); + int range = std::min(averageSize, mqAllSize - startIndex); + + for (int i = 0; i < range; i++) + { + result->push_back(mqAll.at((startIndex + i) % mqAllSize)); + } + + return result; + } + + virtual std::string getName() + { + return "AVG"; + } + }; + + + class AllocateMessageQueueAveragelyByCircle : public AllocateMessageQueueStrategy + { + public: + virtual ~AllocateMessageQueueAveragelyByCircle() {} + virtual std::vector* allocate( + const std::string& consumerGroup, + const std::string& currentCID, + std::vector& mqAll, + std::list& cidAll) + { + if (currentCID.empty()) + { + THROW_MQEXCEPTION(MQClientException, "currentCID is empty", -1); + } + + if (mqAll.empty()) + { + THROW_MQEXCEPTION(MQClientException, "mqAll is empty", -1); + } + + if (cidAll.empty()) + { + THROW_MQEXCEPTION(MQClientException, "cidAll is empty", -1); + } + + int index = -1; + std::list::iterator it = cidAll.begin(); + for (int i = 0; it != cidAll.end(); it++, i++) + { + if (*it == currentCID) + { + index = i; + break; + } + } + + if (index == -1) + { + RMQ_ERROR("[BUG] ConsumerGroup: {%s} The consumerId: {%s} not in cidAll: {%s}", // + consumerGroup.c_str(), + currentCID.c_str(), + UtilAll::toString(cidAll).c_str()); + return NULL; + } + + std::vector* result = new std::vector(); + for (int i = index; i < (int)mqAll.size(); i++) + { + if (i % (int)cidAll.size() == index) + { + result->push_back(mqAll.at(i)); + } + } + + return result; + } + + virtual std::string getName() + { + return "AVG_BY_CIRCLE"; + } + }; + + + class AllocateMessageQueueByConfig : public AllocateMessageQueueStrategy + { + public: + virtual ~AllocateMessageQueueByConfig() {} + virtual std::vector* allocate( + const std::string& consumerGroup, + const std::string& currentCID, + std::vector& mqAll, + std::list& cidAll) + { + return NULL; + } + + virtual std::string getName() + { + return "CONFIG"; + } + }; + + + class AllocateMessageQueueByMachineRoom : public AllocateMessageQueueStrategy + { + public: + virtual ~AllocateMessageQueueByMachineRoom() {} + virtual std::vector* allocate( + const std::string& consumerGroup, + const std::string& currentCID, + std::vector& mqAll, + std::list& cidAll) + { + return NULL; + } + + virtual std::string getName() + { + return "MACHINE_ROOM"; + } + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp new file mode 100755 index 0000000..7550acb --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp @@ -0,0 +1,476 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "ConsumeMessageConcurrentlyService.h" + +#include "DefaultMQPushConsumerImpl.h" +#include "MessageListener.h" +#include "MessageQueue.h" +#include "RebalanceImpl.h" +#include "DefaultMQPushConsumer.h" +#include "MixAll.h" +#include "KPRUtil.h" +#include "UtilAll.h" +#include "OffsetStore.h" + +namespace rmq +{ + + +class SubmitConsumeRequestLater : public kpr::TimerHandler +{ +public: + SubmitConsumeRequestLater(std::list& msgs, + ProcessQueue* pProcessQueue, + MessageQueue messageQueue, + ConsumeMessageConcurrentlyService* pService) + : m_msgs(msgs), + m_pProcessQueue(pProcessQueue), + m_messageQueue(messageQueue), + m_pService(pService) + { + + } + + void OnTimeOut(unsigned int timerID) + { + try + { + m_pService->submitConsumeRequest(m_msgs, m_pProcessQueue, m_messageQueue, true); + } + catch(...) + { + RMQ_ERROR("SubmitConsumeRequestLater OnTimeOut exception"); + } + + delete this; + } + +private: + std::list m_msgs; + ProcessQueue* m_pProcessQueue; + MessageQueue m_messageQueue; + ConsumeMessageConcurrentlyService* m_pService; +}; + + +class CleanExpireMsgTask : public kpr::TimerHandler +{ +public: + CleanExpireMsgTask(ConsumeMessageConcurrentlyService* pService) + : m_pService(pService) + { + + } + + void OnTimeOut(unsigned int timerID) + { + try + { + m_pService->cleanExpireMsg(); + } + catch(...) + { + RMQ_ERROR("CleanExpireMsgTask OnTimeOut exception"); + } + } + +private: + ConsumeMessageConcurrentlyService* m_pService; +}; + + + +ConsumeMessageConcurrentlyService::ConsumeMessageConcurrentlyService( + DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl, + MessageListenerConcurrently* pMessageListener) +{ + m_pDefaultMQPushConsumerImpl = pDefaultMQPushConsumerImpl; + m_pMessageListener = pMessageListener; + m_pDefaultMQPushConsumer = m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer(); + m_consumerGroup = m_pDefaultMQPushConsumer->getConsumerGroup(); + m_pConsumeExecutor = new kpr::ThreadPool("ConsumeMessageThreadPool", 5, + m_pDefaultMQPushConsumer->getConsumeThreadMin(), m_pDefaultMQPushConsumer->getConsumeThreadMax()); + m_pScheduledExecutorService = new kpr::TimerThread("ConsumeMessageConcurrentlyService", 1000); + m_pCleanExpireMsgExecutors = new kpr::TimerThread("CleanExpireMsgService", 1000); + m_pCleanExpireMsgTask = new CleanExpireMsgTask(this); +} + +ConsumeMessageConcurrentlyService::~ConsumeMessageConcurrentlyService() +{ + delete m_pCleanExpireMsgTask; +} + + +void ConsumeMessageConcurrentlyService::start() +{ + m_pCleanExpireMsgExecutors->RegisterTimer(60 * 1000, 60 * 1000, m_pCleanExpireMsgTask, true); + m_pScheduledExecutorService->Start(); + m_pCleanExpireMsgExecutors->Start(); +} + +void ConsumeMessageConcurrentlyService::shutdown() +{ + m_pConsumeExecutor->Destroy(); + m_pScheduledExecutorService->Stop(); + m_pScheduledExecutorService->Join(); + + m_pCleanExpireMsgExecutors->Stop(); + m_pCleanExpireMsgExecutors->Join(); +} + + +void ConsumeMessageConcurrentlyService::cleanExpireMsg() +{ + kpr::ScopedRLock lock(m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->getProcessQueueTableLock()); + std::map& processQueueTable + = m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->getProcessQueueTable(); + RMQ_FOR_EACH(processQueueTable, it) + { + ProcessQueue* pq = it->second; + if (!pq->isDropped()) + { + pq->cleanExpiredMsg(m_pDefaultMQPushConsumer); + } + } +} + + +ConsumerStat& ConsumeMessageConcurrentlyService::getConsumerStat() +{ + return m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat(); +} + +bool ConsumeMessageConcurrentlyService::sendMessageBack(MessageExt& msg, + ConsumeConcurrentlyContext& context) +{ + try + { + m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, + context.delayLevelWhenNextConsume, context.messageQueue.getBrokerName()); + return true; + } + catch (...) + { + RMQ_ERROR("sendMessageBack exception, group: %s, msg: %s", + m_consumerGroup.c_str(), msg.toString().c_str()); + } + + return false; +} + +void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(std::list& msgs, + ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue) +{ + SubmitConsumeRequestLater* sc = new SubmitConsumeRequestLater(msgs, pProcessQueue, messageQueue, this); + m_pScheduledExecutorService->RegisterTimer(0, 5000, sc, false); +} + +void ConsumeMessageConcurrentlyService::submitConsumeRequest(std::list& msgs, + ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + bool dispathToConsume) +{ + size_t consumeBatchSize = m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize(); + + RMQ_DEBUG("submitConsumeRequest begin, msgs.size=%d, messageQueue=%s, consumeBatchSize=%d, dispathToConsume=%d", + (int)msgs.size(), messageQueue.toString().c_str(), (int)consumeBatchSize, dispathToConsume + ); + + if (msgs.size() <= consumeBatchSize) + { + kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeConcurrentlyRequest(msgs, pProcessQueue, messageQueue, this); + m_pConsumeExecutor->AddWork(consumeRequest); + } + else + { + std::list::iterator it = msgs.begin(); + for (; it != msgs.end();) + { + std::list msgThis; + for (size_t i = 0; i < consumeBatchSize; i++, it++) + { + if (it != msgs.end()) + { + msgThis.push_back(*it); + } + else + { + break; + } + } + + kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeConcurrentlyRequest(msgThis, pProcessQueue, messageQueue, this); + m_pConsumeExecutor->AddWork(consumeRequest); + } + } + + RMQ_DEBUG("submitConsumeRequest end"); +} + +void ConsumeMessageConcurrentlyService::updateCorePoolSize(int corePoolSize) +{ + //todo +} + +void ConsumeMessageConcurrentlyService::processConsumeResult(ConsumeConcurrentlyStatus status, + ConsumeConcurrentlyContext& context, + ConsumeConcurrentlyRequest& consumeRequest) +{ + int ackIndex = context.ackIndex; + + if (consumeRequest.getMsgs().empty()) + { + return; + } + + int msgsSize = consumeRequest.getMsgs().size(); + + switch (status) + { + case CONSUME_SUCCESS: + { + if (ackIndex >= msgsSize) + { + ackIndex = msgsSize - 1; + } + + int ok = ackIndex + 1; + int failed = msgsSize - ok; + getConsumerStat().consumeMsgOKTotal.fetchAndAdd(ok); + getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(failed); + } + + break; + case RECONSUME_LATER: + ackIndex = -1; + getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize); + break; + default: + break; + } + + std::list& msgs = consumeRequest.getMsgs(); + std::list::iterator it = msgs.begin(); + + for (int i = 0; i < ackIndex + 1 && it != msgs.end(); i++) + { + it++; + } + + switch (m_pDefaultMQPushConsumer->getMessageModel()) + { + case BROADCASTING: + for (; it != msgs.end(); it++) + { + MessageExt* msg = *it; + RMQ_WARN("BROADCASTING, the message consume failed, drop it, %s", msg->toString().c_str()); + } + break; + case CLUSTERING: + { + std::list msgBackFailed; + for (; it != msgs.end(); it++) + { + MessageExt* msg = *it; + bool result = sendMessageBack(*msg, context); + if (!result) + { + msg->setReconsumeTimes(msg->getReconsumeTimes() + 1); + msgBackFailed.push_back(msg); + } + } + + if (!msgBackFailed.empty()) + { + it = msgs.begin(); + + for (; it != msgs.end();) + { + bool find = false; + std::list::iterator itFailed = msgBackFailed.begin(); + for (; itFailed != msgBackFailed.end(); itFailed++) + { + if (*it == *itFailed) + { + it = msgs.erase(it); + find = true; + break; + } + } + + if (!find) + { + it++; + } + } + + submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue()); + } + } + break; + default: + break; + } + + long long offset = consumeRequest.getProcessQueue()->removeMessage(consumeRequest.getMsgs()); + if (offset >= 0 && !(consumeRequest.getProcessQueue()->isDropped())) + { + m_pDefaultMQPushConsumerImpl->getOffsetStore()->updateOffset(consumeRequest.getMessageQueue(), + offset, true); + } +} + +std::string& ConsumeMessageConcurrentlyService::getConsumerGroup() +{ + return m_consumerGroup; +} + +MessageListenerConcurrently* ConsumeMessageConcurrentlyService::getMessageListener() +{ + return m_pMessageListener; +} + +DefaultMQPushConsumerImpl* ConsumeMessageConcurrentlyService::getDefaultMQPushConsumerImpl() +{ + return m_pDefaultMQPushConsumerImpl; +} + +ConsumeConcurrentlyRequest::ConsumeConcurrentlyRequest(std::list& msgs, + ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + ConsumeMessageConcurrentlyService* pService) +{ + m_msgs = msgs; + m_pProcessQueue = pProcessQueue; + m_pService = pService; + m_messageQueue = messageQueue; +} + +ConsumeConcurrentlyRequest::~ConsumeConcurrentlyRequest() +{ + m_msgs.clear(); +} + +void ConsumeConcurrentlyRequest::Do() +{ + RMQ_DEBUG("consumeMessage begin, m_msgs.size=%d", (int)m_msgs.size()); + + if (m_pProcessQueue->isDropped()) + { + RMQ_WARN("the message queue not be able to consume, because it's droped, {%s}", + m_messageQueue.toString().c_str()); + return; + } + + try + { + MessageListenerConcurrently* listener = m_pService->getMessageListener(); + ConsumeConcurrentlyContext context(m_messageQueue); + ConsumeConcurrentlyStatus status = RECONSUME_LATER; + + ConsumeMessageContext consumeMessageContext; + if (m_pService->getDefaultMQPushConsumerImpl()->hasHook()) + { + consumeMessageContext.consumerGroup = m_pService->getConsumerGroup(); + consumeMessageContext.mq = m_messageQueue; + consumeMessageContext.msgList = m_msgs; + consumeMessageContext.success = false; + m_pService->getDefaultMQPushConsumerImpl()->executeHookBefore(consumeMessageContext); + } + + long long beginTimestamp = KPRUtil::GetCurrentTimeMillis(); + try + { + resetRetryTopic(m_msgs); + if (!m_msgs.empty()) + { + std::list::iterator it = m_msgs.begin(); + for (; it != m_msgs.end(); it++) + { + MessageExt* msg = (*it); + msg->putProperty(Message::PROPERTY_CONSUME_START_TIMESTAMP, + UtilAll::toString(KPRUtil::GetCurrentTimeMillis())); + } + } + status = listener->consumeMessage(m_msgs, context); + } + catch (...) + { + RMQ_WARN("consumeMessage exception, Group: {%s} Msgs: {%d} MQ: {%s}", + m_pService->getConsumerGroup().c_str(), + (int)m_msgs.size(), + m_messageQueue.toString().c_str() + ); + } + + long long consumeRT = KPRUtil::GetCurrentTimeMillis() - beginTimestamp; + + if (m_pService->getDefaultMQPushConsumerImpl()->hasHook()) + { + consumeMessageContext.success = (status == CONSUME_SUCCESS); + m_pService->getDefaultMQPushConsumerImpl()->executeHookAfter(consumeMessageContext); + } + + m_pService->getConsumerStat().consumeMsgRTTotal.fetchAndAdd(consumeRT); + bool updated = MixAll::compareAndIncreaseOnly(m_pService->getConsumerStat().consumeMsgRTMax, consumeRT); + if (updated) + { + RMQ_WARN("consumeMessage RT new max: %lld, Group: %s, Msgs: %d, MQ: %s", + consumeRT, + m_pService->getConsumerGroup().c_str(), + (int)m_msgs.size(), + m_messageQueue.toString().c_str() + ); + } + + if (!m_pProcessQueue->isDropped()) + { + m_pService->processConsumeResult(status, context, *this); + } + else + { + RMQ_WARN("processQueue is dropped without process consume result, messageQueue={%s}, msgs.size={%d}", + m_messageQueue.toString().c_str(), (int)m_msgs.size()); + } + } + catch(...) + { + RMQ_WARN("ConsumeConcurrentlyRequest exception"); + } + RMQ_DEBUG("consumeMessage end, m_msgs.size=%d", (int)m_msgs.size()); + + return; +} + +void ConsumeConcurrentlyRequest::resetRetryTopic(std::list& msgs) +{ + std::string groupTopic = MixAll::getRetryTopic(m_pService->getConsumerGroup()); + std::list::iterator it = msgs.begin(); + + for (; it != msgs.end(); it++) + { + MessageExt* msg = (*it); + std::string retryTopic = msg->getProperty(Message::PROPERTY_RETRY_TOPIC); + if (!retryTopic.empty() && groupTopic == msg->getTopic()) + { + msg->setTopic(retryTopic); + } + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h new file mode 100755 index 0000000..acb7538 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h @@ -0,0 +1,120 @@ +/** +* Copyright (C) 2013 kangliqiang ,kangliq@163.com +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __CONSUMEMESSAGECONCURRENTLYSERVICE_H__ +#define __CONSUMEMESSAGECONCURRENTLYSERVICE_H__ + +#include "ConsumeMessageService.h" + +#include +#include +#include "MessageQueueLock.h" +#include "ConsumerStatManage.h" +#include "MessageExt.h" +#include "MessageListener.h" +#include "ProcessQueue.h" +#include "ThreadPool.h" +#include "TimerThread.h" + +namespace rmq +{ + class DefaultMQPushConsumerImpl; + class DefaultMQPushConsumer; + class MessageListenerConcurrently; + class ConsumeMessageConcurrentlyService; + + class ConsumeConcurrentlyRequest: public kpr::ThreadPoolWork + { + public: + ConsumeConcurrentlyRequest(std::list& msgs, + ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + ConsumeMessageConcurrentlyService* pService); + ~ConsumeConcurrentlyRequest(); + virtual void Do(); + + std::list& getMsgs() + { + return m_msgs; + } + + ProcessQueue* getProcessQueue() + { + return m_pProcessQueue; + } + + MessageQueue getMessageQueue() + { + return m_messageQueue; + } + + private: + void resetRetryTopic(std::list& msgs); + + private: + std::list m_msgs; + ProcessQueue* m_pProcessQueue; + MessageQueue m_messageQueue; + ConsumeMessageConcurrentlyService* m_pService; + }; + + + class ConsumeMessageConcurrentlyService : public ConsumeMessageService + { + public: + ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl, + MessageListenerConcurrently* pMessageListener); + ~ConsumeMessageConcurrentlyService(); + + void start(); + void shutdown(); + + void cleanExpireMsg(); + ConsumerStat& getConsumerStat(); + + bool sendMessageBack(MessageExt& msg, ConsumeConcurrentlyContext& context); + void processConsumeResult(ConsumeConcurrentlyStatus status, + ConsumeConcurrentlyContext& context, + ConsumeConcurrentlyRequest& consumeRequest); + + void submitConsumeRequestLater(std::list& pMsgs, + ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue); + + void submitConsumeRequest(std::list& pMsgs, + ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + bool dispathToConsume); + + void updateCorePoolSize(int corePoolSize); + + std::string& getConsumerGroup(); + MessageListenerConcurrently* getMessageListener(); + DefaultMQPushConsumerImpl* getDefaultMQPushConsumerImpl(); + + private: + DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl; + DefaultMQPushConsumer* m_pDefaultMQPushConsumer; + MessageListenerConcurrently* m_pMessageListener; + std::string m_consumerGroup; + kpr::ThreadPoolPtr m_pConsumeExecutor; + kpr::TimerThreadPtr m_pScheduledExecutorService; + kpr::TimerThreadPtr m_pCleanExpireMsgExecutors; + kpr::TimerHandler* m_pCleanExpireMsgTask; + }; +} + +#endif