kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] branch master updated: KUDU-3011 p3: mechanism to quiesce scans
Date Wed, 08 Jan 2020 01:21:57 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4619473  KUDU-3011 p3: mechanism to quiesce scans
4619473 is described below

commit 4619473b9a83dce2b6a60a79802fde33c8686238
Author: Andrew Wong <awong@cloudera.com>
AuthorDate: Tue Jan 7 13:40:54 2020 -0800

    KUDU-3011 p3: mechanism to quiesce scans
    
    This prevents new scans from being started on a tablet server when it is
    quiescing. The scans are retried elsewhere.
    
    I added some tests to check the behavior with various non-fault-tolerant
    scenarios to ensure we can rely on quiescing to move scans away from
    specific tablet servers without failing read workloads. To do this, I
    added some additional configuration to TestWorkload.
    
    Change-Id: Idedca29f40b0a8576245be0f7cfad2be29db4135
    Reviewed-on: http://gerrit.cloudera.org:8080/14986
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../tablet_server_quiescing-itest.cc               | 156 +++++++++++++++++++++
 src/kudu/integration-tests/test_workload.cc        |  16 ++-
 src/kudu/integration-tests/test_workload.h         |  17 +++
 src/kudu/tserver/tablet_server.h                   |   4 +
 src/kudu/tserver/tablet_service.cc                 |  18 +++
 5 files changed, 207 insertions(+), 4 deletions(-)

diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
index 60ec1af..5c18a71 100644
--- a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -26,6 +26,9 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/shared_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -34,6 +37,7 @@
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/util/metrics.h"
@@ -46,9 +50,17 @@ DECLARE_bool(enable_leader_failure_detection);
 DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(scanner_default_batch_size_bytes);
 DECLARE_int32(raft_heartbeat_interval_ms);
 
+using kudu::client::KuduClient;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduScanner;
+using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
+using kudu::tserver::MiniTabletServer;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -75,6 +87,26 @@ class TServerQuiescingITest : public MiniClusterITestBase {
       }
     });
   }
+
+  // Creates a read-write workload that doesn't use a fault-tolerant scanner.
+  // Not using a FT scanner means:
+  // - Remote errors when writing won't automatically be retried, so we must
+  //   permit these if we want to run the workload while restarting a tserver.
+  //   We may get a remote error if the tserver is reachable but shutting down
+  //   (this isn't the case in production where we just kill the process).
+  // - The number of rows returned may not be consistent with what we've
+  //   already written.
+  unique_ptr<TestWorkload> CreateFaultIntolerantRWWorkload() const {
+    unique_ptr<TestWorkload> rw_workload(new TestWorkload(cluster_.get()));
+    rw_workload->set_scanner_fault_tolerant(false);
+    rw_workload->set_num_replicas(cluster_->num_tablet_servers());
+    rw_workload->set_num_read_threads(3);
+    rw_workload->set_num_write_threads(3);
+    rw_workload->set_verify_num_rows(false);
+    // NOTE: this doesn't affect scans at all.
+    rw_workload->set_remote_error_allowed(true);
+    return rw_workload;
+  }
 };
 
 // Test that a quiescing server won't trigger an election by natural means (i.e.
@@ -82,6 +114,7 @@ class TServerQuiescingITest : public MiniClusterITestBase {
 TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) {
   const int kNumReplicas = 3;
   const int kNumTablets = 10;
+  // This test will change leaders frequently, so set a low Raft heartbeat.
   FLAGS_raft_heartbeat_interval_ms = 100;
   NO_FATALS(StartCluster(kNumReplicas));
 
@@ -149,6 +182,86 @@ TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) {
   });
 }
 
