kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [2/2] kudu git commit: [ts_itest-base] split declaration and definition
Date Mon, 16 Oct 2017 20:55:42 GMT
[ts_itest-base] split declaration and definition

Splitted declaration and definition of methods in
TabletServerIntegrationTestBase and TabletServerTestBase test classes.
In addition to shorter compilation times (no need to compile the same
code multiple times), this is also necessary for upcoming changelist
which splits set of tests in raft_consensus-itest into separate files.

This changelist does not contain any functional changes.

Change-Id: I31085ad2a6df0df795dad70e9e1c90d34cdd3716
Reviewed-on: http://gerrit.cloudera.org:8080/8277
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3a7342a3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3a7342a3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3a7342a3

Branch: refs/heads/master
Commit: 3a7342a31b2c0e30f5175a2b26b6fdb6acac0dff
Parents: 92064c0
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Sat Oct 14 12:09:20 2017 -0700
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Mon Oct 16 20:42:25 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../exactly_once_writes-itest.cc                |  10 +-
 src/kudu/integration-tests/linked_list-test.cc  |  19 +-
 .../integration-tests/raft_consensus-itest.cc   |  96 ++-
 src/kudu/integration-tests/ts_itest-base.cc     | 580 +++++++++++++++++++
 src/kudu/integration-tests/ts_itest-base.h      | 496 ++--------------
 src/kudu/tools/kudu-admin-test.cc               |  57 +-
 src/kudu/tserver/CMakeLists.txt                 |   4 +-
 src/kudu/tserver/tablet_copy-test-base.h        |   3 +-
 src/kudu/tserver/tablet_server-stress-test.cc   |   2 +-
 src/kudu/tserver/tablet_server-test-base.cc     | 467 +++++++++++++++
 src/kudu/tserver/tablet_server-test-base.h      | 436 ++------------
 src/kudu/tserver/tablet_server-test.cc          |  68 ++-
 13 files changed, 1271 insertions(+), 968 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 968ef19..d124662 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -28,6 +28,7 @@ set(INTEGRATION_TESTS_SRCS
   internal_mini_cluster-itest-base.cc
   log_verifier.cc
   test_workload.cc
+  ts_itest-base.cc
 )
 
 add_library(itest_util ${INTEGRATION_TESTS_SRCS})

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/integration-tests/exactly_once_writes-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index 1595432..b291bc9 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -22,6 +22,7 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -53,7 +54,12 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+DECLARE_int32(consensus_rpc_timeout_ms);
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+
 using std::string;
+using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
@@ -112,7 +118,7 @@ void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(int thread_idx,
   rpc::MessengerBuilder bld("Client");
   ASSERT_OK(bld.Build(&client_messenger));
 
-  std::unique_ptr<TabletServerServiceProxy> proxy(new TabletServerServiceProxy(
+  unique_ptr<TabletServerServiceProxy> proxy(new TabletServerServiceProxy(
       client_messenger, address, address.host()));
   for (int i = 0; i < num_batches; i++) {
     // Wait for all of the other writer threads to finish their attempts of the prior
@@ -146,7 +152,7 @@ void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(int thread_idx,
       controller.Reset();
       WriteResponsePB response;
 
-      std::unique_ptr<rpc::RequestIdPB> request_id(new rpc::RequestIdPB());
+      unique_ptr<rpc::RequestIdPB> request_id(new rpc::RequestIdPB());
       request_id->set_client_id("test_client");
       request_id->set_seq_no(i);
       request_id->set_attempt_no(base_attempt_idx * kMaxAttempts + num_attempts);

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/integration-tests/linked_list-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/linked_list-test.cc b/src/kudu/integration-tests/linked_list-test.cc
index f7479d7..b12eb14 100644
--- a/src/kudu/integration-tests/linked_list-test.cc
+++ b/src/kudu/integration-tests/linked_list-test.cc
@@ -39,6 +39,7 @@
 
 #include <boost/bind.hpp>
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -57,6 +58,9 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+DECLARE_string(ts_flags);
 DEFINE_int32(seconds_to_run, 5, "Number of seconds for which to run the test");
 
 DEFINE_int32(num_chains, 50, "Number of parallel chains to generate");
@@ -69,19 +73,18 @@ DEFINE_bool(stress_flush_compact, false,
 DEFINE_bool(stress_wal_gc, false,
             "Set WAL segment size small so that logs will be GCed during the test");
 
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ClusterNodes;
+using kudu::itest::TServerDetails;
+using std::string;
+using std::vector;
+
 namespace kudu {
 
 namespace client {
 class KuduClient;
 } // namespace client
 
-using client::KuduClient;
-using client::sp::shared_ptr;
-using cluster::ClusterNodes;
-using itest::TServerDetails;
-using std::string;
-using std::vector;
-
 class LinkedListTest : public tserver::TabletServerIntegrationTestBase {
  public:
   LinkedListTest() {}
@@ -146,7 +149,7 @@ class LinkedListTest : public tserver::TabletServerIntegrationTestBase {
   }
 
  protected:
-  shared_ptr<KuduClient> client_;
+  shared_ptr<client::KuduClient> client_;
   gscoped_ptr<LinkedListTester> tester_;
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 5f59b4d..b3b63f9 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -88,18 +88,50 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+DECLARE_int32(consensus_rpc_timeout_ms);
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+DECLARE_int32(rpc_timeout);
 DEFINE_int32(num_client_threads, 8,
              "Number of client threads to launch");
 DEFINE_int64(client_inserts_per_thread, 50,
              "Number of rows inserted by each client thread");
 DEFINE_int64(client_num_batches_per_thread, 5,
              "In how many batches to group the rows, for each client");
-DECLARE_int32(consensus_rpc_timeout_ms);
 
 METRIC_DECLARE_entity(tablet);
 METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
 METRIC_DECLARE_gauge_int64(raft_term);
 
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::ConsensusRequestPB;
+using kudu::consensus::ConsensusResponsePB;
+using kudu::consensus::ConsensusServiceProxy;
+using kudu::consensus::MajoritySize;
+using kudu::consensus::MakeOpId;
+using kudu::consensus::OpId;
+using kudu::consensus::RaftPeerPB;
+using kudu::consensus::ReplicateMsg;
+using kudu::itest::AddServer;
+using kudu::itest::GetReplicaStatusAndCheckIfLeader;
+using kudu::itest::LeaderStepDown;
+using kudu::itest::RemoveServer;
+using kudu::itest::StartElection;
+using kudu::itest::TabletServerMap;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitUntilLeader;
+using kudu::itest::WriteSimpleTestRow;
+using kudu::master::TabletLocationsPB;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::pb_util::SecureDebugString;
+using kudu::rpc::RpcController;
+using kudu::server::SetFlagRequestPB;
+using kudu::server::SetFlagResponsePB;
+using kudu::tablet::TABLET_DATA_COPYING;
 using std::string;
 using std::unordered_map;
 using std::unordered_set;
@@ -109,41 +141,6 @@ using strings::Substitute;
 namespace kudu {
 namespace tserver {
 
-using client::KuduInsert;
-using client::KuduSession;
-using client::KuduTable;
-using client::sp::shared_ptr;
-using cluster::ExternalTabletServer;
-using consensus::ConsensusRequestPB;
-using consensus::ConsensusResponsePB;
-using consensus::ConsensusServiceProxy;
-using consensus::MajoritySize;
-using consensus::MakeOpId;
-using consensus::OpId;
-using consensus::RaftPeerPB;
-using consensus::ReplicateMsg;
-using itest::AddServer;
-using itest::GetReplicaStatusAndCheckIfLeader;
-using itest::LeaderStepDown;
-using itest::RemoveServer;
-using itest::StartElection;
-using itest::TabletServerMap;
-using itest::TServerDetails;
-using itest::WaitUntilLeader;
-using itest::WriteSimpleTestRow;
-using master::TabletLocationsPB;
-using pb_util::SecureShortDebugString;
-using pb_util::SecureDebugString;
-using rpc::RpcController;
-using server::SetFlagRequestPB;
-using server::SetFlagResponsePB;
-using std::string;
-using std::unordered_map;
-using std::unordered_set;
-using std::vector;
-using strings::Substitute;
-using tablet::TABLET_DATA_COPYING;
-
 static const int kConsensusRpcTimeoutForTests = 50;
 
 static const int kTestRowKey = 1234;
@@ -421,7 +418,7 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
   Status GetTermMetricValue(ExternalTabletServer* ts, int64_t* term);
 
   shared_ptr<KuduTable> table_;
-  std::vector<scoped_refptr<kudu::Thread> > threads_;
+  vector<scoped_refptr<kudu::Thread> > threads_;
   CountDownLatch inserters_;
 };
 
@@ -486,7 +483,7 @@ TEST_F(RaftConsensusITest, TestInsertAndMutateThroughConsensus) {
                                FLAGS_client_num_batches_per_thread,
                                vector<CountDownLatch*>());
   }
-  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters);
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters));
 }
 
 TEST_F(RaftConsensusITest, TestFailedTransaction) {
@@ -527,7 +524,7 @@ TEST_F(RaftConsensusITest, TestFailedTransaction) {
   SCOPED_TRACE(SecureShortDebugString(resp));
   ASSERT_FALSE(resp.has_error());
 
-  ASSERT_ALL_REPLICAS_AGREE(1);
+  NO_FATALS(AssertAllReplicasAgree(1));
 }
 
 // Inserts rows through consensus and also starts one delay injecting thread
@@ -568,7 +565,7 @@ TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
    CHECK_OK(ThreadJoiner(thr.get()).Join());
   }
 
-  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads);
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads));
 }
 
 TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
