kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: [tools] Add 'run_cleanup' option for 'kudu perf loadgen'
Date Tue, 14 Jan 2020 05:09:07 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ef0d3bd  [tools] Add 'run_cleanup' option for 'kudu perf loadgen'
ef0d3bd is described below

commit ef0d3bdd7ff823a7c84f072b3e4476a864d1c17f
Author: Yingchun Lai <405403881@qq.com>
AuthorDate: Wed Jan 1 10:32:25 2020 +0800

    [tools] Add 'run_cleanup' option for 'kudu perf loadgen'
    
    Add 'run_cleanup' option to provide a way to cleanup test data written to
    the table, especially an existing user table.
    
    Change-Id: I1e75adde434bac5e88151361655526b91f327b4c
    Reviewed-on: http://gerrit.cloudera.org:8080/14958
    Tested-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/tools/kudu-tool-test.cc   |  64 ++++++++++++--
 src/kudu/tools/tool_action_perf.cc | 168 +++++++++++++++++++++++--------------
 2 files changed, 160 insertions(+), 72 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index e478413..88684f8 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -22,8 +22,8 @@
 #include <cstdint>
 #include <cstdio>
 #include <fstream>
-#include <iterator>
 #include <initializer_list>
+#include <iterator>
 #include <map>
 #include <memory>
 #include <set>
