kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [2/8] incubator-kudu git commit: KUDU-1250. [java client] Track row errors in KuduSession when AUTO_FLUSH_BACKGROUND
Date Fri, 29 Jan 2016 21:28:47 GMT
KUDU-1250. [java client] Track row errors in KuduSession when AUTO_FLUSH_BACKGROUND

This patch adds a critically missing piece in the Java sync client.
It was impossible to track row errors when using AUTO_FLUSH_BACKGROUND.
You could still do it with the async client by attaching callbacks.

The APIs are named the same as in the C++ client.

Change-Id: I0216619f3a49f2b70b8719dac99a787cf46202a1
Reviewed-on: http://gerrit.cloudera.org:8080/1924
Reviewed-by: Todd Lipcon <todd@apache.org>
Tested-by: Jean-Daniel Cryans
Reviewed-by: Dan Burkert <dan@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/d70305c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/d70305c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/d70305c4

Branch: refs/heads/master
Commit: d70305c4f03015ffff8292446b0c9c580fc4dbc0
Parents: 48a7b2f
Author: Jean-Daniel Cryans <jdcryans@cloudera.com>
Authored: Wed Jan 27 11:23:20 2016 -0800
Committer: Jean-Daniel Cryans <jdcryans@gerrit.cloudera.org>
Committed: Fri Jan 29 18:45:35 2016 +0000

----------------------------------------------------------------------
 .../org/kududb/client/AsyncKuduSession.java     | 18 ++++++++-
 .../java/org/kududb/client/KuduSession.java     | 17 ++++++--
 .../org/kududb/client/SessionConfiguration.java | 41 +++++++++++++-------
 .../java/org/kududb/client/TestRowErrors.java   | 35 +++++++++++++----
 4 files changed, 87 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d70305c4/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
index 470d8af..e5fdb42 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
@@ -87,6 +87,7 @@ public class AsyncKuduSession implements SessionConfiguration {
 
   private final AsyncKuduClient client;
   private final Random randomizer = new Random();
+  private final ErrorCollector errorCollector;
   private int interval = 1000;
   private int mutationBufferSpace = 1000; // TODO express this in terms of data size.
   private float mutationBufferLowWatermarkPercentage = 0.5f;
@@ -143,6 +144,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     this.consistencyMode = CLIENT_PROPAGATED;
     this.timeoutMs = client.getDefaultOperationTimeoutMs();
     setMutationBufferLowWatermark(this.mutationBufferLowWatermarkPercentage);
+    errorCollector = new ErrorCollector(mutationBufferSpace);
   }
 
   @Override
@@ -230,6 +232,16 @@ public class AsyncKuduSession implements SessionConfiguration {
     this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
   }
 
