kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ha...@apache.org
Subject kudu git commit: KUDU-1704: add c++ client support for READ_YOUR_WRITES mode
Date Sun, 04 Mar 2018 20:29:16 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 726514abf -> 5047f091d


KUDU-1704: add c++ client support for READ_YOUR_WRITES mode

Change-Id: I34214245a78aed172a28fbdb395ff5bccd0fc0e1
Reviewed-on: http://gerrit.cloudera.org:8080/8823
Reviewed-by: David Ribeiro Alves <davidralves@gmail.com>
Tested-by: Hao Hao <hao.hao@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5047f091
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5047f091
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5047f091

Branch: refs/heads/master
Commit: 5047f091d312698b5e1d2d9db75fbccfde24f687
Parents: 726514a
Author: hahao <hao.hao@cloudera.com>
Authored: Tue Dec 12 16:35:49 2017 -0800
Committer: Hao Hao <hao.hao@cloudera.com>
Committed: Sun Mar 4 20:24:43 2018 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc                  | 161 ++++++++++++-------
 src/kudu/client/client.cc                       |  16 +-
 src/kudu/client/client.h                        |  14 +-
 src/kudu/client/scan_configuration.cc           |   5 +
 src/kudu/client/scan_configuration.h            |  19 +++
 src/kudu/client/scan_token-internal.cc          |  25 ++-
 src/kudu/client/scan_token-test.cc              |  78 ++++++---
 src/kudu/client/scanner-internal.cc             |  36 ++++-
 src/kudu/integration-tests/consistency-itest.cc | 143 +++++++++++++++-
 .../integration-tests/delete_table-itest.cc     |   5 +-
 10 files changed, 406 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 4a44761..531b5b1 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -537,14 +537,17 @@ class ClientTest : public KuduTest {
   }
 
   int CountRowsFromClient(KuduTable* table) {
-    return CountRowsFromClient(table, kNoBound, kNoBound);
+    return CountRowsFromClient(table, KuduScanner::READ_LATEST, kNoBound, kNoBound);
   }
 
-  int CountRowsFromClient(KuduTable* table, int32_t lower_bound, int32_t upper_bound) {
-    return CountRowsFromClient(table, KuduClient::LEADER_ONLY, lower_bound, upper_bound);
+  int CountRowsFromClient(KuduTable* table, KuduScanner::ReadMode scan_mode,
+                          int32_t lower_bound, int32_t upper_bound) {
+    return CountRowsFromClient(table, KuduClient::LEADER_ONLY, scan_mode,
+                               lower_bound, upper_bound);
   }
 
   int CountRowsFromClient(KuduTable* table, KuduClient::ReplicaSelection selection,
+                          KuduScanner::ReadMode scan_mode,
                           int32_t lower_bound, int32_t upper_bound) {
     KuduScanner scanner(table);
     CHECK_OK(scanner.SetSelection(selection));
@@ -559,7 +562,7 @@ class ClientTest : public KuduTest {
                    client_table_->NewComparisonPredicate("key", KuduPredicate::LESS_EQUAL,
                                                          KuduValue::FromInt(upper_bound))));
     }
-
+    CHECK_OK(scanner.SetReadMode(scan_mode));
     CHECK_OK(scanner.Open());
 
     int count = 0;
@@ -865,7 +868,19 @@ TEST_F(ClientTest, TestScanAtFutureTimestamp) {
   ASSERT_STR_CONTAINS(s.ToString(), "in the future.");
 }
 
-TEST_F(ClientTest, TestScanMultiTablet) {
+const KuduScanner::ReadMode read_modes[] = {
+    KuduScanner::READ_LATEST,
+    KuduScanner::READ_AT_SNAPSHOT,
+    KuduScanner::READ_YOUR_WRITES,
+};
+
+class ScanMultiTabletParamTest :
+    public ClientTest,
+    public ::testing::WithParamInterface<KuduScanner::ReadMode> {
+};
+// Tests multiple tablet scan with all scan modes.
+TEST_P(ScanMultiTabletParamTest, Test) {
+  const KuduScanner::ReadMode read_mode = GetParam();
   // 5 tablets, each with 10 rows worth of space.
   static const int kTabletsNum = 5;
   static const int kRowsPerTablet = 10;
@@ -900,18 +915,17 @@ TEST_F(ClientTest, TestScanMultiTablet) {
   }
   FlushSessionOrDie(session);
 
-  // Run through various scans.
   ASSERT_EQ(4 * (kTabletsNum - 1),
-            CountRowsFromClient(table.get(), kNoBound, kNoBound));
-  ASSERT_EQ(3, CountRowsFromClient(table.get(), kNoBound, 15));
-  ASSERT_EQ(9, CountRowsFromClient(table.get(), 27, kNoBound));
-  ASSERT_EQ(3, CountRowsFromClient(table.get(), 0, 15));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 10));
-  ASSERT_EQ(4, CountRowsFromClient(table.get(), 0, 20));
-  ASSERT_EQ(8, CountRowsFromClient(table.get(), 0, 30));
-  ASSERT_EQ(6, CountRowsFromClient(table.get(), 14, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 30, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), kTabletsNum * kRowsPerTablet,
+            CountRowsFromClient(table.get(), read_mode, kNoBound, kNoBound));
+  ASSERT_EQ(3, CountRowsFromClient(table.get(), read_mode, kNoBound, 15));
+  ASSERT_EQ(9, CountRowsFromClient(table.get(), read_mode, 27, kNoBound));
+  ASSERT_EQ(3, CountRowsFromClient(table.get(), read_mode, 0, 15));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 10));
+  ASSERT_EQ(4, CountRowsFromClient(table.get(), read_mode, 0, 20));
+  ASSERT_EQ(8, CountRowsFromClient(table.get(), read_mode, 0, 30));
+  ASSERT_EQ(6, CountRowsFromClient(table.get(), read_mode, 14, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 30, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kTabletsNum * kRowsPerTablet,
                                    kNoBound));
 
   // Update every other row
