rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: feat(memleak): remove mem leak in factory schedule task. (#227)
Date Mon, 13 Jan 2020 08:16:43 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 555e19d  feat(memleak): remove mem leak in factory schedule task. (#227)
555e19d is described below

commit 555e19d591fa2955eeb62b12d9db8c548b76b412
Author: dinglei <libya_003@163.com>
AuthorDate: Mon Jan 13 16:16:36 2020 +0800

    feat(memleak): remove mem leak in factory schedule task. (#227)
---
 src/MQClientFactory.cpp | 47 +++++++++++++++++++++++++++++------------------
 src/MQClientFactory.h   | 13 +++++++------
 src/common/MQClient.cpp |  1 +
 3 files changed, 37 insertions(+), 24 deletions(-)

diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 4d64aa0..6e1a392 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -83,7 +83,8 @@ void MQClientFactory::start() {
   }
 }
 
-void MQClientFactory::updateTopicRouteInfo(boost::system::error_code& ec, boost::asio::deadline_timer*
t) {
+void MQClientFactory::updateTopicRouteInfo(boost::system::error_code& ec,
+                                           boost::shared_ptr<boost::asio::deadline_timer>
t) {
   if ((getConsumerTableSize() == 0) && (getProducerTableSize() == 0)) {
     return;
   }
@@ -784,7 +785,8 @@ void MQClientFactory::sendHeartbeatToAllBroker() {
   brokerTable.clear();
 }
 
-void MQClientFactory::persistAllConsumerOffset(boost::system::error_code& ec, boost::asio::deadline_timer*
t) {
+void MQClientFactory::persistAllConsumerOffset(boost::system::error_code& ec,
+                                               boost::shared_ptr<boost::asio::deadline_timer>
t) {
   {
     boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
     if (m_consumerTable.size() > 0) {
@@ -814,7 +816,8 @@ HeartbeatData* MQClientFactory::prepareHeartbeatData() {
   return pHeartbeatData;
 }
 
-void MQClientFactory::timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec,
boost::asio::deadline_timer* t) {
+void MQClientFactory::timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec,
+                                                       boost::shared_ptr<boost::asio::deadline_timer>
t) {
   sendHeartbeatToAllBroker();
 
   boost::system::error_code e;
@@ -822,7 +825,8 @@ void MQClientFactory::timerCB_sendHeartbeatToAllBroker(boost::system::error_code
   t->async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this,
ec, t));
 }
 
-void MQClientFactory::timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::asio::deadline_timer*
t) {
+void MQClientFactory::timerCB_cleanOfflineBrokers(boost::system::error_code& ec,
+                                                  boost::shared_ptr<boost::asio::deadline_timer>
t) {
   cleanOfflineBrokers();
 
   boost::system::error_code e;
@@ -830,7 +834,8 @@ void MQClientFactory::timerCB_cleanOfflineBrokers(boost::system::error_code&
ec,
   t->async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec,
t));
 }
 
-void MQClientFactory::fetchNameServerAddr(boost::system::error_code& ec, boost::asio::deadline_timer*
t) {
+void MQClientFactory::fetchNameServerAddr(boost::system::error_code& ec,
+                                          boost::shared_ptr<boost::asio::deadline_timer>
t) {
   m_pClientAPIImpl->fetchNameServerAddr(m_nameSrvDomain);
 
   boost::system::error_code e;
@@ -845,21 +850,24 @@ void MQClientFactory::startScheduledTask(bool startFetchNSService) {
                                                           // callback
 
   boost::system::error_code ec1;
-  boost::asio::deadline_timer t1(m_async_ioService, boost::posix_time::seconds(3));
-  t1.async_wait(boost::bind(&MQClientFactory::updateTopicRouteInfo, this, ec1, &t1));
+  boost::shared_ptr<boost::asio::deadline_timer> t1 =
+      boost::make_shared<boost::asio::deadline_timer>(m_async_ioService, boost::posix_time::seconds(3));
+  t1->async_wait(boost::bind(&MQClientFactory::updateTopicRouteInfo, this, ec1, t1));
 
   boost::system::error_code ec2;
-  boost::asio::deadline_timer t2(m_async_ioService, boost::posix_time::milliseconds(10));
-  t2.async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this,
ec2, &t2));
+  boost::shared_ptr<boost::asio::deadline_timer> t2 =
+      boost::make_shared<boost::asio::deadline_timer>(m_async_ioService, boost::posix_time::milliseconds(10));
+  t2->async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this,
ec2, t2));
 
   boost::system::error_code ec3;
-  boost::asio::deadline_timer t3(m_async_ioService, boost::posix_time::seconds(3));
-  t3.async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec3,
&t3));
+  boost::shared_ptr<boost::asio::deadline_timer> t3 =
+      boost::make_shared<boost::asio::deadline_timer>(m_async_ioService, boost::posix_time::seconds(3));
+  t3->async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this,
ec3, t3));
 
   if (startFetchNSService) {
     boost::system::error_code ec5;
-    boost::asio::deadline_timer* t5 =
-        new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::seconds(60
* 2));
+    boost::shared_ptr<boost::asio::deadline_timer> t5 =
+        boost::make_shared<boost::asio::deadline_timer>(m_async_ioService, boost::posix_time::seconds(60
* 2));
     t5->async_wait(boost::bind(&MQClientFactory::fetchNameServerAddr, this, ec5, t5));
   }
 
