kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 02/02: [consensus] respond lock-free to RequestVote() if busy
Date Mon, 06 Jan 2020 18:27:34 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b6ac4c7aa3e2bdc04d85135b8eac06ec215cbbcc
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Mon Dec 23 13:40:11 2019 -0800

    [consensus] respond lock-free to RequestVote() if busy
    
    I saw cases of contention on replica's RaftConsensus lock when the
    filesystem was slow on updating Raft metadata files to record a vote
    that has just been granted by the replica.  As it turned out, the fast
    path was also acquiring the RaftConsensus object lock, but it's easy
    to avoid that.
    
    This patch updates the code by not acquiring the lock in such cases.
    It should help a bit with overflowing of the RaftConsensus RPC queue.
    
    In addition, I turned the LOG(INFO) message about this event into
    VLOG(1) since it's not so important to report about such events on the
    responder side: its state doesn't change upon responding with
    ServiceUnavailable anyways.  Other minor clean-up.
    
    Change-Id: I95d5cbe455fefc4cdc540ee1e7b69e1f21b6ebc0
    Reviewed-on: http://gerrit.cloudera.org:8080/14943
    Tested-by: Alexey Serbin <aserbin@cloudera.com>
    Reviewed-by: Bankim Bhavsar <bankim@cloudera.com>
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/consensus/raft_consensus.cc | 70 ++++++++++++++++++++----------------
 src/kudu/consensus/raft_consensus.h  | 22 ++++++++++--
 2 files changed, 59 insertions(+), 33 deletions(-)

diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 69c5d9f..d4b7d86 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -228,7 +228,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                             scoped_refptr<TimeManager> time_manager,
                             ConsensusRoundHandler* round_handler,
                             const scoped_refptr<MetricEntity>& metric_entity,