@@ -926,16 +940,16 @@ TEST_F(ClientTest, TestScanMultiTablet) {
 
   // Check all counts the same (make sure updates don't change # of rows)
   ASSERT_EQ(4 * (kTabletsNum - 1),
-            CountRowsFromClient(table.get(), kNoBound, kNoBound));
-  ASSERT_EQ(3, CountRowsFromClient(table.get(), kNoBound, 15));
-  ASSERT_EQ(9, CountRowsFromClient(table.get(), 27, kNoBound));
-  ASSERT_EQ(3, CountRowsFromClient(table.get(), 0, 15));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 10));
-  ASSERT_EQ(4, CountRowsFromClient(table.get(), 0, 20));
-  ASSERT_EQ(8, CountRowsFromClient(table.get(), 0, 30));
-  ASSERT_EQ(6, CountRowsFromClient(table.get(), 14, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 30, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), kTabletsNum * kRowsPerTablet,
+            CountRowsFromClient(table.get(), read_mode, kNoBound, kNoBound));
+  ASSERT_EQ(3, CountRowsFromClient(table.get(), read_mode, kNoBound, 15));
+  ASSERT_EQ(9, CountRowsFromClient(table.get(), read_mode, 27, kNoBound));
+  ASSERT_EQ(3, CountRowsFromClient(table.get(), read_mode, 0, 15));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 10));
+  ASSERT_EQ(4, CountRowsFromClient(table.get(), read_mode, 0, 20));
+  ASSERT_EQ(8, CountRowsFromClient(table.get(), read_mode, 0, 30));
+  ASSERT_EQ(6, CountRowsFromClient(table.get(), read_mode, 14, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 30, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kTabletsNum * kRowsPerTablet,
                                    kNoBound));
 
   // Delete half the rows
@@ -950,16 +964,16 @@ TEST_F(ClientTest, TestScanMultiTablet) {
 
   // Check counts changed accordingly
   ASSERT_EQ(2 * (kTabletsNum - 1),
-            CountRowsFromClient(table.get(), kNoBound, kNoBound));
-  ASSERT_EQ(2, CountRowsFromClient(table.get(), kNoBound, 15));
-  ASSERT_EQ(4, CountRowsFromClient(table.get(), 27, kNoBound));
-  ASSERT_EQ(2, CountRowsFromClient(table.get(), 0, 15));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 10));
-  ASSERT_EQ(2, CountRowsFromClient(table.get(), 0, 20));
-  ASSERT_EQ(4, CountRowsFromClient(table.get(), 0, 30));
-  ASSERT_EQ(2, CountRowsFromClient(table.get(), 14, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 30, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), kTabletsNum * kRowsPerTablet,
+            CountRowsFromClient(table.get(), read_mode, kNoBound, kNoBound));
+  ASSERT_EQ(2, CountRowsFromClient(table.get(), read_mode, kNoBound, 15));
+  ASSERT_EQ(4, CountRowsFromClient(table.get(), read_mode, 27, kNoBound));
+  ASSERT_EQ(2, CountRowsFromClient(table.get(), read_mode, 0, 15));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 10));
+  ASSERT_EQ(2, CountRowsFromClient(table.get(), read_mode, 0, 20));
+  ASSERT_EQ(4, CountRowsFromClient(table.get(), read_mode, 0, 30));
+  ASSERT_EQ(2, CountRowsFromClient(table.get(), read_mode, 14, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 30, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kTabletsNum * kRowsPerTablet,
                                    kNoBound));
 
   // Delete rest of rows
@@ -973,18 +987,20 @@ TEST_F(ClientTest, TestScanMultiTablet) {
   FlushSessionOrDie(session);
 
   // Check counts changed accordingly
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), kNoBound, kNoBound));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), kNoBound, 15));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 27, kNoBound));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 15));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 10));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 20));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 14, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), 30, 30));
-  ASSERT_EQ(0, CountRowsFromClient(table.get(), kTabletsNum * kRowsPerTablet,
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kNoBound, kNoBound));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kNoBound, 15));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 27, kNoBound));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 15));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 10));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 20));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 14, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 30, 30));
+  ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kTabletsNum * kRowsPerTablet,
                                    kNoBound));
 }
