kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] branch master updated: KUDU-3011 p1: metric to count tablet leaders
Date Mon, 06 Jan 2020 22:01:22 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f22c0f  KUDU-3011 p1: metric to count tablet leaders
4f22c0f is described below

commit 4f22c0f9a6e8d41ec5efa0bba25aed55936a9f91
Author: Andrew Wong <awong@cloudera.com>
AuthorDate: Fri Jan 3 16:34:14 2020 -0800

    KUDU-3011 p1: metric to count tablet leaders
    
    Adds a metric that tracks the number of tablet leaders on a given
    tablet server. This is done by plumbing down a top-level gauge down from
    the KuduServer to each TabletReplica's RaftConsensus instance.
    
    This is done at the KuduServer to eventually be extensible to count the
    number of master system catalog leaders, though that plumbing is left
    for a future patch. I've left a TODO where I expect this to happen.
    
    I also considered instead having the metric be defined by a functor that
    would iterate through all replicas and check each's leadership status. I
    opted to not do this, since iterating through and locking each
    RaftConsensus instance seemed like it'd be less performant.
    
    This will be useful in orchestrating a smooth maintenance window, as it
    will allow us to determine whether leadership has quiesced away from a
    given tablet server.
    
    Change-Id: Iaa6554458a860e34f97af168da7ed786c8ef47e4
    Reviewed-on: http://gerrit.cloudera.org:8080/14976
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/consensus/raft_consensus.cc               | 31 +++++---
 src/kudu/consensus/raft_consensus.h                | 19 ++++-
 src/kudu/consensus/raft_consensus_quorum-test.cc   |  9 ++-
 .../raft_consensus_election-itest.cc               | 86 +++++++++++++++++++++-
 .../integration-tests/ts_tablet_manager-itest.cc   |  7 +-
 src/kudu/kserver/kserver.cc                        | 19 +++--
 src/kudu/kserver/kserver.h                         |  7 ++
 src/kudu/master/sys_catalog.cc                     |  5 +-
 src/kudu/tablet/tablet_replica-test.cc             |  3 +-
 src/kudu/tablet/tablet_replica.cc                  |  5 +-
 src/kudu/tablet/tablet_replica.h                   |  2 +-
 .../tserver/tablet_copy_source_session-test.cc     |  3 +-
 src/kudu/tserver/ts_tablet_manager.cc              |  3 +-
 13 files changed, 163 insertions(+), 36 deletions(-)

diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index dbbf2f4..69c19c8 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -181,11 +181,11 @@ RaftConsensus::RaftConsensus(
     ConsensusOptions options,
     RaftPeerPB local_peer_pb,
     scoped_refptr<ConsensusMetadataManager> cmeta_manager,
-    ThreadPool* raft_pool)
+    ServerContext server_ctx)
     : options_(std::move(options)),
       local_peer_pb_(std::move(local_peer_pb)),
       cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
-      raft_pool_(raft_pool),
+      server_ctx_(std::move(server_ctx)),
       state_(kNew),
       rng_(GetRandomSeed32()),
       leader_transfer_in_progress_(false),
@@ -211,12 +211,12 @@ RaftConsensus::~RaftConsensus() {
 Status RaftConsensus::Create(ConsensusOptions options,
                              RaftPeerPB local_peer_pb,
                              scoped_refptr<ConsensusMetadataManager> cmeta_manager,
-                             ThreadPool* raft_pool,
+                             ServerContext server_ctx,
                              shared_ptr<RaftConsensus>* consensus_out) {
   shared_ptr<RaftConsensus> consensus(RaftConsensus::make_shared(std::move(options),
                                                                  std::move(local_peer_pb),
                                                                  std::move(cmeta_manager),
-                                                                 raft_pool));
+                                                                 std::move(server_ctx)));
   RETURN_NOT_OK_PREPEND(consensus->Init(), "Unable to initialize Raft consensus");
   *consensus_out = std::move(consensus);
   return Status::OK();
@@ -257,7 +257,8 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
   // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
   // raw pointer to the token, to emphasize that RaftConsensus is responsible
   // for destroying the token.
-  raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
+  ThreadPool* raft_pool = server_ctx_.raft_pool;
+  raft_pool_token_ = raft_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
 
   // The message queue that keeps track of which operations need to be replicated
   // where.
