kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [1/2] kudu git commit: consensus: Reduce copy/paste for observer callbacks
Date Tue, 09 Jan 2018 03:18:46 GMT
Repository: kudu
Updated Branches:
  refs/heads/master e37bd1cf5 -> f932547b2


consensus: Reduce copy/paste for observer callbacks

This is a net-negative line patch that unifies how the PeerMessageQueue
notifies PeerMessageQueueObserver instances that are registered with it.
The patch removes most of the copy/paste in this part of the code.

This patch has no functional changes.

Change-Id: Icbc4cb9d7b6e51a9f64d6f08c2f48d89705f5437
Reviewed-on: http://gerrit.cloudera.org:8080/8962
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ff63b434
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ff63b434
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ff63b434

Branch: refs/heads/master
Commit: ff63b434c25574595105fb480bcd01c9334008c7
Parents: e37bd1c
Author: Mike Percy <mpercy@apache.org>
Authored: Tue Dec 12 18:20:27 2017 -0800
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Jan 9 03:18:06 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_queue.cc | 99 +++++++++---------------------
 src/kudu/consensus/consensus_queue.h  | 19 ++----
 2 files changed, 35 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ff63b434/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 6933cfa..1889ed3 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -18,6 +18,7 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <functional>
 #include <iostream>
 #include <mutex>
 #include <string>
@@ -1275,97 +1276,55 @@ bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const
{
 
 void PeerMessageQueue::NotifyObserversOfCommitIndexChange(int64_t new_commit_index) {
   WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask,
-           Unretained(this), new_commit_index)),
-              LogPrefixUnlocked() + "Unable to notify RaftConsensus of "
-                                    "commit index change.");
+      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
+           [=](PeerMessageQueueObserver* observer) {
+             observer->NotifyCommitIndex(new_commit_index);
+           })),
+      LogPrefixUnlocked() + "Unable to notify RaftConsensus of commit index change.");
 }
 
 void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
   WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversOfTermChangeTask,
-           Unretained(this), term)),
-              LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change.");
-}
-
-void PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index) {
-  std::vector<PeerMessageQueueObserver*> copy;
-  {
-    std::lock_guard<simple_spinlock> lock(queue_lock_);
-    copy = observers_;
-  }
-  for (PeerMessageQueueObserver* observer : copy) {
-    observer->NotifyCommitIndex(new_commit_index);
-  }
-}
-
-void PeerMessageQueue::NotifyObserversOfTermChangeTask(int64_t term) {
-  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
-  std::vector<PeerMessageQueueObserver*> copy;
-  {
-    std::lock_guard<simple_spinlock> lock(queue_lock_);
-    copy = observers_;
-  }
-  for (PeerMessageQueueObserver* observer : copy) {
-    observer->NotifyTermChange(term);
-  }
+      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
+           [=](PeerMessageQueueObserver* observer) {
+             observer->NotifyTermChange(term);
+           })),
+      LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change.");
 }
 
 void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid,
                                                        int64_t term,
                                                        const string& reason) {
   WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversOfFailedFollowerTask,
-           Unretained(this), uuid, term, reason)),
-              LogPrefixUnlocked() + "Unable to notify RaftConsensus of abandoned follower.");
-}
-
-void PeerMessageQueue::NotifyObserversOfFailedFollowerTask(const string& uuid,
-                                                           int64_t term,
-                                                           const string& reason) {
-  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
-  std::vector<PeerMessageQueueObserver*> observers_copy;
-  {
-    std::lock_guard<simple_spinlock> lock(queue_lock_);
-    observers_copy = observers_;
-  }
-  for (PeerMessageQueueObserver* observer : observers_copy) {
-    observer->NotifyFailedFollower(uuid, term, reason);
-  }
+      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
+           [=](PeerMessageQueueObserver* observer) {
+             observer->NotifyFailedFollower(uuid, term, reason);
+           })),
+      LogPrefixUnlocked() + "Unable to notify RaftConsensus of abandoned follower.");
 }
 
 void PeerMessageQueue::NotifyObserversOfPeerToPromote(const string& peer_uuid,
                                                       int64_t term,
                                                       int64_t committed_config_opid_index)
{
   WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversOfPeerToPromoteTask,
-           Unretained(this), peer_uuid, term, committed_config_opid_index)),
-              LogPrefixUnlocked() + "unable to notify RaftConsensus of peer to promote");
-
-}
-
-void PeerMessageQueue::NotifyObserversOfPeerToPromoteTask(const string& peer_uuid,
-                                                          int64_t term,
-                                                          int64_t committed_config_opid_index)
{
-  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
-  std::vector<PeerMessageQueueObserver*> observers_copy;
-  {
-    std::lock_guard<simple_spinlock> lock(queue_lock_);
-    observers_copy = observers_;
-  }
-  for (PeerMessageQueueObserver* observer : observers_copy) {
-    observer->NotifyPeerToPromote(peer_uuid, term, committed_config_opid_index);
-  }
-
+      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
+           [=](PeerMessageQueueObserver* observer) {
+             observer->NotifyPeerToPromote(peer_uuid, term, committed_config_opid_index);
+           })),
+      LogPrefixUnlocked() + "Unable to notify RaftConsensus of peer to promote.");
 }
 
 void PeerMessageQueue::NotifyObserversOfPeerHealthChange() {
   WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask, Unretained(this))),