+// Test that when we're quiescing a tserver, we don't accept new scan requests,
+// Even with non-FT scanners, if we restart a quiescing tserver that has
+// completed its scans, on-going read workloads are not affected.
+TEST_F(TServerQuiescingITest, TestDoesntAllowNewScans) {
+  const int kNumReplicas = 3;
+  // Set a tiny batch size to encourage many batches for a single scan. This
+  // will emulate longer-running scans.
+  FLAGS_scanner_default_batch_size_bytes = 1;
+  NO_FATALS(StartCluster(kNumReplicas));
+
+  // Set up a table with some replicas and start a workload without fault
+  // tolerant scans.
+  auto rw_workload = CreateFaultIntolerantRWWorkload();
+  rw_workload->Setup();
+  rw_workload->Start();
+
+  // Wait for some scans to begin.
+  auto* ts = cluster_->mini_tablet_server(0);
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_LT(0, ts->server()->scanner_manager()->CountActiveScanners());
+  });
+
+  // Mark a tablet server as quiescing. It should eventually stop serving scans.
+  *ts->server()->mutable_quiescing() = true;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, ts->server()->scanner_manager()->CountActiveScanners());
+  });
+
+  // Stopping the quiesced tablet server shouldn't affect the ongoing read
+  // workload, since there are no scans on that server.
+  ts->Shutdown();
+  NO_FATALS(rw_workload->StopAndJoin());
+}
+
+// Test that when we're doing a leader-only non-FT scan and we quiesce the
+// leaders, we eventually stop seeing scans on that server.
+TEST_F(TServerQuiescingITest, TestDoesntAllowNewScansLeadersOnly) {
+  const int kNumReplicas = 3;
+  // This test will change leaders frequently, so set a low Raft heartbeat.
+  FLAGS_raft_heartbeat_interval_ms = 100;
+  // Set a tiny batch size to encourage many batches for a single scan. This
+  // will emulate long-running scans.
+  FLAGS_scanner_default_batch_size_bytes = 1;
+  NO_FATALS(StartCluster(kNumReplicas));
+
+  // Set up a table with some replicas.
+  auto rw_workload = CreateFaultIntolerantRWWorkload();
+  rw_workload->set_scanner_selection(client::KuduClient::LEADER_ONLY);
+  rw_workload->Setup();
+  rw_workload->Start();
+
+  // Inject a bunch of leader elections to stress leadership changes.
+  FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+  FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
+
+  // Wait for the scans to begin.
+  MiniTabletServer* ts = nullptr;
+  ASSERT_EVENTUALLY([&] {
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      auto* tserver = cluster_->mini_tablet_server(i);
+      if (tserver->server()->scanner_manager()->CountActiveScanners() > 0) {
+        ts = tserver;
+        break;
+      }
+    }
+    ASSERT_NE(nullptr, ts);
+  });
+
+  // Mark one of the tablet servers with scans as quiescing. It should
+  // eventually stop serving scans.
+  *ts->server()->mutable_quiescing() = true;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, ts->server()->scanner_manager()->CountActiveScanners());
+  });
+  ts->Shutdown();
+
+  // Stopping the quiesced tablet server shouldn't affect the ongoing read workload.
+  NO_FATALS(rw_workload->StopAndJoin());
+}
+
 class TServerQuiescingParamITest : public TServerQuiescingITest,
                                    public testing::WithParamInterface<int> {};
 
@@ -226,6 +339,49 @@ TEST_P(TServerQuiescingParamITest, TestNoElectionsForNewReplicas) {
   });
 }
 
+// Test that scans are opaquely retried when sent to quiescing servers. If all
+// servers are quiescing, the scans will eventually time out; if any are not
+// quiescing, all scans will be directed at the non-quiescing server.
+TEST_P(TServerQuiescingParamITest, TestScansRetry) {
+  const int kNumReplicas = GetParam();
+  NO_FATALS(StartCluster(kNumReplicas));
+  string table_name;
+  {
+    auto rw_workload = CreateFaultIntolerantRWWorkload();
+    rw_workload->Setup();
+    rw_workload->Start();
+    table_name = rw_workload->table_name();
+    while (rw_workload->rows_inserted() < 10000) {
+      SleepFor(MonoDelta::FromMilliseconds(100));
+    }
+    NO_FATALS(rw_workload->StopAndJoin());
+  }
+  // Quiesce every tablet server.
+  for (int i = 0; i < kNumReplicas; i++) {
+    *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+  }
+  // This should result in a failure to start scanning anything.
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(table_name, &table));
+  KuduScanner scanner(table.get());
+  ASSERT_OK(scanner.SetTimeoutMillis(1000));
+  {
+    Status s = scanner.Open();
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "exceeded configured scan timeout");
+  }
+
+  // Now stop quiescing one of the servers. Our scans should succeed. Set a
+  // small batch size so our scanner remains active.
+  FLAGS_scanner_default_batch_size_bytes = 1;
+  auto* ts = cluster_->mini_tablet_server(0)->server();
+  *ts->mutable_quiescing() = false;
+  KuduScanBatch batch;
+  ASSERT_OK(scanner.Open());
+  ASSERT_OK(scanner.NextBatch(&batch));
+  ASSERT_EQ(1, ts->scanner_manager()->CountActiveScanners());
+}
+
 INSTANTIATE_TEST_CASE_P(NumReplicas, TServerQuiescingParamITest, ::testing::Values(1, 3));
 
 } // namespace itest
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 539ceec..3695b83 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -70,10 +70,13 @@ TestWorkload::TestWorkload(MiniCluster* cluster)
     write_batch_size_(50),
     write_interval_millis_(0),
     write_timeout_millis_(20000),
+    fault_tolerant_(true),
+    verify_num_rows_(true),
     timeout_allowed_(false),
     not_found_allowed_(false),
     network_error_allowed_(false),
     remote_error_allowed_(false),
+    selection_(client::KuduClient::CLOSEST_REPLICA),
     schema_(KuduSchema::FromSchema(GetSimpleTestSchema())),
     num_replicas_(3),
     num_tablets_(1),