@@ -274,7 +275,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
       time_manager_,
       local_peer_pb_,
       options_.tablet_id,
-      raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+      raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL),
       info.last_id,
       info.last_committed_id));
 
@@ -393,8 +394,8 @@ bool RaftConsensus::IsRunning() const {
   return state_ == kRunning;
 }
 
-Status RaftConsensus::EmulateElection() {
-  TRACE_EVENT2("consensus", "RaftConsensus::EmulateElection",
+Status RaftConsensus::EmulateElectionForTests() {
+  TRACE_EVENT2("consensus", "RaftConsensus::EmulateElectionForTests",
                "peer", peer_uuid(),
                "tablet", options_.tablet_id);
 
@@ -676,7 +677,9 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
   EndLeaderTransferPeriod();
 
   queue_->RegisterObserver(this);
+  bool was_leader = queue_->IsInLeaderMode();
   RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
+  if (!was_leader && server_ctx_.num_leaders) server_ctx_.num_leaders->Increment();
 
   // Initiate a NO_OP transaction that is sent at the beginning of every term
   // change in raft.
@@ -716,7 +719,11 @@ Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta>
fd_delta)
   // Deregister ourselves from the queue. We no longer need to track what gets
   // replicated since we're stepping down.
   queue_->UnRegisterObserver(this);
+  bool was_leader = queue_->IsInLeaderMode();
   queue_->SetNonLeaderMode(cmeta_->ActiveConfig());
+  if (was_leader && server_ctx_.num_leaders) {
+    server_ctx_.num_leaders->IncrementBy(-1);
+  }
   peer_manager_->Close();
 
   return Status::OK();
@@ -2166,7 +2173,13 @@ void RaftConsensus::Stop() {
   if (peer_manager_) peer_manager_->Close();
 
   // We must close the queue after we close the peers.
-  if (queue_) queue_->Close();
+  if (queue_) {
+    // If we were leader, decrement the number of leaders there are now.
+    if (queue_->IsInLeaderMode() && server_ctx_.num_leaders) {
+      server_ctx_.num_leaders->IncrementBy(-1);
+    }
+    queue_->Close();
+  }
 
   {
     ThreadRestrictions::AssertWaitAllowed();
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 23f5bef..b038bfd 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -79,6 +79,16 @@ class PendingRounds;
 struct ConsensusBootstrapInfo;
 struct ElectionResult;
 
+// Context containing resources shared by the Raft consensus instances on a
+// single server.
+struct ServerContext {
+  // Gauge indicating how many Raft tablet leaders are hosted on the server.
+  scoped_refptr<AtomicGauge<int32_t>> num_leaders;
+
+  // Threadpool on which to run Raft tasks.
+  ThreadPool* raft_pool;
+};
+
 struct ConsensusOptions {
   std::string tablet_id;
 };
@@ -138,7 +148,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   static Status Create(ConsensusOptions options,
                        RaftPeerPB local_peer_pb,
                        scoped_refptr<ConsensusMetadataManager> cmeta_manager,
-                       ThreadPool* raft_pool,
+                       ServerContext server_ctx,
                        std::shared_ptr<RaftConsensus>* consensus_out);
 
   // Starts running the Raft consensus algorithm.
@@ -159,7 +169,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // in the configuration by sending a NO_OP to other peers.
   // This is NOT safe to use in a distributed configuration with failure detection
   // enabled, as it could result in a split-brain scenario.
-  Status EmulateElection();
+  Status EmulateElectionForTests();
 
   // Triggers a leader election.
   Status StartElection(ElectionMode mode, ElectionReason reason);
@@ -389,7 +399,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   RaftConsensus(ConsensusOptions options,
                 RaftPeerPB local_peer_pb,
                 scoped_refptr<ConsensusMetadataManager> cmeta_manager,
-                ThreadPool* raft_pool);
+                ServerContext server_ctx);
 
  private:
   friend class RaftConsensusQuorumTest;
@@ -828,7 +838,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Consensus metadata service.
   const scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
 
-  ThreadPool* const raft_pool_;
+  // State shared by Raft instances on a given server.
+  const ServerContext server_ctx_;
 
   // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages
   // should probably be refactored out.
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 9d4209e..090371a 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -23,6 +23,7 @@
 #include <type_traits>
 #include <unordered_map>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -191,10 +192,12 @@ class RaftConsensusQuorumTest : public KuduTest {
       RETURN_NOT_OK(GetRaftConfigMember(&config_, fs->uuid(), &local_peer_pb));
 
       shared_ptr<RaftConsensus> peer;
+      ServerContext ctx({ /*num_leaders*/nullptr,
+                          raft_pool_.get() });
       RETURN_NOT_OK(RaftConsensus::Create(options_,
                                           config_.peers(i),
                                           std::move(cmeta_manager),
-                                          raft_pool_.get(),
+                                          std::move(ctx),
                                           &peer));
       peers_->AddPeer(config_.peers(i).permanent_uuid(), peer);
     }
@@ -245,7 +248,7 @@ class RaftConsensusQuorumTest : public KuduTest {
     const int kLeaderIdx = num - 1;
     shared_ptr<RaftConsensus> leader;
     RETURN_NOT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
-    RETURN_NOT_OK(leader->EmulateElection());
+    RETURN_NOT_OK(leader->EmulateElectionForTests());
     return Status::OK();
   }
 
@@ -823,7 +826,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
 
   shared_ptr<RaftConsensus> leader;
   CHECK_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
-  ASSERT_OK(leader->EmulateElection());
+  ASSERT_OK(leader->EmulateElectionForTests());
 
   // Wait for the config round to get committed and count the number
   // of update calls, calls after that will be heartbeats.
diff --git a/src/kudu/integration-tests/raft_consensus_election-itest.cc b/src/kudu/integration-tests/raft_consensus_election-itest.cc
index 35ca929..aed18a6 100644
--- a/src/kudu/integration-tests/raft_consensus_election-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_election-itest.cc
@@ -60,9 +60,8 @@ DECLARE_int32(num_client_threads);
 DECLARE_int32(num_replicas);
 DECLARE_int32(num_tablet_servers);
 
-METRIC_DECLARE_entity(tablet);
-METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
-METRIC_DECLARE_gauge_int64(raft_term);
+METRIC_DECLARE_entity(server);
+METRIC_DECLARE_gauge_int32(num_raft_leaders);
 
 using kudu::cluster::ExternalTabletServer;
 using kudu::consensus::COMMITTED_OPID;
@@ -72,6 +71,7 @@ using kudu::consensus::OpId;
 using kudu::consensus::RaftPeerPB;
 using kudu::itest::AddServer;
 using kudu::itest::GetConsensusState;
+using kudu::itest::GetInt64Metric;
 using kudu::itest::GetLastOpIdForReplica;
 using kudu::itest::GetReplicaStatusAndCheckIfLeader;
 using kudu::itest::LeaderStepDown;
@@ -345,6 +345,86 @@ TEST_F(RaftConsensusElectionITest, LeaderStepDown) {
                                   << s.ToString();
 }
 
+class RaftConsensusNumLeadersMetricTest : public RaftConsensusElectionITest,
+                                          public testing::WithParamInterface<int> {};
+
+TEST_P(RaftConsensusNumLeadersMetricTest, TestNumLeadersMetric) {
+  const int kNumTablets = 10;
+  FLAGS_num_tablet_servers = GetParam();
+  // We'll trigger elections manually, so turn off leader failure detection.
+  const vector<string> kTsFlags = {
+    "--enable_leader_failure_detection=false"
+  };
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
+  };
+  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags, /*location_info*/{}, /*create_table*/false));
+
+  // Create some tablet replias.
+  TestWorkload workload(cluster_.get());
+  workload.set_num_tablets(kNumTablets);
+  workload.set_num_replicas(FLAGS_num_tablet_servers);
+  workload.Setup();
+
+  const auto kTimeout = MonoDelta::FromSeconds(10);
+  const auto* ts = tablet_servers_.begin()->second;
+  vector<string> tablet_ids;
+  ASSERT_EVENTUALLY([&] {
+    vector<string> tablets;
+    ASSERT_OK(ListRunningTabletIds(ts, kTimeout, &tablets));
+    ASSERT_EQ(kNumTablets, tablets.size());
+    tablet_ids = std::move(tablets);
+  });
+
+  // Do a sanity check that there are no leaders yet.
+  for (const auto& id : tablet_ids) {
+    Status s = GetReplicaStatusAndCheckIfLeader(ts, id, kTimeout);
+    ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader yet: " <<
s.ToString();
+  }
+  const auto get_num_leaders_metric = [&] (int64_t* num_leaders) {
+    return GetInt64Metric(cluster_->tablet_server_by_uuid(ts->uuid())->bound_http_hostport(),
+                          &METRIC_ENTITY_server, nullptr, &METRIC_num_raft_leaders,
"value",
+                          num_leaders);
+  };
+  int64_t num_leaders_metric;
+  ASSERT_OK(get_num_leaders_metric(&num_leaders_metric));
+  ASSERT_EQ(0, num_leaders_metric);
+
+  // Begin triggering elections and ensure we get the correct values for the
+  // metric.
+  int num_leaders_expected = 0;
+  for (const auto& id : tablet_ids) {
+    ASSERT_OK(StartElection(ts, id, kTimeout));
+    ASSERT_OK(WaitUntilLeader(ts, id, kTimeout));
+    ASSERT_OK(get_num_leaders_metric(&num_leaders_metric));
+    ASSERT_EQ(++num_leaders_expected, num_leaders_metric);
+  }
+
+  // Delete half of the leaders and ensure that the metric goes down.
+  int idx = 0;
+  int halfway_idx = tablet_ids.size() / 2;
+  for (; idx < halfway_idx; idx++) {
+    ASSERT_OK(DeleteTablet(ts, tablet_ids[idx], tablet::TABLET_DATA_TOMBSTONED, kTimeout));
+    ASSERT_OK(get_num_leaders_metric(&num_leaders_metric));
+    ASSERT_EQ(--num_leaders_expected, num_leaders_metric);
+  }
+
+  // Renounce leadership on the rest.
+  for (; idx < tablet_ids.size(); idx++) {
+    ASSERT_OK(LeaderStepDown(ts, tablet_ids[idx], kTimeout));
+    ASSERT_OK(get_num_leaders_metric(&num_leaders_metric));
+    ASSERT_EQ(--num_leaders_expected, num_leaders_metric);
+
+    // Also delete them and ensure that they don't affect the metric, since
+    // they're already non-leaders.
+    ASSERT_OK(DeleteTablet(ts, tablet_ids[idx], tablet::TABLET_DATA_TOMBSTONED, kTimeout));
+    ASSERT_OK(get_num_leaders_metric(&num_leaders_metric));
+    ASSERT_EQ(num_leaders_expected, num_leaders_metric);
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(NumReplicas, RaftConsensusNumLeadersMetricTest, ::testing::Values(1,
3));
+
 // Test for KUDU-699: sets the consensus RPC timeout to be long,
 // and freezes both followers before asking the leader to step down.
 // Prior to fixing KUDU-699, the step-down process would block
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 2e40920..f7b4ac6 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -29,7 +29,6 @@
 #include <vector>
 
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -88,7 +87,7 @@ DECLARE_int32(heartbeat_interval_ms);
 DECLARE_int32(metrics_retirement_age_ms);
 DECLARE_int32(raft_heartbeat_interval_ms);
 DEFINE_int32(num_election_test_loops, 3,
-             "Number of random EmulateElection() loops to execute in "
+             "Number of random EmulateElectionForTests() loops to execute in "
              "TestReportNewLeaderOnLeaderChange");
 
 using kudu::client::KuduClient;
@@ -290,7 +289,7 @@ class LeadershipChangeReportingTest : public TsTabletManagerITest {
     NO_FATALS(StartCluster(std::move(opts)));
 
     // We need to control elections precisely for this test since we're using
-    // EmulateElection() with a distributed consensus configuration.
+    // EmulateElectionForTests() with a distributed consensus configuration.
     FLAGS_enable_leader_failure_detection = false;
     FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
 
@@ -342,7 +341,7 @@ class LeadershipChangeReportingTest : public TsTabletManagerITest {
     int leader_idx = rand() % tablet_replicas_.size();
     LOG(INFO) << "Electing peer " << leader_idx << "...";
     RaftConsensus* con = CHECK_NOTNULL(tablet_replicas_[leader_idx]->consensus());
-    RETURN_NOT_OK(con->EmulateElection());
+    RETURN_NOT_OK(con->EmulateElectionForTests());
     LOG(INFO) << "Waiting for servers to agree...";
     RETURN_NOT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5), ts_map_,
         tablet_replicas_[leader_idx]->tablet_id(), min_term));
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index 1a46c4f..c2b1db9 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -18,7 +18,6 @@
 #include "kudu/kserver/kserver.h"
 
 #include <algorithm>
