kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 01/02: [java] Clean up AsyncKuduSession
Date Wed, 22 May 2019 16:55:28 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2bf80b5147f09c980c2f2268ee3467a5b3305a57
Author: Will Berkeley <wdberkeley@gmail.com>
AuthorDate: Tue May 14 14:11:01 2019 -0700

    [java] Clean up AsyncKuduSession
    
    This makes several improvements to AsyncKuduSession code.
    
    1. I replaced a lot of wordy callback definitions with Java 8 lambdas.
    2. I renamed the member variable and method parameter names for ones
       that were mutation-buffer-space-related. The new names reflect that
       the buffers are measured in number of ops, not size in bytes. I
       couldn't rename the methods because that would break API
       compatibility.
    3. I renamed 'interval' to 'intervalMillis', following naming best
       practice, unless it would break compatibility.
    4. I added some helper functions and simplified logic in a few places.
    5. I removed all of the "low watermark rejections". Previously, the
       AsyncKuduSession tracked a low watermark percentage (default 0.5)
       and, once the buffer was more full than that percentage and there's
       an ongoing flush, new operations were rejected out of apply with a
       PleaseThrottleException with a probability scaling linearly from 0
       to 1 as the buffer went from the watermark to full. I don't think
       this makes sense, however, as it simply means doing more work to fill
       the buffer. Applications ought to just fill the buffer as fast as
       possible, and then wait to apply more. The PleaseThrottleException
       provides a deferred that applications can wait on until there is
       buffer space available. In fact, this was causing bad behavior in the
       synchronous KuduSession, which, in its apply, would fill the buffer
       until the first PleaseThrottleException (high chance of this being
       before the buffer is full) and then it would wait until the
       outstanding flush was finished before returning, holding up preparing
       more ops for flush for basically no reason.
    6. I removed the test for the watermark behavior.
    
    Change-Id: I0c901af497b3a4195c78b62c61d31aa00866a993
    Reviewed-on: http://gerrit.cloudera.org:8080/13336
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Andrew Wong <awong@cloudera.com>
    Tested-by: Will Berkeley <wdberkeley@gmail.com>
---
 .../org/apache/kudu/client/AsyncKuduSession.java   | 248 +++++++++------------
 .../java/org/apache/kudu/client/KuduSession.java   |  11 +-
 .../apache/kudu/client/SessionConfiguration.java   |   1 +
 .../apache/kudu/client/TestAsyncKuduSession.java   |  26 ---
 4 files changed, 114 insertions(+), 172 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 2e2aab8..c563248 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -120,10 +120,8 @@ public class AsyncKuduSession implements SessionConfiguration {
   private final AsyncKuduClient client;
   private final Random randomizer = new Random();
   private final ErrorCollector errorCollector;
-  private int interval = 1000;
-  private int mutationBufferSpace = 1000; // TODO express this in terms of data size.
-  private float mutationBufferLowWatermarkPercentage = 0.5f;
-  private int mutationBufferLowWatermark;
+  private int flushIntervalMillis = 1000;
+  private int mutationBufferMaxOps = 1000; // TODO express this in terms of data size.
   private FlushMode flushMode;
   private ExternalConsistencyMode consistencyMode;
   private long timeoutMillis;
@@ -192,8 +190,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     timeoutMillis = client.getDefaultOperationTimeoutMs();
     inactiveBuffers.add(bufferA);
     inactiveBuffers.add(bufferB);
-    errorCollector = new ErrorCollector(mutationBufferSpace);
-    setMutationBufferLowWatermark(this.mutationBufferLowWatermarkPercentage);
+    errorCollector = new ErrorCollector(mutationBufferMaxOps);
   }
 
   @Override
@@ -201,6 +198,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     return this.flushMode;
   }
 