+INSTANTIATE_TEST_CASE_P(Params, ScanMultiTabletParamTest,
+                        testing::ValuesIn(read_modes));
 
 TEST_F(ClientTest, TestScanEmptyTable) {
   KuduScanner scanner(client_table_.get());
@@ -1191,6 +1207,26 @@ TEST_F(ClientTest, TestRowPtrNoRedaction) {
   }
 }
 
+TEST_F(ClientTest, TestScanYourWrites) {
+  // Insert the rows
+  ASSERT_NO_FATAL_FAILURE(InsertTestRows(client_table_.get(),
+                                         FLAGS_test_scan_num_rows));
+
+  // Verify that no matter which replica is selected, client could
+  // achieve read-your-writes/read-your-reads.
+  uint64_t count = CountRowsFromClient(client_table_.get(),
+                                       KuduClient::LEADER_ONLY,
+                                       KuduScanner::READ_YOUR_WRITES,
+                                       kNoBound, kNoBound);
+  ASSERT_EQ(FLAGS_test_scan_num_rows, count);
+
+  count = CountRowsFromClient(client_table_.get(),
+                              KuduClient::CLOSEST_REPLICA,
+                              KuduScanner::READ_YOUR_WRITES,
+                              kNoBound, kNoBound);
+  ASSERT_EQ(FLAGS_test_scan_num_rows, count);
+}
+
 namespace internal {
 
 static void ReadBatchToStrings(KuduScanner* scanner, vector<string>* rows) {
@@ -3816,6 +3852,7 @@ TEST_F(ClientTest, TestReplicatedMultiTabletTableFailover) {
     tries++;
     int num_rows = CountRowsFromClient(table.get(),
                                        KuduClient::LEADER_ONLY,
+                                       KuduScanner::READ_LATEST,
                                        kNoBound, kNoBound);
     int master_rpcs = CountMasterLookupRPCs() - master_rpcs_before;
 
@@ -3891,6 +3928,7 @@ TEST_F(ClientTest, TestReplicatedTabletWritesWithLeaderElection) {
   LOG(INFO) << "Counting rows...";
   ASSERT_EQ(2 * kNumRowsToWrite, CountRowsFromClient(table.get(),
                                                      KuduClient::FIRST_REPLICA,
+                                                     KuduScanner::READ_LATEST,
                                                      kNoBound, kNoBound));
 }
 
@@ -4477,9 +4515,14 @@ TEST_F(ClientTest, TestInsertEmptyPK) {
   ASSERT_EQ("<none>", ReadRowAsString());
 }
 
+class LatestObservedTimestampParamTest :
+    public ClientTest,
+    public ::testing::WithParamInterface<KuduScanner::ReadMode> {
+};
 // Check the behavior of the latest observed timestamp when performing
 // write and read operations.
-TEST_F(ClientTest, TestLatestObservedTimestamp) {
+TEST_P(LatestObservedTimestampParamTest, Test) {
+  const KuduScanner::ReadMode read_mode = GetParam();
   // Check that a write updates the latest observed timestamp.
   const uint64_t ts0 = client_->GetLatestObservedTimestamp();
   ASSERT_EQ(KuduClient::kNoTimestamp, ts0);
@@ -4500,26 +4543,28 @@ TEST_F(ClientTest, TestLatestObservedTimestamp) {
     if (c != client_) {
       // Check that the new client has no latest observed timestamp.
       ASSERT_EQ(KuduClient::kNoTimestamp, c->GetLatestObservedTimestamp());
+      // The observed timestamp may not move forward when scan in
+      // READ_YOUR_WRITES mode by another client. Since other client
+      // does not have the same propagation timestamp bound and the
+      // chosen snapshot timestamp is returned as the new propagation
+      // timestamp.
+      if (read_mode == KuduScanner::READ_YOUR_WRITES) break;
     }
     shared_ptr<KuduTable> table;
     ASSERT_OK(c->OpenTable(client_table_->name(), &table));
-    static const KuduScanner::ReadMode kReadModes[] = {
-      KuduScanner::READ_AT_SNAPSHOT,
-      KuduScanner::READ_LATEST,
-    };
-    for (auto read_mode : kReadModes) {
-      KuduScanner scanner(table.get());
-      ASSERT_OK(scanner.SetReadMode(read_mode));
-      if (read_mode == KuduScanner::READ_AT_SNAPSHOT) {
-        ASSERT_OK(scanner.SetSnapshotRaw(ts1));
-      }
-      ASSERT_OK(scanner.Open());
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetReadMode(read_mode));
+    if (read_mode == KuduScanner::READ_AT_SNAPSHOT) {
+      ASSERT_OK(scanner.SetSnapshotRaw(ts1));
     }
+    ASSERT_OK(scanner.Open());
     const uint64_t ts = c->GetLatestObservedTimestamp();
     ASSERT_LT(latest_ts, ts);
     latest_ts = ts;
   }
 }
+INSTANTIATE_TEST_CASE_P(Params, LatestObservedTimestampParamTest,
+                        testing::ValuesIn(read_modes));
 
 // Insert bunch of rows, delete a row, and then insert the row back.
 // Run scans several scan and check the results are consistent with the

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 93431b7..e067c04 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -136,7 +136,7 @@ MAKE_ENUM_LIMITS(kudu::client::KuduSession::ExternalConsistencyMode,
 
 MAKE_ENUM_LIMITS(kudu::client::KuduScanner::ReadMode,
                  kudu::client::KuduScanner::READ_LATEST,
-                 kudu::client::KuduScanner::READ_AT_SNAPSHOT);
+                 kudu::client::KuduScanner::READ_YOUR_WRITES);
 
 MAKE_ENUM_LIMITS(kudu::client::KuduScanner::OrderMode,
                  kudu::client::KuduScanner::UNORDERED,
@@ -1365,6 +1365,20 @@ Status KuduScanner::Open() {
     return Status::OK();
   }
 
+  // For READ_YOUR_WRITES scan mode, get the latest observed timestamp and store it
+  // to scan config. Always use this one as propagation timestamp for the duration
+  // of the scan to avoid unnecessarily wait.
+  if (data_->configuration().read_mode() == READ_YOUR_WRITES) {
+    const uint64_t lo_ts = data_->table_->client()->data_->GetLatestObservedTimestamp();
+    data_->mutable_configuration()->SetScanLowerBoundTimestampRaw(lo_ts);
+  }
+
+  if (data_->configuration().read_mode() != READ_AT_SNAPSHOT &&
+      data_->configuration().has_snapshot_timestamp()) {
+    return Status::InvalidArgument("Snapshot timestamp should only be configured "
+                                   "for READ_AT_SNAPSHOT scan mode.");
+  }
+
   VLOG(2) << "Beginning " << data_->DebugString();
 
   MonoTime deadline = MonoTime::Now() + data_->configuration().timeout();

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index a9d09cf..dadf8dc 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1735,7 +1735,19 @@ class KUDU_EXPORT KuduScanner {
     ///   by which writes are sometimes not externally consistent even when
     ///   action was taken to make them so. In these cases Isolation may
     ///   degenerate to mode "Read Committed". See KUDU-430.
-    READ_AT_SNAPSHOT
+    READ_AT_SNAPSHOT,
+
+    /// When @c READ_YOUR_WRITES is specified, the client will perform a read
+    /// such that it follows all previously known writes and reads from this client.
+    /// Specifically this mode:
+    ///  (1) ensures read-your-writes and read-your-reads session guarantees,
+    ///  (2) minimizes latency caused by waiting for outstanding write
+    ///      transactions to complete.
+    ///
+    /// Reads in this mode are not repeatable: two READ_YOUR_WRITES reads, even if
+    /// they provide the same propagated timestamp bound, can execute at different
+    /// timestamps and thus return different results.
+    READ_YOUR_WRITES
   };
 
   /// Whether the rows should be returned in order.

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scan_configuration.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
index fe751be..addb2e0 100644
--- a/src/kudu/client/scan_configuration.cc
+++ b/src/kudu/client/scan_configuration.cc
@@ -52,6 +52,7 @@ ScanConfiguration::ScanConfiguration(KuduTable* table)
       read_mode_(KuduScanner::READ_LATEST),
       is_fault_tolerant_(false),
       snapshot_timestamp_(kNoTimestamp),
+      lower_bound_propagation_timestamp_(kNoTimestamp),
       timeout_(MonoDelta::FromMilliseconds(KuduScanner::kScanTimeoutMillis)),
       arena_(256),
       row_format_flags_(KuduScanner::NO_FLAGS) {
@@ -180,6 +181,10 @@ void ScanConfiguration::SetSnapshotRaw(uint64_t snapshot_timestamp) {
   snapshot_timestamp_ = snapshot_timestamp;
 }
 
+void ScanConfiguration::SetScanLowerBoundTimestampRaw(uint64_t propagation_timestamp) {
+  lower_bound_propagation_timestamp_ = propagation_timestamp;
+}
+
 void ScanConfiguration::SetTimeoutMillis(int millis) {
   timeout_ = MonoDelta::FromMilliseconds(millis);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scan_configuration.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.h b/src/kudu/client/scan_configuration.h
index 26c1bcc..4a1e5e1 100644
--- a/src/kudu/client/scan_configuration.h
+++ b/src/kudu/client/scan_configuration.h
@@ -82,10 +82,18 @@ class ScanConfiguration {
 
   Status SetFaultTolerant(bool fault_tolerant) WARN_UNUSED_RESULT;
 
+  // Sets the timestamp the scan must be executed at, in microseconds
+  // since the Unix epoch. Requires READ_AT_SNAPSHOT scan mode.
   void SetSnapshotMicros(uint64_t snapshot_timestamp_micros);
 
+  // Sets a previously encoded timestamp as a snapshot timestamp.
+  // Requires READ_AT_SNAPSHOT scan mode.
   void SetSnapshotRaw(uint64_t snapshot_timestamp);
 
+  // Set the lower bound of scan's propagation timestamp.
+  // It is only used in READ_YOUR_WRITES scan mode.
+  void SetScanLowerBoundTimestampRaw(uint64_t propagation_timestamp);
+
   void SetTimeoutMillis(int millis);
 
   Status SetRowFormatFlags(uint64_t flags);
@@ -144,6 +152,15 @@ class ScanConfiguration {
     return snapshot_timestamp_;
   }
 
+  bool has_lower_bound_propagation_timestamp() const {
+    return lower_bound_propagation_timestamp_ != kNoTimestamp;
+  }
+
+  uint64_t lower_bound_propagation_timestamp() const {
+    CHECK(has_lower_bound_propagation_timestamp());
+    return lower_bound_propagation_timestamp_;
+  }
+
   const MonoDelta& timeout() const {
     return timeout_;
   }
@@ -184,6 +201,8 @@ class ScanConfiguration {
 
   uint64_t snapshot_timestamp_;
 
+  uint64_t lower_bound_propagation_timestamp_;
+
   MonoDelta timeout_;
 
   // Manages interior allocations for the scan spec and copied bounds.

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scan_token-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index de36be6..beb43ad 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -19,7 +19,6 @@
 
 #include <cstdint>
 #include <memory>
-#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
@@ -164,6 +163,9 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
       case ReadMode::READ_AT_SNAPSHOT:
         RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
         break;
+      case ReadMode::READ_YOUR_WRITES:
+        RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_YOUR_WRITES));
+        break;
       default:
         return Status::InvalidArgument("scan token has unrecognized read mode");
     }
@@ -179,6 +181,16 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
 
   RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks()));
 
+  // Since the latest observed timestamp from the given client might be
+  // more recent than the one when the token is created, the performance
+  // of the scan could be affected if using READ_YOUR_WRITES mode.
+  //
+  // We choose to keep it this way because the code path is simpler.
+  // Beside, in practice it's very rarely the case that an active client
+  // is permanently being written to and read from (using scan tokens).
+  //
+  // However it is worth to note that this is a possible optimization, if
+  // we ever notice READ_YOUR_WRITES read stalling with scan tokens.
   if (message.has_propagated_timestamp()) {
     client->data_->UpdateLatestObservedTimestamp(message.propagated_timestamp());
   }
@@ -234,8 +246,8 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>*
tokens) {
     case KuduScanner::READ_LATEST:
       pb.set_read_mode(kudu::READ_LATEST);
       if (configuration_.has_snapshot_timestamp()) {
-        LOG(WARNING) << "Ignoring snapshot timestamp since not in "
-                        "READ_AT_SNAPSHOT mode.";
+        return Status::InvalidArgument("Snapshot timestamp should only be configured "
+                                       "for READ_AT_SNAPSHOT scan mode.");
       }
       break;
     case KuduScanner::READ_AT_SNAPSHOT:
@@ -244,6 +256,13 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>*
tokens) {
         pb.set_snap_timestamp(configuration_.snapshot_timestamp());
       }
       break;
+    case KuduScanner::READ_YOUR_WRITES:
+      pb.set_read_mode(kudu::READ_YOUR_WRITES);
+      if (configuration_.has_snapshot_timestamp()) {
+        return Status::InvalidArgument("Snapshot timestamp should only be configured "
+                                       "for READ_AT_SNAPSHOT scan mode.");
+      }
+      break;
     default:
       LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scan_token-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index 80d1179..bc6bc07 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -194,6 +194,28 @@ TEST_F(ScanTokenTest, TestScanTokens) {
     ASSERT_EQ(0, scanner->data_->last_response_.data().num_rows());
   }
 
+  { // no predicates with READ_YOUR_WRITES mode
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    ASSERT_OK(builder.SetReadMode(KuduScanner::READ_YOUR_WRITES));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(8, tokens.size());
+    ASSERT_EQ(200, CountRows(tokens));
+    NO_FATALS(VerifyTabletInfo(tokens));
+  }
+
+  { // Set snapshot timestamp with READ_YOUR_WRITES mode
+    // gives InvalidArgument error.
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    ASSERT_OK(builder.SetReadMode(KuduScanner::READ_YOUR_WRITES));
+    ASSERT_OK(builder.SetSnapshotMicros(1));
+    ASSERT_TRUE(builder.Build(&tokens).IsInvalidArgument());
+  }
+
   { // no predicates
     vector<KuduScanToken*> tokens;
     ElementDeleter deleter(&tokens);
@@ -414,10 +436,21 @@ TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) {
   }
 }
 
+const kudu::ReadMode read_modes[] = {
+    kudu::READ_LATEST,
+    kudu::READ_AT_SNAPSHOT,
+    kudu::READ_YOUR_WRITES,
+};
+
+class TimestampPropagationParamTest :
+    public ScanTokenTest,
+    public ::testing::WithParamInterface<kudu::ReadMode> {
+};
 // When building a scanner from a serialized scan token,
 // verify that the propagated timestamp from the token makes its way into the
 // latest observed timestamp of the client object.
-TEST_F(ScanTokenTest, TestTimestampPropagation) {
+TEST_P(TimestampPropagationParamTest, Test) {
+  const kudu::ReadMode read_mode = GetParam();
   static const string kTableName = "p_ts_table";
 
   // Create a table to work with:
@@ -448,27 +481,25 @@ TEST_F(ScanTokenTest, TestTimestampPropagation) {
   }
 
   // Deserialize a scan token and make sure the client's last observed timestamp
-  // is updated accordingly.
-  {
-    const uint64_t ts_prev = client_->GetLatestObservedTimestamp();
-    const uint64_t ts_propagated = ts_prev + 1000000;
-
-    ScanTokenPB pb;
-    pb.set_table_name(kTableName);
-    pb.set_read_mode(::kudu::READ_AT_SNAPSHOT);
-    pb.set_propagated_timestamp(ts_propagated);
-    const string serialized_token = pb.SerializeAsString();
-    EXPECT_EQ(ts_prev, client_->GetLatestObservedTimestamp());
-
-    KuduScanner* scanner_raw;
-    ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client_.get(),
-                                                    serialized_token,
-                                                    &scanner_raw));
-    // The caller of the DeserializeIntoScanner() is responsible for
-    // de-allocating the result scanner object.
-    unique_ptr<KuduScanner> scanner(scanner_raw);
-    EXPECT_EQ(ts_propagated, client_->GetLatestObservedTimestamp());
-  }
+  // is always updated accordingly for any read modes.
+  const uint64_t ts_prev = client_->GetLatestObservedTimestamp();
+  const uint64_t ts_propagated = ts_prev + 1000000;
+
+  ScanTokenPB pb;
+  pb.set_table_name(kTableName);
+  pb.set_read_mode(read_mode);
+  pb.set_propagated_timestamp(ts_propagated);
+  const string serialized_token = pb.SerializeAsString();
+  EXPECT_EQ(ts_prev, client_->GetLatestObservedTimestamp());
+
+  KuduScanner* scanner_raw;
+  ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client_.get(),
+                                                  serialized_token,
+                                                  &scanner_raw));
+  // The caller of the DeserializeIntoScanner() is responsible for
+  // de-allocating the result scanner object.
+  unique_ptr<KuduScanner> scanner(scanner_raw);
+  EXPECT_EQ(ts_propagated, client_->GetLatestObservedTimestamp());
 
   // Build the set of scan tokens for the table, serialize them and
   // make sure the serialized tokens contain the propagated timestamp.
