kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: [java client] Redirect KuduExceptions to RowError in KuduSession
Date Mon, 07 Nov 2016 22:36:50 GMT
Repository: kudu
Updated Branches:
  refs/heads/master a97004a91 -> e85e44c78


[java client] Redirect KuduExceptions to RowError in KuduSession

Currently the only RowErrors that the users were going to see were the ones
sent from the tablet servers. Any other client-side error was sent as an
exception, even timeouts. The other problem with timeouts is that, when using
AUTO_FLUSH_BACKGROUND, background flush responses that the client isn't waiting
on would get lost since we don't have an equivalent to the error collector
for exceptions. You could find them in the log, but that's it.

This patch augments the errbacks in AsyncKuduSession to start creating
OperationResponses for KuduExceptions. For those cases, we return a
OperationResponse (or a list of) which switches us from the _errback_ to the
_callback_ track. Other exceptions are still thrown.

This patch also makes some changes for an issue that Adar saw in TestAsyncKuduSession
that was kind of hard to debug, a stray buffer was being flushed against a table that
was deleted (but then the table's name was reused which made reading the logs harder).

The stray buffer most likely came from testBatchErrorCauseSessionStuck which isn't
checking for all the error conditions. This patch adds more checking but doesn't fix
the root cause (which is currently unknown) so it *may* make it fail more often,
but in more obvious ways.

Change-Id: Ie871cde658036d04b9c07a3efe2fdfb4a7e98273
Reviewed-on: http://gerrit.cloudera.org:8080/4949
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e85e44c7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e85e44c7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e85e44c7

Branch: refs/heads/master
Commit: e85e44c78ee78e016f78584f7332d626875bfabb
Parents: a97004a
Author: Jean-Daniel Cryans <jdcryans@apache.org>
Authored: Wed Nov 2 16:23:39 2016 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Mon Nov 7 22:36:18 2016 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/AsyncKuduSession.java    |  64 ++++++++++--
 .../apache/kudu/client/TestAsyncKuduClient.java |   9 +-
 .../kudu/client/TestAsyncKuduSession.java       | 104 ++++++++++++-------
 .../org/apache/kudu/client/TestKuduSession.java |   9 +-
 .../org/apache/kudu/client/TestKuduTable.java   |   9 +-
 .../org/apache/kudu/client/TestTimeouts.java    |  10 +-
 6 files changed, 136 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e85e44c7/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 5b126f2..b8b3ea1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -502,7 +502,8 @@ public class AsyncKuduSession implements SessionConfiguration {
       }
       operation.setExternalConsistencyMode(this.consistencyMode);
       operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
-      return client.sendRpcToTablet(operation);
+      return client.sendRpcToTablet(operation)
+          .addErrback(new SingleOperationErrCallback(operation));
     }
 
     // Kick off a location lookup.
@@ -657,10 +658,13 @@ public class AsyncKuduSession implements SessionConfiguration {
 
         // Send individualized responses to all the operations in this batch.
         for (OperationResponse operationResponse : response.getIndividualResponses()) {
-          operationResponse.getOperation().callback(operationResponse);
           if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND && operationResponse.hasRowError())
{
             errorCollector.addError(operationResponse.getRowError());
           }
+
+          // Fire the callback after collecting the error so that the error is visible should
the
+          // callback interrogate the error collector.
+          operationResponse.getOperation().callback(operationResponse);
         }
 
         return response;
@@ -672,14 +676,37 @@ public class AsyncKuduSession implements SessionConfiguration {
       }
     }
 
