kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject kudu git commit: heavy-update-compaction-itest
Date Wed, 17 Jan 2018 00:44:52 GMT
Repository: kudu
Updated Branches:
  refs/heads/master dc0621d76 -> b0d27d3f5


heavy-update-compaction-itest

This is an integration test which simulates an extremely update-heavy
workload. It was written while trying to repro KUDU-2251, and
subsequently led to discovering the issues in KUDU-2253.

Change-Id: Ie2e30c63e1fedafe1eeb672346700ee8c5e2bb2c
Reviewed-on: http://gerrit.cloudera.org:8080/9037
Reviewed-by: David Ribeiro Alves <davidralves@gmail.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b0d27d3f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b0d27d3f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b0d27d3f

Branch: refs/heads/master
Commit: b0d27d3f5e3a26cdf2d987d3f82270de4ed4376f
Parents: dc0621d
Author: Dan Burkert <danburkert@apache.org>
Authored: Thu Jan 11 16:28:38 2018 -0800
Committer: Dan Burkert <danburkert@apache.org>
Committed: Wed Jan 17 00:44:34 2018 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../heavy-update-compaction-itest.cc            | 244 +++++++++++++++++++
 2 files changed, 245 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b0d27d3f/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index a3f2294..1a10a7d 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -74,6 +74,7 @@ ADD_KUDU_TEST(exactly_once_writes-itest)
 ADD_KUDU_TEST(flex_partitioning-itest TIMEOUT 1800)
 ADD_KUDU_TEST(full_stack-insert-scan-test RUN_SERIAL true)
 ADD_KUDU_TEST(fuzz-itest RUN_SERIAL true)
