kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ha...@apache.org
Subject [2/2] kudu git commit: KUDU-1704: add java client support for READ_YOUR_WRITES mode
Date Mon, 12 Mar 2018 21:16:10 GMT
KUDU-1704: add java client support for READ_YOUR_WRITES mode

Change-Id: I6239521c022147257859e399f55c6f3f945af465
Reviewed-on: http://gerrit.cloudera.org:8080/8847
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <davidralves@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0c05e837
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0c05e837
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0c05e837

Branch: refs/heads/master
Commit: 0c05e8375d005a91d37acd3102a22ddbe92382d5
Parents: 8aa75d8
Author: hahao <hao.hao@cloudera.com>
Authored: Thu Dec 14 17:20:23 2017 -0800
Committer: Hao Hao <hao.hao@cloudera.com>
Committed: Mon Mar 12 21:12:36 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/AsyncKuduScanner.java    |  61 +++++++++--
 .../org/apache/kudu/client/KuduScanToken.java   |   4 +
 .../org/apache/kudu/client/TestKuduClient.java  | 109 +++++++++++++++++++
 .../kudu/client/TestScannerMultiTablet.java     |  69 ++++++++++++
 4 files changed, 236 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0c05e837/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 9f5c137..0863148 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -118,7 +118,21 @@ public final class AsyncKuduScanner {
      * are sometimes not externally consistent even when action was taken to make them so.
      * In these cases Isolation may degenerate to mode "Read Committed". See KUDU-430.
      */
-    READ_AT_SNAPSHOT(Common.ReadMode.READ_AT_SNAPSHOT);
+    READ_AT_SNAPSHOT(Common.ReadMode.READ_AT_SNAPSHOT),
+
+    /**
+     * When @c READ_YOUR_WRITES is specified, the client will perform a read
+     * such that it follows all previously known writes and reads from this client.
+     * Specifically this mode:
+     *  (1) ensures read-your-writes and read-your-reads session guarantees,
+     *  (2) minimizes latency caused by waiting for outstanding write
+     *      transactions to complete.
+     *
+     * Reads in this mode are not repeatable: two READ_YOUR_WRITES reads, even if
+     * they provide the same propagated timestamp bound, can execute at different
+     * timestamps and thus may return different results.
+     */
+    READ_YOUR_WRITES(Common.ReadMode.READ_YOUR_WRITES);
 
     private Common.ReadMode pbVersion;
     ReadMode(Common.ReadMode pbVersion) {
@@ -183,6 +197,8 @@ public final class AsyncKuduScanner {
 
   private long htTimestamp;
 
+  private long lowerBoundPropagationTimestamp = AsyncKuduClient.NO_TIMESTAMP;
+
   private final ReplicaSelection replicaSelection;
 
   /////////////////////
@@ -293,6 +309,13 @@ public final class AsyncKuduScanner {
     }
 
     this.replicaSelection = replicaSelection;
+
+    // For READ_YOUR_WRITES scan mode, get the latest observed timestamp
+    // and store it. Always use this one as the propagated timestamp for
+    // the duration of the scan to avoid unnecessary wait.
+    if (readMode == ReadMode.READ_YOUR_WRITES) {
+      this.lowerBoundPropagationTimestamp = this.client.getLastPropagatedTimestamp();
+    }
   }
 
   /**
@@ -389,8 +412,24 @@ public final class AsyncKuduScanner {
             // context of the same scan.
             htTimestamp = resp.scanTimestamp;
           }
-          if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
-            client.updateLastPropagatedTimestamp(resp.propagatedTimestamp);
+
+          long lastPropagatedTimestamp = AsyncKuduClient.NO_TIMESTAMP;
+          if (readMode == ReadMode.READ_YOUR_WRITES &&
+              resp.scanTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+            // For READ_YOUR_WRITES mode, update the latest propagated timestamp
+            // with the chosen snapshot timestamp sent back from the server, to
+            // avoid unnecessarily wait for subsequent reads. Since as long as
+            // the chosen snapshot timestamp of the next read is greater than
+            // the previous one, the scan does not violate READ_YOUR_WRITES
+            // session guarantees.
+            lastPropagatedTimestamp = resp.scanTimestamp;
+          } else if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+            // Otherwise we just use the propagated timestamp returned from
+            // the server as the latest propagated timestamp.
+            lastPropagatedTimestamp = resp.propagatedTimestamp;
+          }
+          if (lastPropagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+            client.updateLastPropagatedTimestamp(lastPropagatedTimestamp);
           }
 
           if (isFaultTolerant && resp.lastPrimaryKey != null) {
@@ -694,7 +733,7 @@ public final class AsyncKuduScanner {
 
     /**
      * The server timestamp to propagate, if set. If the server response does
-     * not contain propagation timestamp, this field is set to special value
+     * not contain propagated timestamp, this field is set to special value
      * AsyncKuduClient.NO_TIMESTAMP
      */
     private final long propagatedTimestamp;
@@ -785,9 +824,17 @@ public final class AsyncKuduScanner {
           newBuilder.setTabletId(UnsafeByteOperations.unsafeWrap(tablet.getTabletIdAsBytes()));
           newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode());
           newBuilder.setCacheBlocks(cacheBlocks);
-          // if the last propagated timestamp is set send it with the scan
-          if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP)
{
-            newBuilder.setPropagatedTimestamp(table.getAsyncClient().getLastPropagatedTimestamp());
+          // If the last propagated timestamp is set, send it with the scan.
+          // For READ_YOUR_WRITES scan, use the propagated timestamp from
+          // the scanner.
+          long timestamp;
+          if (readMode == ReadMode.READ_YOUR_WRITES) {
+            timestamp = lowerBoundPropagationTimestamp;
+          } else {
+            timestamp = table.getAsyncClient().getLastPropagatedTimestamp();
+          }
+          if (timestamp != AsyncKuduClient.NO_TIMESTAMP) {
+            newBuilder.setPropagatedTimestamp(timestamp);
           }
           newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c05e837/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 0f520f9..a4ee2ab 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -219,6 +219,10 @@ public class KuduScanToken implements Comparable<KuduScanToken>
{
           builder.readMode(AsyncKuduScanner.ReadMode.READ_LATEST);
           break;
         }
+        case READ_YOUR_WRITES: {
+          builder.readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES);
+          break;
+        }
         default: throw new IllegalArgumentException("unknown read mode");
       }
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c05e837/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index f655222..52e8fdb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -25,6 +25,7 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -35,7 +36,10 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableList;
@@ -1038,4 +1042,109 @@ public class TestKuduClient extends BaseKuduTest {
     // If createTable() was disrupted by the alterTable(), this will throw.
     d.join();
   }
