kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [1/2] kudu git commit: Factor out consensus queue methods
Date Thu, 15 Mar 2018 06:52:09 GMT
Repository: kudu
Updated Branches:
  refs/heads/master a74f9a0dc -> f2479e21d


Factor out consensus queue methods

This patch factors a couple of code blocks out into methods to reduce
the line count of PeerMessageQueue::ResponseFromPeer().

Change-Id: I278de150e3dc42181ccfbbd3a4c0e5cc4de90c1a
Reviewed-on: http://gerrit.cloudera.org:8080/9642
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4c1788ea
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4c1788ea
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4c1788ea

Branch: refs/heads/master
Commit: 4c1788eae5bfd4cf4a714f1ca0ab775b005303b3
Parents: a74f9a0
Author: Mike Percy <mpercy@apache.org>
Authored: Tue Mar 13 20:06:32 2018 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Thu Mar 15 06:02:00 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_queue.cc | 160 ++++++++++++++++-------------
 src/kudu/consensus/consensus_queue.h  |  12 ++-
 2 files changed, 100 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4c1788ea/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 189f499..f85589b 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -921,6 +921,78 @@ void PeerMessageQueue::UpdatePeerStatus(const string& peer_uuid,
   }
 }
 
+void PeerMessageQueue::UpdateExchangeStatus(TrackedPeer* peer,
+                                            const TrackedPeer& prev_peer_state,
+                                            const ConsensusResponsePB& response,
+                                            bool* lmp_mismatch) {
+  DCHECK(queue_lock_.is_locked());
+  const ConsensusStatusPB& status = response.status();
+
+  peer->last_communication_time = MonoTime::Now();
+  peer->last_known_committed_index = status.last_committed_idx();
+
+  if (PREDICT_TRUE(!status.has_error())) {
+    peer->last_exchange_status = PeerStatus::OK;
+    *lmp_mismatch = false;
+    return;
+  }
+
+  switch (status.error().code()) {
+    case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH:
+      peer->last_exchange_status = PeerStatus::LMP_MISMATCH;
+      DCHECK(status.has_last_received());
+      if (prev_peer_state.last_exchange_status == PeerStatus::NEW) {
+        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << peer->ToString();
+      } else {
+        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: "
+                                       << peer->ToString();
+      }
+      *lmp_mismatch = true;
+      return;
+
+    case ConsensusErrorPB::INVALID_TERM:
+      peer->last_exchange_status = PeerStatus::INVALID_TERM;
+      CHECK(response.has_responder_term());
+      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Peer responded invalid term: " << peer->ToString();
+      NotifyObserversOfTermChange(response.responder_term());
+      *lmp_mismatch = false;
+      return;
+
+    default:
+      // Other ConsensusStatusPB error codes (such as remote errors) are
+      // supposed to be handled higher up in the stack.
+      LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected consensus error. Code: "
+          << ConsensusErrorPB::Code_Name(status.error().code()) << ". Response:
"
+          << SecureShortDebugString(response);
+  }
+}
+
+void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& prev_peer_state,
+                                       const ConsensusStatusPB& status) {
+  DCHECK(queue_lock_.is_locked());
+  int64_t entries_behind = queue_state_.committed_index - peer->last_received.index();
+  if (queue_state_.mode != PeerMessageQueue::LEADER ||
+      entries_behind > FLAGS_consensus_promotion_max_wal_entries_behind) {
+    return;
+  }
+
+  // TODO(mpercy): It would be more efficient to cache the member type in the
+  // TrackedPeer data structure.
+  RaftPeerPB* peer_pb;
+  Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
+                                 peer->uuid(), &peer_pb);
+  if (s.ok() &&
+      peer_pb &&
+      peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
+      peer_pb->attrs().promote()) {
+    // This peer is ready to promote.
+    //
+    // TODO(mpercy): Should we introduce a function SafeToPromote() that
+    // does the same calculation as SafeToEvict() but for adding a VOTER?
+    NotifyObserversOfPeerToPromote(peer->uuid());
+  }
+}
+
 void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
                                         const ConsensusResponsePB& response,
                                         bool* more_pending) {
@@ -946,28 +1018,23 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
     DCHECK(response.status().IsInitialized())
         << "Error: Uninitialized: " << response.InitializationErrorString()
         << ". Response: "<< SecureShortDebugString(response);
-    // TODO: Include uuid in error messages as well.
+    // TODO(mpercy): Include uuid in error messages as well.
     DCHECK(response.has_responder_uuid() && !response.responder_uuid().empty())
         << "Got response from peer with empty UUID";
 
-    // Application level errors should be handled elsewhere
-    DCHECK(!response.has_error());
-    // Responses should always have a status.
-    DCHECK(response.has_status());
+    DCHECK(!response.has_error()); // Application-level errors should be handled elsewhere.
+    DCHECK(response.has_status()); // Responses should always have a status.
     // The status must always have a last received op id and a last committed index.
-    DCHECK(response.status().has_last_received());
-    DCHECK(response.status().has_last_received_current_leader());
-    DCHECK(response.status().has_last_committed_idx());
-
     const ConsensusStatusPB& status = response.status();
+    DCHECK(status.has_last_received());
+    DCHECK(status.has_last_received_current_leader());
+    DCHECK(status.has_last_committed_idx());
 
-    // Take a snapshot of the current peer status.
-    TrackedPeer previous = *peer;
+    // Take a snapshot of the previously-recorded peer state.
+    const TrackedPeer prev_peer_state = *peer;
 
-    // Update the peer status based on the response.
-    peer->last_exchange_status = PeerStatus::OK;
-    peer->last_known_committed_index = status.last_committed_idx();
-    peer->last_communication_time = MonoTime::Now();
+    // Update the peer's last exchange status based on the response.
+    UpdateExchangeStatus(peer, prev_peer_state, response, more_pending);
 
     // If the reported last-received op for the replica is in our local log,
     // then resume sending entries from that point onward. Otherwise, resume
@@ -981,31 +1048,8 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       peer->last_received = status.last_received();
       peer->next_index = peer->last_received.index() + 1;
 
-      // Since the peer has a prefix of our log, if we are the leader and they
-      // are a non-voter then it may be time to promote them to a voter.
-      //
-      // TODO(mpercy): Factor this out into a dedicated function.
-
-      int64_t entries_behind = queue_state_.committed_index - peer->last_received.index();
-      if (queue_state_.mode == PeerMessageQueue::LEADER &&
-          entries_behind <= FLAGS_consensus_promotion_max_wal_entries_behind) {
-        RaftPeerPB* peer_pb;
-        // TODO(mpercy): It would be more efficient to cached the member type
-        // in the TrackedPeer data structure. The downside is that we'd end up
-        // with multiple sources of truth that would need to be kept in sync.
-        Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
-                                       peer->uuid(), &peer_pb);
-        if (s.ok() &&
-            peer_pb &&
-            peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
-            peer_pb->attrs().promote()) {
-          // This peer is ready to promote.
-          //
-          // TODO(mpercy): Should we introduce a function SafeToPromote() that
-          // does the same calculation as SafeToEvict() but for adding a VOTER?
-          NotifyObserversOfPeerToPromote(peer->uuid());
-        }
-      }
+      // Check if the peer is a NON_VOTER candidate ready for promotion.
+      PromoteIfNeeded(peer, prev_peer_state, status);
 
     } else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) {
       // Their log may have diverged from ours, however we are in the process
@@ -1030,35 +1074,9 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
           << "Falling back to committed index " << peer->last_known_committed_index;
     }
 