@@ -601,7 +598,7 @@ TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
   // TODO: need to change the error code to be something like REPLICA_NOT_LEADER
   // so that the client can properly handle this case! plumbing this is a little difficult
   // so not addressing at the moment.
-  ASSERT_ALL_REPLICAS_AGREE(0);
+  NO_FATALS(AssertAllReplicasAgree(0));
 }
 
 TEST_F(RaftConsensusITest, TestRunLeaderElection) {
@@ -616,7 +613,7 @@ TEST_F(RaftConsensusITest, TestRunLeaderElection) {
                              FLAGS_client_num_batches_per_thread,
                              vector<CountDownLatch*>());
 
-  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters);
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters));
 
   // Select the last follower to be new leader.
   vector<TServerDetails*> followers;
@@ -642,7 +639,7 @@ TEST_F(RaftConsensusITest, TestRunLeaderElection) {
   // Restart the original replica and make sure they all agree.
   ASSERT_OK(leader_ets->Restart());
 
-  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters * 2);
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters * 2));
 }
 
 void RaftConsensusITest::Write128KOpsToLeader(int num_writes) {
@@ -726,8 +723,7 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
 
   // Now unpause the replica, the lagging replica should eventually catch back up.
   ASSERT_OK(replica_ets->Resume());
-
-  ASSERT_ALL_REPLICAS_AGREE(kNumWrites);
+  NO_FATALS(AssertAllReplicasAgree(kNumWrites));
 
   // Once the follower has caught up, all replicas should eventually GC the earlier
   // log segments that they were retaining.
@@ -1136,7 +1132,7 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
     CHECK_OK(ThreadJoiner(thr.get()).Join());
   }
 
-  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads);
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads));
   STLDeleteElements(&latches);
 }
 
@@ -1182,7 +1178,7 @@ TEST_F(RaftConsensusITest, TestAutomaticLeaderElection) {
     CHECK_OK(cluster_->tablet_server_by_uuid(killed_node->uuid())->Restart());
   }
   // Verify the data on the remaining replicas.
-  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * kFinalNumReplicas);
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * kFinalNumReplicas));
 }
 
 // Single-replica leader election test.
@@ -1922,7 +1918,7 @@ TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) {
 
   // Insert some data and verify that it propagates to all servers.
   NO_FATALS(InsertTestRowsRemoteThread(0, 10, 1, vector<CountDownLatch*>()));
-  ASSERT_ALL_REPLICAS_AGREE(10);
+  NO_FATALS(AssertAllReplicasAgree(10));
 
   // Try another config change.
   // This acts as a regression test for KUDU-1338, in which aborting the original
@@ -2231,7 +2227,7 @@ TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) {
                                   min_log_index));
 
   LOG(INFO) << "Number of rows inserted: " << rows_inserted.Load();
-  ASSERT_ALL_REPLICAS_AGREE(rows_inserted.Load());
+  NO_FATALS(AssertAllReplicasAgree(rows_inserted.Load()));
 }
 
 TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
@@ -2492,7 +2488,7 @@ TEST_F(RaftConsensusITest, TestAutoCreateReplica) {
 
   int rows_inserted = workload.rows_inserted();
   LOG(INFO) << "Number of rows inserted: " << rows_inserted;
-  ASSERT_ALL_REPLICAS_AGREE(rows_inserted);
+  NO_FATALS(AssertAllReplicasAgree(rows_inserted));
 }
 
 TEST_F(RaftConsensusITest, TestMemoryRemainsConstantDespiteTwoDeadFollowers) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/integration-tests/ts_itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.cc b/src/kudu/integration-tests/ts_itest-base.cc
