kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 02/04: [java] Improve TestAsyncKuduSession: Refactor into more tests
Date Thu, 30 May 2019 05:05:46 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit a1bbd3d042322cef3cee0af8713a4bd247699113
Author: Will Berkeley <wdberkeley@gmail.com>
AuthorDate: Wed May 29 14:50:50 2019 -0700

    [java] Improve TestAsyncKuduSession: Refactor into more tests
    
    TestAsyncKuduSession had a few tests, but one of them, ominously called
    `test`, was actually 7-8 tests in one, with each test shackled to the
    next by the shared state of the underlying table. The main improvement
    of this commit is to refactor mega-`test` into several targeted tests.
    
    There were some parts of the original mega test method for which I
    couldn't determine what precisely they were testing. Those parts have
    been dropped.
    
    I also made a few other small improvements which allowed me to drop
    almost all the helper code in favor of generic test utils.
    
    Change-Id: I73675605c04d16f23ce9cd61763a97486dfcb009
    Reviewed-on: http://gerrit.cloudera.org:8080/13460
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../apache/kudu/client/TestAsyncKuduSession.java   | 393 +++++++--------------
 1 file changed, 126 insertions(+), 267 deletions(-)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index ba2845f..c6cf6cb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -17,9 +17,8 @@
 package org.apache.kudu.client;
 
 import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP;
-import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
+import static org.apache.kudu.test.ClientTestUtil.countRowsInTable;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
-import static org.apache.kudu.test.ClientTestUtil.defaultErrorCB;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
 import static org.junit.Assert.assertEquals;
@@ -27,12 +26,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
 import org.apache.kudu.test.KuduTestHarness;
 import org.junit.Before;
