kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [3/3] incubator-kudu git commit: [java client] - Current batch may get stuck if inflight batch got error
Date Tue, 05 Jan 2016 18:53:15 GMT
[java client] - Current batch may get stuck if inflight batch got error

This commit fix a bug in java client which may cause write operations
stuck forever. Before this commit, if a batch flush is scheduled but
there is already an inflight batch in-progress, we add a callback to
retry flushing the batch after inflight batch is successfully completed,
but if inflight batch have an error, default PASSTHROUGH errback will be
called and this batch will never be flushed. This commit add retry
flushing to both callback and errback.

Also more informations is added to tracing log to help debug.

Change-Id: Icadfe78a1d70ac6490d36b0801c6d4fa30939955
Reviewed-on: http://gerrit.cloudera.org:8080/1565
Reviewed-by: Mike Percy <mpercy@cloudera.com>
Tested-by: Mike Percy <mpercy@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/63b9268f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/63b9268f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/63b9268f

Branch: refs/heads/master
Commit: 63b9268f90221570cf753ca706f1d22d0932f408
Parents: c367baa
Author: Binglin Chang <decstery@gmail.com>
Authored: Mon Nov 30 13:42:46 2015 +0800
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Tue Jan 5 09:34:36 2016 +0000

----------------------------------------------------------------------
 .../org/kududb/client/AsyncKuduSession.java     | 76 +++++++++++++++++---
 .../src/main/java/org/kududb/client/Batch.java  | 33 +++++++++
 .../org/kududb/client/TestAsyncKuduSession.java | 53 ++++++++++++++
 3 files changed, 153 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/63b9268f/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
index ca4e6b2..7bd2533 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
@@ -627,14 +627,27 @@ public class AsyncKuduSession implements SessionConfiguration {
       // was already flushed.
       if (operations.get(tablet) != expectedBatch) {
         LOG.trace("Had to flush a tablet but it was already flushed: " + Bytes.getString(tablet));
+        // It is OK to return null here, since we currently do not use the returned value
+        // when doing background flush or auto flushing when buffer is full.
+        // The returned value is used when doing manual flush, but it will not run into this
+        // condition, or there is a bug.
         return Deferred.fromResult(null);
       }
 
       if (operationsInFlight.containsKey(tablet)) {
-        LOG.trace("This tablet is already in flight, attaching a callback to retry later:
" +
-            Bytes.getString(tablet));
-        return operationsInFlight.get(tablet).addCallbackDeferring(
-            new FlushRetryCallback(tablet, operations.get(tablet)));
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Tablet " + Bytes.getString(tablet)
+              + " is already in flight, attaching a callback to retry "
+              + expectedBatch.toDebugString() + " later.");
+        }
+        // No matter previous batch get error or not, we still have to flush this batch.
+        FlushRetryCallback retryCallback = new FlushRetryCallback(tablet, operations.get(tablet));
+        FlushRetryErrback retryErrback = new FlushRetryErrback(tablet, operations.get(tablet));
+        // Note that if we do manual flushing multiple times when previous batch is still
inflight,
+        // we may add the same callback multiple times, later retry of flushTablet will return
null
+        // immediately. Since it is an illegal use case, we do not handle this currently.
+        operationsInFlight.get(tablet).addCallbacks(retryCallback, retryErrback);
+        return expectedBatch.getDeferred();
       }
 
       batch = operations.remove(tablet);
@@ -660,7 +673,7 @@ public class AsyncKuduSession implements SessionConfiguration {
    * Simple callback so that we try to flush this tablet again if we were waiting on the
previous
    * Batch to finish.
    */
-  class FlushRetryCallback implements Callback<Deferred<BatchResponse>, BatchResponse>
{
+  class FlushRetryCallback implements Callback<BatchResponse, BatchResponse> {
     private final Slice tablet;
     private final Batch expectedBatch;
     public FlushRetryCallback(Slice tablet, Batch expectedBatch) {
@@ -669,10 +682,45 @@ public class AsyncKuduSession implements SessionConfiguration {
     }
 
     @Override
-    public Deferred<BatchResponse> call(BatchResponse o) throws Exception {
-      LOG.trace("Previous batch in flight is done, flushing this tablet again: " +
-          Bytes.getString(tablet));
-      return flushTablet(tablet, expectedBatch);
+    public BatchResponse call(BatchResponse o) throws Exception {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Previous batch in flight is done. " + toString());
+      }
+      flushTablet(tablet, expectedBatch);
+      return o;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("FlushRetryCallback: retry flush tablet %s %s", Bytes.getString(tablet),
+          expectedBatch.toDebugString());
+    }
+  }
+
+  /**
+   * Same callback as above FlushRetryCallback, for the case that previous batch has error.
+   */
+  class FlushRetryErrback implements Callback<Exception, Exception> {
+    private final Slice tablet;
+    private final Batch expectedBatch;
+    public FlushRetryErrback(Slice tablet, Batch expectedBatch) {
+      this.tablet = tablet;
+      this.expectedBatch = expectedBatch;
+    }
+
+    @Override
+    public Exception call(Exception e) throws Exception {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Previous batch ended with an error. " + toString());
+      }
+      flushTablet(tablet, expectedBatch);
+      return e;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("FlushRetryErrback: retry flush tablet %s %s", Bytes.getString(tablet),
+          expectedBatch.toDebugString());
     }
   }
 
@@ -687,6 +735,11 @@ public class AsyncKuduSession implements SessionConfiguration {
         tabletInFlightDone(tablet);
         return o;
       }
+
+      @Override
+      public String toString() {
+        return "callback: mark tablet " + Bytes.getString(tablet) + " inflight done";
+      }
     };
   }
 