@@ -493,6 +524,9 @@ TEST_F(ScanTokenTest, TestTimestampPropagation) {
   }
 }
 
+INSTANTIATE_TEST_CASE_P(Params, TimestampPropagationParamTest,
+                        testing::ValuesIn(read_modes));
+
 // Tests the results of creating scan tokens, altering the columns being
 // scanned, and then executing the scan tokens.
 TEST_F(ScanTokenTest, TestConcurrentAlterTable) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index f26f588..ea67ec2 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -315,8 +315,8 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
     case KuduScanner::READ_LATEST:
       scan->set_read_mode(kudu::READ_LATEST);
       if (configuration_.has_snapshot_timestamp()) {
-        LOG(WARNING) << "Ignoring snapshot timestamp since "
-                        "not in READ_AT_SNAPSHOT mode.";
+        LOG(FATAL) << "Snapshot timestamp should only be configured "
+                      "for READ_AT_SNAPSHOT scan mode.";
       }
       break;
     case KuduScanner::READ_AT_SNAPSHOT:
@@ -325,6 +325,13 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
         scan->set_snap_timestamp(configuration_.snapshot_timestamp());
       }
       break;
+    case KuduScanner::READ_YOUR_WRITES:
+      scan->set_read_mode(kudu::READ_YOUR_WRITES);
+      if (configuration_.has_snapshot_timestamp()) {
+        LOG(FATAL) << "Snapshot timestamp should only be configured "
+                      "for READ_AT_SNAPSHOT scan mode.";
+      }
+      break;
     default:
       LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
   }
