kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] branch master updated: [master] KUDU-3036 reject DDLs which would lead to DoS
Date Tue, 14 Jan 2020 06:45:38 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b4c3172  [master] KUDU-3036 reject DDLs which would lead to DoS
b4c3172 is described below

commit b4c317299f39ae0530cb10702e540cd22393656c
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Mon Jan 13 17:09:39 2020 -0800

    [master] KUDU-3036 reject DDLs which would lead to DoS
    
    With this patch, masters reject DDL requests which would lead to DoS
    situations described in KUDU-3036.
    
    Added a test scenario to reproduce KUDU-3036.  In the scenario, the size
    of incoming AlterTable RPC is much less than the size of would-be-sent
    UpdateConsensus RPC to update the system catalog correspondingly.
    
    Change-Id: I7c212b635313a0aec5d9ebf8329319bae47f2dd2
    Reviewed-on: http://gerrit.cloudera.org:8080/14999
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../integration-tests/master_replication-itest.cc  | 154 ++++++++++++++++++++-
 src/kudu/master/sys_catalog-test.cc                |   2 +-
 src/kudu/master/sys_catalog.cc                     |  40 +++++-
 src/kudu/master/sys_catalog.h                      |   3 +
 src/kudu/rpc/serialization.cc                      |   2 +-
 src/kudu/rpc/transfer.cc                           |   8 ++
 6 files changed, 203 insertions(+), 6 deletions(-)

diff --git a/src/kudu/integration-tests/master_replication-itest.cc b/src/kudu/integration-tests/master_replication-itest.cc
index b259e5a..a36f907 100644
--- a/src/kudu/integration-tests/master_replication-itest.cc
+++ b/src/kudu/integration-tests/master_replication-itest.cc
@@ -19,6 +19,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -29,20 +30,24 @@
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/replica_management.pb.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
@@ -54,17 +59,23 @@
 
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 
+METRIC_DECLARE_counter(sys_catalog_oversized_write_requests);
+
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
 using kudu::client::sp::shared_ptr;
-using kudu::consensus::ReplicaManagementInfoPB;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
+using kudu::consensus::ReplicaManagementInfoPB;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -349,6 +360,147 @@ TEST_F(MasterReplicationTest, TestConnectToFollowerMasterOnly) {
   EXPECT_LE(successes, 1);
 }
 