@@ -430,7 +430,7 @@ class ToolTest : public KuduTest {
     vector<pair<string, string>> expected_columns;
     if (columns.empty()) {
       // If we ran with an empty projection, we'll actually get all the columns.
-      expected_columns = {{ "int32", "key" },
+      expected_columns = {{ "int.*", "key" },
                           { "int32", "int_val" },
                           { "string", "string_val" }};
     } else {
@@ -2258,6 +2258,55 @@ TEST_F(ToolTest, TestLoadgenDatabaseName) {
   ASSERT_STR_CONTAINS(out, "foo.loadgen_auto_");
 }
 
+TEST_F(ToolTest, TestLoadgenKeepAutoTableAndData) {
+  // Run 'perf loadgen' and keep data.
+  // Create a single tablet by setting table_num_hash_partitions and table_num_range_partitions
+  // to 1, then we can easily check sequential rows in the tablet by RunScanTableCheck.
+  NO_FATALS(RunLoadgen(1, { "--keep_auto_table=true",
+                            "--table_num_hash_partitions=1",
+                            "--table_num_range_partitions=1" }));
+  string auto_table_name;
+  NO_FATALS(RunActionStdoutString(Substitute("table list $0",
+      HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs())), &auto_table_name));
+
+  // Data is kept.
+  NO_FATALS(RunScanTableCheck(auto_table_name, "", 0, 1999, {}));
+
+  // Run 'perf loadgen' again with sequential mode and delete new generated data.
+  {
+    const vector<string> args = {
+      "perf",
+      "loadgen",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      Substitute("--table_name=$0", auto_table_name),
+      "--seq_start=2000",
+      "--use_random=false",
+      "--run_cleanup=true",
+    };
+    ASSERT_OK(RunKuduTool(args));
+  }
+
+  // Old data is kept and new data is deleted.
+  NO_FATALS(RunScanTableCheck(auto_table_name, "", 0, 1999, {}));
+
+  // Run 'perf loadgen' again with random mode and delete new generated data.
+  {
+    const vector<string> args = {
+      "perf",
+      "loadgen",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      Substitute("--table_name=$0", auto_table_name),
+      "--seq_start=2000",
+      "--use_random=true",
+      "--run_cleanup=true",
+    };
+    ASSERT_OK(RunKuduTool(args));
+  }
+
+  // Old data is kept and new data is deleted.
+  NO_FATALS(RunScanTableCheck(auto_table_name, "", 0, 1999, {}));
+}
+
 TEST_F(ToolTest, TestLoadgenHmsEnabled) {
   ExternalMiniClusterOptions opts;
   opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE;
@@ -2299,21 +2348,20 @@ TEST_F(ToolTest, TestLoadgenAutoGenTablePartitioning) {
   const MonoDelta kTimeout = MonoDelta::FromMilliseconds(10);
   TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
 
-  // Test with misconfigured partitioning. This should fail because we disallow
-  // creating tables with "no" partitioning.
+  // Now let's try running with a single partition.
   vector<string> args(base_args);
   args.emplace_back("--table_num_range_partitions=1");
   args.emplace_back("--table_num_hash_partitions=1");
-  Status s = RunKuduTool(args);
-  ASSERT_FALSE(s.ok());
+  int expected_tablets = 1;
+  ASSERT_OK(RunKuduTool(args));
+  ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
 
   // Now let's try running with a couple range partitions.
   args = base_args;
   args.emplace_back("--table_num_range_partitions=2");
   args.emplace_back("--table_num_hash_partitions=1");
-  int expected_tablets = 2;
+  expected_tablets += 2;
   ASSERT_OK(RunKuduTool(args));
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
   ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
 
   // Now let's try running with only hash partitions.
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 845f79f..c31f19d 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -234,6 +234,7 @@ using kudu::client::KuduClient;
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduError;
 using kudu::client::KuduInsert;
+using kudu::client::KuduDelete;
 using kudu::client::KuduScanner;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
@@ -250,6 +251,7 @@ using kudu::log::LogAnchorRegistry;
 using kudu::tablet::RowIteratorOptions;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletMetadata;
+using kudu::client::KuduWriteOperation;
 using std::accumulate;
 using std::cerr;
 using std::cout;
@@ -302,6 +304,9 @@ DEFINE_bool(run_scan, false,
             "the inserted rows matches the expected number. If enabled, "
             "the scan is run only if no errors were encountered "
             "while inserting the generated rows.");
+DEFINE_bool(run_cleanup, false,
+            "Whether to run post-insertion deletion to reset the existing "
+            "table as before.");
 DEFINE_uint64(seq_start, 0,
               "Initial value for the generator in sequential mode. "
               "This is useful when running multiple times against already "
@@ -358,7 +363,7 @@ namespace {
 
 bool ValidatePartitionFlags() {
   int num_tablets = FLAGS_table_num_hash_partitions * FLAGS_table_num_range_partitions;
-  if (num_tablets <= 1) {
+  if (num_tablets < 1) {
     LOG(ERROR) << Substitute("Invalid partitioning: --table_num_hash_partitions=$0
"
                   "--table_num_range_partitions=$1, must specify more than one partition
"
                   "for auto-generated tables", FLAGS_table_num_hash_partitions,
@@ -432,21 +437,31 @@ string Generator::Next() {
 // Utility function that determines the range of generated values each thread
 // should insert across if inserting in non-random mode. In random mode, this
 // is used to generate different RNG seeds per thread.
-int64_t SpanPerThread(int num_columns) {
-  CHECK_LT(0, num_columns);
+int64_t SpanPerThread(int num_key_columns) {
+  CHECK_LT(0, num_key_columns);
   CHECK_LT(0, FLAGS_num_threads);
   const auto per_thread_limit = numeric_limits<int64_t>::max() /
-      (num_columns * FLAGS_num_threads);
+      (num_key_columns * FLAGS_num_threads);
   return (FLAGS_num_rows_per_thread < 0 ||
           FLAGS_num_rows_per_thread > per_thread_limit)
       ? numeric_limits<int64_t>::max() / FLAGS_num_threads
-      : FLAGS_num_rows_per_thread * num_columns;
+      : FLAGS_num_rows_per_thread * num_key_columns;
 }
 
-Status GenerateRowData(Generator* gen, KuduPartialRow* row,
-                       const string& fixed_string) {
+Status GenerateRowData(Generator* key_gen, Generator* value_gen, KuduPartialRow* row,
+                       const string& fixed_string, KuduWriteOperation::Type op_type)
{
   const vector<ColumnSchema>& columns(row->schema()->columns());
-  for (size_t idx = 0; idx < columns.size(); ++idx) {
+  DCHECK(op_type == KuduWriteOperation::Type::INSERT ||
+      op_type == KuduWriteOperation::Type::DELETE);
+  size_t gen_column_count = op_type == KuduWriteOperation::Type::INSERT ?
+      columns.size() : row->schema()->num_key_columns();
+  // Seperate key Generator and value Generator, so we can generate the same primary keys
+  // when perform DELETE operations.
+  Generator* gen = key_gen;
+  for (size_t idx = 0; idx < gen_column_count; ++idx) {
+    if (idx == row->schema()->num_key_columns()) {
+      gen = value_gen;
+    }
     const TypeInfo* tinfo = columns[idx].type_info();
     switch (tinfo->type()) {
       case BOOL:
@@ -478,7 +493,7 @@ Status GenerateRowData(Generator* gen, KuduPartialRow* row,
         break;
       case DECIMAL32:
         RETURN_NOT_OK(row->SetUnscaledDecimal(idx, std::min(gen->Next<int32_t>(),
-                                                              kMaxUnscaledDecimal32)));
+                                                            kMaxUnscaledDecimal32)));
         break;
       case DECIMAL64:
         RETURN_NOT_OK(row->SetUnscaledDecimal(idx, std::min(gen->Next<int64_t>(),
@@ -520,7 +535,8 @@ mutex cerr_lock;
 
 void GeneratorThread(
     const client::sp::shared_ptr<KuduClient>& client, const string& table_name,
-    size_t gen_idx, Status* status, uint64_t* row_count, uint64_t* err_count) {
+    size_t gen_idx, KuduWriteOperation::Type op_type,
+    Status* status, uint64_t* row_count, uint64_t* err_count) {
 
   const Generator::Mode gen_mode = FLAGS_use_random ? Generator::MODE_RAND
                                                     : Generator::MODE_SEQ;
@@ -549,14 +565,29 @@ void GeneratorThread(
 
     // Planning for non-intersecting ranges for different generator threads
     // in sequential generation mode.
-    const int64_t gen_span = SpanPerThread(table->schema().num_columns());
+    const int64_t gen_span = SpanPerThread(KuduSchema::ToSchema(table->schema()).num_key_columns());
     const int64_t gen_seed = gen_idx * gen_span + gen_seq_start;
-    Generator gen(gen_mode, gen_seed, FLAGS_string_len);
+    Generator key_gen(gen_mode, gen_seed, FLAGS_string_len);
+    Generator value_gen(gen_mode, gen_seed, FLAGS_string_len);
     for (; num_rows_per_gen < 0 || idx < num_rows_per_gen; ++idx) {
-      unique_ptr<KuduInsert> insert_op(table->NewInsert());
-      RETURN_NOT_OK(GenerateRowData(&gen, insert_op->mutable_row(),
-                                    FLAGS_string_fixed));
-      RETURN_NOT_OK(session->Apply(insert_op.release()));
+      switch (op_type) {
+        case KuduWriteOperation::Type::INSERT: {
+          unique_ptr<KuduInsert> insert_op(table->NewInsert());
+          RETURN_NOT_OK(GenerateRowData(&key_gen, &value_gen, insert_op->mutable_row(),
+                                        FLAGS_string_fixed, op_type));
+          RETURN_NOT_OK(session->Apply(insert_op.release()));
+          break;
+        }
+        case KuduWriteOperation::Type::DELETE: {
+          unique_ptr<KuduDelete> delete_op(table->NewDelete());
+          RETURN_NOT_OK(GenerateRowData(&key_gen, nullptr, delete_op->mutable_row(),
+                                        FLAGS_string_fixed, op_type));
+          RETURN_NOT_OK(session->Apply(delete_op.release()));
+          break;
+        }
+        default:
+          LOG(FATAL) << "Unknown op_type=" << op_type;
+      }
       if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows ==
0) {
         session->FlushAsync(nullptr);
       }
@@ -588,35 +619,67 @@ void GeneratorThread(
   }
 }
 
-Status GenerateInsertRows(const client::sp::shared_ptr<KuduClient>& client,
-                          const string& table_name,
-                          uint64_t* total_row_count,
-                          uint64_t* total_err_count) {
+Status GenerateWriteRows(const client::sp::shared_ptr<KuduClient>& client,
+                         const string& table_name,
+                         KuduWriteOperation::Type op_type,
+                         uint64_t* num_rows_generated = nullptr) {
+  DCHECK(op_type == KuduWriteOperation::Type::INSERT ||
+      op_type == KuduWriteOperation::Type::DELETE);
 
   const size_t gen_num = FLAGS_num_threads;
-  vector<Status> status(gen_num);
+  vector<Status> statuses(gen_num);
   vector<uint64_t> row_count(gen_num, 0);
   vector<uint64_t> err_count(gen_num, 0);
   vector<thread> threads;
+  Stopwatch sw(Stopwatch::ALL_THREADS);
+  sw.start();
   for (size_t i = 0; i < gen_num; ++i) {
-    threads.emplace_back(&GeneratorThread, client, table_name, i,
-                         &status[i], &row_count[i], &err_count[i]);
+    threads.emplace_back(&GeneratorThread, client, table_name, i, op_type,
+                         &statuses[i], &row_count[i], &err_count[i]);
   }
   for (auto& t : threads) {
     t.join();
   }
-  if (total_row_count != nullptr) {
-    *total_row_count = accumulate(row_count.begin(), row_count.end(), 0UL);
-  }
-  if (total_err_count != nullptr) {
-    *total_err_count = accumulate(err_count.begin(), err_count.end(), 0UL);
+  sw.stop();
+  const double time_total_ms = sw.elapsed().wall_millis();
+  uint64_t total_row_count = accumulate(row_count.begin(), row_count.end(), 0UL);
+  uint64_t total_err_count = accumulate(err_count.begin(), err_count.end(), 0UL);
+  cout << endl
+      << (op_type == KuduWriteOperation::Type::INSERT ? "INSERT" : "DELETE") <<
" report" << endl
+      << "    rows total: " << total_row_count << endl
+      << "    time total: " << time_total_ms << " ms" << endl;
+  if (total_row_count != 0 && total_err_count == 0) {
+    // Report per-row timings only if there were no write errors, otherwise the
+    // readings do not make much sense.
+    cout << "  time per row: " << time_total_ms / total_row_count << "
ms" << endl;
   }
-  // Return first non-OK error status, if any, as a result.
-  const auto it = find_if(status.begin(), status.end(),
+
+  // Make first non-OK error status, if any, as a result.
+  Status status;
+  const auto it = find_if(statuses.begin(), statuses.end(),
                           [&](const Status& s) { return !s.ok(); });
-  if (it != status.end()) {
-    return *it;
+  if (it != statuses.end()) {
+    status = *it;
+  }
+  if (!status.ok() || total_err_count != 0) {
+    string err_str;
+    if (!status.ok()) {
+      SubstituteAndAppend(&err_str, status.ToString());
+    }
+    if (total_err_count != 0) {
+      if (!status.ok()) {
+        SubstituteAndAppend(&err_str, "; ");
+      }
+      SubstituteAndAppend(&err_str, "Encountered $0 write operation errors",
+                          total_err_count);
+    }
+    return Status::RuntimeError(err_str);
   }
+
+  if (num_rows_generated) {
+    *num_rows_generated = total_row_count;
+  }
+
   return Status::OK();
 }
 
@@ -663,12 +726,12 @@ Status TestLoadGenerator(const RunnerContext& context) {
     if (FLAGS_table_num_replicas > 0) {
       table_creator->num_replicas(FLAGS_table_num_replicas);
     }
-    if (FLAGS_table_num_range_partitions > 1) {
+    if (FLAGS_table_num_range_partitions >= 1) {
       // Split the generated span for a sequential workload evenly across all
       // tablets. In case we're inserting in random mode, use unbounded range
       // partitioning, so the table has key coverage of the entire keyspace.
       const int64_t total_inserted_span =
-          SpanPerThread(schema.num_columns()) * FLAGS_num_threads;
+          SpanPerThread(KuduSchema::ToSchema(schema).num_key_columns()) * FLAGS_num_threads;
       const int64_t span_per_range =
           total_inserted_span / FLAGS_table_num_range_partitions;
       table_creator->set_range_partition_columns({ kKeyColumnName });
@@ -689,36 +752,8 @@ Status TestLoadGenerator(const RunnerContext& context) {
        << "table '" << table_name << "'" << endl;
 
   uint64_t num_rows_generated = 0;
-  uint64_t num_rows_write_error = 0;
-  Stopwatch sw(Stopwatch::ALL_THREADS);
-  sw.start();
-  Status status = GenerateInsertRows(client, table_name,
-                                     &num_rows_generated,
-                                     &num_rows_write_error);
-  sw.stop();
-  const double time_total_ms = sw.elapsed().wall_millis();
-  cout << endl << "Generator report" << endl
-       << "  time total  : " << time_total_ms << " ms" << endl;
-  if (num_rows_generated != 0 && num_rows_write_error == 0) {
-    // Report per-row timings only if there were no write errors, otherwise the
-    // readings do not make much sense.
-    cout << "  time per row: " << time_total_ms / num_rows_generated <<
" ms"
-         << endl;
-  }
-  if (!status.ok() || num_rows_write_error != 0) {
-    string err_str;
-    if (!status.ok()) {
-      SubstituteAndAppend(&err_str, status.ToString());
-    }
-    if (num_rows_write_error != 0) {
-      if (!status.ok()) {
-        SubstituteAndAppend(&err_str,  "; ");
-      }
-      SubstituteAndAppend(&err_str, "Encountered $0 write operation errors",
-                          num_rows_write_error);
-    }
-    return Status::RuntimeError(err_str);
-  }
+  RETURN_NOT_OK(GenerateWriteRows(client, table_name, KuduWriteOperation::Type::INSERT,
+                                  &num_rows_generated));
 
   if (FLAGS_run_scan) {
     // In case if no write errors encountered, run a table scan to make sure
@@ -735,6 +770,10 @@ Status TestLoadGenerator(const RunnerContext& context) {
     }
   }
 
+  if (FLAGS_run_cleanup) {
+    RETURN_NOT_OK(GenerateWriteRows(client, table_name, KuduWriteOperation::Type::DELETE));
+  }
+
   if (is_auto_table && !FLAGS_keep_auto_table) {
     cout << "Dropping auto-created table '" << table_name << "'" <<
endl;
     // Drop the table which was automatically created to run the test.
@@ -842,6 +881,7 @@ unique_ptr<Mode> BuildPerfMode() {
       .AddOptionalParameter("keep_auto_table")
       .AddOptionalParameter("num_rows_per_thread")
       .AddOptionalParameter("num_threads")
+      .AddOptionalParameter("run_cleanup")
       .AddOptionalParameter("run_scan")
       .AddOptionalParameter("seq_start")
       .AddOptionalParameter("show_first_n_errors")


Mime
View raw message