@@ -344,10 +351,18 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   scan->set_cache_blocks(configuration_.spec().cache_blocks());
 
   // For consistent operations, propagate the timestamp among all operations
-  // performed the context of the same client.
-  const uint64_t lo_ts = table_->client()->data_->GetLatestObservedTimestamp();
-  if (lo_ts != KuduClient::kNoTimestamp) {
-    scan->set_propagated_timestamp(lo_ts);
+  // performed the context of the same client. For READ_YOUR_WRITES scan, use
+  // the propagation timestamp from the scan config.
+  uint64_t ts = KuduClient::kNoTimestamp;
+  if (read_mode == KuduScanner::READ_YOUR_WRITES) {
+    if (configuration_.has_lower_bound_propagation_timestamp()) {
+      ts = configuration_.lower_bound_propagation_timestamp();
+    }
+  } else {
+    ts = table_->client()->data_->GetLatestObservedTimestamp();
+  }
+  if (ts != KuduClient::kNoTimestamp) {
+    scan->set_propagated_timestamp(ts);
   }
 
   // Set up the predicates.
@@ -473,7 +488,14 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
     configuration_.SetSnapshotRaw(last_response_.snap_timestamp());
   }
 
-  if (last_response_.has_propagated_timestamp()) {
+  // For READ_YOUR_WRITES mode, updates the latest observed timestamp with
+  // the chosen snapshot timestamp sent back from the server, to avoid
+  // unnecessarily wait for subsequent reads.
+  if (configuration_.read_mode() == KuduScanner::READ_YOUR_WRITES) {
+    CHECK(last_response_.has_snap_timestamp());
+    table_->client()->data_->UpdateLatestObservedTimestamp(
+        last_response_.snap_timestamp());
+  } else if (last_response_.has_propagated_timestamp()) {
     table_->client()->data_->UpdateLatestObservedTimestamp(
         last_response_.propagated_timestamp());
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/integration-tests/consistency-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index 76aad69..a0a2dc9 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -56,9 +56,12 @@
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
 
 DECLARE_int32(heartbeat_interval_ms);
 DECLARE_int32(max_clock_sync_error_usec);
@@ -86,7 +89,7 @@ namespace client {
 class ConsistencyITest : public MiniClusterITestBase {
  public:
   ConsistencyITest()
-      : num_tablet_servers_(2),
+      : num_tablet_servers_(3),
         table_name_("timestamp_propagation_test_table"),
         key_column_name_("key"),
         key_split_value_(8) {
@@ -112,6 +115,35 @@ class ConsistencyITest : public MiniClusterITestBase {
     StartCluster(num_tablet_servers_);
   }
 
+  void ScannerThread(KuduClient::ReplicaSelection selection,
+                     int rows_to_insert,
+                     int first_row,
+                     int scans_to_perform) {
+    client::sp::shared_ptr<KuduClient> client;
+    CHECK_OK(cluster_->CreateClient(nullptr, &client));
+
+    shared_ptr<KuduTable> table;
+    CHECK_OK(client->OpenTable(table_name_, &table));
+
+    size_t row_count;
+    for (int i = 0; i < 3; i++) {
+      // Insert multiple rows into the tablets.
+      InsertTestRows(client.get(), table.get(), rows_to_insert, first_row * i);
+      int expected_count = rows_to_insert * (i + 1);
+      // Perform a bunch of READ_YOUR_WRITES scans to all the replicas
+      // that count the rows. And verify that the count of the rows
+      // never go down from what previously observed, to ensure subsequent
+      // reads will not "go back in time" regarding writes that other
+      // clients have done.
+      for (int j = 0; j < scans_to_perform; j++) {
+        CHECK_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES,
+                             selection, 0, &row_count));
+        EXPECT_LE(expected_count, row_count);
+        expected_count = row_count;
+      }
+    }
+  }
+
  protected:
   static void UpdateClock(HybridClock* clock, MonoDelta delta) {
     const uint64_t new_time(HybridClock::GetPhysicalValueMicros(clock->Now()) +
@@ -122,7 +154,8 @@ class ConsistencyITest : public MiniClusterITestBase {
 
   // Creates a table with the specified name and replication factor.
   Status CreateTable(KuduClient* client,
-                     const string& table_name) {
+                     const string& table_name,
+                     int num_replicas = 1) {
     unique_ptr<KuduPartialRow> split_row(schema_.NewRow());
     RETURN_NOT_OK(split_row->SetInt32(0, key_split_value_));
 
@@ -131,7 +164,7 @@ class ConsistencyITest : public MiniClusterITestBase {
                   .schema(&schema_)
                   .add_range_partition_split(split_row.release())
                   .set_range_partition_columns({ key_column_name_ })
-                  .num_replicas(1)
+                  .num_replicas(num_replicas)
                   .Create());
     return Status::OK();
   }
@@ -162,8 +195,15 @@ class ConsistencyITest : public MiniClusterITestBase {
 
   Status GetRowCount(KuduTable* table, KuduScanner::ReadMode read_mode,
                      uint64_t ts, size_t* row_count) {
+    return GetRowCount(table, read_mode, KuduClient::LEADER_ONLY, ts, row_count);
+  }
+
+  Status GetRowCount(KuduTable* table, KuduScanner::ReadMode read_mode,
+                     KuduClient::ReplicaSelection selection, uint64_t ts,
+                     size_t* row_count) {
     KuduScanner scanner(table);
     RETURN_NOT_OK(scanner.SetReadMode(read_mode));
+    RETURN_NOT_OK(scanner.SetSelection(selection));
     if (read_mode == KuduScanner::READ_AT_SNAPSHOT && ts != 0) {
       RETURN_NOT_OK(scanner.SetSnapshotRaw(ts + 1));
     }
@@ -709,5 +749,102 @@ TEST_F(ConsistencyITest, TestScanTokenTimestampPropagation) {
   }
 }
 
+const KuduClient::ReplicaSelection replica_selectors[] = {
+    KuduClient::LEADER_ONLY,
+    KuduClient::CLOSEST_REPLICA,
+    KuduClient::FIRST_REPLICA,
+};
+
+class ScanYourWritesParamTest :
+    public ConsistencyITest,
+    public ::testing::WithParamInterface<KuduClient::ReplicaSelection> {
+};
+
+// Verify that no matter which replica is selected, a single client could
+// achieve read-your-writes on READ_YOUR_WRITES scan mode.
+TEST_P(ScanYourWritesParamTest, Test) {
+  const KuduClient::ReplicaSelection sel = GetParam();
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+  ASSERT_OK(CreateTable(client.get(), table_name_, 3));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(table_name_, &table));
+
+  // Insert multiple rows into the tablets.
+  const uint64_t rows_to_insert = 20000;
+  ASSERT_OK(InsertTestRows(client.get(), table.get(), rows_to_insert, 0));
+
+  size_t row_count;
+  ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES,
+                        sel, 0, &row_count));
+  EXPECT_EQ(rows_to_insert, row_count);
+
+  row_count = 0;
+  ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES,
+                        sel, 0, &row_count));
+  EXPECT_EQ(rows_to_insert, row_count);
+}
+
+INSTANTIATE_TEST_CASE_P(Params, ScanYourWritesParamTest,
+                        testing::ValuesIn(replica_selectors));
+
+class ScanYourWritesMultiClientsParamTest :
+    public ConsistencyITest,
+    public ::testing::WithParamInterface<KuduClient::ReplicaSelection> {
+};
+
+// This is a test that verifies, when multiple clients running
+// simultaneously, a client can get read-your-writes and
+// read-your-reads session guarantees using READ_YOUR_WRITES
+// scan mode, no matter which replica is selected.
+//
+// Read-your-writes guarantees that a client can see all previous
+// writes that it performed.
+//
+// Read-your-reads guarantees all subsequent reads to a given object
+// "never return any previous values" regarding writes that other
+// clients have done.
+//
+// The test scenario is as the following:
+//
+// 1) Have multiple clients running concurrently,
+//
+// 2) From the same client performs multiple writes and
+//    multiple sets of scans. Each client
+//    continuously performs inserts to a tablet, and then
+//    performs a bunch of READ_YOUR_WRITES scans to all the
+//    replicas that count the rows. The count of the rows
+//    should never go down from the previous observed one.
+TEST_P(ScanYourWritesMultiClientsParamTest, Test) {
+  const KuduClient::ReplicaSelection sel = GetParam();
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+  ASSERT_OK(CreateTable(client.get(), table_name_, 3));
+
+  for (int run = 1; run <= 3; run++) {
+    vector<scoped_refptr<Thread>> threads;
+    const int kNumThreads = 5;
+    const int rows_to_insert = 1000;
+    const int scans_to_performs = AllowSlowTests() ? 10 : 3;
+    for (int i = 0; i < kNumThreads; i++) {
+      Random first_row(rows_to_insert * kNumThreads);
+      scoped_refptr<kudu::Thread> new_thread;
+      CHECK_OK(kudu::Thread::Create(
+          "test", strings::Substitute("test-scanner-$0", i),
+          &ConsistencyITest::ScannerThread, this,
+          sel, rows_to_insert, first_row.Next32(),
+          scans_to_performs, &new_thread));
+      threads.push_back(new_thread);
+    }
+    SleepFor(MonoDelta::FromMilliseconds(50));
+
+    for (const scoped_refptr<kudu::Thread> &thr : threads) {
+      CHECK_OK(ThreadJoiner(thr.get()).Join());
+    }
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(Params, ScanYourWritesMultiClientsParamTest,
+                        testing::ValuesIn(replica_selectors));
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/integration-tests/delete_table-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc
index 12b6848..43ed3b2 100644
--- a/src/kudu/integration-tests/delete_table-itest.cc
+++ b/src/kudu/integration-tests/delete_table-itest.cc
@@ -1497,6 +1497,8 @@ TEST_P(DeleteTableWhileScanInProgressParamTest, Test) {
         return "READ_LATEST";
       case KuduScanner::READ_AT_SNAPSHOT:
         return "READ_AT_SNAPSHOT";
+      case KuduScanner::READ_YOUR_WRITES:
+        return "READ_YOUR_WRITES";
       default:
         return "UNKNOWN";
     }
@@ -1563,7 +1565,7 @@ TEST_P(DeleteTableWhileScanInProgressParamTest, Test) {
 
   using kudu::client::sp::shared_ptr;
   shared_ptr<KuduTable> table;
-  ASSERT_OK(client_->OpenTable(w.table_name(), &table));
+  ASSERT_OK(w.client()->OpenTable(w.table_name(), &table));
   KuduScanner scanner(table.get());
   ASSERT_OK(scanner.SetReadMode(mode));
   ASSERT_OK(scanner.SetSelection(sel));
@@ -1621,6 +1623,7 @@ TEST_P(DeleteTableWhileScanInProgressParamTest, Test) {
 const KuduScanner::ReadMode read_modes[] = {
   KuduScanner::READ_LATEST,
   KuduScanner::READ_AT_SNAPSHOT,
+  KuduScanner::READ_YOUR_WRITES,
 };
 const KuduClient::ReplicaSelection replica_selectors[] = {
   KuduClient::LEADER_ONLY,


Mime
View raw message