new file mode 100644
index 0000000..570f6f8
--- /dev/null
+++ b/src/kudu/integration-tests/ts_itest-base.cc
@@ -0,0 +1,580 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/integration-tests/ts_itest-base.h"
+
+#include <algorithm>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int32(consensus_rpc_timeout_ms);
+
+DEFINE_string(ts_flags, "", "Flags to pass through to tablet servers");
+DEFINE_string(master_flags, "", "Flags to pass through to masters");
+
+DEFINE_int32(num_tablet_servers, 3, "Number of tablet servers to start");
+DEFINE_int32(num_replicas, 3, "Number of replicas per tablet server");
+
+using kudu::client::sp::shared_ptr;
+using std::pair;
+using std::string;
+using std::unordered_multimap;
+using std::unordered_set;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+
+namespace kudu {
+namespace tserver {
+
+static const int kMaxRetries = 20;
+
+TabletServerIntegrationTestBase::
+TabletServerIntegrationTestBase()
+    : random_(SeedRandom()) {
+}
+
+void TabletServerIntegrationTestBase::SetUp() {
+  TabletServerTestBase::SetUp();
+}
+
+void TabletServerIntegrationTestBase::AddExtraFlags(
+    const string& flags_str, vector<string>* flags) {
+  if (flags_str.empty()) {
+    return;
+  }
+  vector<string> split_flags = Split(flags_str, " ");
+  for (const string& flag : split_flags) {
+    flags->push_back(flag);
+  }
+}
+
+void TabletServerIntegrationTestBase::CreateCluster(
+    const string& data_root_path,
+    const vector<string>& non_default_ts_flags,
+    const vector<string>& non_default_master_flags,
+    uint32_t num_data_dirs) {
+
+  LOG(INFO) << "Starting cluster with:";
+  LOG(INFO) << "--------------";
+  LOG(INFO) << FLAGS_num_tablet_servers << " tablet servers";
+  LOG(INFO) << FLAGS_num_replicas << " replicas per TS";
+  LOG(INFO) << "--------------";
+
+  cluster::ExternalMiniClusterOptions opts;
+  opts.num_tablet_servers = FLAGS_num_tablet_servers;
+  opts.data_root = GetTestPath(data_root_path);
+  opts.num_data_dirs = num_data_dirs;
+
+  // Enable exactly once semantics for tests.
+
+  // If the caller passed no flags use the default ones, where we stress
+  // consensus by setting low timeouts and frequent cache misses.
+  if (non_default_ts_flags.empty()) {
+    opts.extra_tserver_flags.emplace_back("--log_cache_size_limit_mb=10");
+    opts.extra_tserver_flags.push_back(
+        Substitute("--consensus_rpc_timeout_ms=$0",
+                   FLAGS_consensus_rpc_timeout_ms));
+  } else {
+    for (const string& flag : non_default_ts_flags) {
+      opts.extra_tserver_flags.push_back(flag);
+    }
+  }
+  for (const string& flag : non_default_master_flags) {
+    opts.extra_master_flags.push_back(flag);
+  }
+
+  AddExtraFlags(FLAGS_ts_flags, &opts.extra_tserver_flags);
+  AddExtraFlags(FLAGS_master_flags, &opts.extra_master_flags);
+
+  cluster_.reset(new cluster::ExternalMiniCluster(std::move(opts)));
+  ASSERT_OK(cluster_->Start());
+  inspect_.reset(new itest::ExternalMiniClusterFsInspector(cluster_.get()));
+  CreateTSProxies();
+}
+
+// Creates TSServerDetails instance for each TabletServer and stores them
+// in 'tablet_servers_'.
+void TabletServerIntegrationTestBase::CreateTSProxies() {
+  CHECK(tablet_servers_.empty());
+  CHECK_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
+                                        client_messenger_,
+                                        &tablet_servers_));
+}
+
+// Waits that all replicas for a all tablets of 'table_id' table are online
+// and creates the tablet_replicas_ map.
+void TabletServerIntegrationTestBase::WaitForReplicasAndUpdateLocations(
+    const string& table_id) {
+  bool replicas_missing = true;
+  for (int num_retries = 0; replicas_missing && num_retries < kMaxRetries; num_retries++) {
+    unordered_multimap<string, itest::TServerDetails*> tablet_replicas;
+    master::GetTableLocationsRequestPB req;
+    master::GetTableLocationsResponsePB resp;
+    rpc::RpcController controller;
+    req.mutable_table()->set_table_name(table_id);
+    controller.set_timeout(MonoDelta::FromSeconds(1));
+    CHECK_OK(cluster_->master_proxy()->GetTableLocations(req, &resp, &controller));
+    CHECK_OK(controller.status());
+    if (resp.has_error()) {
+      switch (resp.error().code()) {
+        case master::MasterErrorPB::TABLET_NOT_RUNNING:
+          LOG(WARNING)<< "At least one tablet is not yet running";
+          break;
+
+        case master::MasterErrorPB::NOT_THE_LEADER:   // fallthrough
+        case master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED:
+          LOG(WARNING)<< "CatalogManager is not yet ready to serve requests";
+          break;
+
+        default:
+          FAIL() << "Response had a fatal error: "
+                 << pb_util::SecureShortDebugString(resp.error());
+          break;  // unreachable
+      }
+      SleepFor(MonoDelta::FromSeconds(1));
+      continue;
+    }
+
+    for (const master::TabletLocationsPB& location : resp.tablet_locations()) {
+      for (const master::TabletLocationsPB_ReplicaPB& replica : location.replicas()) {
+        itest::TServerDetails* server =
+            FindOrDie(tablet_servers_, replica.ts_info().permanent_uuid());
+        tablet_replicas.insert(pair<string, itest::TServerDetails*>(
+            location.tablet_id(), server));
+      }
+
+      if (tablet_replicas.count(location.tablet_id()) < FLAGS_num_replicas) {
+        LOG(WARNING)<< "Couldn't find the leader and/or replicas. Location: "
+            << pb_util::SecureShortDebugString(location);
+        replicas_missing = true;
+        SleepFor(MonoDelta::FromSeconds(1));
+        break;
+      }
+
+      replicas_missing = false;
+    }
+    if (!replicas_missing) {
+      tablet_replicas_ = tablet_replicas;
+    }
+  }
+
+  // GetTableLocations() does not guarantee that all replicas are actually
+  // running. Some may still be bootstrapping. Wait for them before
+  // returning.
+  //
+  // Just as with the above loop and its behavior once kMaxRetries is
+  // reached, the wait here is best effort only. That is, if the wait
+  // deadline expires, the resulting timeout failure is ignored.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    cluster::ExternalTabletServer* ts = cluster_->tablet_server(i);
+    int expected_tablet_count = 0;
+    for (const auto& e : tablet_replicas_) {
+      if (ts->uuid() == e.second->uuid()) {
+        ++expected_tablet_count;
+      }
+    }
+    LOG(INFO) << Substitute(
+        "Waiting for $0 tablets on tserver $1 to finish bootstrapping",
+        expected_tablet_count, ts->uuid());
+    cluster_->WaitForTabletsRunning(ts, expected_tablet_count,
+                                    MonoDelta::FromSeconds(20));
+  }
+}
+
+// Returns the last committed leader of the consensus configuration. Tries to get it from master
+// but then actually tries to the get the committed consensus configuration to make sure.
+itest::TServerDetails* TabletServerIntegrationTestBase::GetLeaderReplicaOrNull(
+    const string& tablet_id) {
+  string leader_uuid;
+  Status master_found_leader_result = GetTabletLeaderUUIDFromMaster(
+      tablet_id, &leader_uuid);
+
+  // See if the master is up to date. I.e. if it does report a leader and if the
+  // replica it reports as leader is still alive and (at least thinks) its still
+  // the leader.
+  itest::TServerDetails* leader;
+  if (master_found_leader_result.ok()) {
+    leader = GetReplicaWithUuidOrNull(tablet_id, leader_uuid);
+    if (leader && itest::GetReplicaStatusAndCheckIfLeader(
+          leader, tablet_id, MonoDelta::FromMilliseconds(100)).ok()) {
+      return leader;
+    }
+  }
+
+  // The replica we got from the master (if any) is either dead or not the leader.
+  // Find the actual leader.
+  pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
+      tablet_replicas_.equal_range(tablet_id);
+  vector<itest::TServerDetails*> replicas_copy;
+  for (;range.first != range.second; ++range.first) {
+    replicas_copy.push_back((*range.first).second);
+  }
+
+  std::random_shuffle(replicas_copy.begin(), replicas_copy.end());
+  for (itest::TServerDetails* replica : replicas_copy) {
+    if (itest::GetReplicaStatusAndCheckIfLeader(
+          replica, tablet_id, MonoDelta::FromMilliseconds(100)).ok()) {
+      return replica;
+    }
+  }
+  return nullptr;
+}
+
+// For the last committed consensus configuration, return the last committed
+// leader of the consensus configuration and its followers.
+Status TabletServerIntegrationTestBase::GetTabletLeaderAndFollowers(
+    const string& tablet_id,
+    itest::TServerDetails** leader,
+    vector<itest::TServerDetails*>* followers) {
+
+  pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
+      tablet_replicas_.equal_range(tablet_id);
+  vector<itest::TServerDetails*> replicas;
+  for (; range.first != range.second; ++range.first) {
+    replicas.push_back((*range.first).second);
+  }
+
+  itest::TServerDetails* leader_replica = nullptr;
+  auto it = replicas.begin();
+  for (; it != replicas.end(); ++it) {
+    itest::TServerDetails* replica = *it;
+    bool found_leader_replica = false;
+    for (auto i = 0; i < kMaxRetries; ++i) {
+      if (itest::GetReplicaStatusAndCheckIfLeader(
+            replica, tablet_id, MonoDelta::FromMilliseconds(100)).ok()) {
+        leader_replica = replica;
+        found_leader_replica = true;
+        break;
+      }
+    }
+    if (found_leader_replica) {
+      break;
+    }
+  }
+  if (!leader_replica) {
+    return Status::NotFound("leader replica not found");
+  }
+
+  if (leader) {
+    *leader = leader_replica;
+  }
+  if (followers) {
+    CHECK(replicas.end() != it);
+    replicas.erase(it);
+    followers->swap(replicas);
+  }
+  return Status::OK();
+}
+
+Status TabletServerIntegrationTestBase::GetLeaderReplicaWithRetries(
+    const string& tablet_id,
+    itest::TServerDetails** leader,
+    int max_attempts) {
+  int attempts = 0;
+  while (attempts < max_attempts) {
+    *leader = GetLeaderReplicaOrNull(tablet_id);
+    if (*leader) {
+      return Status::OK();
+    }
+    attempts++;
+    SleepFor(MonoDelta::FromMilliseconds(100L * attempts));
+  }
+  return Status::NotFound("leader replica not found");
+}
+
+Status TabletServerIntegrationTestBase::GetTabletLeaderUUIDFromMaster(
+    const string& tablet_id, string* leader_uuid) {
+  master::GetTableLocationsRequestPB req;
+  master::GetTableLocationsResponsePB resp;
+  rpc::RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(100));
+  req.mutable_table()->set_table_name(kTableId);
+
+  RETURN_NOT_OK(cluster_->master_proxy()->GetTableLocations(req, &resp, &controller));
+  for (const master::TabletLocationsPB& loc : resp.tablet_locations()) {
+    if (loc.tablet_id() == tablet_id) {
+      for (const master::TabletLocationsPB::ReplicaPB& replica : loc.replicas()) {
+        if (replica.role() == consensus::RaftPeerPB::LEADER) {
+          *leader_uuid = replica.ts_info().permanent_uuid();
+          return Status::OK();
+        }
+      }
+    }
+  }
+  return Status::NotFound("Unable to find leader for tablet", tablet_id);
+}
+
+itest::TServerDetails* TabletServerIntegrationTestBase::GetReplicaWithUuidOrNull(
+    const string& tablet_id, const string& uuid) {
+  pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
+      tablet_replicas_.equal_range(tablet_id);
+  for (;range.first != range.second; ++range.first) {
+    if ((*range.first).second->instance_id.permanent_uuid() == uuid) {
+      return (*range.first).second;
+    }
+  }
+  return nullptr;
+}
+
+// Gets the the locations of the consensus configuration and waits until all replicas
+// are available for all tablets.
+void TabletServerIntegrationTestBase::WaitForTSAndReplicas(const string& table_id) {
+  int num_retries = 0;
+  // make sure the replicas are up and find the leader
+  while (true) {
+    if (num_retries >= kMaxRetries) {
+      FAIL() << " Reached max. retries while looking up the config.";
+    }
+
+    Status status = cluster_->WaitForTabletServerCount(FLAGS_num_tablet_servers,
+                                                       MonoDelta::FromSeconds(5));
+    if (status.IsTimedOut()) {
+      LOG(WARNING)<< "Timeout waiting for all replicas to be online, retrying...";
+      num_retries++;
+      continue;
+    }
+    break;
+  }
+  WaitForReplicasAndUpdateLocations(table_id);
+}
+
+// Removes a set of servers from the replicas_ list.
+// Handy for controlling who to validate against after killing servers.
+void TabletServerIntegrationTestBase::PruneFromReplicas(
+    const unordered_set<string>& uuids) {
+  auto iter = tablet_replicas_.begin();
+  while (iter != tablet_replicas_.end()) {
+    if (uuids.count((*iter).second->instance_id.permanent_uuid()) != 0) {
+      iter = tablet_replicas_.erase(iter);
+      continue;
+    }
+    ++iter;
+  }
+
+  for (const string& uuid : uuids) {
+    delete EraseKeyReturnValuePtr(&tablet_servers_, uuid);
+  }
+}
+
+void TabletServerIntegrationTestBase::GetOnlyLiveFollowerReplicas(
+    const string& tablet_id, vector<itest::TServerDetails*>* followers) {
+  followers->clear();
+  itest::TServerDetails* leader;
+  CHECK_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+
+  vector<itest::TServerDetails*> replicas;
+  pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
+      tablet_replicas_.equal_range(tablet_id);
+  for (;range.first != range.second; ++range.first) {
+    replicas.push_back((*range.first).second);
+  }
+
+  for (itest::TServerDetails* replica : replicas) {
+    if (leader != nullptr &&
+        replica->instance_id.permanent_uuid() == leader->instance_id.permanent_uuid()) {
+      continue;
+    }
+    Status s = itest::GetReplicaStatusAndCheckIfLeader(
+        replica, tablet_id, MonoDelta::FromMilliseconds(100));
+    if (s.IsIllegalState()) {
+      followers->push_back(replica);
+    }
+  }
+}
+
+// Return the index within 'replicas' for the replica which is farthest ahead.
+int64_t TabletServerIntegrationTestBase::GetFurthestAheadReplicaIdx(
+    const string& tablet_id, const vector<itest::TServerDetails*>& replicas) {
+  vector<consensus::OpId> op_ids;
+  CHECK_OK(GetLastOpIdForEachReplica(tablet_id, replicas, consensus::RECEIVED_OPID,
+                                     MonoDelta::FromSeconds(10), &op_ids));
+  int64_t max_index = 0;
+  int max_replica_index = -1;
+  for (int i = 0; i < op_ids.size(); i++) {
+    if (op_ids[i].index() > max_index) {
+      max_index = op_ids[i].index();
+      max_replica_index = i;
+    }
+  }
+
+  CHECK_NE(max_replica_index, -1);
+
+  return max_replica_index;
+}
+
+Status TabletServerIntegrationTestBase::ShutdownServerWithUUID(const string& uuid) {
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    cluster::ExternalTabletServer* ts = cluster_->tablet_server(i);
+    if (ts->instance_id().permanent_uuid() == uuid) {
+      ts->Shutdown();
+      return Status::OK();
+    }
+  }
+  return Status::NotFound("Unable to find server with UUID", uuid);
+}
+
+Status TabletServerIntegrationTestBase::RestartServerWithUUID(const string& uuid) {
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    cluster::ExternalTabletServer* ts = cluster_->tablet_server(i);
+    if (ts->instance_id().permanent_uuid() == uuid) {
+      ts->Shutdown();
+      RETURN_NOT_OK(CheckTabletServersAreAlive(tablet_servers_.size()-1));
+      RETURN_NOT_OK(ts->Restart());
+      RETURN_NOT_OK(CheckTabletServersAreAlive(tablet_servers_.size()));
+      return Status::OK();
+    }
+  }
+  return Status::NotFound("Unable to find server with UUID", uuid);
+}
+
+// Since we're fault-tolerant we might mask when a tablet server is
+// dead. This returns Status::IllegalState() if fewer than 'num_tablet_servers'
+// are alive.
+Status TabletServerIntegrationTestBase::CheckTabletServersAreAlive(int num_tablet_servers) {
+  int live_count = 0;
+  string error = Substitute("Fewer than $0 TabletServers were alive. Dead TSs: ",
+                            num_tablet_servers);
+  rpc::RpcController controller;
+  for (const itest::TabletServerMap::value_type& entry : tablet_servers_) {
+    controller.Reset();
+    controller.set_timeout(MonoDelta::FromSeconds(10));
+    PingRequestPB req;
+    PingResponsePB resp;
+    Status s = entry.second->tserver_proxy->Ping(req, &resp, &controller);
+    if (!s.ok()) {
+      error += "\n" + entry.second->ToString() +  " (" + s.ToString() + ")";
+      continue;
+    }
+    live_count++;
+  }
+  if (live_count < num_tablet_servers) {
+    return Status::IllegalState(error);
+  }
+  return Status::OK();
+}
+
+void TabletServerIntegrationTestBase::TearDown() {
+  STLDeleteValues(&tablet_servers_);
+}
+
+void TabletServerIntegrationTestBase::CreateClient(shared_ptr<client::KuduClient>* client) {
+  // Connect to the cluster.
+  ASSERT_OK(client::KuduClientBuilder()
+            .add_master_server_addr(cluster_->master()->bound_rpc_addr().ToString())
+            .Build(client));
+}
+
+// Create a table with a single tablet, with 'num_replicas'.
+void TabletServerIntegrationTestBase::CreateTable(const string& table_id) {
+  // The tests here make extensive use of server schemas, but we need
+  // a client schema to create the table.
+  client::KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
+  gscoped_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(table_id)
+           .schema(&client_schema)
+           .set_range_partition_columns({ "key" })
+           .num_replicas(FLAGS_num_replicas)
+           .Create());
+  ASSERT_OK(client_->OpenTable(table_id, &table_));
+}
+
+// Starts an external cluster with a single tablet and a number of replicas equal
+// to 'FLAGS_num_replicas'. The caller can pass 'ts_flags' to specify non-default
+// flags to pass to the tablet servers.
+void TabletServerIntegrationTestBase::BuildAndStart(
+    const vector<string>& ts_flags, const vector<string>& master_flags) {
+  NO_FATALS(CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags));
+  NO_FATALS(CreateClient(&client_));
+  NO_FATALS(CreateTable());
+  WaitForTSAndReplicas();
+  ASSERT_FALSE(tablet_replicas_.empty());
+  tablet_id_ = (*tablet_replicas_.begin()).first;
+}
+
+void TabletServerIntegrationTestBase::AssertAllReplicasAgree(int expected_result_count) {
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::EXACTLY, expected_result_count));
+}
+
+// Check for and restart any TS that have crashed.
+// Returns the number of servers restarted.
+int TabletServerIntegrationTestBase::RestartAnyCrashedTabletServers() {
+  int restarted = 0;
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    if (!cluster_->tablet_server(i)->IsProcessAlive()) {
+      LOG(INFO) << "TS " << i << " appears to have crashed. Restarting.";
+      cluster_->tablet_server(i)->Shutdown();
+      CHECK_OK(cluster_->tablet_server(i)->Restart());
+      restarted++;
+    }
+  }
+  return restarted;
+}
+
+// Assert that no tablet servers have crashed.
+// Tablet servers that have been manually Shutdown() are allowed.
+void TabletServerIntegrationTestBase::AssertNoTabletServersCrashed() {
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    if (cluster_->tablet_server(i)->IsShutdown()) {
+      continue;
+    }
+    ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
+        << "Tablet server " << i << " crashed";
+  }
+}
+
+}  // namespace tserver
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index 7ecfba1..f5ec9a3 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -17,528 +17,118 @@
 
 #pragma once
 