@@ -885,19 +893,22 @@ void MQClientFactory::consumer_timerOperation() {
                                                                    // callback
 
   boost::system::error_code ec1;
-  boost::asio::deadline_timer t(m_consumer_async_ioService, boost::posix_time::seconds(10));
-  t.async_wait(boost::bind(&MQClientFactory::timerCB_doRebalance, this, ec1, &t));
+  boost::shared_ptr<boost::asio::deadline_timer> t1 =
+      boost::make_shared<boost::asio::deadline_timer>(m_consumer_async_ioService, boost::posix_time::seconds(10));
+  t1->async_wait(boost::bind(&MQClientFactory::timerCB_doRebalance, this, ec1, t1));
 
   boost::system::error_code ec2;
-  boost::asio::deadline_timer t2(m_consumer_async_ioService, boost::posix_time::seconds(5));
-  t2.async_wait(boost::bind(&MQClientFactory::persistAllConsumerOffset, this, ec2, &t2));
+  boost::shared_ptr<boost::asio::deadline_timer> t2 =
+      boost::make_shared<boost::asio::deadline_timer>(m_consumer_async_ioService, boost::posix_time::seconds(5));
+  t2->async_wait(boost::bind(&MQClientFactory::persistAllConsumerOffset, this, ec2,
t2));
 
   boost::system::error_code ec;
   m_consumer_async_ioService.run(ec);
   LOG_INFO("clientFactory:%s stop consumer_timerOperation", m_clientId.c_str());
 }
 
-void MQClientFactory::timerCB_doRebalance(boost::system::error_code& ec, boost::asio::deadline_timer*
t) {
+void MQClientFactory::timerCB_doRebalance(boost::system::error_code& ec,
+                                          boost::shared_ptr<boost::asio::deadline_timer>
t) {
   doRebalance();
 
   boost::system::error_code e;
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index b5e5441..32c337c 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -129,17 +129,18 @@ class MQClientFactory {
 
   void startScheduledTask(bool startFetchNSService = true);
   //<!timer async callback
-  void fetchNameServerAddr(boost::system::error_code& ec, boost::asio::deadline_timer*
t);
-  void updateTopicRouteInfo(boost::system::error_code& ec, boost::asio::deadline_timer*
t);
-  void timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec, boost::asio::deadline_timer*
t);
+  void fetchNameServerAddr(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer>
t);
+  void updateTopicRouteInfo(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer>
t);
+  void timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec,
+                                        boost::shared_ptr<boost::asio::deadline_timer>
t);
 
-  void timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::asio::deadline_timer*
t);
+  void timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer>
t);
 
   // consumer related operation
   void consumer_timerOperation();
-  void persistAllConsumerOffset(boost::system::error_code& ec, boost::asio::deadline_timer*
t);
+  void persistAllConsumerOffset(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer>
t);
   void doRebalance();
-  void timerCB_doRebalance(boost::system::error_code& ec, boost::asio::deadline_timer*
t);
+  void timerCB_doRebalance(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer>
t);
   bool getSessionCredentialFromConsumerTable(SessionCredentials& sessionCredentials);
   bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer);
   void eraseConsumerFromTable(const string& consumerName);
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index 068b8c4..f638f6f 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -150,6 +150,7 @@ void MQClient::start() {
 }
 
 void MQClient::shutdown() {
+  m_clientFactory->shutdown();
   m_clientFactory = NULL;
 }
 


Mime
View raw message