@@ -49,6 +45,7 @@ public class TestAsyncKuduSession {
   private static final String INJECTED_TS_ERROR = "injected error for test";
 
   private static AsyncKuduClient client;
+  private static AsyncKuduSession session;
   private static KuduTable table;
 
   @Rule
@@ -57,6 +54,7 @@ public class TestAsyncKuduSession {
   @Before
   public void setUp() throws Exception {
     client = harness.getAsyncClient();
+    session = client.newSession();
     table = harness.getClient().createTable(TABLE_NAME, SCHEMA, getBasicCreateTableOptions());
   }
 
@@ -67,10 +65,9 @@ public class TestAsyncKuduSession {
    */
   @Test(timeout = 100000)
   public void testBackgroundErrors() throws Exception {
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+    session.setFlushInterval(10);
     try {
-      AsyncKuduSession session = client.newSession();
-      session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
-      session.setFlushInterval(10);
       Batch.injectTabletServerErrorAndLatency(makeTabletServerError(), 0);
 
       OperationResponse resp = session.apply(createInsert(1)).join(DEFAULT_SLEEP);
@@ -90,10 +87,9 @@ public class TestAsyncKuduSession {
    */
   @Test(timeout = 100000)
   public void testBatchErrorCauseSessionStuck() throws Exception {
+    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+    session.setFlushInterval(100);
     try {
-      AsyncKuduSession session = client.newSession();
-      session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
-      session.setFlushInterval(100);
       Batch.injectTabletServerErrorAndLatency(makeTabletServerError(), 200);
       // 0ms: Insert the first row, which will be the first batch.
       Deferred<OperationResponse> resp1 = session.apply(createInsert(1));
@@ -132,7 +128,6 @@ public class TestAsyncKuduSession {
    */
   @Test(timeout = 100000)
   public void testGetTableLocationsErrorCausesStuckSession() throws Exception {
-    AsyncKuduSession session = client.newSession();
     // Make sure tablet locations are cached.
     Insert insert = createInsert(1);
     session.apply(insert).join(DEFAULT_SLEEP);
@@ -163,7 +158,6 @@ public class TestAsyncKuduSession {
   @Test
   public void testInsertIntoUnavailableTablet() throws Exception {
     harness.killAllTabletServers();
-    AsyncKuduSession session = client.newSession();
     session.setTimeoutMillis(1);
     OperationResponse response = session.apply(createInsert(1)).join();
     assertTrue(response.hasRowError());
@@ -194,7 +188,6 @@ public class TestAsyncKuduSession {
         getBasicCreateTableOptions().setNumReplicas(1));
 
     // Write before doing any restarts to establish a connection.
-    AsyncKuduSession session = client.newSession();
     session.setTimeoutMillis(30000);
     session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
     session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
@@ -214,314 +207,180 @@ public class TestAsyncKuduSession {
     assertEquals(numClientsBefore, numClientsAfter);
   }
 
+  /**
+   * Regression test for KUDU-232, where, in AUTO_FLUSH_BACKGROUND mode, a call to `flush`
could
+   * throw instead of blocking on in-flight ops that are doing tablet lookups.
+   */
   @Test(timeout = 100000)
-  public void test() throws Exception {
-    AsyncKuduSession session = client.newSession();
-
-    // First testing KUDU-232, the cache is empty and we want to force flush. We force the
flush
-    // interval to be higher than the sleep time so that we don't background flush while
waiting.
-    // If our subsequent manual flush throws, it means the logic to block on in-flight tablet
-    // lookups in flush isn't working properly.
+  public void testKUDU232() throws Exception {
     session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+
+    // Set the flush interval high enough that the operation won't flush in the background
before
+    // the call to `flush`.
     session.setFlushInterval(DEFAULT_SLEEP + 1000);
     session.apply(createInsert(0));
-    session.flush().join(DEFAULT_SLEEP);
-    assertTrue(exists(0));
-    // set back to default
-    session.setFlushInterval(1000);
 
-    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_SYNC);
-    for (int i = 1; i < 10; i++) {
-      session.apply(createInsert(i)).join(DEFAULT_SLEEP);
-    }
-
-    assertEquals(10, countInRange(0, 10));
+    // `flush` should not throw and should block until the row has been flushed. Ergo, the
row
+    // should now be readable server-side by this client.
+    session.flush().join(DEFAULT_SLEEP);
+    assertEquals(1, countRowsInTable(table));
+  }
 
+  /**
+   * Test that changing the flush mode while ops are in flight results in an error.
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void testChangingFlushModeWithOpsInFlightIsAnError() throws Exception {
+    // Buffer an operation in MANUAL_FLUSH mode.
     session.setFlushMode(AsyncKuduSession.FlushMode.MANUAL_FLUSH);
-    session.setMutationBufferSpace(10);
-
     session.apply(createInsert(10));
 
     try {
+      // `flush` was never called, so switching the flush mode is an error.
       session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_SYNC);
       fail();
     } catch (IllegalArgumentException ex) {
-      /* expected, flush mode remains manual */
+      // Furthermore, the flush mode should not have changed.
+      assertTrue(ex.getMessage().contains("Cannot change flush mode when writes are buffered"));
+      assertEquals(session.getFlushMode(), AsyncKuduSession.FlushMode.MANUAL_FLUSH);
     }
+  }
 
-    assertFalse(exists(10));
+  /**
+   * Test the behavior of AUTO_FLUSH_SYNC mode.
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void testAutoFlushSync() throws Exception {
+    final int kNumOps = 1000;
+    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_SYNC);
+
+    // Apply a bunch of operations. There's no buffer to overflow, but the client does need
to track
+    // each op to know if and when it has succeeded.
+    List<Deferred<OperationResponse>> opResponses = new ArrayList<>();
+    for (int i = 0; i < kNumOps; i++) {
+      opResponses.add(session.apply(createInsert(i)));
+    }
+
+    // Wait on all the ops. After this, all ops should be visible. No explicit flush required.
+    Deferred.group(opResponses).join(DEFAULT_SLEEP);
+    assertEquals(kNumOps, countRowsInTable(table));
+  }
 
-    for (int i = 11; i < 20; i++) {
+  /**
+   * Test the behavior of MANUAL_FLUSH mode.
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void testManualFlush() throws Exception {
+    final int kBufferSizeOps = 10;
+    session.setFlushMode(AsyncKuduSession.FlushMode.MANUAL_FLUSH);
+    session.setMutationBufferSpace(kBufferSizeOps);
+
+    // Fill the buffer.
+    for (int i = 0; i < kBufferSizeOps; i++) {
       session.apply(createInsert(i));
     }
 
-    assertEquals(0, countInRange(10, 20));
+    // There was no call to flush, so there should be no rows in the table.
+    assertEquals(0, countRowsInTable(table));
+
+    // Attempting to buffer another op is an error.
     try {
-      session.apply(createInsert(20));
+      session.apply(createInsert(kBufferSizeOps + 1));
       fail();
     } catch (KuduException ex) {
-      /* expected, buffer would be too big */
+      assertTrue(ex.getMessage().contains("MANUAL_FLUSH is enabled but the buffer is too
big"));
     }
-    assertEquals(0, countInRange(10, 20)); // the buffer should still be full
 
+    // Now flush. There should be `kBufferSizeOps` rows in the end.
     session.flush().join(DEFAULT_SLEEP);
-    assertEquals(10, countInRange(10, 20)); // now everything should be there
+    assertEquals(kBufferSizeOps, countRowsInTable(table));
 
-    session.flush().join(DEFAULT_SLEEP); // flushing empty buffer should be a no-op.
+    // Applying another operation should now succeed.
+    session.apply(createInsert(kBufferSizeOps + 1));
+  }
 
+  /**
+   * Test the behavior of AUTO_FLUSH_BACKGROUND mode. Because this mode does a lot of work
in the
+   * background, possibly on different threads, it's difficult to test precisely.
+   * TODO(wdberkeley): Invent better ways of testing AUTO_FLUSH_BACKGROUND edge cases.
+   * @throws Exception
+   */
+  @Test
+  public void testAutoFlushBackground() throws Exception {
+    final int kBufferSizeOps = 10;
     session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+    session.setMutationBufferSpace(kBufferSizeOps);
 
-    Deferred<OperationResponse> d = session.apply(createInsert(20));
-    Thread.sleep(50); // waiting a minimal amount of time to make sure the interval is in
effect
-    assertFalse(exists(20));
-    // Add 10 items, the last one will stay in the buffer
-    for (int i = 21; i < 30; i++) {
-      d = session.apply(createInsert(i));
-    }
-    Deferred<OperationResponse> buffered = session.apply(createInsert(30));
-    long now = System.currentTimeMillis();
-    d.join(DEFAULT_SLEEP); // Ok to use the last d, everything is going to the buffer
-    // auto flush will force flush if the buffer is full as it should be now
-    // so we check that we didn't wait the full interval
-    long elapsed = System.currentTimeMillis() - now;
-    assertTrue(elapsed < 950);
-    assertEquals(10, countInRange(20, 31));
-    buffered.join();
-    assertEquals(11, countInRange(20, 31));
-
-    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_SYNC);
-    Update update = createUpdate(30);
-    PartialRow row = update.getRow();
-    row.addInt(2, 999);
-    row.addString(3, "updated data");
-    d = session.apply(update);
-    d.addErrback(defaultErrorCB);
-    d.join(DEFAULT_SLEEP);
-    assertEquals(31, countInRange(0, 31));
-
-    Delete del = createDelete(30);
-    d = session.apply(del);
-    d.addErrback(defaultErrorCB);
-    d.join(DEFAULT_SLEEP);
-    assertEquals(30, countInRange(0, 31));
-
-    session.setFlushMode(AsyncKuduSession.FlushMode.MANUAL_FLUSH);
-    session.setMutationBufferSpace(35);
-    for (int i = 0; i < 20; i++) {
-      buffered = session.apply(createDelete(i));
-    }
-    assertEquals(30, countInRange(0, 31));
-    session.flush();
-    buffered.join(DEFAULT_SLEEP);
-    assertEquals(10, countInRange(0, 31));
-
-    for (int i = 30; i < 40; i++) {
+    // In AUTO_FLUSH_BACKGROUND mode, the session can accept up to 2x the buffer size of
ops before
+    // it might be out of buffer space (depending on if a background flush finishes).
+    for (int i = 0; i < 2 * kBufferSizeOps; i++) {
       session.apply(createInsert(i));
     }
 
-    for (int i = 20; i < 30; i++) {
-      buffered = session.apply(createDelete(i));
-    }
-
-    assertEquals(10, countInRange(0, 40));
-    session.flush();
-    buffered.join(DEFAULT_SLEEP);
-    assertEquals(10, countInRange(0, 40));
-
-    // Test nulls
-    // add 10 rows with the nullable column set to null
-    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_SYNC);
-    for (int i = 40; i < 50; i++) {
-      session.apply(createInsertWithNull(i)).join(DEFAULT_SLEEP);
-    }
-
-    // now scan those rows and make sure the column is null
-    assertEquals(10, countNullColumns(40, 50));
-
-    // Test sending edits too fast
-    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
-    session.setMutationBufferSpace(10);
-
-    // This used to test that inserting too many operations into the buffer caused a
-    // PleaseThrottleException. However, it is inherently racy and flaky.
-    // TODO(wdberkeley): Add a test for behavior when the client is applying operations faster
than
-    //                   they can be flushed.
-    for (int i = 50; i < 71; i++) {
+    // If the client tries to buffer many more operations, it may receive a PleaseThrottleException.
+    // In this case, if the client simply waits for a flush notification on the Deferred
returned
+    // with the exception, it can continue to buffer operations.
+    final int kNumOpsMultipler = 10;
+    for (int i = 2 * kBufferSizeOps; i < kNumOpsMultipler * kBufferSizeOps; i++) {
+      Insert insert = createInsert(i);
       try {
-        session.apply(createInsert(i));
+        session.apply(insert);
       } catch (PleaseThrottleException ex) {
-        assertEquals(70, i);
-        // Wait for the buffer to clear
         ex.getDeferred().join(DEFAULT_SLEEP);
-        session.apply(ex.getFailedRpc());
-        session.flush().join(DEFAULT_SLEEP);
+        session.apply(insert);
       }
     }
-    //assertTrue("Expected PleaseThrottleException", gotException);
-    assertEquals(21, countInRange(50, 71));
 
-    // Now test a more subtle issue, basically the race where we call flush from the client
when
-    // there's a batch already in flight. We need to finish joining only when all the data
is
-    // flushed.
-    for (int i = 71; i < 91; i++) {
-      session.apply(createInsert(i));
-    }
+    // After a final call to `flush` all operations should be visible to this client.
     session.flush().join(DEFAULT_SLEEP);
-    // If we only waited after the in flight batch, there would be 10 rows here.
-    assertEquals(20, countInRange(71, 91));
-
-    // Test empty scanner projection
-    AsyncKuduScanner scanner = getScanner(71, 91, Collections.<String>emptyList());
-    assertEquals(20, countRowsInScan(scanner));
-
-    // Test removing the connection and then do a rapid set of inserts
-    client.getConnectionListCopy().get(0).disconnect()
-        .awaitUninterruptibly(DEFAULT_SLEEP);
-    session.setMutationBufferSpace(1);
-    for (int i = 91; i < 101; i++) {
+    assertEquals(kNumOpsMultipler * kBufferSizeOps, countRowsInTable(table));
+  }
+
+  /**
+   * Test a tablet going missing or encountering a new tablet while inserting a lot of data.
This
+   * code used to fail in many different ways.
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void testTabletCacheInvalidatedDuringWrites() throws Exception {
+    final int kNumOps = 10000;
+    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+
+    // Insert 2 * kNumOps rows, but drop the locations cache partway through.
+    for (int i = 0; i < kNumOps; i++) {
+      Insert insert = createInsert(i);
       try {
-        session.apply(createInsert(i));
+        session.apply(insert);
       } catch (PleaseThrottleException ex) {
-        // Wait for the buffer to clear
         ex.getDeferred().join(DEFAULT_SLEEP);
-        session.apply(ex.getFailedRpc());
+        session.apply(insert);
       }
     }
-    session.flush().join(DEFAULT_SLEEP);
-    assertEquals(10, countInRange(91, 101));
 
-    // Test a tablet going missing or encountering a new tablet while inserting a lot
-    // of data. This code used to fail in many different ways.
     client.emptyTabletsCacheForTable(table.getTableId());
-    for (int i = 101; i < 151; i++) {
+
+    for (int i = kNumOps; i < 2 * kNumOps; i++) {
       Insert insert = createInsert(i);
-      while (true) {
-        try {
-          session.apply(insert);
-          break;
-        } catch (PleaseThrottleException ex) {
-          // Wait for the buffer to clear
-          ex.getDeferred().join(DEFAULT_SLEEP);
-        }
+      try {
+        session.apply(insert);
+      } catch (PleaseThrottleException ex) {
+        ex.getDeferred().join(DEFAULT_SLEEP);
+        session.apply(insert);
       }
     }
+
     session.flush().join(DEFAULT_SLEEP);
-    assertEquals(50, countInRange(101, 151));
+    assertEquals(2 * kNumOps, countRowsInTable(table));
   }
 
+  // A helper just to make some lines shorter.
   private Insert createInsert(int key) {
     return createBasicSchemaInsert(table, key);
   }
 
-  private Insert createInsertWithNull(int key) {
-    Insert insert = table.newInsert();
-    PartialRow row = insert.getRow();
-    row.addInt(0, key);
-    row.addInt(1, 2);
-    row.addInt(2, 3);
-    row.setNull(3);
-    row.addBoolean(4, false);
-    return insert;
-  }
-
-  private Update createUpdate(int key) {
-    Update update = table.newUpdate();
-    PartialRow row = update.getRow();
-    row.addInt(0, key);
-    return update;
-  }
-
-  private Delete createDelete(int key) {
-    Delete delete = table.newDelete();
-    PartialRow row = delete.getRow();
-    row.addInt(0, key);
-    return delete;
-  }
-
-  private boolean exists(final int key) throws Exception {
-    AsyncKuduScanner scanner = getScanner(key, key + 1);
-    final AtomicBoolean exists = new AtomicBoolean(false);
-
-    Callback<Object, RowResultIterator> cb =
-        new Callback<Object, RowResultIterator>() {
-      @Override
-      public Object call(RowResultIterator arg) throws Exception {
-        if (arg == null) return null;
-        for (RowResult row : arg) {
-          if (row.getInt(0) == key) {
-            exists.set(true);
-            break;
-          }
-        }
-        return null;
-      }
-    };
-
-    while (scanner.hasMoreRows()) {
-      Deferred<RowResultIterator> data = scanner.nextRows();
-      data.addCallbacks(cb, defaultErrorCB);
-      data.join(DEFAULT_SLEEP);
-      if (exists.get()) {
-        break;
-      }
-    }
-
-    Deferred<RowResultIterator> closer = scanner.close();
-    closer.join(DEFAULT_SLEEP);
-    return exists.get();
-  }
-
-  private int countNullColumns(final int startKey, final int endKey) throws Exception {
-    AsyncKuduScanner scanner = getScanner(startKey, endKey);
-    final AtomicInteger ai = new AtomicInteger();
-
-    Callback<Object, RowResultIterator> cb = new Callback<Object, RowResultIterator>()
{
-      @Override
-      public Object call(RowResultIterator arg) throws Exception {
-        if (arg == null) return null;
-        for (RowResult row : arg) {
-          if (row.isNull(3)) {
-            ai.incrementAndGet();
-          }
-        }
-        return null;
-      }
-    };
-
-    while (scanner.hasMoreRows()) {
-      Deferred<RowResultIterator> data = scanner.nextRows();
-      data.addCallbacks(cb, defaultErrorCB);
-      data.join(DEFAULT_SLEEP);
-    }
-
-    Deferred<RowResultIterator> closer = scanner.close();
-    closer.join(DEFAULT_SLEEP);
-    return ai.get();
-  }
-
-  private int countInRange(final int start, final int exclusiveEnd) throws Exception {
-    return countRowsInScan(getScanner(start, exclusiveEnd));
-  }
-
-  private AsyncKuduScanner getScanner(int start, int exclusiveEnd) {
-    return getScanner(start, exclusiveEnd, null);
-  }
-
-  private AsyncKuduScanner getScanner(int start, int exclusiveEnd,
-                                             List<String> columnNames) {
-    PartialRow lowerBound = SCHEMA.newPartialRow();
-    lowerBound.addInt(SCHEMA.getColumnByIndex(0).getName(), start);
-
-    PartialRow upperBound = SCHEMA.newPartialRow();
-    upperBound.addInt(SCHEMA.getColumnByIndex(0).getName(), exclusiveEnd);
-
-    return client.newScannerBuilder(table)
-        .lowerBound(lowerBound)
-        .exclusiveUpperBound(upperBound)
-        .setProjectedColumnNames(columnNames)
-        .build();
-  }
-
   private TabletServerErrorPB makeTabletServerError() {
     return TabletServerErrorPB.newBuilder()
         .setCode(TabletServerErrorPB.Code.UNKNOWN_ERROR)


Mime
View raw message