+#include <cstdint>
 #include <string>
-#include <utility>
+#include <unordered_set>
 #include <vector>
 
-#include <glog/stl_logging.h>
-
-#include "kudu/client/client-test-util.h"
-#include "kudu/client/schema-internal.h"
-#include "kudu/consensus/quorum_util.h"
-#include "kudu/gutil/strings/split.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
-#include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
-#include "kudu/master/master.pb.h"
-#include "kudu/master/master.proxy.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/tserver/tablet_server-test-base.h"
-#include "kudu/util/pb_util.h"
 #include "kudu/util/random.h"
-#include "kudu/util/test_util.h"
-
-DECLARE_int32(consensus_rpc_timeout_ms);
+#include "kudu/util/status.h"
 
-DEFINE_string(ts_flags, "", "Flags to pass through to tablet servers");
-DEFINE_string(master_flags, "", "Flags to pass through to masters");
-
-DEFINE_int32(num_tablet_servers, 3, "Number of tablet servers to start");
-DEFINE_int32(num_replicas, 3, "Number of replicas per tablet server");
+namespace kudu {
 
-#define ASSERT_ALL_REPLICAS_AGREE(count) \
-  NO_FATALS(AssertAllReplicasAgree(count))
+namespace client {
+class KuduClient;
+class KuduTable;
+} // namespace client
 
-namespace kudu {
 namespace tserver {
 
-static const int kMaxRetries = 20;
-
 // A base for tablet server integration tests.
 class TabletServerIntegrationTestBase : public TabletServerTestBase {
  public:
+  TabletServerIntegrationTestBase();
 
-  TabletServerIntegrationTestBase() : random_(SeedRandom()) {}
-
-  void SetUp() OVERRIDE {
-    TabletServerTestBase::SetUp();
-  }
+  void SetUp() override;
 
-  void AddExtraFlags(const std::string& flags_str, std::vector<std::string>* flags) {
-    if (flags_str.empty()) {
-      return;
-    }
-    std::vector<std::string> split_flags = strings::Split(flags_str, " ");
-    for (const std::string& flag : split_flags) {
-      flags->push_back(flag);
-    }
-  }
+  void AddExtraFlags(const std::string& flags_str,
+                     std::vector<std::string>* flags);
 
   void CreateCluster(const std::string& data_root_path,
                      const std::vector<std::string>& non_default_ts_flags,
                      const std::vector<std::string>& non_default_master_flags,
-                     uint32_t num_data_dirs = 1) {
-
-    LOG(INFO) << "Starting cluster with:";
-    LOG(INFO) << "--------------";
-    LOG(INFO) << FLAGS_num_tablet_servers << " tablet servers";
-    LOG(INFO) << FLAGS_num_replicas << " replicas per TS";
-    LOG(INFO) << "--------------";
-
-    cluster::ExternalMiniClusterOptions opts;
-    opts.num_tablet_servers = FLAGS_num_tablet_servers;
-    opts.data_root = GetTestPath(data_root_path);
-    opts.num_data_dirs = num_data_dirs;
-
-    // Enable exactly once semantics for tests.
-
-    // If the caller passed no flags use the default ones, where we stress consensus by setting
-    // low timeouts and frequent cache misses.
-    if (non_default_ts_flags.empty()) {
-      opts.extra_tserver_flags.emplace_back("--log_cache_size_limit_mb=10");
-      opts.extra_tserver_flags.push_back(strings::Substitute("--consensus_rpc_timeout_ms=$0",
-                                                             FLAGS_consensus_rpc_timeout_ms));
-    } else {
-      for (const std::string& flag : non_default_ts_flags) {
-        opts.extra_tserver_flags.push_back(flag);
-      }
-    }
-    for (const std::string& flag : non_default_master_flags) {
-      opts.extra_master_flags.push_back(flag);
-    }
-
-    AddExtraFlags(FLAGS_ts_flags, &opts.extra_tserver_flags);
-    AddExtraFlags(FLAGS_master_flags, &opts.extra_master_flags);
-
-    cluster_.reset(new cluster::ExternalMiniCluster(std::move(opts)));
-    ASSERT_OK(cluster_->Start());
-    inspect_.reset(new itest::ExternalMiniClusterFsInspector(cluster_.get()));
-    CreateTSProxies();
-  }
+                     uint32_t num_data_dirs = 1);
 
   // Creates TSServerDetails instance for each TabletServer and stores them
   // in 'tablet_servers_'.
-  void CreateTSProxies() {
-    CHECK(tablet_servers_.empty());
-    CHECK_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
-                                          client_messenger_,
-                                          &tablet_servers_));
-  }
+  void CreateTSProxies();
 
   // Waits that all replicas for a all tablets of 'table_id' table are online
   // and creates the tablet_replicas_ map.
-  void WaitForReplicasAndUpdateLocations(const std::string& table_id = kTableId) {
-    bool replicas_missing = true;
-    for (int num_retries = 0; replicas_missing && num_retries < kMaxRetries; num_retries++) {
-      std::unordered_multimap<std::string, itest::TServerDetails*> tablet_replicas;
-      master::GetTableLocationsRequestPB req;
-      master::GetTableLocationsResponsePB resp;
-      rpc::RpcController controller;
-      req.mutable_table()->set_table_name(table_id);
-      controller.set_timeout(MonoDelta::FromSeconds(1));
-      CHECK_OK(cluster_->master_proxy()->GetTableLocations(req, &resp, &controller));
-      CHECK_OK(controller.status());
-      if (resp.has_error()) {
-        switch (resp.error().code()) {
-          case master::MasterErrorPB::TABLET_NOT_RUNNING:
-            LOG(WARNING)<< "At least one tablet is not yet running";
-            break;
-
-          case master::MasterErrorPB::NOT_THE_LEADER:   // fallthrough
-          case master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED:
-            LOG(WARNING)<< "CatalogManager is not yet ready to serve requests";
-            break;
-
-          default:
-            FAIL() << "Response had a fatal error: "
-                   << pb_util::SecureShortDebugString(resp.error());
-            break;  // unreachable
-        }
-        SleepFor(MonoDelta::FromSeconds(1));
-        continue;
-      }
-
-      for (const master::TabletLocationsPB& location : resp.tablet_locations()) {
-        for (const master::TabletLocationsPB_ReplicaPB& replica : location.replicas()) {
-          itest::TServerDetails* server =
-              FindOrDie(tablet_servers_, replica.ts_info().permanent_uuid());
-          tablet_replicas.insert(std::pair<std::string, itest::TServerDetails*>(
-              location.tablet_id(), server));
-        }
-
-        if (tablet_replicas.count(location.tablet_id()) < FLAGS_num_replicas) {
-          LOG(WARNING)<< "Couldn't find the leader and/or replicas. Location: "
-              << pb_util::SecureShortDebugString(location);
-          replicas_missing = true;
-          SleepFor(MonoDelta::FromSeconds(1));
-          break;
-        }
-
-        replicas_missing = false;
-      }
-      if (!replicas_missing) {
-        tablet_replicas_ = tablet_replicas;
-      }
-    }
-
-    // GetTableLocations() does not guarantee that all replicas are actually
-    // running. Some may still be bootstrapping. Wait for them before
-    // returning.
-    //
-    // Just as with the above loop and its behavior once kMaxRetries is
-    // reached, the wait here is best effort only. That is, if the wait
-    // deadline expires, the resulting timeout failure is ignored.
-    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-      cluster::ExternalTabletServer* ts = cluster_->tablet_server(i);
-      int expected_tablet_count = 0;
-      for (const auto& e : tablet_replicas_) {
-        if (ts->uuid() == e.second->uuid()) {
-          expected_tablet_count++;
-        }
-      }
-      LOG(INFO) << strings::Substitute(
-          "Waiting for $0 tablets on tserver $1 to finish bootstrapping",
-          expected_tablet_count, ts->uuid());
-      cluster_->WaitForTabletsRunning(ts, expected_tablet_count,
-                                      MonoDelta::FromSeconds(20));
-    }
-  }
+  void WaitForReplicasAndUpdateLocations(const std::string& table_id = kTableId);
 
   // Returns the last committed leader of the consensus configuration. Tries to get it from master
   // but then actually tries to the get the committed consensus configuration to make sure.