+
+  // This is a test that verifies, when multiple clients run
+  // simultaneously, a client can get read-your-writes and
+  // read-your-reads session guarantees using READ_YOUR_WRITES
+  // scan mode, from leader replica. In this test writes are
+  // performed in AUTO_FLUSH_SYNC (single operation) flush modes.
+  @Test(timeout = 100000)
+  public void testReadYourWritesSyncLeaderReplica() throws Exception {
+    readYourWrites(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC,
+                   ReplicaSelection.LEADER_ONLY);
+  }
+
+  // Similar test as above but scan from the closest replica.
+  @Test(timeout = 100000)
+  public void testReadYourWritesSyncClosestReplica() throws Exception {
+    readYourWrites(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC,
+            ReplicaSelection.CLOSEST_REPLICA);
+  }
+
+  // Similar to testReadYourWritesSyncLeaderReplica, but in this
+  // test writes are performed in MANUAL_FLUSH (batches) flush modes.
+  @Test(timeout = 100000)
+  public void testReadYourWritesBatchLeaderReplica() throws Exception {
+    readYourWrites(SessionConfiguration.FlushMode.MANUAL_FLUSH,
+                   ReplicaSelection.LEADER_ONLY);
+  }
+
+  // Similar test as above but scan from the closest replica.
+  @Test(timeout = 100000)
+  public void testReadYourWritesBatchClosestReplica() throws Exception {
+    readYourWrites(SessionConfiguration.FlushMode.MANUAL_FLUSH,
+            ReplicaSelection.CLOSEST_REPLICA);
+  }
+
+  private void readYourWrites(final SessionConfiguration.FlushMode flushMode,
+                              final ReplicaSelection replicaSelection)
+          throws Exception {
+    Schema schema = createManyStringsSchema();
+    syncClient.createTable(tableName, schema, createTableOptions());
+
+    final int tasksNum = 4;
+    List<Callable<Void>> callables = new ArrayList<>();
+    for (int t = 0; t < tasksNum; t++) {
+      Callable<Void> callable = new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          // From the same client continuously performs inserts to a tablet
+          // in the given flush mode.
+          KuduSession session = syncClient.newSession();
+          session.setFlushMode(flushMode);
+          KuduTable table = syncClient.openTable(tableName);
+          for (int i = 0; i < 3; i++) {
+            for (int j = 100 * i; j < 100 * (i + 1); j++) {
+              Insert insert = table.newInsert();
+              PartialRow row = insert.getRow();
+              row.addString("key", String.format("key_%02d", j));
+              row.addString("c1", "c1_" + j);
+              row.addString("c2", "c2_" + j);
+              row.addString("c3", "c3_" + j);
+              session.apply(insert);
+            }
+            session.flush();
+            session.close();
+
+            // Perform a bunch of READ_YOUR_WRITES scans to all the replicas
+            // that count the rows. And verify that the count of the rows
+            // never go down from what previously observed, to ensure subsequent
+            // reads will not "go back in time" regarding writes that other
+            // clients have done.
+            for (int k = 0; k < 3; k++) {
+              AsyncKuduScanner scanner = client.newScannerBuilder(table)
+                      .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+                      .replicaSelection(replicaSelection)
+                      .build();
+              KuduScanner syncScanner = new KuduScanner(scanner);
+              long preTs = client.getLastPropagatedTimestamp();
+              assertNotEquals(AsyncKuduClient.NO_TIMESTAMP,
+                              client.getLastPropagatedTimestamp());
+
+              long row_count = countRowsInScan(syncScanner);
+              long expected_count = 100 * (i + 1);
+              assertTrue(expected_count <= row_count);
+
+              // After the scan, verify that the chosen snapshot timestamp is
+              // returned from the server and it is larger than the previous
+              // propagated timestamp.
+              assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
+              assertTrue(preTs < scanner.getSnapshotTimestamp());
+              syncScanner.close();
+            }
+          }
+          return null;
+        }
+      };
+      callables.add(callable);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(tasksNum);
+    List<Future<Void>> futures = executor.invokeAll(callables);
+
+    // Waits for the spawn tasks to complete, and then retrieves the results.
+    // Any exceptions or assertion errors in the spawn tasks will be thrown here.
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c05e837/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index 0365387..22a5bed 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -240,6 +240,62 @@ public class TestScannerMultiTablet extends BaseKuduTest {
     assertEquals(9, rowCount);
   }
 