-    if (PREDICT_FALSE(status.has_error())) {
-      switch (status.error().code()) {
-        case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH: {
-          peer->last_exchange_status = PeerStatus::LMP_MISMATCH;
-          DCHECK(status.has_last_received());
-          if (previous.last_exchange_status == PeerStatus::NEW) {
-            LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << peer->ToString();
-          } else {
-            LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: "
-                                           << peer->ToString();
-          }
-          *more_pending = true;
-          return;
-        }
-        case ConsensusErrorPB::INVALID_TERM: {
-          peer->last_exchange_status = PeerStatus::INVALID_TERM;
-          CHECK(response.has_responder_term());
-          LOG_WITH_PREFIX_UNLOCKED(INFO) << "Peer responded invalid term: " <<
peer->ToString();
-          NotifyObserversOfTermChange(response.responder_term());
-          *more_pending = false;
-          return;
-        }
-        default: {
-          peer->last_exchange_status = PeerStatus::REMOTE_ERROR;
-          LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected consensus error. Code: "
-              << ConsensusErrorPB::Code_Name(status.error().code()) << ". Response:
"
-              << SecureShortDebugString(response);
-        }
-      }
+    if (peer->last_exchange_status != PeerStatus::OK) {
+      // In this case, *more_pending has has already been set to false by UpdateExchangeStatus().
+      return;
     }
 
     if (response.has_responder_term()) {
@@ -1087,7 +1105,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       // Advance the majority replicated index.
       AdvanceQueueWatermark("majority_replicated",
                             &queue_state_.majority_replicated_index,
-                            /*replicated_before=*/ previous.last_received,
+                            /*replicated_before=*/ prev_peer_state.last_received,
                             /*replicated_after=*/ peer->last_received,
                             /*num_peers_required=*/ queue_state_.majority_size_,
                             VOTER_REPLICAS,
@@ -1096,7 +1114,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       // Advance the all replicated index.
       AdvanceQueueWatermark("all_replicated",
                             &queue_state_.all_replicated_index,
-                            /*replicated_before=*/ previous.last_received,
+                            /*replicated_before=*/ prev_peer_state.last_received,
                             /*replicated_after=*/ peer->last_received,
                             /*num_peers_required=*/ peers_map_.size(),
                             ALL_REPLICAS,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4c1788ea/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 626f8e3..cbbc522 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -57,6 +57,7 @@ class LogThrottler;
 namespace consensus {
 class ConsensusRequestPB;
 class ConsensusResponsePB;
+class ConsensusStatusPB;
 class PeerMessageQueueObserver;
 class TimeManager;
 class StartTabletCopyRequestPB;
@@ -286,7 +287,6 @@ class PeerMessageQueue {
                         PeerStatus ps,
                         const Status& status);
 
-
   // Updates the request queue with the latest response of a peer, returns
   // whether this peer has more requests pending.
   void ResponseFromPeer(const std::string& peer_uuid,
@@ -443,6 +443,16 @@ class PeerMessageQueue {
   // notifications.
   void UpdatePeerHealthUnlocked(TrackedPeer* peer);
 
+  // Update the peer's last exchange status, and other fields, based on the
+  // response.
+  void UpdateExchangeStatus(TrackedPeer* peer, const TrackedPeer& prev_peer_state,
+                            const ConsensusResponsePB& response, bool* lmp_mismatch);
+
+  // Check if the peer is a NON_VOTER candidate ready for promotion. If so,
+  // trigger promotion.
+  void PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& prev_peer_state,
+                       const ConsensusStatusPB& status);
+
   // Calculate a peer's up-to-date health status based on internal fields.
   static HealthReportPB::HealthStatus PeerHealthStatus(const TrackedPeer& peer);
 


Mime
View raw message