@@ -702,6 +755,11 @@ public class AsyncKuduSession implements SessionConfiguration {
         tabletInFlightDone(tablet);
         return e;
       }
+
+      @Override
+      public String toString() {
+        return "errback: mark tablet " + Bytes.getString(tablet) + " inflight done";
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/63b9268f/java/kudu-client/src/main/java/org/kududb/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Batch.java b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
index aa806ec..e084933 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
@@ -13,11 +13,14 @@
 // limitations under the License.
 package org.kududb.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Message;
 import com.google.protobuf.ZeroCopyLiteralByteString;
+
 import org.kududb.WireProtocol;
 import org.kududb.annotations.InterfaceAudience;
 import org.kududb.tserver.Tserver;
+import org.kududb.tserver.Tserver.TabletServerErrorPB;
 import org.kududb.util.Pair;
 import org.jboss.netty.buffer.ChannelBuffer;
 
@@ -100,6 +103,17 @@ class Batch extends KuduRpc<BatchResponse> implements KuduRpc.HasKey
{
 
     BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(), tsUUID,
         builder.getTimestamp(), errorsPB, ops);
+
+    if (injectedError != null) {
+      if (injectedlatencyMs > 0) {
+        try {
+          Thread.sleep(injectedlatencyMs);
+        } catch (InterruptedException e) {
+        }
+      }
+      return new Pair<BatchResponse, Object>(response, injectedError);
+    }
+
     return new Pair<BatchResponse, Object>(response, builder.hasError() ? builder.getError()
: null);
   }
 
@@ -109,6 +123,10 @@ class Batch extends KuduRpc<BatchResponse> implements KuduRpc.HasKey
{
     return this.ops.get(0).partitionKey();
   }
 
+  public String toDebugString() {
+    return "Batch(" + ops.size() + " ops)@" + Integer.toHexString(hashCode());
+  }
+
   /**
    * Sorts the Operations by their sequence number.
    */
@@ -118,4 +136,19 @@ class Batch extends KuduRpc<BatchResponse> implements KuduRpc.HasKey
{
       return Long.compare(o1.getSequenceNumber(), o2.getSequenceNumber());
     }
   }
+
+  private static TabletServerErrorPB injectedError;
+  private static int injectedlatencyMs;
+
+  /**
+   * Inject tablet server side error for Batch rpc related tests.
+   * @param error error response from tablet server
+   * @param latencyMs blocks response handling thread for some time to simulate
+   * write latency
+   */
+  @VisibleForTesting
+  static void injectTabletServerErrorAndLatency(TabletServerErrorPB error, int latencyMs)
{
+    injectedError = error;
+    injectedlatencyMs = latencyMs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/63b9268f/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java
index 66341fa..6a97f9d 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java
@@ -14,8 +14,11 @@
 package org.kududb.client;
 
 import org.kududb.Schema;
+import org.kududb.WireProtocol.AppStatusPB;
+import org.kududb.tserver.Tserver.TabletServerErrorPB;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import com.stumbleupon.async.TimeoutException;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -51,6 +54,56 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     table = createTable(TABLE_NAME, schema, new CreateTableOptions());
   }
 
+  /**
+   * Regression test for case where an error in the previous batch could cause the next
+   * batch to hang in flush()
+   */
+  @Test(timeout = 100000)
+  public void testBatchErrorCauseSessionStuck() throws Exception {
+    try {
+      AsyncKuduSession session = client.newSession();
+      session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+      session.setFlushInterval(100);
+      TabletServerErrorPB error = TabletServerErrorPB.newBuilder()
+          .setCode(TabletServerErrorPB.Code.UNKNOWN_ERROR)
+          .setStatus(AppStatusPB.newBuilder()
+              .setCode(AppStatusPB.ErrorCode.UNKNOWN_ERROR)
+              .setMessage("injected error for test")
+              .build())
+          .build();
+      Batch.injectTabletServerErrorAndLatency(error, 200);
+      // 0ms: insert first row, which will be the first batch.
+      Deferred<OperationResponse> resp1 = session.apply(createInsert(1));
+      Thread.sleep(120);
+      // 100ms: start to send first batch.
+      // 100ms+: first batch got response from ts,
+      //         will wait 200s and throw erorr.
+      // 120ms: insert another row, which will be the second batch.
+      Deferred<OperationResponse> resp2 = session.apply(createInsert(2));
+      // 220ms: start to send the second batch, but first batch is inflight,
+      //        so add callback to retry after first batch finishes.
+      // 300ms: first batch's callback handles error, retry second batch.
+      try {
+        resp1.join(2000);
+      } catch (TimeoutException e) {
+        fail("First batch should not timeout in case of tablet server error");
+      } catch (TabletServerErrorException e) {
+        // Expected.
+        assertTrue(e.getMessage().contains("injected error for test"));
+      }
+      try {
+        resp2.join(2000);
+      } catch (TimeoutException e) {
+        fail("Second batch should not timeout in case of tablet server error");
+      } catch (TabletServerErrorException e) {
+        // expected
+        assertTrue(e.getMessage().contains("injected error for test"));
+      }
+    } finally {
+      Batch.injectTabletServerErrorAndLatency(null, 0);
+    }
+  }
+
   @Test(timeout = 100000)
   public void test() throws Exception {
 


Mime
View raw message