-  itest::TServerDetails* GetLeaderReplicaOrNull(const std::string& tablet_id) {
-    std::string leader_uuid;
-    Status master_found_leader_result = GetTabletLeaderUUIDFromMaster(tablet_id, &leader_uuid);
-
-    // See if the master is up to date. I.e. if it does report a leader and if the
-    // replica it reports as leader is still alive and (at least thinks) its still
-    // the leader.
-    itest::TServerDetails* leader;
-    if (master_found_leader_result.ok()) {
-      leader = GetReplicaWithUuidOrNull(tablet_id, leader_uuid);
-      if (leader && itest::GetReplicaStatusAndCheckIfLeader(
-            leader, tablet_id, MonoDelta::FromMilliseconds(100)).ok()) {
-        return leader;
-      }
-    }
-
-    // The replica we got from the master (if any) is either dead or not the leader.
-    // Find the actual leader.
-    std::pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
-        tablet_replicas_.equal_range(tablet_id);
-    std::vector<itest::TServerDetails*> replicas_copy;
-    for (;range.first != range.second; ++range.first) {
-      replicas_copy.push_back((*range.first).second);
-    }
-
-    std::random_shuffle(replicas_copy.begin(), replicas_copy.end());
-    for (itest::TServerDetails* replica : replicas_copy) {
-      if (itest::GetReplicaStatusAndCheckIfLeader(
-            replica, tablet_id, MonoDelta::FromMilliseconds(100)).ok()) {
-        return replica;
-      }
-    }
-    return NULL;
-  }
+  itest::TServerDetails* GetLeaderReplicaOrNull(const std::string& tablet_id);
 
   // For the last committed consensus configuration, return the last committed
   // leader of the consensus configuration and its followers.
   Status GetTabletLeaderAndFollowers(const std::string& tablet_id,
                                      itest::TServerDetails** leader,
-                                     std::vector<itest::TServerDetails*>* followers) {
-    std::pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
-        tablet_replicas_.equal_range(tablet_id);
-    std::vector<itest::TServerDetails*> replicas;
-    for (; range.first != range.second; ++range.first) {
-      replicas.push_back((*range.first).second);
-    }
-
-    itest::TServerDetails* leader_replica = nullptr;
-    auto it = replicas.begin();
-    for (; it != replicas.end(); ++it) {
-      itest::TServerDetails* replica = *it;
-      bool found_leader_replica = false;
-      for (auto i = 0; i < kMaxRetries; ++i) {
-        if (itest::GetReplicaStatusAndCheckIfLeader(
-              replica, tablet_id, MonoDelta::FromMilliseconds(100)).ok()) {
-          leader_replica = replica;
-          found_leader_replica = true;
-          break;
-        }
-      }
-      if (found_leader_replica) {
-        break;
-      }
-    }
-    if (!leader_replica) {
-      return Status::NotFound("leader replica not found");
-    }
-
-    if (leader) {
-      *leader = leader_replica;
-    }
-    if (followers) {
-      CHECK(replicas.end() != it);
-      replicas.erase(it);
-      followers->swap(replicas);
-    }
-    return Status::OK();
-  }
+                                     std::vector<itest::TServerDetails*>* followers);
 
   Status GetLeaderReplicaWithRetries(const std::string& tablet_id,
                                      itest::TServerDetails** leader,
-                                     int max_attempts = 100) {
-    int attempts = 0;
-    while (attempts < max_attempts) {
-      *leader = GetLeaderReplicaOrNull(tablet_id);
-      if (*leader) {
-        return Status::OK();
-      }
-      attempts++;
-      SleepFor(MonoDelta::FromMilliseconds(100 * attempts));
-    }
-    return Status::NotFound("Leader replica not found");
-  }
-
-  Status GetTabletLeaderUUIDFromMaster(const std::string& tablet_id, std::string* leader_uuid) {
-    master::GetTableLocationsRequestPB req;
-    master::GetTableLocationsResponsePB resp;
-    rpc::RpcController controller;
-    controller.set_timeout(MonoDelta::FromMilliseconds(100));
-    req.mutable_table()->set_table_name(kTableId);
-
-    RETURN_NOT_OK(cluster_->master_proxy()->GetTableLocations(req, &resp, &controller));
-    for (const master::TabletLocationsPB& loc : resp.tablet_locations()) {
-      if (loc.tablet_id() == tablet_id) {
-        for (const master::TabletLocationsPB::ReplicaPB& replica : loc.replicas()) {
-          if (replica.role() == consensus::RaftPeerPB::LEADER) {
-            *leader_uuid = replica.ts_info().permanent_uuid();
-            return Status::OK();
-          }
-        }
-      }
-    }
-    return Status::NotFound("Unable to find leader for tablet", tablet_id);
-  }
+                                     int max_attempts = 100);
+
+  Status GetTabletLeaderUUIDFromMaster(const std::string& tablet_id,
+                                       std::string* leader_uuid);
 
   itest::TServerDetails* GetReplicaWithUuidOrNull(const std::string& tablet_id,
-                                           const std::string& uuid) {
-    std::pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
-        tablet_replicas_.equal_range(tablet_id);
-    for (;range.first != range.second; ++range.first) {
-      if ((*range.first).second->instance_id.permanent_uuid() == uuid) {
-        return (*range.first).second;
-      }
-    }
-    return NULL;
-  }
+                                                  const std::string& uuid);
 
   // Gets the the locations of the consensus configuration and waits until all replicas
   // are available for all tablets.
-  void WaitForTSAndReplicas(const std::string& table_id = kTableId) {
-    int num_retries = 0;
-    // make sure the replicas are up and find the leader
-    while (true) {
-      if (num_retries >= kMaxRetries) {
-        FAIL() << " Reached max. retries while looking up the config.";
-      }
-
-      Status status = cluster_->WaitForTabletServerCount(FLAGS_num_tablet_servers,
-                                                         MonoDelta::FromSeconds(5));
-      if (status.IsTimedOut()) {
-        LOG(WARNING)<< "Timeout waiting for all replicas to be online, retrying...";
-        num_retries++;
-        continue;
-      }
-      break;
-    }
-    WaitForReplicasAndUpdateLocations(table_id);
-  }
+  void WaitForTSAndReplicas(const std::string& table_id = kTableId);
 
   // Removes a set of servers from the replicas_ list.
   // Handy for controlling who to validate against after killing servers.