-    final class BatchErrCallback implements Callback<Exception, Exception> {
+    final class BatchErrCallback implements Callback<Object, Exception> {
       @Override
-      public Exception call(Exception e) {
-        // Send the same exception to all the operations.
+      public Object call(Exception e) {
+        // If the exception we receive is a KuduException we're going to build OperationResponses.
+        Status status = null;
+        List<OperationResponse> responses = null;
+        boolean handleKuduException = e instanceof KuduException;
+        if (handleKuduException) {
+          status = ((KuduException) e).getStatus();
+          responses = new ArrayList<>(request.operations.size());
+        }
+
         for (Operation operation : request.operations) {
-          operation.errback(e);
+          // Same comment as in BatchCallback regarding the ordering of when to callback.
+          if (handleKuduException) {
+            RowError rowError = new RowError(status, operation);
+            OperationResponse response = new OperationResponse(0, null, 0, operation, rowError);
+            errorCollector.addError(rowError);
+            responses.add(response);
+
+            operation.callback(response);
+          } else {
+            // We have no idea what the exception is so we'll just send it up.
+            operation.errback(e);
+          }
         }
-        return e;
+
+        // Note that returning an object that's not an exception will make us leave the
+        // errback chain. Effectively, the BatchResponse below will end up as part of the
list
+        // passed to ConvertBatchToListOfResponsesCB.
+        return handleKuduException ? new BatchResponse(responses) : e;
       }
       @Override
       public String toString() {
@@ -691,6 +718,29 @@ public class AsyncKuduSession implements SessionConfiguration {
   }
 
   /**
+   * Analogous to BatchErrCallback above but for AUTO_FLUSH_SYNC which doesn't handle lists
of
+   * operations and responses.
+   */
+  private final class SingleOperationErrCallback implements Callback<Object, Exception>
{
+
+    private final Operation operation;
+
+    private SingleOperationErrCallback(Operation operation) {
+      this.operation = operation;
+    }
+
+    @Override
+    public Object call(Exception e) throws Exception {
+      if (e instanceof KuduException) {
+        Status status = ((KuduException) e).getStatus();
+        RowError rowError = new RowError(status, operation);
+        return new OperationResponse(0, null, 0, operation, rowError);
+      }
+      return e;
+    }
+  }
+
+  /**
    * A FlusherTask is created for each active buffer in mode
    * {@link FlushMode#AUTO_FLUSH_BACKGROUND}.
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/e85e44c7/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index ed6d21a..815f767 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -218,11 +218,8 @@ public class TestAsyncKuduClient extends BaseKuduTest {
     // Try the same thing with an insert.
     KuduSession session = syncClient.newSession();
     session.setTimeoutMillis(1000);
-    try {
-      session.apply(createBasicSchemaInsert(table, 1));
-      fail("The insert should timeout");
-    } catch (NonRecoverableException ex) {
-      assertTrue(ex.getStatus().isTimedOut());
-    }
+    OperationResponse response = session.apply(createBasicSchemaInsert(table, 1));
+    assertTrue(response.hasRowError());
+    assertTrue(response.getRowError().getErrorStatus().isTimedOut());
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e85e44c7/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 2a8a341..ac57401 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -22,8 +22,6 @@ import org.apache.kudu.tserver.Tserver.TabletServerErrorPB;
 
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
-import com.stumbleupon.async.DeferredGroupException;
-import com.stumbleupon.async.TimeoutException;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -59,6 +57,30 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     table = createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
   }
 
+
+  @Test(timeout = 100000)
+  public void testBackgroundErrors() throws Exception {
+    try {
+      AsyncKuduSession session = client.newSession();
+      session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+      session.setFlushInterval(10);
+      Batch.injectTabletServerErrorAndLatency(makeTabletServerError(), 0);
+
+      try {
+        OperationResponse resp = session.apply(createInsert(1)).join(DEFAULT_SLEEP);
+        assertTrue(resp.hasRowError());
+        assertTrue(
+            resp.getRowError().getErrorStatus()
+                .getMessage().contains(getTabletServerErrorMessage()));
+      } catch (Exception e) {
+        fail("Should not throw");
+      }
+      assertEquals(1, session.countPendingErrors());
+    } finally {
+      Batch.injectTabletServerErrorAndLatency(null, 0);
+    }
+  }
+
   /**
    * Regression test for case where an error in the previous batch could cause the next
    * batch to hang in flush()
@@ -69,41 +91,37 @@ public class TestAsyncKuduSession extends BaseKuduTest {
       AsyncKuduSession session = client.newSession();
       session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
       session.setFlushInterval(100);
-      TabletServerErrorPB error = TabletServerErrorPB.newBuilder()
-          .setCode(TabletServerErrorPB.Code.UNKNOWN_ERROR)
-          .setStatus(AppStatusPB.newBuilder()
-              .setCode(AppStatusPB.ErrorCode.UNKNOWN_ERROR)
-              .setMessage("injected error for test")
-              .build())
-          .build();
-      Batch.injectTabletServerErrorAndLatency(error, 200);
+      Batch.injectTabletServerErrorAndLatency(makeTabletServerError(), 200);
       // 0ms: insert first row, which will be the first batch.
       Deferred<OperationResponse> resp1 = session.apply(createInsert(1));
       Thread.sleep(120);
       // 100ms: start to send first batch.
       // 100ms+: first batch got response from ts,
-      //         will wait 200s and throw erorr.
+      //         will wait 200s and throw error.
       // 120ms: insert another row, which will be the second batch.
       Deferred<OperationResponse> resp2 = session.apply(createInsert(2));
       // 220ms: start to send the second batch, but first batch is inflight,
       //        so add callback to retry after first batch finishes.
       // 300ms: first batch's callback handles error, retry second batch.
       try {
-        resp1.join(2000);
-      } catch (TimeoutException e) {
-        fail("First batch should not timeout in case of tablet server error");
-      } catch (KuduException e) {
-        // Expected.
-        assertTrue(e.getMessage().contains("injected error for test"));
+        OperationResponse resp = resp1.join(DEFAULT_SLEEP);
+        assertTrue(resp.hasRowError());
+        assertTrue(
+            resp.getRowError().getErrorStatus()
+                .getMessage().contains(getTabletServerErrorMessage()));
+      } catch (Exception e) {
+        fail("Should not throw");
       }
       try {
-        resp2.join(2000);
-      } catch (TimeoutException e) {
-        fail("Second batch should not timeout in case of tablet server error");
-      } catch (KuduException e) {
-        // expected
-        assertTrue(e.getMessage().contains("injected error for test"));
+        OperationResponse resp = resp2.join(DEFAULT_SLEEP);
+        assertTrue(resp.hasRowError());
+        assertTrue(
+            resp.getRowError().getErrorStatus()
+                .getMessage().contains(getTabletServerErrorMessage()));
+      } catch (Exception e) {
+        fail("Should not throw");
       }
+      assertFalse(session.hasPendingOperations());
     } finally {
       Batch.injectTabletServerErrorAndLatency(null, 0);
     }
@@ -136,14 +154,10 @@ public class TestAsyncKuduSession extends BaseKuduTest {
         }
         Thread.sleep(100);
       }
-      try {
-        session.apply(createInsert(1)).join(DEFAULT_SLEEP);
-        fail("Insert should not succeed");
-      } catch (KuduException e) {
-        assertTrue(e.getStatus().isNotFound());
-      } catch (Throwable e) {
-        fail("Should not throw other error: " + e);
-      }
+
+      OperationResponse response = session.apply(createInsert(1)).join(DEFAULT_SLEEP);
+      assertTrue(response.hasRowError());
+      assertTrue(response.getRowError().getErrorStatus().isNotFound());
     } finally {
       table = createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
     }
@@ -156,16 +170,16 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     try {
       AsyncKuduSession session = client.newSession();
       session.setTimeoutMillis(1);
+      OperationResponse response = session.apply(createInsert(1)).join();
+      assertTrue(response.hasRowError());
+      assertTrue(response.getRowError().getErrorStatus().isTimedOut());
+
       session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
       Insert insert = createInsert(1);
       session.apply(insert);
-      try {
-        session.flush().join();
-        fail("expected exception");
-      } catch (DeferredGroupException e) {
-        assertEquals(1, e.results().size());
-        assertTrue(e.results().get(0).toString().contains("timeout"));
-      }
+      List<OperationResponse> responses = session.flush().join();
+      assertEquals(1, responses.size());
+      assertTrue(responses.get(0).getRowError().getErrorStatus().isTimedOut());
     } finally {
       restartTabletServers();
     }
@@ -554,4 +568,18 @@ public class TestAsyncKuduSession extends BaseKuduTest {
         .build();
     return scanner;
   }
+
+  private TabletServerErrorPB makeTabletServerError() {
+    return TabletServerErrorPB.newBuilder()
+        .setCode(TabletServerErrorPB.Code.UNKNOWN_ERROR)
+        .setStatus(AppStatusPB.newBuilder()
+            .setCode(AppStatusPB.ErrorCode.UNKNOWN_ERROR)
+            .setMessage(getTabletServerErrorMessage())
+            .build())
+        .build();
+  }
+
+  private String getTabletServerErrorMessage() {
+    return "injected error for test";
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e85e44c7/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
index bf98157..c6b6bc2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
@@ -270,12 +270,9 @@ public class TestKuduSession extends BaseKuduTest {
 
     List<Integer> nonCoveredKeys = ImmutableList.of(350, 300, 199, 150, 100, -1, -50);
     for (int key : nonCoveredKeys) {
-      try {
-        session.apply(createBasicSchemaInsert(table, key));
-        fail("apply should have thrown");
-      } catch (KuduException e) {
-        assertTrue(e.getStatus().isNotFound());
-      }
+      OperationResponse response = session.apply(createBasicSchemaInsert(table, key));
+      assertTrue(response.hasRowError());
+      assertTrue(response.getRowError().getErrorStatus().isNotFound());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e85e44c7/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
index 4f0079a..42dc4fe 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
@@ -312,12 +312,9 @@ public class TestKuduTable extends BaseKuduTest {
     syncClient.alterTable(tableName, ato);
 
     insert = createBasicSchemaInsert(table, 202);
-    try {
-      session.apply(insert);
-      fail("Should get a non-recoverable");
-    } catch (NonCoveredRangeException e) {
-      // Expected.
-    }
+    OperationResponse response = session.apply(insert);
+    assertTrue(response.hasRowError());
+    assertTrue(response.getRowError().getErrorStatus().isNotFound());
   }
 
   public KuduTable createTableWithSplitsAndTest(int splitsCount) throws Exception {

http://git-wip-us.apache.org/repos/asf/kudu/blob/e85e44c7/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index 28d15bf..12db70a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -50,12 +50,10 @@ public class TestTimeouts extends BaseKuduTest {
 
     KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
 
-    try {
-      lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
-      fail("Should have timed out");
-    } catch (KuduException ex) {
-      assertTrue(ex.getStatus().isTimedOut());
-    }
+
+    OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
+    assertTrue(response.hasRowError());
+    assertTrue(response.getRowError().getErrorStatus().isTimedOut());
 
     KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
     try {


Mime
View raw message