-              LogPrefixUnlocked() + "Unable to notify RaftConsensus peer health change.");
+      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
+           [](PeerMessageQueueObserver* observer) {
+             observer->NotifyPeerHealthChange();
+           })),
+      LogPrefixUnlocked() + "Unable to notify RaftConsensus peer health change.");
 }
 
-void PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask() {
+void PeerMessageQueue::NotifyObserversTask(
+    const std::function<void(PeerMessageQueueObserver*)>& func) {
   MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
   std::vector<PeerMessageQueueObserver*> observers_copy;
   {
@@ -1373,7 +1332,7 @@ void PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask() {
     observers_copy = observers_;
   }
   for (PeerMessageQueueObserver* observer : observers_copy) {
-    observer->NotifyPeerHealthChange();
+    func(observer);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ff63b434/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 0808c69..2a6df6c 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -19,6 +19,7 @@
 #define KUDU_CONSENSUS_CONSENSUS_QUEUE_H_
 
 #include <cstdint>
+#include <functional>
 #include <iosfwd>
 #include <memory>
 #include <string>
@@ -450,28 +451,20 @@ class PeerMessageQueue {
   // Calculate a peer's up-to-date health status based on internal fields.
   static HealthReportPB::HealthStatus PeerHealthStatus(const TrackedPeer& peer);
 
+  // Asynchronously trigger various types of observer notifications on a
+  // separate thread.
   void NotifyObserversOfCommitIndexChange(int64_t new_commit_index);
-  void NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index);
-
   void NotifyObserversOfTermChange(int64_t term);
-  void NotifyObserversOfTermChangeTask(int64_t term);
-
   void NotifyObserversOfFailedFollower(const std::string& uuid,
                                        int64_t term,
                                        const std::string& reason);
-  void NotifyObserversOfFailedFollowerTask(const std::string& uuid,
-                                           int64_t term,
-                                           const std::string& reason);
-
   void NotifyObserversOfPeerToPromote(const std::string& peer_uuid,
                                       int64_t term,
                                       int64_t committed_config_opid_index);
-  void NotifyObserversOfPeerToPromoteTask(const std::string& peer_uuid,
-                                          int64_t term,
-                                          int64_t committed_config_opid_index);
-
   void NotifyObserversOfPeerHealthChange();
-  void NotifyObserversOfPeerHealthChangeTask();
+
+  // Notify all PeerMessageQueueObservers using the given callback function.
+  void NotifyObserversTask(const std::function<void(PeerMessageQueueObserver*)>&
func);
 
   typedef std::unordered_map<std::string, TrackedPeer*> PeersMap;
 


Mime
View raw message