-  void PruneFromReplicas(const std::unordered_set<std::string>& uuids) {
-    auto iter = tablet_replicas_.begin();
-    while (iter != tablet_replicas_.end()) {
-      if (uuids.count((*iter).second->instance_id.permanent_uuid()) != 0) {
-        iter = tablet_replicas_.erase(iter);
-        continue;
-      }
-      ++iter;
-    }
-
-    for (const std::string& uuid : uuids) {
-      delete EraseKeyReturnValuePtr(&tablet_servers_, uuid);
-    }
-  }
+  void PruneFromReplicas(const std::unordered_set<std::string>& uuids);
 
   void GetOnlyLiveFollowerReplicas(const std::string& tablet_id,
-                                   std::vector<itest::TServerDetails*>* followers) {
-    followers->clear();
-    itest::TServerDetails* leader;
-    CHECK_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
-
-    std::vector<itest::TServerDetails*> replicas;
-    std::pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
-        tablet_replicas_.equal_range(tablet_id);
-    for (;range.first != range.second; ++range.first) {
-      replicas.push_back((*range.first).second);
-    }
-
-    for (itest::TServerDetails* replica : replicas) {
-      if (leader != NULL &&
-          replica->instance_id.permanent_uuid() == leader->instance_id.permanent_uuid()) {
-        continue;
-      }
-      Status s = itest::GetReplicaStatusAndCheckIfLeader(
-          replica, tablet_id, MonoDelta::FromMilliseconds(100));
-      if (s.IsIllegalState()) {
-        followers->push_back(replica);
-      }
-    }
-  }
+                                   std::vector<itest::TServerDetails*>* followers);
 
   // Return the index within 'replicas' for the replica which is farthest ahead.
   int64_t GetFurthestAheadReplicaIdx(const std::string& tablet_id,
-                                     const std::vector<itest::TServerDetails*>& replicas) {
-    std::vector<consensus::OpId> op_ids;
-    CHECK_OK(GetLastOpIdForEachReplica(tablet_id, replicas, consensus::RECEIVED_OPID,
-                                       MonoDelta::FromSeconds(10), &op_ids));
-
-    int64_t max_index = 0;
-    int max_replica_index = -1;
-    for (int i = 0; i < op_ids.size(); i++) {
-      if (op_ids[i].index() > max_index) {
-        max_index = op_ids[i].index();
-        max_replica_index = i;
-      }
-    }
-
-    CHECK_NE(max_replica_index, -1);
-
-    return max_replica_index;
-  }
-
-  Status ShutdownServerWithUUID(const std::string& uuid) {
-    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-      cluster::ExternalTabletServer* ts = cluster_->tablet_server(i);
-      if (ts->instance_id().permanent_uuid() == uuid) {
-        ts->Shutdown();
-        return Status::OK();
-      }
-    }
-    return Status::NotFound("Unable to find server with UUID", uuid);
-  }
-
-  Status RestartServerWithUUID(const std::string& uuid) {
-    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-      cluster::ExternalTabletServer* ts = cluster_->tablet_server(i);
-      if (ts->instance_id().permanent_uuid() == uuid) {
-        ts->Shutdown();
-        RETURN_NOT_OK(CheckTabletServersAreAlive(tablet_servers_.size()-1));
-        RETURN_NOT_OK(ts->Restart());
-        RETURN_NOT_OK(CheckTabletServersAreAlive(tablet_servers_.size()));
-        return Status::OK();
-      }
-    }
-    return Status::NotFound("Unable to find server with UUID", uuid);
-  }
+                                     const std::vector<itest::TServerDetails*>& replicas);
+
+  Status ShutdownServerWithUUID(const std::string& uuid);
+
+  Status RestartServerWithUUID(const std::string& uuid);
 
   // Since we're fault-tolerant we might mask when a tablet server is
   // dead. This returns Status::IllegalState() if fewer than 'num_tablet_servers'
   // are alive.
-  Status CheckTabletServersAreAlive(int num_tablet_servers) {
-    int live_count = 0;
-    std::string error = strings::Substitute("Fewer than $0 TabletServers were alive. Dead TSs: ",
-                                            num_tablet_servers);
-    rpc::RpcController controller;
-    for (const itest::TabletServerMap::value_type& entry : tablet_servers_) {
-      controller.Reset();
-      controller.set_timeout(MonoDelta::FromSeconds(10));
-      PingRequestPB req;
-      PingResponsePB resp;
-      Status s = entry.second->tserver_proxy->Ping(req, &resp, &controller);
-      if (!s.ok()) {
-        error += "\n" + entry.second->ToString() +  " (" + s.ToString() + ")";
-        continue;
-      }
-      live_count++;
-    }
-    if (live_count < num_tablet_servers) {
-      return Status::IllegalState(error);
-    }
-    return Status::OK();
-  }
-
-  virtual void TearDown() OVERRIDE {
-    STLDeleteValues(&tablet_servers_);
-  }
-
-  void CreateClient(client::sp::shared_ptr<client::KuduClient>* client) {
-    // Connect to the cluster.
-    ASSERT_OK(client::KuduClientBuilder()
-                     .add_master_server_addr(cluster_->master()->bound_rpc_addr().ToString())
-                     .Build(client));
-  }
+  Status CheckTabletServersAreAlive(int num_tablet_servers);
+
+  void TearDown() override;
+
+  void CreateClient(client::sp::shared_ptr<client::KuduClient>* client);
 
   // Create a table with a single tablet, with 'num_replicas'.
-  void CreateTable(const std::string& table_id = kTableId) {
-    // The tests here make extensive use of server schemas, but we need
-    // a client schema to create the table.
-    client::KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
-    gscoped_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
-    ASSERT_OK(table_creator->table_name(table_id)
-             .schema(&client_schema)
-             .set_range_partition_columns({ "key" })
-             .num_replicas(FLAGS_num_replicas)
-             .Create());
-    ASSERT_OK(client_->OpenTable(table_id, &table_));
-  }
+  void CreateTable(const std::string& table_id = kTableId);
 
   // Starts an external cluster with a single tablet and a number of replicas equal
   // to 'FLAGS_num_replicas'. The caller can pass 'ts_flags' to specify non-default
   // flags to pass to the tablet servers.
   void BuildAndStart(const std::vector<std::string>& ts_flags = {},
-                     const std::vector<std::string>& master_flags = {}) {
-    NO_FATALS(CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags));
-    NO_FATALS(CreateClient(&client_));
-    NO_FATALS(CreateTable());
-    WaitForTSAndReplicas();
-    ASSERT_FALSE(tablet_replicas_.empty());
-    tablet_id_ = (*tablet_replicas_.begin()).first;
-  }
-
-  void AssertAllReplicasAgree(int expected_result_count) {
-    ClusterVerifier v(cluster_.get());
-    NO_FATALS(v.CheckCluster());
-    NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::EXACTLY, expected_result_count));
-  }
+                     const std::vector<std::string>& master_flags = {});
+
+  void AssertAllReplicasAgree(int expected_result_count);
 
   // Check for and restart any TS that have crashed.
   // Returns the number of servers restarted.
-  int RestartAnyCrashedTabletServers() {
-    int restarted = 0;
-    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-      if (!cluster_->tablet_server(i)->IsProcessAlive()) {
-        LOG(INFO) << "TS " << i << " appears to have crashed. Restarting.";
-        cluster_->tablet_server(i)->Shutdown();
-        CHECK_OK(cluster_->tablet_server(i)->Restart());
-        restarted++;
-      }
-    }
-    return restarted;
-  }
+  int RestartAnyCrashedTabletServers();
 
   // Assert that no tablet servers have crashed.
   // Tablet servers that have been manually Shutdown() are allowed.
-  void AssertNoTabletServersCrashed() {
-    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-      if (cluster_->tablet_server(i)->IsShutdown()) continue;
-
-      ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
-                    << "Tablet server " << i << " crashed";
-    }
-  }
+  void AssertNoTabletServersCrashed();
 
  protected:
   gscoped_ptr<cluster::ExternalMiniCluster> cluster_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 70cf4e7..f49e9bd 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -27,6 +27,7 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -61,34 +62,38 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-namespace kudu {
-namespace tools {
-
-using client::KuduClient;
-using client::KuduClientBuilder;
-using client::KuduSchema;
-using client::KuduTableCreator;
-using client::sp::shared_ptr;
-using cluster::ExternalTabletServer;
-using consensus::COMMITTED_OPID;
-using consensus::ConsensusStatePB;
-using consensus::OpId;
-using itest::GetConsensusState;
-using itest::TabletServerMap;
-using itest::TServerDetails;
-using itest::WAIT_FOR_LEADER;
-using itest::WaitForReplicasReportedToMaster;
-using itest::WaitForServersToAgree;
-using itest::WaitUntilCommittedConfigNumVotersIs;
-using itest::WaitUntilCommittedOpIdIndexIs;
-using itest::WaitUntilTabletInState;
-using itest::WaitUntilTabletRunning;
-using pb_util::SecureDebugString;
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::COMMITTED_OPID;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::OpId;
+using kudu::itest::GetConsensusState;
+using kudu::itest::TabletServerMap;
+using kudu::itest::TServerDetails;
+using kudu::itest::WAIT_FOR_LEADER;
+using kudu::itest::WaitForReplicasReportedToMaster;
+using kudu::itest::WaitForServersToAgree;
+using kudu::itest::WaitUntilCommittedConfigNumVotersIs;
+using kudu::itest::WaitUntilCommittedOpIdIndexIs;
+using kudu::itest::WaitUntilTabletInState;
+using kudu::itest::WaitUntilTabletRunning;
+using kudu::pb_util::SecureDebugString;
 using std::deque;
 using std::string;
 using std::vector;
+using strings::Split;
 using strings::Substitute;
 
+namespace kudu {
+namespace tools {
+
 class AdminCliTest : public tserver::TabletServerIntegrationTestBase {
 };
 
@@ -1187,8 +1192,7 @@ TEST_F(AdminCliTest, TestListTables) {
     cluster_->master()->bound_rpc_addr().ToString()
   }, "", &stdout, nullptr));
 
-  vector<string> stdout_lines = strings::Split(stdout, ",",
-                                               strings::SkipEmpty());
+  vector<string> stdout_lines = Split(stdout, ",", strings::SkipEmpty());
   ASSERT_EQ(1, stdout_lines.size());
   ASSERT_EQ(Substitute("$0\n", kTableId), stdout_lines[0]);
 }
@@ -1225,8 +1229,7 @@ TEST_F(AdminCliTest, TestListTablesDetail) {
     cluster_->master()->bound_rpc_addr().ToString()
   }, "", &stdout, nullptr));
 
