rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifplu...@apache.org
Subject [rocketmq-client-cpp] 21/29: refactor: RemoteBrokerOffsetStore::persistAll
Date Tue, 29 Dec 2020 03:36:38 GMT
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 3a805ec9e6f987c426821a94620f0d7b22d07f74
Author: James Yin <ywhjames@hotmail.com>
AuthorDate: Wed Sep 23 17:40:47 2020 +0800

    refactor: RemoteBrokerOffsetStore::persistAll
---
 include/OffsetStore.h                      |  2 +-
 src/consumer/DefaultMQPushConsumerImpl.cpp |  8 +---
 src/consumer/LocalFileOffsetStore.cpp      |  2 +-
 src/consumer/LocalFileOffsetStore.h        |  2 +-
 src/consumer/RemoteBrokerOffsetStore.cpp   | 60 ++++++++++++++++++++++++++----
 src/consumer/RemoteBrokerOffsetStore.h     |  2 +-
 6 files changed, 57 insertions(+), 19 deletions(-)

diff --git a/include/OffsetStore.h b/include/OffsetStore.h
index 05ae0b4..4991010 100644
--- a/include/OffsetStore.h
+++ b/include/OffsetStore.h
@@ -40,7 +40,7 @@ class ROCKETMQCLIENT_API OffsetStore {
   virtual void updateOffset(const MQMessageQueue& mq, int64_t offset, bool increaseOnly)
= 0;
   virtual int64_t readOffset(const MQMessageQueue& mq, ReadOffsetType type) = 0;
   virtual void persist(const MQMessageQueue& mq) = 0;
-  virtual void persistAll(const std::vector<MQMessageQueue>& mq) = 0;
+  virtual void persistAll(std::vector<MQMessageQueue>& mqs) = 0;
   virtual void removeOffset(const MQMessageQueue& mq) = 0;
 };
 
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 2b3a7c4..768b7df 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -549,13 +549,7 @@ bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int
delay_lev
 void DefaultMQPushConsumerImpl::persistConsumerOffset() {
   if (isServiceStateOk()) {
     std::vector<MQMessageQueue> mqs = rebalance_impl_->getAllocatedMQ();
-    if (getDefaultMQPushConsumerConfig()->message_model() == BROADCASTING) {
-      offset_store_->persistAll(mqs);
-    } else {
-      for (const auto& mq : mqs) {
-        offset_store_->persist(mq);
-      }
-    }
+    offset_store_->persistAll(mqs);
   }
 }
 