+// In this test, a Kudu master receives RPC under the maximum size limit,
+// however the corresponding update on the system tablet would be greater than.
+class MasterReplicationAndRpcSizeLimitTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(Prepare());
+  }
+
+ protected:
+  static constexpr const char* const kKeyColumnName = "key";
+  static constexpr auto kNumMasters = 3;
+  static constexpr auto kNumTabletServers = 3;
+  static constexpr auto kReplicationFactor = 3;
+  // In case of standard builds, shorten the Raft hearbeat and election timeout
+  // intervals to speed up the test.
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+  static constexpr auto kHbIntervalMs = 500;
+#else
+  static constexpr auto kHbIntervalMs = 200;
+#endif
+  static constexpr auto kMaxMissedHbs = 2;
+
+  Status Prepare() {
+    const vector<string> ts_extra_flags = {
+      // Set custom timings for Raft heartbeats and heard-from-leader timeouts.
+      Substitute("--raft_heartbeat_interval_ms=$0", kHbIntervalMs),
+      Substitute("--leader_failure_max_missed_heartbeat_periods=$0", kMaxMissedHbs),
+      // This test scenario creates many replicas per tablet server and causes
+      // multiple re-elections, so it's necessary to accommodate for spikes in
+      // Raft heartbeat traffic coming from one tablet server to another,
+      // especially in case of sanitizer builds.
+      "--rpc_service_queue_length=200",
+    };
+    const vector<string> master_extra_flags = {
+      // Set custom timings for Raft heartbeats and heard-from-leader timeouts.
+      Substitute("--raft_heartbeat_interval_ms=$0", kHbIntervalMs),
+      Substitute("--leader_failure_max_missed_heartbeat_periods=$0", kMaxMissedHbs),
+      // Turn off the validator for the --rpc_max_message_size flag since this
+      // scenario uses non-conventional setting for the flag.
+      "--rpc_max_message_size_enable_validation=false",
+      // Set the maximum size for the master RPC to 64 KiByte.
+      Substitute("--rpc_max_message_size=$0", 64 * 1024),
+      // The updates on the system catalog tablet might be accumulated by Raft
+      // in various scenarios due to connectivity, leadership changes, etc.
+      // Substracting an extra 1K to account for extra fields while wrapping
+      // messages to replicate into UpdateConsensus RPC.
+      Substitute("--consensus_max_batch_size_bytes=$0", 63 * 1024),
+    };
+
+    ExternalMiniClusterOptions opts;
+    opts.num_masters = kNumMasters;
+    opts.num_tablet_servers = kNumTabletServers;
+    opts.extra_master_flags = master_extra_flags;
+    opts.extra_tserver_flags = ts_extra_flags;
+    cluster_.reset(new ExternalMiniCluster(std::move(opts)));
+    RETURN_NOT_OK(cluster_->Start());
+    return cluster_->CreateClient(nullptr, &client_);
+  }
+
+  // Create a table named 'table_name' with pre-defined structure.
+  Status CreateTable(const string& table_name, int replication_factor) {
+    // In this test scenario, long dimension labels are used to make
+    // the corresponding update on the system tablet longer than the incoming
+    // RPC to master (e.g. a tablet report or AlterTable request). In real life,
+    // it's possible to achieve the same by other means, but it would be
+    // necessary to create many more tablet replicas in the cluster.
+    static const char* const kLabelSuffix =
+        "_very_looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+        "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+        "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+        "ooooooooooooooooooooooooooooooooooooooooooooooooonooooog_label_suffix";
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
+    b.AddColumn("string_column")->Type(KuduColumnSchema::STRING);
+    RETURN_NOT_OK(b.Build(&schema_));
+
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    const auto s = table_creator->table_name(table_name)
+        .schema(&schema_)
+        .set_range_partition_columns({ kKeyColumnName })
+        .add_hash_partitions({ kKeyColumnName }, 10)
+        .num_replicas(replication_factor)
+        .dimension_label(table_name + kLabelSuffix)
+        .Create();
+    return s;
+  }
+
+  Status GetMetricValue(const MetricPrototype& metric_proto, int64_t* value) {
+    int leader_idx;
+    RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    return itest::GetInt64Metric(
+          cluster_->master(leader_idx)->bound_http_hostport(),
+          &METRIC_ENTITY_server,
+          nullptr,
+          &metric_proto,
+          "value",
+          value);
+  }
+
+  unique_ptr<cluster::ExternalMiniCluster> cluster_;
+  client::sp::shared_ptr<client::KuduClient> client_;
+  KuduSchema schema_;
+};
+
+// Make sure leader master rejects AlterTable requests which result in updates
+// on the system tablet which it would not be able to push to its followers
+// due to the limit set by the --rpc_max_message_size flag.
+// This scenario simulates conditions described in KUDU-3036.
+TEST_F(MasterReplicationAndRpcSizeLimitTest, AlterTable) {
+  const string table_name = "table_to_alter";
+  ASSERT_OK(CreateTable(table_name, kReplicationFactor));
+
+  // After fresh start, there should be no rejected writes to the system catalog
+  // tablet yet.
+  int64_t val;
+  ASSERT_OK(GetMetricValue(METRIC_sys_catalog_oversized_write_requests, &val));
+  ASSERT_EQ(0, val);
+
+  unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+  alterer->DropRangePartition(schema_.NewRow(), schema_.NewRow());
+  for (auto i = 0; i < 50; ++i) {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(lower->SetInt64(kKeyColumnName, 10 * i));
+    ASSERT_OK(upper->SetInt64(kKeyColumnName, 10 * (i + 1)));
+    alterer->AddRangePartition(lower.release(), upper.release());
+  }
+  auto s = alterer->timeout(MonoDelta::FromSeconds(30))->Alter();
+
+  // The DDL attempt above (i.e. the Alter() call) produces an oversided write
+  // request to the system catalog tablet. The request should have been rejected
+  // and the corresponding metric incremented.
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "too large for current setting of the "
+                                    "--rpc_max_message_size flag");
+  ASSERT_OK(GetMetricValue(METRIC_sys_catalog_oversized_write_requests, &val));
+  ASSERT_EQ(1, val);
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
 
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/sys_catalog-test.cc b/src/kudu/master/sys_catalog-test.cc
index 5f2f107..c67740c 100644
--- a/src/kudu/master/sys_catalog-test.cc
+++ b/src/kudu/master/sys_catalog-test.cc
@@ -408,7 +408,7 @@ TEST_F(SysCatalogTest, AttemptOverwriteCertAuthorityInfo) {
   const Status s = master_->catalog_manager()->sys_catalog()->
       AddCertAuthorityEntry(ca_entry);
   ASSERT_TRUE(s.IsCorruption()) << s.ToString();
-  ASSERT_EQ("Corruption: One or more rows failed to write", s.ToString());
+  ASSERT_EQ("Corruption: failed to write one or more rows", s.ToString());
 }
 
 } // namespace master
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index fb5e47a..e82f20f 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -94,6 +94,15 @@ DEFINE_double(sys_catalog_fail_during_write, 0.0,
               "Fraction of the time when system table writes will fail");
 TAG_FLAG(sys_catalog_fail_during_write, hidden);
 