+  // Test multi tablets scan in READ_YOUR_WRITES mode for both AUTO_FLUSH_SYNC
+  // (single operation) and MANUAL_FLUSH (batches) flush modes to ensure
+  // client-local read-your-writes.
+  @Test(timeout = 100000)
+  public void testReadYourWrites() throws Exception {
+    // Perform scan in READ_YOUR_WRITES mode. Before the scan, verify that the
+    // propagated timestamp is set via previous write session while snapshot
+    // timestamp is not.
+    AsyncKuduScanner scanner = client.newScannerBuilder(table)
+                                     .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+                                     .build();
+    KuduScanner syncScanner = new KuduScanner(scanner);
+    assertEquals(scanner.getReadMode(), syncScanner.getReadMode());
+    long preTs = client.getLastPropagatedTimestamp();
+    assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp());
+    assertEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
+
+    assertEquals(9, countRowsInScan(syncScanner));
+
+    // After the scan, verify that the chosen snapshot timestamp is
+    // returned from the server and it is larger than the previous
+    // propagated timestamp.
+    assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
+    assertTrue(preTs < scanner.getSnapshotTimestamp());
+    syncScanner.close();
+
+    // Perform write in MANUAL_FLUSH (batch) mode.
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(KuduSession.FlushMode.MANUAL_FLUSH);
+    String[] keys = new String[] {"11", "22", "33"};
+    for (int i = 0; i < keys.length; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString(schema.getColumnByIndex(0).getName(), keys[i]);
+      row.addString(schema.getColumnByIndex(1).getName(), keys[i]);
+      session.apply(insert);
+    }
+    session.flush();
+    session.close();
+
+    scanner = client.newScannerBuilder(table)
+                    .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+                    .build();
+    syncScanner = new KuduScanner(scanner);
+    assertTrue(preTs < client.getLastPropagatedTimestamp());
+    preTs = client.getLastPropagatedTimestamp();
+
+    assertEquals(12, countRowsInScan(syncScanner));
+
+    // After the scan, verify that the chosen snapshot timestamp is
+    // returned from the server and it is larger than the previous
+    // propagated timestamp.
+    assertTrue(preTs < scanner.getSnapshotTimestamp());
+    syncScanner.close();
+  }
+
   @Test(timeout = 100000)
   public void testScanPropagatesLatestTimestamp() throws Exception {
     // Reset the clients in order to clear the propagated timestamp, which may
@@ -309,6 +365,19 @@ public class TestScannerMultiTablet extends BaseKuduTest {
     assertEquals(tsPropagated, client.getLastPropagatedTimestamp());
   }
 
+  @Test(timeout = 100000)
+  public void testScanTokenReadMode() throws Exception {
+    ScanTokenPB.Builder pbBuilder = ScanTokenPB.newBuilder();
+    pbBuilder.setTableName(table.getName());
+    pbBuilder.setReadMode(Common.ReadMode.READ_YOUR_WRITES);
+    Client.ScanTokenPB scanTokenPB = pbBuilder.build();
+    final byte[] serializedToken = KuduScanToken.serialize(scanTokenPB);
+
+    // Deserialize scan tokens and make sure the read mode is updated accordingly.
+    KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, syncClient);
+    assertEquals(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES, scanner.getReadMode());
+  }
+
   private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
                                       String lowerBoundKeyTwo,
                                       String exclusiveUpperBoundKeyOne,


Mime
View raw message