@@ -219,12 +222,17 @@ void TestWorkload::ReadThread() {
 
     KuduScanner scanner(table.get());
     CHECK_OK(scanner.SetTimeoutMillis(read_timeout_millis_));
-    CHECK_OK(scanner.SetFaultTolerant());
+    CHECK_OK(scanner.SetSelection(selection_));
+    if (fault_tolerant_) {
+      CHECK_OK(scanner.SetFaultTolerant());
+    }
 
     // Note: when INSERT_RANDOM_ROWS_WITH_DELETE is used, ReadThread doesn't really verify
     // anything except that a scan works.
-    int64_t expected_row_count = write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE ?
-                                 0 : rows_inserted_.Load();
+    int64_t expected_min_rows = 0;
+    if (write_pattern_ != INSERT_RANDOM_ROWS_WITH_DELETE && verify_num_rows_) {
+      expected_min_rows = rows_inserted_.Load();
+    }
     size_t row_count = 0;
 
     CHECK_OK(scanner.Open());
@@ -234,7 +242,7 @@ void TestWorkload::ReadThread() {
       row_count += batch.NumRows();
     }
 
-    CHECK_GE(row_count, expected_row_count);
+    CHECK_GE(row_count, expected_min_rows);
   }
 }
 
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 00a35af..1f3eaa9 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -61,6 +61,14 @@ class TestWorkload {
   explicit TestWorkload(cluster::MiniCluster* cluster);
   ~TestWorkload();
 
+  void set_scanner_fault_tolerant(bool fault_tolerant) {
+    fault_tolerant_ = fault_tolerant;
+  }
+
+  void set_scanner_selection(client::KuduClient::ReplicaSelection selection) {
+    selection_ = selection;
+  }
+
   void set_payload_bytes(int n) {
     payload_bytes_ = n;
   }
@@ -119,6 +127,12 @@ class TestWorkload {
     not_found_allowed_ = allowed;
   }
 
+  // Set whether we should attempt to verify the number of rows when scanning.
+  // This sort of error may be indicative of a stale read.
+  void set_verify_num_rows(bool should_verify) {
+    verify_num_rows_ = should_verify;
+  }
+
   // Whether per-row errors with Status::AlreadyPresent() are allowed.
   // By default this triggers a check failure.
   void set_already_present_allowed(bool allowed) {
@@ -245,12 +259,15 @@ class TestWorkload {
   int write_batch_size_;
   int write_interval_millis_;
   int write_timeout_millis_;
+  bool fault_tolerant_;
+  bool verify_num_rows_;
   bool timeout_allowed_;
   bool not_found_allowed_;
   bool already_present_allowed_;
   bool network_error_allowed_;
   bool remote_error_allowed_;
   WritePattern write_pattern_;
+  client::KuduClient::ReplicaSelection selection_;
   client::KuduSchema schema_;
 
   int num_replicas_;
diff --git a/src/kudu/tserver/tablet_server.h b/src/kudu/tserver/tablet_server.h
index 90b4b45..5d802c8 100644
--- a/src/kudu/tserver/tablet_server.h
+++ b/src/kudu/tserver/tablet_server.h
@@ -82,6 +82,10 @@ class TabletServer : public kserver::KuduServer {
     return maintenance_manager_.get();
   }
 
+  bool quiescing() const {
+    return quiescing_;
+  }
+
   std::atomic<bool>* mutable_quiescing() {
     return &quiescing_;
   }
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 15d11c1..6b49af3 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -358,6 +358,18 @@ bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>&
replica,
   return true;
 }
 
+template<class RespClass>
+bool CheckTabletServerQuiescingOrRespond(const TabletServer* server, RespClass* resp,
+                                         rpc::RpcContext* context) {
+  if (PREDICT_FALSE(server->quiescing())) {
+    Status s = Status::ServiceUnavailable("Tablet server is quiescing");
+    SetupErrorAndRespond(resp->mutable_error(), s,
+                         TabletServerErrorPB::TABLET_NOT_RUNNING, context);
+    return false;
+  }
+  return true;
+}
+
 Status GetTabletRef(const scoped_refptr<TabletReplica>& replica,
                     shared_ptr<Tablet>* tablet,
                     TabletServerErrorPB::Code* error_code) {
@@ -1696,6 +1708,9 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   bool has_more_results = false;
   TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
   if (req->has_new_scan_request()) {
+    if (!CheckTabletServerQuiescingOrRespond(server_, resp, context)) {
+      return;
+    }
     const NewScanRequestPB& scan_pb = req->new_scan_request();
     scoped_refptr<TabletReplica> replica;
     if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), scan_pb.tablet_id(),
resp,
@@ -2007,6 +2022,9 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
     }
   }
   if (req->has_new_request()) {
+    if (!CheckTabletServerQuiescingOrRespond(server_, resp, context)) {
+      return;
+    }
     const NewScanRequestPB& new_req = req->new_request();
     scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
     scoped_refptr<TabletReplica> replica;


Mime
View raw message