diff --git a/src/consumer/LocalFileOffsetStore.cpp b/src/consumer/LocalFileOffsetStore.cpp
index ac66ece..1046c41 100644
--- a/src/consumer/LocalFileOffsetStore.cpp
+++ b/src/consumer/LocalFileOffsetStore.cpp
@@ -105,7 +105,7 @@ int64_t LocalFileOffsetStore::readOffset(const MQMessageQueue& mq,
ReadOffsetTyp
 
 void LocalFileOffsetStore::persist(const MQMessageQueue& mq) {}
 
-void LocalFileOffsetStore::persistAll(const std::vector<MQMessageQueue>& mqs) {
+void LocalFileOffsetStore::persistAll(std::vector<MQMessageQueue>& mqs) {
   if (mqs.empty()) {
     return;
   }
diff --git a/src/consumer/LocalFileOffsetStore.h b/src/consumer/LocalFileOffsetStore.h
index c517fb4..6453769 100644
--- a/src/consumer/LocalFileOffsetStore.h
+++ b/src/consumer/LocalFileOffsetStore.h
@@ -34,7 +34,7 @@ class LocalFileOffsetStore : public OffsetStore {
   void updateOffset(const MQMessageQueue& mq, int64_t offset, bool increaseOnly) override;
   int64_t readOffset(const MQMessageQueue& mq, ReadOffsetType type) override;
   void persist(const MQMessageQueue& mq) override;
-  void persistAll(const std::vector<MQMessageQueue>& mq) override;
+  void persistAll(std::vector<MQMessageQueue>& mqs) override;
   void removeOffset(const MQMessageQueue& mq) override;
 
  private:
diff --git a/src/consumer/RemoteBrokerOffsetStore.cpp b/src/consumer/RemoteBrokerOffsetStore.cpp
index 537cf7f..ad2e210 100644
--- a/src/consumer/RemoteBrokerOffsetStore.cpp
+++ b/src/consumer/RemoteBrokerOffsetStore.cpp
@@ -76,23 +76,67 @@ int64_t RemoteBrokerOffsetStore::readOffset(const MQMessageQueue&
mq, ReadOffset
 }
 
 void RemoteBrokerOffsetStore::persist(const MQMessageQueue& mq) {
-  std::map<MQMessageQueue, int64_t> offsetTable;
+  int64_t offset = -1;
   {
     std::lock_guard<std::mutex> lock(lock_);
-    offsetTable = offset_table_;
+    const auto& it = offset_table_.find(mq);
+    if (it != offset_table_.end()) {
+      offset = it->second;
+    }
   }
 
-  const auto& it = offsetTable.find(mq);
-  if (it != offsetTable.end()) {
+  if (offset >= 0) {
     try {
-      updateConsumeOffsetToBroker(mq, it->second);
+      updateConsumeOffsetToBroker(mq, offset);
+      LOG_INFO_NEW("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
group_name_,
+                   client_instance_->getClientId(), mq.toString(), offset);
     } catch (MQException& e) {
       LOG_ERROR("updateConsumeOffsetToBroker error");
     }
   }
 }
 
-void RemoteBrokerOffsetStore::persistAll(const std::vector<MQMessageQueue>& mq)
{}
+void RemoteBrokerOffsetStore::persistAll(std::vector<MQMessageQueue>& mqs) {
+  if (mqs.empty()) {
+    return;
+  }
+
+  std::sort(mqs.begin(), mqs.end());
+
+  std::vector<MQMessageQueue> unused_mqs;
+
+  std::map<MQMessageQueue, int64_t> offset_table;
+  {
+    std::lock_guard<std::mutex> lock(lock_);
+    offset_table = offset_table_;
+  }
+
+  for (const auto& it : offset_table) {
+    const auto& mq = it.first;
+    auto offset = it.second;
+    if (offset >= 0) {
+      if (std::binary_search(mqs.begin(), mqs.end(), mq)) {
+        try {
+          updateConsumeOffsetToBroker(mq, offset);
+          LOG_INFO_NEW("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {}
{}", group_name_,
+                       client_instance_->getClientId(), mq.toString(), offset);
+        } catch (std::exception& e) {
+          LOG_ERROR_NEW("updateConsumeOffsetToBroker exception, {} {}", mq.toString(), e.what());
+        }
+      } else {
+        unused_mqs.push_back(mq);
+      }
+    }
+  }
+
+  if (!unused_mqs.empty()) {
+    std::lock_guard<std::mutex> lock(lock_);
+    for (const auto& mq : unused_mqs) {
+      offset_table_.erase(mq);
+      LOG_INFO_NEW("remove unused mq, {}, {}", mq.toString(), group_name_);
+    }
+  }
+}
 
 void RemoteBrokerOffsetStore::removeOffset(const MQMessageQueue& mq) {
   std::lock_guard<std::mutex> lock(lock_);
@@ -118,14 +162,14 @@ void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MQMessageQueue&
     requestHeader->commitOffset = offset;
 
     try {
-      LOG_INFO("oneway updateConsumeOffsetToBroker of mq:%s, its offset is:%lld", mq.toString().c_str(),
offset);
       return client_instance_->getMQClientAPIImpl()->updateConsumerOffsetOneway(findBrokerResult->broker_addr(),
                                                                                 requestHeader,
1000 * 5);
     } catch (MQException& e) {
       LOG_ERROR(e.what());
     }
+  } else {
+    LOG_WARN("The broker not exist");
   }
-  LOG_WARN("The broker not exist");
 }
 
 int64_t RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MQMessageQueue& mq)
{
diff --git a/src/consumer/RemoteBrokerOffsetStore.h b/src/consumer/RemoteBrokerOffsetStore.h
index ba6598e..e45de95 100644
--- a/src/consumer/RemoteBrokerOffsetStore.h
+++ b/src/consumer/RemoteBrokerOffsetStore.h
@@ -34,7 +34,7 @@ class RemoteBrokerOffsetStore : public OffsetStore {
   void updateOffset(const MQMessageQueue& mq, int64_t offset, bool increaseOnly) override;
   int64_t readOffset(const MQMessageQueue& mq, ReadOffsetType type) override;
   void persist(const MQMessageQueue& mq) override;
-  void persistAll(const std::vector<MQMessageQueue>& mq) override;
+  void persistAll(std::vector<MQMessageQueue>& mqs) override;
   void removeOffset(const MQMessageQueue& mq) override;
 
  private:


Mime
View raw message