+ADD_KUDU_TEST(heavy-update-compaction-itest RUN_SERIAL true)
 ADD_KUDU_TEST(linked_list-test RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(log-rolling-itest)
 ADD_KUDU_TEST(master_cert_authority-itest RESOURCE_LOCK "master-rpc-ports")

http://git-wip-us.apache.org/repos/asf/kudu/blob/b0d27d3f/src/kudu/integration-tests/heavy-update-compaction-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/heavy-update-compaction-itest.cc b/src/kudu/integration-tests/heavy-update-compaction-itest.cc
new file mode 100644
index 0000000..8cd6e89
--- /dev/null
+++ b/src/kudu/integration-tests/heavy-update-compaction-itest.cc
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/row_result.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/scan_predicate.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/client/value.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int32(flush_threshold_mb);
+
+DEFINE_int32(rounds, 100,
+             "How many rounds of updates will be performed. More rounds make the "
+             "test take longer, but add more data and stress.");
+DEFINE_int32(rows, 500,
+             "How many base rows in the update set. More rounds results in more rowsets "
+             "with updates.");
+DEFINE_int32(num_columns, 5, "The number of value columns to test.");
+DEFINE_int32(scan_timeout_ms, 120 * 1000,
+             "Sets the scanner timeout. It may be necessary to increase the "
+             "timeout if the number of rounds or rows is increased.");
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace tablet {
+
+using client::KuduClient;
+using client::KuduClientBuilder;
+using client::KuduColumnSchema;
+using client::KuduInsert;
+using client::KuduPredicate;
+using client::KuduRowResult;
+using client::KuduScanner;
+using client::KuduSchema;
+using client::KuduSchemaBuilder;
+using client::KuduSession;
+using client::KuduTable;
+using client::KuduTableCreator;
+using client::KuduUpdate;
+using client::KuduValue;
+using client::sp::shared_ptr;
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
+
+class HeavyUpdateCompactionITest : public KuduTest {
+ protected:
+
+  const char* const kTableName = "heavy-update-compaction-test";
+
+  HeavyUpdateCompactionITest()
+      : rand_(SeedRandom()) {
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
+
+    for (int i = 0; i < FLAGS_num_columns; i++) {
+      string name("val_");
+      name.push_back('a' + i);
+      b.AddColumn(name)->Type(KuduColumnSchema::STRING)->NotNull();
+    }
+
+    CHECK_OK(b.Build(&schema_));
+  }
+
+  void SetUp() override {
+    // Encourage frequent flushes.
+    FLAGS_flush_threshold_mb = 2;
+    KuduTest::SetUp();
+  }
+
+  void CreateTable() {
+    NO_FATALS(InitCluster());
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    ASSERT_OK(table_creator->table_name(kTableName)
+             .schema(&schema_)
+             .set_range_partition_columns({})
+             .num_replicas(1)
+             .Create());
+    ASSERT_OK(client_->OpenTable(kTableName, &table_));
+  }
+
+  void TearDown() override {
+    if (cluster_) {
+      cluster_->Shutdown();
+    }
+    KuduTest::TearDown();
+  }
+
+  shared_ptr<KuduSession> CreateSession() {
+    shared_ptr<KuduSession> session = client_->NewSession();
+    session->SetTimeoutMillis(30000);
+    CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+    return session;
+  }
+
+  // Sets the passed values on the row.
+  void MakeRow(int64_t key, KuduPartialRow* row) {
+    CHECK_OK(row->SetInt64(0, key));
+
+    faststring s;
+    s.resize(1024 * 8);
+    for (int idx = 1; idx <= FLAGS_num_columns; idx++) {
+      RandomString(s.data(), s.size(), &rand_);
+      CHECK_OK(row->SetStringCopy(idx, s));
+    }
+  }
+
+  void InitCluster() {
+    // Start mini-cluster with 1 tserver.
+    cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
+    ASSERT_OK(cluster_->Start());
+    KuduClientBuilder client_builder;
+    client_builder.add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str());
+    ASSERT_OK(client_builder.Build(&client_));
+  }
+
+  KuduSchema schema_;
+  std::shared_ptr<InternalMiniCluster> cluster_;
+  shared_ptr<KuduTable> table_;
+  shared_ptr<KuduClient> client_;
+  Random rand_;
+};
+
+// Repro for KUDU-2231, which is an integer overflow in the RowSetInfo class.
+// This test creates a rowset with a very large amount of REDO delta files
+// (the bug was a 2GiB overflow), as well as mixed inserts. This causes the
+// maintanance manager to schedule rowset compactions, which sometimes
+// reproduces the overflow.
+TEST_F(HeavyUpdateCompactionITest, TestHeavyUpdateCompaction) {
+  NO_FATALS(CreateTable());
+  shared_ptr<KuduSession> session = CreateSession();
+
+  // Insert an initial batch of rows.
+  LOG_TIMING(INFO, "inserting") {
+    for (int64_t key = 0; key < FLAGS_rows; key++) {
+      unique_ptr<KuduInsert> insert(table_->NewInsert());
+      MakeRow(key, insert->mutable_row());
+      ASSERT_OK(session->Apply(insert.release()));
+    }
+    ASSERT_OK(session->Flush());
+  }
+
+  // Vector of flattened final values of the updated rows. We keep these around
+  // in order to verify the table is consistent during the scan step.
+  vector<string> final_values;
+
+  // Update the rows.
+  LOG_TIMING(INFO, "updating") {
+    for (int64_t round = 0; round < FLAGS_rounds; round++) {
+      for (int64_t key = 0; key < FLAGS_rows; key++) {
+        unique_ptr<KuduUpdate> update(table_->NewUpdate());
+        MakeRow(key, update->mutable_row());
+
+        if (round + 1 == FLAGS_rounds) {
+          for (int idx = 1; idx <= FLAGS_num_columns; idx++) {
+            Slice val;
+            ASSERT_OK(update->mutable_row()->GetString(idx, &val));
+            final_values.emplace_back(val.ToString());
+          }
+        }
+
+        ASSERT_OK(session->Apply(update.release()));
+      }
+
+      // Insert an additional row so that more rowsets are created, and the MM
+      // will run the rowset compaction calculations.
+      unique_ptr<KuduInsert> insert(table_->NewInsert());
+      MakeRow(FLAGS_rows + round, insert->mutable_row());
+      ASSERT_OK(session->Apply(insert.release()));
+      ASSERT_OK(session->Flush());
+    }
+  }
+
+  // Scan the updated rows and ensure the final values are present.
+  KuduScanner scanner(table_.get());
+  ASSERT_OK(scanner.SetFaultTolerant());
+  ASSERT_OK(scanner.AddConjunctPredicate(
+      table_->NewComparisonPredicate(
+        "key", KuduPredicate::LESS, KuduValue::FromInt(FLAGS_rows))));
+
+  // Walking the updates can take a long time.
+  scanner.SetTimeoutMillis(FLAGS_scan_timeout_ms);
+
+  LOG_TIMING(INFO, "scanning") {
+    ASSERT_OK(scanner.Open());
+    vector<KuduRowResult> rows;
+    size_t final_values_offset = 0;
+    while (scanner.HasMoreRows()) {
+      ASSERT_OK(scanner.NextBatch(&rows));
+      for (const auto& row : rows) {
+        for (int idx = 1; idx <= FLAGS_num_columns; idx++) {
+          ASSERT_GT(final_values.size(), final_values_offset);
+          Slice actual_val;
+          ASSERT_OK(row.GetString(idx, &actual_val));
+          EXPECT_EQ(actual_val, final_values[final_values_offset++]);
+        }
+      }
+    }
+  }
+}
+
+} // namespace tablet
+} // namespace kudu


Mime
View raw message