-#include <cstdint>
 #include <initializer_list>
 #include <memory>
 #include <mutex>
@@ -61,11 +60,14 @@ using kudu::server::ServerBaseOptions;
 using std::string;
 using strings::Substitute;
 
-namespace kudu {
-namespace kserver {
+METRIC_DEFINE_gauge_int32(server, num_raft_leaders,
+                          "Number of Raft Leaders",
+                          kudu::MetricUnit::kTablets,
+                          "Number of tablet replicas that are Raft leaders",
+                          kudu::MetricLevel::kInfo);
 
 METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply Queue Length",
-                        MetricUnit::kTasks,
+                        kudu::MetricUnit::kTasks,
                         "Number of operations waiting to be applied to the tablet. "
                         "High queue lengths indicate that the server is unable to process
"
                         "operations as fast as they are being written to the WAL.",
@@ -73,7 +75,7 @@ METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply
Queue Le
                         10000, 2);
 
 METRIC_DEFINE_histogram(server, op_apply_queue_time, "Operation Apply Queue Time",
-                        MetricUnit::kMicroseconds,
+                        kudu::MetricUnit::kMicroseconds,
                         "Time that operations spent waiting in the apply queue before being
"
                         "processed. High queue times indicate that the server is unable to
"
                         "process operations as fast as they are being written to the WAL.",
@@ -81,13 +83,16 @@ METRIC_DEFINE_histogram(server, op_apply_queue_time, "Operation Apply
Queue Time
                         10000000, 2);
 
 METRIC_DEFINE_histogram(server, op_apply_run_time, "Operation Apply Run Time",
-                        MetricUnit::kMicroseconds,
+                        kudu::MetricUnit::kMicroseconds,
                         "Time that operations spent being applied to the tablet. "
                         "High values may indicate that the server is under-provisioned or
"
                         "that operations consist of very large batches.",
                         kudu::MetricLevel::kWarn,
                         10000000, 2);
 
+namespace kudu {
+namespace kserver {
+
 namespace {
 
 int GetThreadPoolThreadLimit(Env* env) {
@@ -158,6 +163,8 @@ Status KuduServer::Init() {
                 .set_max_threads(server_wide_pool_limit)
                 .Build(&raft_pool_));
 
+  num_raft_leaders_ = metric_entity_->FindOrCreateGauge(&METRIC_num_raft_leaders,
0);
+
   return Status::OK();
 }
 
diff --git a/src/kudu/kserver/kserver.h b/src/kudu/kserver/kserver.h
index 1833323..dc0b516 100644
--- a/src/kudu/kserver/kserver.h
+++ b/src/kudu/kserver/kserver.h
@@ -17,11 +17,14 @@
 
 #pragma once
 
+#include <cstdint>
 #include <memory>
 #include <string>
 
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/server/server_base.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/threadpool.h"
 
 namespace kudu {
@@ -59,6 +62,7 @@ class KuduServer : public server::ServerBase {
   ThreadPool* tablet_prepare_pool() const { return tablet_prepare_pool_.get(); }
   ThreadPool* tablet_apply_pool() const { return tablet_apply_pool_.get(); }
   ThreadPool* raft_pool() const { return raft_pool_.get(); }
+  scoped_refptr<AtomicGauge<int32_t>> num_raft_leaders() const { return num_raft_leaders_;
}
 
  private:
 
@@ -71,6 +75,9 @@ class KuduServer : public server::ServerBase {
   // Thread pool for Raft-related operations, shared between all tablets.
   std::unique_ptr<ThreadPool> raft_pool_;
 
+  // Gauge counting the number of Raft instances that in leaders mode.
+  scoped_refptr<AtomicGauge<int32_t>> num_raft_leaders_;
+
   DISALLOW_COPY_AND_ASSIGN(KuduServer);
 };
 
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 2cd36a9..34043e0 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -84,6 +84,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
@@ -378,7 +379,9 @@ Status SysCatalogTable::SetupTablet(
       Bind(&SysCatalogTable::SysCatalogStateChanged,
            Unretained(this),
            metadata->tablet_id())));
-  RETURN_NOT_OK_SHUTDOWN(tablet_replica_->Init(master_->raft_pool()),
+  // TODO(awong): plumb master_->num_raft_leaders() here.
+  RETURN_NOT_OK_SHUTDOWN(tablet_replica_->Init({ /*num_leaders*/nullptr,
+                                                 master_->raft_pool() }),
                          "failed to initialize system catalog replica");
 
   shared_ptr<Tablet> tablet;
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 2baa9b2..ecf980e 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -153,7 +153,8 @@ class TabletReplicaTest : public KuduTabletTest {
                         Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
                              Unretained(this),
                              tablet()->tablet_id())));
-    ASSERT_OK(tablet_replica_->Init(raft_pool_.get()));
+    ASSERT_OK(tablet_replica_->Init({ /*num_leaders*/nullptr,
+                                      raft_pool_.get() }));
     // Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
     // TODO(mpercy): Refactor TabletHarness to allow taking a
     // LogAnchorRegistry, while also providing TabletMetadata for consumption
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 7a32791..fafc28f 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -111,6 +111,7 @@ using consensus::RaftConfigPB;
 using consensus::RaftPeerPB;
 using consensus::RaftConsensus;
 using consensus::RpcPeerProxyFactory;
+using consensus::ServerContext;
 using consensus::TimeManager;
 using consensus::ALTER_SCHEMA_OP;
 using consensus::WRITE_OP;
@@ -149,7 +150,7 @@ TabletReplica::~TabletReplica() {
       << TabletStatePB_Name(state_);
 }
 
-Status TabletReplica::Init(ThreadPool* raft_pool) {
+Status TabletReplica::Init(ServerContext server_ctx) {
   CHECK_EQ(NOT_INITIALIZED, state_);
   TRACE("Creating consensus instance");
   SetStatusMessage("Initializing consensus...");
@@ -159,7 +160,7 @@ Status TabletReplica::Init(ThreadPool* raft_pool) {
   RETURN_NOT_OK(RaftConsensus::Create(std::move(options),
                                       local_peer_pb_,
                                       cmeta_manager_,
-                                      raft_pool,
+                                      std::move(server_ctx),
                                       &consensus));
   consensus_ = std::move(consensus);
   set_state(INITIALIZED);
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 89338df..e2254f0 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -99,7 +99,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // This must be called before publishing the instance to other threads.
   // If this fails, the TabletReplica instance remains in a NOT_INITIALIZED
   // state.
-  Status Init(ThreadPool* raft_pool);
+  Status Init(consensus::ServerContext server_ctx);
 
   // Starts the TabletReplica, making it available for Write()s. If this
   // TabletReplica is part of a consensus configuration this will connect it to other replicas
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 5bc912b..4801beb 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -164,7 +164,8 @@ class TabletCopyTest : public KuduTabletTest {
                           Bind(&TabletCopyTest::TabletReplicaStateChangedCallback,
                                Unretained(this),
                                tablet()->tablet_id())));
-    ASSERT_OK(tablet_replica_->Init(raft_pool_.get()));
+    ASSERT_OK(tablet_replica_->Init({ /*num_leaders*/nullptr,
+                                      raft_pool_.get() }));
 
     shared_ptr<Messenger> messenger;
     MessengerBuilder mbuilder(CURRENT_TEST_NAME());
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index b9814f1..f3c2e2e 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -822,7 +822,8 @@ Status TSTabletManager::CreateAndRegisterTabletReplica(
                         Bind(&TSTabletManager::MarkTabletDirty,
                              Unretained(this),
                              tablet_id)));
-  Status s = replica->Init(server_->raft_pool());
+  Status s = replica->Init({ server_->num_raft_leaders(),
+                             server_->raft_pool() });
   if (PREDICT_FALSE(!s.ok())) {
     replica->SetError(s);
     replica->Shutdown();


Mime
View raw message