+  // TODO(wdberkeley): KUDU-1944. Don't let applications change the flush mode. Use a new
session.
   @Override
   public void setFlushMode(FlushMode flushMode) {
     if (hasPendingOperations()) {
@@ -219,27 +217,18 @@ public class AsyncKuduSession implements SessionConfiguration {
   }
 
   @Override
-  public void setMutationBufferSpace(int size) {
+  public void setMutationBufferSpace(int numOps) {
     if (hasPendingOperations()) {
       throw new IllegalArgumentException("Cannot change the buffer" +
           " size when operations are buffered");
     }
-    this.mutationBufferSpace = size;
-    // Reset the low watermark, using the same percentage as before.
-    setMutationBufferLowWatermark(mutationBufferLowWatermarkPercentage);
+    this.mutationBufferMaxOps = numOps;
   }
 
+  @Deprecated
   @Override
   public void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage) {
-    if (hasPendingOperations()) {
-      throw new IllegalArgumentException("Cannot change the buffer" +
-          " low watermark when operations are buffered");
-    } else if (!PERCENTAGE_RANGE.contains(mutationBufferLowWatermarkPercentage)) {
-      throw new IllegalArgumentException("The low watermark must be between 0 and 1 inclusively");
-    }
-    this.mutationBufferLowWatermarkPercentage = mutationBufferLowWatermarkPercentage;
-    this.mutationBufferLowWatermark =
-        (int)(this.mutationBufferLowWatermarkPercentage * mutationBufferSpace);
+    LOG.warn("setMutationBufferLowWatermark is deprecated");
   }
 
   /**
@@ -252,8 +241,8 @@ public class AsyncKuduSession implements SessionConfiguration {
   }
 
   @Override
-  public void setFlushInterval(int interval) {
-    this.interval = interval;
+  public void setFlushInterval(int flushIntervalMillis) {
+    this.flushIntervalMillis = flushIntervalMillis;
   }
 
   @Override
@@ -435,36 +424,28 @@ public class AsyncKuduSession implements SessionConfiguration {
     Buffer buffer;
     Deferred<Void> nonActiveBufferFlush;
     synchronized (monitor) {
-      nonActiveBufferFlush = getNonActiveFlushNotification();
-      buffer = activeBuffer;
-      activeBuffer = null;
+      nonActiveBufferFlush = getNonActiveFlushNotificationUnlocked();
+      buffer = retireActiveBufferUnlocked();
     }
 
-    final Deferred<List<OperationResponse>> activeBufferFlush = buffer == null
?
-        Deferred.<List<OperationResponse>>fromResult(ImmutableList.<OperationResponse>of())
:
-        doFlush(buffer);
-
-    return AsyncUtil.addBothDeferring(nonActiveBufferFlush,
-        new Callback<Deferred<List<OperationResponse>>, Object>() {
-          @Override
-          public Deferred<List<OperationResponse>> call(Object arg) {
-            return activeBufferFlush;
-          }
-        });
+    // TODO(wdb): If there is a buffer flushing already, this code will wait for it to finish
before
+    //            flushing 'buffer'. This is less performant but has less surprising semantics
than
+    //            simultaneously flushing two buffers. Even though we don't promise those
semantics,
+    //            I'm going to leave it this way for now because it's never caused any trouble.
+    return AsyncUtil.addBothDeferring(nonActiveBufferFlush, _unused -> doFlush(buffer));
   }
 
   /**
-   * Flushes a write buffer. This method takes ownership of the buffer, no other concurrent
access
-   * is allowed.
+   * Flushes a write buffer. This method takes ownership of 'buffer', no other concurrent
access
+   * is allowed. 'buffer' is allowed to be null.
    *
    * @param buffer the buffer to flush, must not be modified once passed to this method
    * @return the operation responses
    */
   private Deferred<List<OperationResponse>> doFlush(Buffer buffer) {
     LOG.debug("flushing buffer: {}", buffer);
-    if (buffer.getOperations().isEmpty()) {
-      // no-op.
-      return Deferred.<List<OperationResponse>>fromResult(ImmutableList.<OperationResponse>of());
+    if (buffer == null || buffer.getOperations().isEmpty()) {
+      return Deferred.fromResult(ImmutableList.of());
     }
 
     Deferred<List<BatchResponse>> batchResponses = new Deferred<>();
@@ -527,6 +508,24 @@ public class AsyncKuduSession implements SessionConfiguration {
     }
   }
 
+  // TODO(wdberkeley): Get rid of the idea of an Operation as a distinct way to do a write.
Replace
+  //                   it with a single-operation Batch.
+  private Deferred<OperationResponse> doAutoFlushSync(final Operation operation) {
+    if (timeoutMillis != 0) {
+      operation.resetTimeoutMillis(client.getTimer(), timeoutMillis);
+    }
+    operation.setExternalConsistencyMode(consistencyMode);
+    operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
+    operation.setIgnoreAllNotFoundRows(ignoreAllNotFoundRows);
+
+    return client.sendRpcToTablet(operation)
+        .addCallbackDeferring(resp -> {
+          client.updateLastPropagatedTimestamp(resp.getWriteTimestampRaw());
+          return Deferred.fromResult(resp);
+        })
+        .addErrback(new SingleOperationErrCallback(operation));
+  }
+
   /**
    * Apply the given operation.
    * <p>
@@ -540,7 +539,7 @@ public class AsyncKuduSession implements SessionConfiguration {
    * @see SessionConfiguration.FlushMode FlushMode
    */
   public Deferred<OperationResponse> apply(final Operation operation) throws KuduException
{
-    Preconditions.checkNotNull(operation, "Can not apply a null operation");
+    Preconditions.checkNotNull(operation, "Cannot apply a null operation");
     Preconditions.checkArgument(operation.getTable().getAsyncClient() == client,
         "Applied operations must be created from a KuduTable instance opened " +
         "from the same client that opened this KuduSession");
@@ -550,30 +549,12 @@ public class AsyncKuduSession implements SessionConfiguration {
       LOG.warn("Applying an operation in a closed session; this is unsafe");
     }
 
-    // Freeze the row so that the client can not concurrently modify it while it is in flight.
+    // Freeze the row so that the client cannot concurrently modify it while it is in flight.
     operation.getRow().freeze();
 
     // If immediate flush mode, send the operation directly.
     if (flushMode == FlushMode.AUTO_FLUSH_SYNC) {
-      if (timeoutMillis != 0) {
-        operation.resetTimeoutMillis(client.getTimer(), timeoutMillis);
-      }
-      operation.setExternalConsistencyMode(this.consistencyMode);
-      operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
-      operation.setIgnoreAllNotFoundRows(ignoreAllNotFoundRows);
-
-      // Add a callback to update the propagated timestamp returned from the server.
-      Callback<Deferred<OperationResponse>, OperationResponse> cb =
-        new Callback<Deferred<OperationResponse>, OperationResponse>() {
-          @Override
-          public Deferred<OperationResponse> call(OperationResponse resp) throws Exception
{
-            client.updateLastPropagatedTimestamp(resp.getWriteTimestampRaw());
-            return Deferred.fromResult(resp);
-          }
-        };
-      return client.sendRpcToTablet(operation)
-          .addCallbackDeferring(cb)
-          .addErrback(new SingleOperationErrCallback(operation));
+      return doAutoFlushSync(operation);
     }
 
     // Kick off a location lookup.
@@ -591,10 +572,10 @@ public class AsyncKuduSession implements SessionConfiguration {
           // If the active buffer is null then we recently flushed. Check if there
           // is an inactive buffer available to replace as the active.
           if (inactiveBufferAvailable()) {
-            refreshActiveBuffer();
+            refreshActiveBufferUnlocked();
           } else {
             Status statusServiceUnavailable =
-                Status.ServiceUnavailable("All buffers are currently flushing");
+                Status.ServiceUnavailable("all buffers are currently flushing");
             // This can happen if the user writes into a buffer, flushes it, writes
             // into the second, flushes it, and immediately tries to write again.
             throw new PleaseThrottleException(statusServiceUnavailable,
@@ -602,71 +583,55 @@ public class AsyncKuduSession implements SessionConfiguration {
           }
         }
 
-        if (flushMode == FlushMode.MANUAL_FLUSH) {
-          if (activeBuffer.getOperations().size() < mutationBufferSpace) {
-            activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
-          } else {
-            Status statusIllegalState =
-                Status.IllegalState("MANUAL_FLUSH is enabled but the buffer is too big");
-            throw new NonRecoverableException(statusIllegalState);
+        int activeBufferSize = activeBuffer.getOperations().size();
+        switch (flushMode) {
+          case AUTO_FLUSH_SYNC: {
+            // This case is handled above and is impossible here.
+            // TODO(wdberkeley): Handle AUTO_FLUSH_SYNC just like other flush modes.
+            assert false;
+            break;
           }
-        } else {
-          assert flushMode == FlushMode.AUTO_FLUSH_BACKGROUND;
-          int activeBufferSize = activeBuffer.getOperations().size();
-
-          if (activeBufferSize >= mutationBufferSpace) {
-            // Save the active buffer into fullBuffer so that it gets flushed when we leave
this
-            // synchronized block.
-            fullBuffer = activeBuffer;
-            activeBuffer = null;
-            activeBufferSize = 0;
-            if (inactiveBufferAvailable()) {
-              refreshActiveBuffer();
-            } else {
-              Status statusServiceUnavailable =
-                  Status.ServiceUnavailable("All buffers are currently flushing");
-              throw new PleaseThrottleException(statusServiceUnavailable,
-                                                null, operation, notification);
+          case MANUAL_FLUSH: {
+            if (activeBufferSize >= mutationBufferMaxOps) {
+              Status statusIllegalState =
+                  Status.IllegalState("MANUAL_FLUSH is enabled but the buffer is too big");
+              throw new NonRecoverableException(statusIllegalState);
             }
+            activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
+            break;
           }
-
-          if (mutationBufferLowWatermark < mutationBufferSpace && // low watermark
is enabled
-              activeBufferSize >= mutationBufferLowWatermark &&   // buffer is
over low water mark
-              !inactiveBufferAvailable()) {                       // no inactive buffers
-
-            // Check if we are over the low water mark.
-            int randomWatermark = activeBufferSize + 1 +
-                                  randomizer.nextInt(mutationBufferSpace -
-                                                     mutationBufferLowWatermark);
-
-            if (randomWatermark > mutationBufferSpace) {
-              Status statusServiceUnavailable =
-                  Status.ServiceUnavailable("The previous buffer hasn't been flushed and
the " +
-                      "current buffer is over the low watermark, please retry later");
-              throw new PleaseThrottleException(statusServiceUnavailable,
-                                                null, operation, notification);
+          case AUTO_FLUSH_BACKGROUND: {
+            if (activeBufferSize >= mutationBufferMaxOps) {
+              // If the active buffer is full or overflowing, be sure to kick off a flush.
+              fullBuffer = retireActiveBufferUnlocked();
+              activeBufferSize = 0;
+
+              if (!inactiveBufferAvailable()) {
+                Status statusServiceUnavailable =
+                    Status.ServiceUnavailable("All buffers are currently flushing");
+                throw new PleaseThrottleException(statusServiceUnavailable,
+                    null, operation, notification);
+              }
+              refreshActiveBufferUnlocked();
             }
-          }
 
-          activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
-
-          if (activeBufferSize + 1 >= mutationBufferSpace && inactiveBufferAvailable())
{
-            // If the operation filled the buffer, then flush it.
-            Preconditions.checkState(fullBuffer == null);
-            fullBuffer = activeBuffer;
-            activeBuffer = null;
-            activeBufferSize = 0;
-          } else if (activeBufferSize == 0) {
-            // If this is the first operation in the buffer, start a background flush timer.
-            AsyncKuduClient.newTimeout(client.getTimer(), activeBuffer.getFlusherTask(),
interval);
+            // Add the operation to the active buffer, and:
+            // 1. If it's the first operation in the buffer, start a background flush timer.
+            // 2. If it filled or overflowed the buffer, kick off a flush.
+            activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
+            if (activeBufferSize == 0) {
+              AsyncKuduClient.newTimeout(client.getTimer(), activeBuffer.getFlusherTask(),
flushIntervalMillis);
+            }
+            if (activeBufferSize + 1 >= mutationBufferMaxOps && inactiveBufferAvailable())
{
+              fullBuffer = retireActiveBufferUnlocked();
+            }
+            break;
           }
         }
       }
     } finally {
       // Flush the buffer outside of the synchronized block, if required.
-      if (fullBuffer != null) {
-        doFlush(fullBuffer);
-      }
+      doFlush(fullBuffer);
     }
     return operation.getDeferred();
   }
@@ -686,10 +651,21 @@ public class AsyncKuduSession implements SessionConfiguration {
    * {@link #monitor} is locked.
    */
   @GuardedBy("monitor")
-  private void refreshActiveBuffer() {
+  private void refreshActiveBufferUnlocked() {
     Preconditions.checkState(activeBuffer == null);
     activeBuffer = inactiveBuffers.remove();
-    activeBuffer.reset();
+    activeBuffer.resetUnlocked();
+  }
+
+  /**
+   * Retires the active buffer and returns it. Returns null if there is no active buffer.
+   * This should only be called if {@link #monitor} is locked.
+   */
+  @GuardedBy("monitor")
+  private Buffer retireActiveBufferUnlocked() {
+    Buffer buffer = activeBuffer;
+    activeBuffer = null;
+    return buffer;
   }
 
   /**
@@ -698,17 +674,12 @@ public class AsyncKuduSession implements SessionConfiguration {
    * buffer) are fully flushed before completing.
    */
   @GuardedBy("monitor")
-  private Deferred<Void> getNonActiveFlushNotification() {
+  private Deferred<Void> getNonActiveFlushNotificationUnlocked() {
     final Deferred<Void> notificationA = bufferA.getFlushNotification();
     final Deferred<Void> notificationB = bufferB.getFlushNotification();
     if (activeBuffer == null) {
       // Both buffers are either flushing or inactive.
-      return AsyncUtil.addBothDeferring(notificationA, new Callback<Deferred<Void>,
Object>() {
-        @Override
-        public Deferred<Void> call(Object obj) throws Exception {
-          return notificationB;
-        }
-      });
+      return AsyncUtil.addBothDeferring(notificationA, _unused -> notificationB);
     } else if (activeBuffer == bufferA) {
       return notificationB;
     } else {
@@ -725,9 +696,7 @@ public class AsyncKuduSession implements SessionConfiguration {
       @Override
       public BatchResponse call(final BatchResponse response) {
         LOG.trace("Got a Batch response for {} rows", request.operations.size());
-        if (response.getWriteTimestamp() != 0) {
-          AsyncKuduSession.this.client.updateLastPropagatedTimestamp(response.getWriteTimestamp());
-        }
+        AsyncKuduSession.this.client.updateLastPropagatedTimestamp(response.getWriteTimestamp());
 
         // Send individualized responses to all the operations in this batch.
         for (OperationResponse operationResponse : response.getIndividualResponses()) {
@@ -735,7 +704,7 @@ public class AsyncKuduSession implements SessionConfiguration {
             errorCollector.addError(operationResponse.getRowError());
           }
 
-          // Fire the callback after collecting the error so that the error is visible should
the
+          // Fire the callback after collecting the errors so that the errors are visible
should the
           // callback interrogate the error collector.
           operationResponse.getOperation().callback(operationResponse);
         }
@@ -827,14 +796,11 @@ public class AsyncKuduSession implements SessionConfiguration {
           return;
         }
         if (activeBuffer.getFlusherTask() == this) {
-          buffer = activeBuffer;
-          activeBuffer = null;
+          buffer = retireActiveBufferUnlocked();
         }
       }
 
-      if (buffer != null) {
-        doFlush(buffer);
-      }
+      doFlush(buffer);
     }
   }
 
@@ -869,7 +835,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     }
 
     @GuardedBy("monitor")
-    public FlusherTask getFlusherTask() {
+    FlusherTask getFlusherTask() {
       if (flusherTask == null) {
         flusherTask = new FlusherTask();
       }
@@ -881,7 +847,7 @@ public class AsyncKuduSession implements SessionConfiguration {
      * is inactive (its flush is complete and it has been enqueued into {@link #inactiveBuffers}),
      * then the deferred will already be complete.
      */
-    public Deferred<Void> getFlushNotification() {
+    Deferred<Void> getFlushNotification() {
       return flushNotification;
     }
 
@@ -889,7 +855,7 @@ public class AsyncKuduSession implements SessionConfiguration {
      * Completes the buffer's flush notification. Should be called when the buffer has been
      * successfully flushed.
      */
-    public void callbackFlushNotification() {
+    void callbackFlushNotification() {
       LOG.trace("buffer flush notification fired: {}", this);
       flushNotification.callback(null);
     }
@@ -899,8 +865,8 @@ public class AsyncKuduSession implements SessionConfiguration {
      * inactive to active.
      */
     @GuardedBy("monitor")
-    public void reset() {
-      LOG.trace("buffer reset: {}", this);
+    void resetUnlocked() {
+      LOG.trace("buffer resetUnlocked: {}", this);
       operations.clear();
       flushNotification = new Deferred<>();
       flusherTask = null;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
index c181eca..453eded 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
@@ -126,18 +126,19 @@ public class KuduSession implements SessionConfiguration {
   }
 
   @Override
-  public void setMutationBufferSpace(int size) {
-    session.setMutationBufferSpace(size);
+  public void setMutationBufferSpace(int numOps) {
+    session.setMutationBufferSpace(numOps);
   }
 
   @Override
+  @Deprecated
   public void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage) {
-    session.setMutationBufferLowWatermark(mutationBufferLowWatermarkPercentage);
+    LOG.warn("setMutationBufferLowWatermark is deprecated");
   }
 
   @Override
-  public void setFlushInterval(int interval) {
-    session.setFlushInterval(interval);
+  public void setFlushInterval(int intervalMillis) {
+    session.setFlushInterval(intervalMillis);
   }
 
   @Override
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
index 12aa31b..c7bae9a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
@@ -102,6 +102,7 @@ public interface SessionConfiguration {
    *                             the low watermark since it's the same as the high one
    * @throws IllegalArgumentException if the buffer isn't empty or if the watermark isn't
between
    * 0 and 1
+   * @deprecated The low watermark no longer has any effect.
    */
   void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage);
 
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index dd4f517..c210878 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -433,32 +433,6 @@ public class TestAsyncKuduSession {
     }
     session.flush().join(DEFAULT_SLEEP);
     assertEquals(50, countInRange(101, 151));
-
-    // Test the low watermark.
-    // Before the fix for KUDU-804, a change to the buffer space did not result in a change
to the
-    // low watermark causing this test to fail.
-    session.setMutationBufferLowWatermark(0.1f);
-    session.setMutationBufferSpace(10);
-    session.setRandomSeed(12345); // Will make us hit the exception after 6 tries
-    gotException = false;
-    for (int i = 151; i < 171; i++) {
-      try {
-        session.apply(createInsert(i));
-      } catch (PleaseThrottleException ex) {
-        // We're going to hit the exception after filling up the buffer a first time then
trying
-        // to insert 6 more rows.
-        assertEquals(167, i);
-        gotException = true;
-        assertTrue(ex.getMessage().contains("watermark"));
-        // Once we hit the exception we wait on the batch to finish flushing and then insert
the
-        // rest of the data.
-        ex.getDeferred().join(DEFAULT_SLEEP);
-        session.apply(ex.getFailedRpc());
-      }
-    }
-    session.flush().join(DEFAULT_SLEEP);
-    assertEquals(20, countInRange(151, 171));
-    assertTrue("Expected PleaseThrottleException, but it was never thrown", gotException);
   }
 
   private Insert createInsert(int key) {


Mime
View raw message