+  @Override
+  public int countPendingErrors() {
+    return errorCollector.countErrors();
+  }
+
+  @Override
+  public RowErrorsAndOverflowStatus getPendingErrors() {
+    return errorCollector.getErrors();
+  }
+
   /**
    * Flushes the buffered operations and marks this sessions as closed.
    * See the javadoc on {@link #flush()} on how to deal with exceptions coming out of this
method.
@@ -559,7 +571,7 @@ public class AsyncKuduSession implements SessionConfiguration {
 
   /**
    * Creates callbacks to handle a multi-put and adds them to the request.
-   * @param request The request for which we must handle the response.
+   * @param request the request for which we must handle the response
    */
   private void addBatchCallbacks(final Batch request) {
     final class BatchCallback implements
@@ -573,7 +585,11 @@ public class AsyncKuduSession implements SessionConfiguration {
         // Send individualized responses to all the operations in this batch.
         for (OperationResponse operationResponse : response.getIndividualResponses()) {
           operationResponse.getOperation().callback(operationResponse);
+          if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND && operationResponse.hasRowError())
{
+            errorCollector.addError(operationResponse.getRowError());
+          }
         }
+
         return response;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d70305c4/java/kudu-client/src/main/java/org/kududb/client/KuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/KuduSession.java
index c60043e..f13f9c6 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduSession.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduSession.java
@@ -54,9 +54,10 @@ public class KuduSession implements SessionConfiguration {
    * <li>AUTO_FLUSH_SYNC: the call returns when the operation is persisted,
    * else it throws an exception.
    * <li>AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added
to the buffer.
-   * The operation's state is then unreachable, meaning that there's no way to know if the
-   * operation is persisted. This call should normally perform only fast in-memory operations
but
-   * it may have to wait when the buffer is full and there's another buffer being flushed.
+   * This call should normally perform only fast in-memory operations but
+   * it may have to wait when the buffer is full and there's another buffer being flushed.
Row
+   * errors can be checked by calling {@link #countPendingErrors()} and can be retrieved
by calling
+   * {@link #getPendingErrors()}.
    * <li>MANUAL_FLUSH: the call returns when the operation has been added to the buffer,
    * else it throws an exception such as a NonRecoverableException if the buffer is full.
    * </ul>
@@ -168,4 +169,14 @@ public class KuduSession implements SessionConfiguration {
   public void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows) {
     session.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
   }
+
+  @Override
+  public int countPendingErrors() {
+    return session.countPendingErrors();
+  }
+
+  @Override
+  public RowErrorsAndOverflowStatus getPendingErrors() {
+    return session.getPendingErrors();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d70305c4/java/kudu-client/src/main/java/org/kududb/client/SessionConfiguration.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/SessionConfiguration.java b/java/kudu-client/src/main/java/org/kududb/client/SessionConfiguration.java
index 24bfa2b..94e0a66 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/SessionConfiguration.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/SessionConfiguration.java
@@ -29,7 +29,7 @@ public interface SessionConfiguration {
 
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public enum FlushMode {
+  enum FlushMode {
     // Every write will be sent to the server in-band with the Apply()
     // call. No batching will occur. This is the default flush mode. In this
     // mode, the Flush() call never has any effect, since each Apply() call
@@ -58,21 +58,21 @@ public interface SessionConfiguration {
    * Get the current flush mode.
    * @return flush mode, AUTO_FLUSH_SYNC by default
    */
-  public FlushMode getFlushMode();
+  FlushMode getFlushMode();
 
   /**
    * Set the new flush mode for this session.
    * @param flushMode new flush mode, can be the same as the previous one.
    * @throws IllegalArgumentException if the buffer isn't empty.
    */
-  public void setFlushMode(FlushMode flushMode);
+  void setFlushMode(FlushMode flushMode);
 
   /**
    * Set the number of operations that can be buffered.
    * @param size number of ops.
    * @throws IllegalArgumentException if the buffer isn't empty.
    */
-  public void setMutationBufferSpace(int size);
+  void setMutationBufferSpace(int size);
 
   /**
    * Set the low watermark for this session. The default is set to half the mutation buffer
space.
@@ -85,51 +85,51 @@ public interface SessionConfiguration {
    * @throws IllegalArgumentException if the buffer isn't empty or if the watermark isn't
between
    * 0 and 1
    */
-  public void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage);
+  void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage);
 
   /**
    * Set the flush interval, which will be used for the next scheduling decision.
    * @param interval interval in milliseconds.
    */
-  public void setFlushInterval(int interval);
+  void setFlushInterval(int interval);
 
   /**
    * Get the current timeout.
    * @return operation timeout in milliseconds, 0 if none was configured.
    */
-  public long getTimeoutMillis();
+  long getTimeoutMillis();
 
   /**
    * Sets the timeout for the next applied operations.
    * The default timeout is 0, which disables the timeout functionality.
    * @param timeout Timeout in milliseconds.
    */
-  public void setTimeoutMillis(long timeout);
+  void setTimeoutMillis(long timeout);
 
   /**
    * Returns true if this session has already been closed.
    */
-  public boolean isClosed();
+  boolean isClosed();
 
   /**
    * Check if there are operations that haven't been completely applied.
    * @return true if operations are pending, else false.
    */
-  public boolean hasPendingOperations();
+  boolean hasPendingOperations();
 
   /**
    * Set the new external consistency mode for this session.
    * @param consistencyMode new external consistency mode, can the same as the previous one.
    * @throws IllegalArgumentException if the buffer isn't empty.
    */
-  public void setExternalConsistencyMode(ExternalConsistencyMode consistencyMode);
+  void setExternalConsistencyMode(ExternalConsistencyMode consistencyMode);
 
   /**
    * Tells if the session is currently ignoring row errors when the whole list returned by
a tablet
    * server is of the AlreadyPresent type.
    * @return true if the session is enforcing this, else false
    */
-  public boolean isIgnoreAllDuplicateRows();
+  boolean isIgnoreAllDuplicateRows();
 
   /**
    * Configures the option to ignore all the row errors if they are all of the AlreadyPresent
type.
@@ -139,5 +139,20 @@ public interface SessionConfiguration {
    * This is disabled by default.
    * @param ignoreAllDuplicateRows true if this session should enforce this, else false
    */
-  public void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows);
+  void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows);
+
+  /**
+   * Return the number of errors which are pending. Errors may accumulate when
+   * using the AUTO_FLUSH_BACKGROUND mode.
+   * @return a count of errors
+   */
+  int countPendingErrors();
+
+  /**
+   * Return any errors from previous calls. If there were more errors
+   * than could be held in the session's error storage, the overflow state is set to true.
+   * Resets the pending errors.
+   * @return an object that contains the errors and the overflow status
+   */
+  RowErrorsAndOverflowStatus getPendingErrors();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d70305c4/java/kudu-client/src/test/java/org/kududb/client/TestRowErrors.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestRowErrors.java b/java/kudu-client/src/test/java/org/kududb/client/TestRowErrors.java
index 1d68377..39e977d 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestRowErrors.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestRowErrors.java
@@ -25,23 +25,20 @@ import static org.junit.Assert.*;
 
 public class TestRowErrors extends BaseKuduTest {
 
-  // Generate a unique table name
-  private static final String TABLE_NAME =
-      TestRowErrors.class.getName() + "-" + System.currentTimeMillis();
-
   private static KuduTable table;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     BaseKuduTest.setUpBeforeClass();
-    createTable(TABLE_NAME, basicSchema, new CreateTableOptions());
 
-    table = openTable(TABLE_NAME);
   }
 
 
   @Test(timeout = 100000)
-  public void test() throws Exception {
+  public void singleTabletTest() throws Exception {
+    String tableName = TestRowErrors.class.getName() + "-" + System.currentTimeMillis();
+    createTable(tableName, basicSchema, new CreateTableOptions());
+    table = openTable(tableName);
     AsyncKuduSession session = client.newSession();
 
     // Insert 3 rows to play with.
@@ -70,6 +67,30 @@ public class TestRowErrors extends BaseKuduTest {
     assertTrue(errors.get(1).getOperation() == dupeForTwo);
   }
 
+  /**
+   * Test collecting errors from multiple tablets.
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void multiTabletTest() throws Exception {
+    String tableName = TestRowErrors.class.getName() + "-" + System.currentTimeMillis();
+    createFourTabletsTableWithNineRows(tableName);
+    table = openTable(tableName);
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+
+    int dupRows = 3;
+    session.apply(createInsert(12));
+    session.apply(createInsert(22));
+    session.apply(createInsert(32));
+
+    session.flush();
+
+    RowErrorsAndOverflowStatus reos = session.getPendingErrors();
+    assertEquals(dupRows, reos.getRowErrors().length);
+    assertEquals(0, session.countPendingErrors());
+  }
+
   private Insert createInsert(int key) {
     return createBasicSchemaInsert(table, key);
   }


Mime
View raw message