-                            Callback<void(const std::string& reason)> mark_dirty_clbk)
{
+                            Callback<void(const string& reason)> mark_dirty_clbk)
{
   DCHECK(metric_entity);
 
   peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
@@ -849,7 +849,7 @@ void RaftConsensus::NotifyTermChange(int64_t term) {
 
 void RaftConsensus::NotifyFailedFollower(const string& uuid,
                                          int64_t term,
-                                         const std::string& reason) {
+                                         const string& reason) {
   // Common info used in all of the log messages within this method.
   string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ",
                                uuid, term, reason);
@@ -891,7 +891,7 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
               LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
 }
 
-void RaftConsensus::NotifyPeerToPromote(const std::string& peer_uuid) {
+void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) {
   // Run the config change on the raft thread pool.
   WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryPromoteNonVoterTask,
                                                      shared_from_this(),
@@ -913,7 +913,7 @@ void RaftConsensus::NotifyPeerHealthChange() {
 
 void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
                                           const RaftConfigPB& committed_config,
-                                          const std::string& reason) {
+                                          const string& reason) {
   ChangeConfigRequestPB req;
   req.set_tablet_id(options_.tablet_id);
   req.mutable_server()->set_permanent_uuid(uuid);
@@ -926,7 +926,7 @@ void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
               LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
 }
 
-void RaftConsensus::TryPromoteNonVoterTask(const std::string& peer_uuid) {
+void RaftConsensus::TryPromoteNonVoterTask(const string& peer_uuid) {
   string msg = Substitute("attempt to promote peer $0: ", peer_uuid);
   int64_t current_committed_config_index;
   {
@@ -1052,8 +1052,8 @@ bool RaftConsensus::IsSingleVoterConfig() const {
          cmeta_->IsVoterInConfig(peer_uuid(), COMMITTED_CONFIG);
 }
 
-std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
-  std::string ret;
+string RaftConsensus::LeaderRequest::OpsRangeString() const {
+  string ret;
   ret.reserve(100);
   ret.push_back('[');
   if (!messages.empty()) {
@@ -1651,7 +1651,7 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request,
   // takes place between requests.
   // Lock ordering: update_lock_ must be acquired before lock_.
   std::unique_lock<simple_spinlock> update_guard(update_lock_, std::defer_lock);
-  if (FLAGS_enable_leader_failure_detection) {
+  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
     update_guard.try_lock();
   } else {
     // If failure detection is not enabled, then we can't just reject the vote,
@@ -1663,10 +1663,6 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request,
     // other request is likely to reset the timer, and we'll end up just voting
     // "NO" after waiting. To avoid starving RPC handlers and causing cascading
     // timeouts, just vote a quick NO.
-    //
-    // We still need to take the state lock in order to respond with term info, etc.
-    ThreadRestrictions::AssertWaitAllowed();
-    LockGuard l(lock_);
     return RequestVoteRespondIsBusy(request, response);
   }
 
@@ -2236,7 +2232,13 @@ Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
   return HandleTermAdvanceUnlocked(new_term);
 }
 
-std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request)
const {
+string RaftConsensus::GetRequestVoteLogPrefixThreadSafe(const VoteRequestPB& request)
const {
+  return Substitute("$0Leader $1election vote request",
+                    LogPrefixThreadSafe(),
+                    request.is_pre_election() ? "pre-" : "");
+}
+
+string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const
{
   DCHECK(lock_.is_locked());
   return Substitute("$0Leader $1election vote request",
                     LogPrefixUnlocked(),
@@ -2248,11 +2250,15 @@ void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response)
{
   response->set_vote_granted(true);
 }
 
-void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
-                                               VoteResponsePB* response) {
-  response->set_responder_term(CurrentTermUnlocked());
+void RaftConsensus::FillVoteResponseVoteDenied(
+    ConsensusErrorPB::Code error_code,
+    VoteResponsePB* response,
+    ResponderTermPolicy responder_term_policy) {
   response->set_vote_granted(false);
   response->mutable_consensus_error()->set_code(error_code);
+  if (responder_term_policy == ResponderTermPolicy::SET) {
+    response->set_responder_term(CurrentTermUnlocked());
+  }
 }
 
 Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request,
@@ -2321,20 +2327,24 @@ Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB*
reque
                           request->candidate_uuid(),
                           request->candidate_term());
   LOG(INFO) << msg;
-  StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
+  StatusToPB(Status::InvalidArgument(msg),
+             response->mutable_consensus_error()->mutable_status());
   return Status::OK();
 }
 
-Status RaftConsensus::RequestVoteRespondIsBusy(const VoteRequestPB* request,
-                                               VoteResponsePB* response) {
-  FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response);
-  string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
-                          "replica is already servicing an update from a current leader "
-                          "or another vote.",
-                          GetRequestVoteLogPrefixUnlocked(*request),
-                          request->candidate_uuid(),
-                          request->candidate_term());
-  LOG(INFO) << msg;
+Status RaftConsensus::RequestVoteRespondIsBusy(
+    const VoteRequestPB* request, VoteResponsePB* response) {
+  // Don't set the term in the response: the requestor doesn't need it
+  // to process the NO vote response in this case.
+  FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response,
+                             ResponderTermPolicy::DO_NOT_SET);
+  auto msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
+                        "replica is already servicing an update from "
+                        "a current leader or another vote",
+                        GetRequestVoteLogPrefixThreadSafe(*request),
+                        request->candidate_uuid(),
+                        request->candidate_term());
+  VLOG(1) << msg;
   StatusToPB(Status::ServiceUnavailable(msg),
              response->mutable_consensus_error()->mutable_status());
   return Status::OK();
@@ -2709,7 +2719,7 @@ log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
                                queue_->GetAllReplicatedIndex()); // for peers
 }
 
-void RaftConsensus::MarkDirty(const std::string& reason) {
+void RaftConsensus::MarkDirty(const string& reason) {
   WARN_NOT_OK(raft_pool_token_->SubmitClosure(Bind(mark_dirty_clbk_, reason)),
               LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
 }
@@ -3062,7 +3072,7 @@ const bool RaftConsensus::HasVotedCurrentTermUnlocked() const {
   return cmeta_->has_voted_for();
 }
 
-Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
+Status RaftConsensus::SetVotedForCurrentTermUnlocked(const string& uuid) {
   TRACE_EVENT1("consensus", "RaftConsensus::SetVotedForCurrentTermUnlocked",
                "uuid", uuid);
   DCHECK(lock_.is_locked());
@@ -3071,7 +3081,7 @@ Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string&
uuid) {
   return Status::OK();
 }
 
-const std::string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
+const string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
   DCHECK(lock_.is_locked());
   DCHECK(cmeta_->has_voted_for());
   return cmeta_->voted_for();
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 95b3120..ddf5020 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -552,7 +552,11 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Returns true if this node is the only voter in the Raft configuration.
   bool IsSingleVoterConfig() const;
 
-  // Return header string for RequestVote log messages. 'lock_' must be held.
+  // Return header string for RequestVote log messages, no 'lock_' is necessary.
+  std::string GetRequestVoteLogPrefixThreadSafe(const VoteRequestPB& request) const;
+
+  // Similar to the method above, but outputs more detailed information on the
+  // metadata of the RaftConsensus object. 'lock_' must be held.
   std::string GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const;
 
   // Fills the response with the current status, if an update was successful.
@@ -568,11 +572,23 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // - Set vote_granted to true.
   void FillVoteResponseVoteGranted(VoteResponsePB* response);
 
+  // Enum for the 'responder_term' parameter of the FillVoterResponseVoteDenied()
+  // method below. Controls whether to populate the 'responder_term' field
+  // in the 'response' output parameter.
+  enum class ResponderTermPolicy {
+    DO_NOT_SET,  // don't set the field
+    SET,          // populate/set the field
+  };
+
   // Fill VoteResponsePB with the following information:
-  // - Update responder_term to current local term.
   // - Set vote_granted to false.
   // - Set consensus_error.code to the given code.
-  void FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code, VoteResponsePB* response);
+  // - Set or leave the responder_term field unset as prescribed by the
+  //   'responder_term' parameter.
+  void FillVoteResponseVoteDenied(
+      ConsensusErrorPB::Code error_code,
+      VoteResponsePB* response,
+      ResponderTermPolicy responder_term_policy = ResponderTermPolicy::SET);
 
   // Respond to VoteRequest that the candidate has an old term.
   Status RequestVoteRespondInvalidTerm(const VoteRequestPB* request, VoteResponsePB* response);


Mime
View raw message