+DECLARE_int64(rpc_max_message_size);
+
+METRIC_DEFINE_counter(server, sys_catalog_oversized_write_requests,
+                      "System Catalog Oversized Write Requests",
+                      kudu::MetricUnit::kRequests,
+                      "Number of oversized write requests to the system "
+                      "catalog tablet rejected since start",
+                      kudu::MetricLevel::kWarn);
+
 using kudu::consensus::ConsensusMetadata;
 using kudu::consensus::ConsensusMetadataManager;
 using kudu::consensus::ConsensusStatePB;
@@ -106,6 +115,7 @@ using kudu::tablet::Tablet;
 using kudu::tablet::TabletReplica;
 using kudu::tserver::WriteRequestPB;
 using kudu::tserver::WriteResponsePB;
+using std::back_inserter;
 using std::function;
 using std::set;
 using std::shared_ptr;
@@ -124,6 +134,20 @@ class Message;
 namespace kudu {
 namespace master {
 
+namespace  {
+
+// An utility function to get the upper limit for the size of a write request
+// into the system tablet represented by WriteRequestPB. The write request is
+// eventually wrapped into ConsensusRequestPB when a leader master propagates
+// the updates to the followers. Wrapping WriteRequestPB into ConsensusRequestPB
+// requires adding extra fields, and 1KB looks like a reasonable estimate
+// for the upper limit of the added delta.
+size_t GetMaxWriteRequestSize() {
+  return std::max<int64_t>(0, FLAGS_rpc_max_message_size - 1024);
+}
+
+} // anonymous namespace
+
 static const char* const kSysCatalogTableColType = "entry_type";
 static const char* const kSysCatalogTableColId = "entry_id";
 static const char* const kSysCatalogTableColMetadata = "metadata";
@@ -162,6 +186,8 @@ SysCatalogTable::SysCatalogTable(Master* master,
       master_(master),
       cmeta_manager_(new ConsensusMetadataManager(master_->fs_manager())),
       leader_cb_(std::move(leader_cb)) {
+  oversized_write_requests_ = master_->metric_entity()->FindOrCreateCounter(
+      &METRIC_sys_catalog_oversized_write_requests);
 }
 
 SysCatalogTable::~SysCatalogTable() {
@@ -216,7 +242,7 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
                                   peer_addrs_from_opts.end(),
                                   peer_addrs_from_disk.begin(),
                                   peer_addrs_from_disk.end(),
-                                  std::back_inserter(symm_diff));
+                                  back_inserter(symm_diff));
     if (!symm_diff.empty()) {
       string msg = Substitute(
           "on-disk master list ($0) and provided master list ($1) differ. "
@@ -451,9 +477,17 @@ Status SysCatalogTable::WaitUntilRunning() {
 }
 
 Status SysCatalogTable::SyncWrite(const WriteRequestPB& req) {
+  DCHECK(req.has_tablet_id());
+  DCHECK(req.has_schema());
   MAYBE_RETURN_FAILURE(FLAGS_sys_catalog_fail_during_write,
                        Status::RuntimeError(kInjectedFailureStatusMsg));
-
+  const size_t request_size = req.ByteSizeLong();
+  if (request_size > GetMaxWriteRequestSize()) {
+    oversized_write_requests_->Increment();
+    return Status::InvalidArgument(
+        Substitute("write request ($0 bytes in size) is too large for current "
+                   "setting of the --rpc_max_message_size flag", request_size));
+  }
   CountDownLatch latch(1);
   WriteResponsePB resp;
   gscoped_ptr<tablet::TransactionCompletionCallback> txn_callback(
@@ -476,7 +510,7 @@ Status SysCatalogTable::SyncWrite(const WriteRequestPB& req) {
       LOG(WARNING) << Substitute(
           "row $0: $1", error.row_index(), StatusFromPB(error.error()).ToString());
     }
-    return Status::Corruption("One or more rows failed to write");
+    return Status::Corruption("failed to write one or more rows");
   }
   return Status::OK();
 }
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index b213ad3..54b5fd6 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -38,6 +38,7 @@
 
 namespace kudu {
 
+class Counter;
 class FsManager;
 class MetricRegistry;
 class RowBlockRow;
@@ -323,6 +324,8 @@ class SysCatalogTable {
   ElectedLeaderCallback leader_cb_;
 
   consensus::RaftPeerPB local_peer_pb_;
+
+  scoped_refptr<Counter> oversized_write_requests_;
 };
 
 } // namespace master
diff --git a/src/kudu/rpc/serialization.cc b/src/kudu/rpc/serialization.cc
index 473a817..78ea295 100644
--- a/src/kudu/rpc/serialization.cc
+++ b/src/kudu/rpc/serialization.cc
@@ -69,7 +69,7 @@ void SerializeMessage(const MessageLite& message, faststring* param_buf,
   // this is a safe limitation.
   CHECK_LE(total_size, std::numeric_limits<uint32_t>::max());
 
-  if (total_size > FLAGS_rpc_max_message_size) {
+  if (PREDICT_FALSE(total_size > FLAGS_rpc_max_message_size)) {
     LOG(WARNING) << Substitute("Serialized $0 ($1 bytes) is larger than the maximum
configured "
                                "RPC message size ($2 bytes). "
                                "Sending anyway, but peer may reject the data.",
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index 8f84011..b268e77 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -36,6 +36,11 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/net/socket.h"
 
+DEFINE_bool(rpc_max_message_size_enable_validation, true,
+            "Whether to turn off validation for --rpc_max_message_size flag. "
+            "This is a test-only flag.");
+TAG_FLAG(rpc_max_message_size_enable_validation, unsafe);
+
 DEFINE_int64(rpc_max_message_size, (50 * 1024 * 1024),
              "The maximum size of a message that any RPC that the server will accept. "
              "Must be at least 1MB.");
@@ -43,6 +48,9 @@ TAG_FLAG(rpc_max_message_size, advanced);
 TAG_FLAG(rpc_max_message_size, runtime);
 
 static bool ValidateMaxMessageSize(const char* flagname, int64_t value) {
+  if (!FLAGS_rpc_max_message_size_enable_validation) {
+    return true;
+  }
   if (value < 1 * 1024 * 1024) {
     LOG(ERROR) << flagname << " must be at least 1MB.";
     return false;


Mime
View raw message