-  vector<string> stdout_lines = strings::Split(stdout, "\n",
-                                               strings::SkipEmpty());
+  vector<string> stdout_lines = Split(stdout, "\n", strings::SkipEmpty());
 
   // Verify multiple tables along with their tablets and replica-uuids.
   ASSERT_EQ(4, stdout_lines.size());

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/tserver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index 84daec8..938e88a 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -152,11 +152,13 @@ target_link_libraries(kudu-tserver
 
 set(TSERVER_TEST_UTIL_SRCS
   tablet_server_test_util.cc
+  tablet_server-test-base.cc
 )
 
 add_library(tserver_test_util ${TSERVER_TEST_UTIL_SRCS})
 target_link_libraries(tserver_test_util
-  tserver)
+  tserver
+  ${KUDU_MIN_TEST_LIBS})
 
 #########################################
 # tserver tests

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/tserver/tablet_copy-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h
index d8287fc..8da5f37 100644
--- a/src/kudu/tserver/tablet_copy-test-base.h
+++ b/src/kudu/tserver/tablet_copy-test-base.h
@@ -30,6 +30,7 @@
 #include "kudu/tserver/tablet_copy.pb.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 namespace kudu {
@@ -105,7 +106,7 @@ class TabletCopyTest : public TabletServerTestBase {
     const int kIncr = 50;
     LOG_TIMING(INFO, "Loading test data") {
       for (int row_id = 0; row_id < kNumLogRolls * kIncr; row_id += kIncr) {
-        InsertTestRowsRemote(0, row_id, kIncr);
+        InsertTestRowsRemote(row_id, kIncr);
         ASSERT_OK(tablet_replica_->tablet()->Flush());
         ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOver());
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/tserver/tablet_server-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-stress-test.cc b/src/kudu/tserver/tablet_server-stress-test.cc
index 183d298..195f5a7 100644
--- a/src/kudu/tserver/tablet_server-stress-test.cc
+++ b/src/kudu/tserver/tablet_server-stress-test.cc
@@ -118,7 +118,7 @@ void TSStressTest::InserterThread(int thread_idx) {
   int start_row = thread_idx * max_rows;
   for (int i = start_row; i < start_row + max_rows && stop_latch_.count() > 0; i++) {
     MonoTime before = MonoTime::Now();
-    InsertTestRowsRemote(thread_idx, i, 1);
+    InsertTestRowsRemote(i, 1);
     MonoTime after = MonoTime::Now();
     MonoDelta delta = after - before;
     histogram_->Increment(delta.ToMicroseconds());

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a7342a3/src/kudu/tserver/tablet_server-test-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
new file mode 100644
index 0000000..192e3a0
--- /dev/null
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tserver/tablet_server-test-base.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/iterator.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/rowblock.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tablet/local_tablet_writer.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/tablet_server_options.h"
+#include "kudu/tserver/tablet_server_test_util.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_graph.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
+DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
+DECLARE_bool(enable_maintenance_manager);
+DECLARE_int32(heartbeat_rpc_timeout_ms);
+
+METRIC_DEFINE_entity(test);
+
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::RpcController;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tserver {
+
+const char* TabletServerTestBase::kTableId = "TestTable";
+const char* TabletServerTestBase::kTabletId = "TestTablet";
+
+TabletServerTestBase::TabletServerTestBase()
+    : schema_(GetSimpleTestSchema()),
+      ts_test_metric_entity_(METRIC_ENTITY_test.Instantiate(
+                                 &ts_test_metric_registry_, "ts_server-test")) {
+  // Disable the maintenance ops manager since we want to trigger our own
+  // maintenance operations at predetermined times.
+  FLAGS_enable_maintenance_manager = false;
+
+  // Decrease heartbeat timeout: we keep re-trying heartbeats when a
+  // single master server fails due to a network error. Decreasing
+  // the hearbeat timeout to 1 second speeds up unit tests which
+  // purposefully specify non-running Master servers.
+  FLAGS_heartbeat_rpc_timeout_ms = 1000;
+}
+
+// Starts the tablet server, override to start it later.
+void TabletServerTestBase::SetUp() {
+  KuduTest::SetUp();
+
+  key_schema_ = schema_.CreateKeyProjection();
+  rb_.reset(new RowBuilder(schema_));
+
+  rpc::MessengerBuilder bld("Client");
+  ASSERT_OK(bld.Build(&client_messenger_));
+}
+
+void TabletServerTestBase::StartTabletServer(int num_data_dirs) {
+  CHECK(!mini_server_);
+
+  // Start server with an invalid master address, so it never successfully
+  // heartbeats, even if there happens to be a master running on this machine.
+  mini_server_.reset(new MiniTabletServer(GetTestPath("TabletServerTest-fsroot"),
+                                          HostPort("127.0.0.1", 0), num_data_dirs));
+  mini_server_->options()->master_addresses.clear();
+  mini_server_->options()->master_addresses.emplace_back("255.255.255.255", 1);
+  ASSERT_OK(mini_server_->Start());
+
+  // Set up a tablet inside the server.
+  ASSERT_OK(mini_server_->AddTestTablet(kTableId, kTabletId, schema_));
+  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId,
+                                                                     &tablet_replica_));
+
+  // Creating a tablet is async, we wait here instead of having to handle errors later.
+  ASSERT_OK(WaitForTabletRunning(kTabletId));
+
+  // Connect to it.
+  ResetClientProxies();
+}
+
+Status TabletServerTestBase::WaitForTabletRunning(const char *tablet_id) {
+  scoped_refptr<tablet::TabletReplica> tablet_replica;
+  RETURN_NOT_OK(mini_server_->server()->tablet_manager()->GetTabletReplica(tablet_id,
+                                                                           &tablet_replica));
+  RETURN_NOT_OK(tablet_replica->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
+  return tablet_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10));
+}
+
+void TabletServerTestBase::UpdateTestRowRemote(int64_t row_idx,
+                                               int32_t new_val,
+                                               TimeSeries* ts) {
+  WriteRequestPB req;
+  req.set_tablet_id(kTabletId);
+  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+  WriteResponsePB resp;
+  RpcController controller;
+  controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
+  string new_string_val(Substitute("mutated$0", row_idx));
+
+  AddTestRowToPB(RowOperationsPB::UPDATE, schema_, row_idx, new_val, new_string_val,
+                 req.mutable_row_operations());
+  ASSERT_OK(proxy_->Write(req, &resp, &controller));
+
+  SCOPED_TRACE(SecureDebugString(resp));
+  ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp);
+  ASSERT_EQ(0, resp.per_row_errors_size());
+  if (ts) {
+    ts->AddValue(1);
+  }
+}
+
+void TabletServerTestBase::ResetClientProxies() {
+  CreateTsClientProxies(mini_server_->bound_rpc_addr(),
+                        client_messenger_,
+                        &tablet_copy_proxy_,
+                        &proxy_,
+                        &admin_proxy_,
+                        &consensus_proxy_,
+                        &generic_proxy_);
+}
+
+// Inserts 'num_rows' test rows directly into the tablet (i.e not via RPC)
+void TabletServerTestBase::InsertTestRowsDirect(int64_t start_row,
+                                                uint64_t num_rows) {
+  tablet::LocalTabletWriter writer(tablet_replica_->tablet(), &schema_);
+  KuduPartialRow row(&schema_);
+  for (int64_t i = 0; i < num_rows; i++) {
+    BuildTestRow(start_row + i, &row);
+    CHECK_OK(writer.Insert(row));
+  }
+}
+
+// Inserts 'num_rows' test rows remotely into the tablet (i.e via RPC)
+// Rows are grouped in batches of 'count'/'num_batches' size.
+// Batch size defaults to 1.
+void TabletServerTestBase::InsertTestRowsRemote(
+    int64_t first_row,
+    uint64_t count,
+    uint64_t num_batches,
+    TabletServerServiceProxy* proxy,
+    string tablet_id,
+    vector<uint64_t>* write_timestamps_collector,
+    TimeSeries* ts,
+    bool string_field_defined) {
+  if (!proxy) {
+    proxy = proxy_.get();
+  }
+
+  if (num_batches == -1) {
+    num_batches = count;
+  }
+
+  WriteRequestPB req;
+  req.set_tablet_id(std::move(tablet_id));
+
+  WriteResponsePB resp;
+  RpcController controller;
+
+  RowOperationsPB* data = req.mutable_row_operations();
+
+  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+  uint64_t inserted_since_last_report = 0;
+  for (int i = 0; i < num_batches; ++i) {
+
+    // reset the controller and the request
+    controller.Reset();
+    controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
+    data->Clear();
+
+    uint64_t first_row_in_batch = first_row + (i * count / num_batches);
+    uint64_t last_row_in_batch = first_row_in_batch + count / num_batches;
+
+    for (int j = first_row_in_batch; j < last_row_in_batch; j++) {
+      string str_val = Substitute("original$0", j);
+      const char* cstr_val = str_val.c_str();
+      if (!string_field_defined) {
+        cstr_val = NULL;
+      }
+      AddTestRowWithNullableStringToPB(RowOperationsPB::INSERT, schema_, j, j,
+                                       cstr_val, data);
+    }
+    CHECK_OK(DCHECK_NOTNULL(proxy)->Write(req, &resp, &controller));
+    if (write_timestamps_collector) {
+      write_timestamps_collector->push_back(resp.timestamp());
+    }
+
+    if (resp.has_error() || resp.per_row_errors_size() > 0) {
+      LOG(FATAL) << "Failed to insert batch "
+                 << first_row_in_batch << "-" << last_row_in_batch
+                 << ": " << SecureDebugString(resp);
+    }
+
+    inserted_since_last_report += count / num_batches;
+    if ((inserted_since_last_report > 100) && ts) {
+      ts->AddValue(static_cast<double>(inserted_since_last_report));
+      inserted_since_last_report = 0;
+    }
+  }
+
+  if (ts) {
+    ts->AddValue(static_cast<double>(inserted_since_last_report));
+  }
+}
+
+// Delete specified test row range.
+void TabletServerTestBase::DeleteTestRowsRemote(int64_t first_row,
+                                                uint64_t count,
+                                                TabletServerServiceProxy* proxy,
+                                                string tablet_id) {
+  if (!proxy) {
+    proxy = proxy_.get();
+  }
+
+  WriteRequestPB req;
+  WriteResponsePB resp;
+  RpcController controller;
+
+  req.set_tablet_id(std::move(tablet_id));
+  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+  RowOperationsPB* ops = req.mutable_row_operations();
+  for (int64_t rowid = first_row; rowid < first_row + count; rowid++) {
+    AddTestKeyToPB(RowOperationsPB::DELETE, schema_, rowid, ops);
+  }
+
+  SCOPED_TRACE(SecureDebugString(req));
+  ASSERT_OK(proxy_->Write(req, &resp, &controller));
+  SCOPED_TRACE(SecureDebugString(resp));
+  ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp);
+}
+
+void TabletServerTestBase::BuildTestRow(int index, KuduPartialRow* row) {
+  ASSERT_OK(row->SetInt32(0, index));
+  ASSERT_OK(row->SetInt32(1, index * 2));
+  ASSERT_OK(row->SetStringCopy(2, StringPrintf("hello %d", index)));
+}
+
+void TabletServerTestBase::DrainScannerToStrings(const string& scanner_id,
+                                                 const Schema& projection,
+                                                 vector<string>* results,
+                                                 TabletServerServiceProxy* proxy,
+                                                 uint32_t call_seq_id) {
+  if (!proxy) {
+    proxy = proxy_.get();
+  }
+
+  RpcController rpc;
+  rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
+  ScanRequestPB req;
+  ScanResponsePB resp;
+  req.set_scanner_id(scanner_id);
+
+  // NOTE: we do not sort the results here, since this function is used
+  // by test cases which are verifying the server side's ability to
+  // do ordered scans.
+  do {
+    rpc.Reset();
+    req.set_batch_size_bytes(10000);
+    req.set_call_seq_id(call_seq_id);
+    SCOPED_TRACE(SecureDebugString(req));
+    ASSERT_OK(DCHECK_NOTNULL(proxy)->Scan(req, &resp, &rpc));
+    SCOPED_TRACE(SecureDebugString(resp));
+    ASSERT_FALSE(resp.has_error());
+
+    StringifyRowsFromResponse(projection, rpc, &resp, results);
+    call_seq_id += 1;
+  } while (resp.has_more_results());
+}
+
+void TabletServerTestBase::StringifyRowsFromResponse(
+    const Schema& projection,
+    const RpcController& rpc,
+    ScanResponsePB* resp,
+    vector<string>* results) {
+  RowwiseRowBlockPB* rrpb = resp->mutable_data();
+  Slice direct, indirect; // sidecar data buffers
+  ASSERT_OK(rpc.GetInboundSidecar(rrpb->rows_sidecar(), &direct));
+  if (rrpb->has_indirect_data_sidecar()) {
+    ASSERT_OK(rpc.GetInboundSidecar(rrpb->indirect_data_sidecar(),
+            &indirect));
+  }
+  vector<const uint8_t*> rows;
+  ASSERT_OK(ExtractRowsFromRowBlockPB(projection, *rrpb,
+                                      indirect, &direct, &rows));
+  VLOG(1) << "Round trip got " << rows.size() << " rows";
+  for (const uint8_t* row_ptr : rows) {
+    ConstContiguousRow row(&projection, row_ptr);
+    results->push_back(projection.DebugRow(row));
+  }
+}
+
+void TabletServerTestBase::ShutdownTablet() {
+  if (mini_server_.get()) {
+    // The TabletReplica must be destroyed before the TS, otherwise data
+    // blocks may be destroyed after their owning block manager.
+    tablet_replica_.reset();
+    mini_server_->Shutdown();
+    mini_server_.reset();
+  }
+}
+
+Status TabletServerTestBase::ShutdownAndRebuildTablet(int num_data_dirs) {
+  ShutdownTablet();
+
+  // Start server.
+  mini_server_.reset(new MiniTabletServer(GetTestPath("TabletServerTest-fsroot"),
+                                          HostPort("127.0.0.1", 0), num_data_dirs));
+  mini_server_->options()->master_addresses.clear();
+  mini_server_->options()->master_addresses.emplace_back("255.255.255.255", 1);
+  // this should open the tablet created on StartTabletServer()
+  RETURN_NOT_OK(mini_server_->Start());
+
+  // Don't RETURN_NOT_OK immediately -- even if we fail, we may still get a TabletReplica object
+  // which has information about the failure.
+  Status wait_status = mini_server_->WaitStarted();
+  bool found_peer = mini_server_->server()->tablet_manager()->LookupTablet(
+      kTabletId, &tablet_replica_);
+  RETURN_NOT_OK(wait_status);
+  if (!found_peer) {
+    return Status::NotFound("Tablet was not found");
+  }
+
+  // Connect to it.
+  ResetClientProxies();
+
+  // Opening a tablet is async, we wait here instead of having to handle errors later.
+  return WaitForTabletRunning(kTabletId);
+}
+
+// Verifies that a set of expected rows (key, value) is present in the tablet.
+void TabletServerTestBase::VerifyRows(const Schema& schema,
+                                      const vector<KeyValue>& expected) {
+  gscoped_ptr<RowwiseIterator> iter;
+  ASSERT_OK(tablet_replica_->tablet()->NewRowIterator(schema, &iter));
+  ASSERT_OK(iter->Init(NULL));
+
+  const size_t batch_size =
+      std::max(size_t(1), std::min(expected.size() / 10,
+                                   4*1024*1024 / schema.byte_size()));
+  Arena arena(32*1024, 256*1024);
+  RowBlock block(schema, batch_size, &arena);
+
+  int count = 0;
+  while (iter->HasNext()) {
+    ASSERT_OK_FAST(iter->NextBlock(&block));
+    RowBlockRow rb_row = block.row(0);
+    for (int i = 0; i < block.nrows(); i++) {
+      if (block.selection_vector()->IsRowSelected(i)) {
+        rb_row.Reset(&block, i);
+        VLOG(1) << "Verified row " << schema.DebugRow(rb_row);
+        ASSERT_LT(count, expected.size()) << "Got more rows than expected!";
+        EXPECT_EQ(expected[count].first, *schema.ExtractColumnFromRow<INT32>(rb_row, 0))
+            << "Key mismatch at row: " << count;
+        EXPECT_EQ(expected[count].second, *schema.ExtractColumnFromRow<INT32>(rb_row, 1))
+            << "Value mismatch at row: " << count;
+        count++;
+      }
+    }
+  }
+  ASSERT_EQ(count, expected.size());
+}
+
+// Verifies that a simple scan request fails with the specified error code/message.
+void TabletServerTestBase::VerifyScanRequestFailure(
+    const Schema& projection,
+    TabletServerErrorPB::Code expected_code,
+    const char *expected_message) {
+  ScanRequestPB req;
+  ScanResponsePB resp;
+  RpcController rpc;
+
+  NewScanRequestPB* scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(kTabletId);
+  ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
+  req.set_call_seq_id(0);
+
+  // Send the call
+  {
+    SCOPED_TRACE(SecureDebugString(req));
+    ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+    SCOPED_TRACE(SecureDebugString(resp));
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_EQ(expected_code, resp.error().code());
+    ASSERT_STR_CONTAINS(resp.error().status().message(), expected_message);
+  }
+}
+
+// Open a new scanner which scans all of the columns in the table.
+void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp) {
+  ScanRequestPB req;
+  RpcController rpc;
+
+  // Set up a new request with no predicates, all columns.
+  const Schema& projection = schema_;
+  NewScanRequestPB* scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(kTabletId);
+  ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
+  req.set_call_seq_id(0);
+  req.set_batch_size_bytes(0); // so it won't return data right away
+
+  // Send the call
+  {
+    SCOPED_TRACE(SecureDebugString(req));
+    ASSERT_OK(proxy_->Scan(req, resp, &rpc));
+    SCOPED_TRACE(SecureDebugString(*resp));
+    ASSERT_FALSE(resp->has_error());
+    ASSERT_TRUE(resp->has_more_results());
+  }
+}
+
+} // namespace tserver
+} // namespace kudu


Mime
View raw message