beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [23/50] [abbrv] incubator-beam git commit: FluentBackoff: a replacement for a variety of custom backoff implementations
Date Tue, 13 Sep 2016 00:40:54 GMT
FluentBackoff: a replacement for a variety of custom backoff implementations

We have 3 different backoff classes, which don't really have that much
different functionality. Add a single, flexible backoff implementation
that can be used to replace all three classes. Additionally, this new
backoff actually supports more functionality than any of the other three
did -- you can limit retries, cap the exponential growth of an
individual backoff, and cap the cumulative time spent in backoff; prior
implementations did not allow all 3.

This also makes the parameters self-obvious (Duration, not
number-that-is-also-millis) where appropriate.

This initial PR should have no functional changes.

* Implement FluentBackoff
* Replace other custom BackOff implementations with FluentBackoff


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f485666
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f485666
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f485666

Branch: refs/heads/gearpump-runner
Commit: 3f48566618552c4b0fa026aa3a75ef6f1875da63
Parents: c92e45d
Author: Dan Halperin <dhalperi@google.com>
Authored: Wed Aug 24 22:35:26 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Sep 12 17:40:12 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/common/ExampleUtils.java      |   7 +-
 .../runners/dataflow/DataflowPipelineJob.java   |  72 +++---
 .../beam/runners/dataflow/util/PackageUtil.java |  31 +--
 .../dataflow/DataflowPipelineJobTest.java       |  32 ++-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   8 +-
 ...AttemptAndTimeBoundedExponentialBackOff.java | 172 --------------
 .../util/AttemptBoundedExponentialBackOff.java  |  85 -------
 .../org/apache/beam/sdk/util/FluentBackoff.java | 229 +++++++++++++++++++
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  18 +-
 .../util/IntervalBoundedExponentialBackOff.java |   1 +
 ...mptAndTimeBoundedExponentialBackOffTest.java | 212 -----------------
 .../AttemptBoundedExponentialBackOffTest.java   |  84 -------
 .../apache/beam/sdk/util/FluentBackoffTest.java | 226 ++++++++++++++++++
 .../org/apache/beam/sdk/util/GcsUtilTest.java   |  10 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  21 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 152 ++++++------
 .../gcp/bigquery/BigQueryTableRowIterator.java  |   5 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  26 +--
 .../gcp/bigquery/BigQueryServicesImplTest.java  |  17 +-
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java   |   9 +-
 20 files changed, 675 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index eadb580..2e8dcf6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -45,8 +45,9 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
 
 /**
  * The utility class that sets up and tears down external resources,
@@ -79,7 +80,9 @@ public class ExampleUtils {
    */
   public void setup() throws IOException {
     Sleeper sleeper = Sleeper.DEFAULT;
-    BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200);
+    BackOff backOff =
+        FluentBackoff.DEFAULT
+            .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff();
     Throwable lastException = null;
     try {
       do {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 9a515fa..dad59f2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -34,7 +34,6 @@ import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
 import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
@@ -44,8 +43,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,14 +93,27 @@ public class DataflowPipelineJob implements PipelineResult {
   /**
    * The polling interval for job status and messages information.
    */
-  static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
-  static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
+  static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds(2);
+  static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds(2);
+
+  static final double DEFAULT_BACKOFF_EXPONENT = 1.5;
 
   /**
-   * The amount of polling attempts for job status and messages information.
+   * The amount of polling retries for job status and messages information.
    */
-  static final int MESSAGES_POLLING_ATTEMPTS = 12;
-  static final int STATUS_POLLING_ATTEMPTS = 5;
+  static final int MESSAGES_POLLING_RETRIES = 11;
+  static final int STATUS_POLLING_RETRIES = 4;
+
+  private static final FluentBackoff MESSAGES_BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(MESSAGES_POLLING_INTERVAL)
+          .withMaxRetries(MESSAGES_POLLING_RETRIES)
+          .withExponent(DEFAULT_BACKOFF_EXPONENT);
+  protected static final FluentBackoff STATUS_BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(STATUS_POLLING_INTERVAL)
+          .withMaxRetries(STATUS_POLLING_RETRIES)
+          .withExponent(DEFAULT_BACKOFF_EXPONENT);
 
   /**
    * Constructs the job.
@@ -214,21 +225,23 @@ public class DataflowPipelineJob implements PipelineResult {
     MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient());
 
     long lastTimestamp = 0;
-    BackOff backoff =
-        duration.getMillis() > 0
-            ? new AttemptAndTimeBoundedExponentialBackOff(
-                MESSAGES_POLLING_ATTEMPTS,
-                MESSAGES_POLLING_INTERVAL,
-                duration.getMillis(),
-                AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
-                nanoClock)
-            : new AttemptBoundedExponentialBackOff(
-                MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
+    BackOff backoff;
+    if (!duration.isLongerThan(Duration.ZERO)) {
+      backoff = MESSAGES_BACKOFF_FACTORY.backoff();
+    } else {
+      backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff();
+    }
+
+    // This function tracks the cumulative time from the *first request* to enforce the wall-clock
+    // limit. Any backoff instance could, at best, track the the time since the first attempt at a
+    // given request. Thus, we need to track the cumulative time ourselves.
+    long startNanos = nanoClock.nanoTime();
+
     State state;
     do {
       // Get the state of the job before listing messages. This ensures we always fetch job
       // messages after the job finishes to ensure we have all them.
-      state = getStateWithRetries(1, sleeper);
+      state = getStateWithRetries(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff(), sleeper);
       boolean hasError = state == State.UNKNOWN;
 
       if (messageHandler != null && !hasError) {
@@ -250,7 +263,16 @@ public class DataflowPipelineJob implements PipelineResult {
       }
 
       if (!hasError) {
+        // Reset the backoff.
         backoff.reset();
+        // If duration is set, update the new cumulative sleep time to be the remaining
+        // part of the total input sleep duration.
+        if (duration.isLongerThan(Duration.ZERO)) {
+          long nanosConsumed = nanoClock.nanoTime() - startNanos;
+          Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
+          backoff =
+              MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration.minus(consumed)).backoff();
+        }
         // Check if the job is done.
         if (state.isTerminal()) {
           return state;
@@ -287,7 +309,7 @@ public class DataflowPipelineJob implements PipelineResult {
       return terminalState;
     }
 
-    return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT);
+    return getStateWithRetries(STATUS_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
   }
 
   /**
@@ -299,7 +321,7 @@ public class DataflowPipelineJob implements PipelineResult {
    * @return The state of the job or State.UNKNOWN in case of failure.
    */
   @VisibleForTesting
-  State getStateWithRetries(int attempts, Sleeper sleeper) {
+  State getStateWithRetries(BackOff attempts, Sleeper sleeper) {
     if (terminalState != null) {
       return terminalState;
     }
@@ -318,17 +340,13 @@ public class DataflowPipelineJob implements PipelineResult {
    * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
    * maximum number of passed in attempts.
    *
-   * @param attempts The amount of attempts to make.
+   * @param backoff the {@link BackOff} used to control retries.
    * @param sleeper Object used to do the sleeps between attempts.
    * @return The underlying {@link Job} object.
    * @throws IOException When the maximum number of retries is exhausted, the last exception is
    * thrown.
    */
-  @VisibleForTesting
-  Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException {
-    AttemptBoundedExponentialBackOff backoff =
-        new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL);
-
+  private Job getJobWithRetries(BackOff backoff, Sleeper sleeper) throws IOException {
     // Retry loop ends in return or throw
     while (true) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index bf1f666..6d910ba 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.dataflow.util;
 
 import com.fasterxml.jackson.core.Base64Variants;
-import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
@@ -37,10 +37,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.ZipFiles;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,11 +55,15 @@ public class PackageUtil {
   /**
    * The initial interval to use between package staging attempts.
    */
-  private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
+  private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5);
   /**
-   * The maximum number of attempts when staging a file.
+   * The maximum number of retries when staging a file.
    */
-  private static final int MAX_ATTEMPTS = 5;
+  private static final int MAX_RETRIES = 4;
+
+  private static final FluentBackoff BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL);
 
   /**
    * Translates exceptions from API calls.
@@ -199,9 +204,7 @@ public class PackageUtil {
         }
 
         // Upload file, retrying on failure.
-        AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
-            MAX_ATTEMPTS,
-            INITIAL_BACKOFF_INTERVAL_MS);
+        BackOff backoff = BACKOFF_FACTORY.backoff();
         while (true) {
           try {
             LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
@@ -219,15 +222,17 @@ public class PackageUtil {
                   + "'gcloud auth login'.", classpathElement, target);
               LOG.error(errorMessage);
               throw new IOException(errorMessage, e);
-            } else if (!backoff.atMaxAttempts()) {
-              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
-                  classpathElement, e);
-              BackOffUtils.next(retrySleeper, backoff);
-            } else {
+            }
+            long sleep = backoff.nextBackOffMillis();
+            if (sleep == BackOff.STOP) {
               // Rethrow last error, to be included as a cause in the catch below.
               LOG.error("Upload failed, will NOT retry staging of classpath: {}",
                   classpathElement, e);
               throw e;
+            } else {
+              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
+                  classpathElement, e);
+              retrySleeper.sleep(sleep);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 22b5400..226140a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.joda.time.Duration;
@@ -111,22 +110,21 @@ public class DataflowPipelineJobTest {
    * AttemptBoundedExponentialBackOff given the number of retries and
    * an initial polling interval.
    *
-   * @param pollingIntervalMillis The initial polling interval given.
-   * @param attempts The number of attempts made
+   * @param pollingInterval The initial polling interval given.
+   * @param retries The number of retries made
    * @param timeSleptMillis The amount of time slept by the clock. This is checked
    * against the valid interval.
    */
-  void checkValidInterval(long pollingIntervalMillis, int attempts, long timeSleptMillis) {
+  private void checkValidInterval(Duration pollingInterval, int retries, long timeSleptMillis) {
     long highSum = 0;
     long lowSum = 0;
-    for (int i = 1; i < attempts; i++) {
+    for (int i = 0; i < retries; i++) {
       double currentInterval =
-          pollingIntervalMillis
-          * Math.pow(AttemptBoundedExponentialBackOff.DEFAULT_MULTIPLIER, i - 1);
-      double offset =
-          AttemptBoundedExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR * currentInterval;
-      highSum += Math.round(currentInterval + offset);
-      lowSum += Math.round(currentInterval - offset);
+          pollingInterval.getMillis()
+          * Math.pow(DataflowPipelineJob.DEFAULT_BACKOFF_EXPONENT, i);
+      double randomOffset = 0.5 * currentInterval;
+      highSum += Math.round(currentInterval + randomOffset);
+      lowSum += Math.round(currentInterval - randomOffset);
     }
     assertThat(timeSleptMillis, allOf(greaterThanOrEqualTo(lowSum), lessThanOrEqualTo(highSum)));
   }
@@ -228,7 +226,7 @@ public class DataflowPipelineJobTest {
     assertEquals(null, state);
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
     checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL,
-        DataflowPipelineJob.MESSAGES_POLLING_ATTEMPTS, timeDiff);
+        DataflowPipelineJob.MESSAGES_POLLING_RETRIES, timeDiff);
   }
 
   @Test
@@ -246,8 +244,8 @@ public class DataflowPipelineJobTest {
     State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
     assertEquals(null, state);
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
-    // Should only sleep for the 4 ms remaining.
-    assertEquals(timeDiff, 4L);
+    // Should only have slept for the 4 ms allowed.
+    assertEquals(4L, timeDiff);
   }
 
   @Test
@@ -268,7 +266,7 @@ public class DataflowPipelineJobTest {
 
     assertEquals(
         State.RUNNING,
-        job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock));
+        job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock));
   }
 
   @Test
@@ -286,10 +284,10 @@ public class DataflowPipelineJobTest {
     long startTime = fastClock.nanoTime();
     assertEquals(
         State.UNKNOWN,
-        job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock));
+        job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock));
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
     checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL,
-        DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, timeDiff);
+        DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 28d7746..b41c655 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.ValueWithRecordId;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -52,6 +52,10 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T
   private final UnboundedSource<T, ?> source;
   private final long maxNumRecords;
   private final Duration maxReadTime;
+  private static final FluentBackoff BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(Duration.millis(10))
+          .withMaxBackoff(Duration.standardSeconds(10));
 
   /**
    * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
@@ -241,7 +245,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T
 
       private boolean advanceWithBackoff() throws IOException {
         // Try reading from the source with exponential backoff
-        BackOff backoff = new IntervalBoundedExponentialBackOff(10000L, 10L);
+        BackOff backoff = BACKOFF_FACTORY.backoff();
         long nextSleep = backoff.nextBackOffMillis();
         while (nextSleep != BackOff.STOP) {
           if (endTime != null && Instant.now().isAfter(endTime)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
deleted file mode 100644
index d8050e0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.NanoClock;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Extension of {@link AttemptBoundedExponentialBackOff} that bounds the total time that the backoff
- * is happening as well as the amount of retries. Acts exactly as a AttemptBoundedExponentialBackOff
- * unless the time interval has expired since the object was created. At this point, it will always
- * return BackOff.STOP. Calling reset() resets both the timer and the number of retry attempts,
- * unless a custom ResetPolicy (ResetPolicy.ATTEMPTS or ResetPolicy.TIMER) is passed to the
- * constructor.
- *
- * <p>Implementation is not thread-safe.
- */
-public class AttemptAndTimeBoundedExponentialBackOff extends AttemptBoundedExponentialBackOff {
-  private long endTimeMillis;
-  private long maximumTotalWaitTimeMillis;
-  private ResetPolicy resetPolicy;
-  private final NanoClock nanoClock;
-  // NanoClock.SYSTEM has a max elapsed time of 292 years or 2^63 ns.  Here, we choose 2^53 ns as
-  // a smaller but still huge limit.
-  private static final long MAX_ELAPSED_TIME_MILLIS = 1L << 53;
-
-  /**
-   * A ResetPolicy controls the behavior of this BackOff when reset() is called.  By default, both
-   * the number of attempts and the time bound for the BackOff are reset, but an alternative
-   * ResetPolicy may be set to only reset one of these two.
-   */
-  public static enum ResetPolicy {
-    ALL,
-    ATTEMPTS,
-    TIMER
-  }
-
-  /**
-   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
-   *
-   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
-   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
-   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
-   *    allow more attempts in milliseconds.
-   */
-  public AttemptAndTimeBoundedExponentialBackOff(
-      int maximumNumberOfAttempts, long initialIntervalMillis, long maximumTotalWaitTimeMillis) {
-    this(
-        maximumNumberOfAttempts,
-        initialIntervalMillis,
-        maximumTotalWaitTimeMillis,
-        ResetPolicy.ALL,
-        NanoClock.SYSTEM);
-  }
-
-  /**
-   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
-   *
-   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
-   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
-   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
-   *    allow more attempts in milliseconds.
-   * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
-   *    to being reset.
-   */
-  public AttemptAndTimeBoundedExponentialBackOff(
-      int maximumNumberOfAttempts,
-      long initialIntervalMillis,
-      long maximumTotalWaitTimeMillis,
-      ResetPolicy resetPolicy) {
-    this(
-        maximumNumberOfAttempts,
-        initialIntervalMillis,
-        maximumTotalWaitTimeMillis,
-        resetPolicy,
-        NanoClock.SYSTEM);
-  }
-
-  /**
-   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
-   *
-   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
-   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
-   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
-   *    allow more attempts in milliseconds.
-   * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
-   *    to being reset.
-   * @param nanoClock clock used to measure the time that has passed.
-   */
-  public AttemptAndTimeBoundedExponentialBackOff(
-      int maximumNumberOfAttempts,
-      long initialIntervalMillis,
-      long maximumTotalWaitTimeMillis,
-      ResetPolicy resetPolicy,
-      NanoClock nanoClock) {
-    super(maximumNumberOfAttempts, initialIntervalMillis);
-    checkArgument(
-        maximumTotalWaitTimeMillis > 0, "Maximum total wait time must be greater than zero.");
-    checkArgument(
-        maximumTotalWaitTimeMillis < MAX_ELAPSED_TIME_MILLIS,
-        "Maximum total wait time must be less than " + MAX_ELAPSED_TIME_MILLIS + " milliseconds");
-    checkArgument(resetPolicy != null, "resetPolicy may not be null");
-    checkArgument(nanoClock != null, "nanoClock may not be null");
-    this.maximumTotalWaitTimeMillis = maximumTotalWaitTimeMillis;
-    this.resetPolicy = resetPolicy;
-    this.nanoClock = nanoClock;
-    // Set the end time for this BackOff.  Note that we cannot simply call reset() here since the
-    // resetPolicy may not be set to reset the time bound.
-    endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
-  }
-
-  @Override
-  @SuppressFBWarnings(value = "UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR",
-      justification = "Explicitly handled in implementation.")
-  public void reset() {
-    // reset() is called in the constructor of the parent class before resetPolicy and nanoClock are
-    // set.  In this case, we call the parent class's reset() method and return.
-    if (resetPolicy == null) {
-      super.reset();
-      return;
-    }
-    // Reset the number of attempts.
-    if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.ATTEMPTS) {
-      super.reset();
-    }
-    // Reset the time bound.
-    if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.TIMER) {
-      endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
-    }
-  }
-
-  public void setEndtimeMillis(long endTimeMillis) {
-    this.endTimeMillis = endTimeMillis;
-  }
-
-  @Override
-  public long nextBackOffMillis() {
-    if (atMaxAttempts()) {
-      return BackOff.STOP;
-    }
-    long backoff = Math.min(super.nextBackOffMillis(), endTimeMillis - getTimeMillis());
-    return (backoff > 0 ? backoff : BackOff.STOP);
-  }
-
-  private long getTimeMillis() {
-    return TimeUnit.NANOSECONDS.toMillis(nanoClock.nanoTime());
-  }
-
-  @Override
-  public boolean atMaxAttempts() {
-    return super.atMaxAttempts() || getTimeMillis() >= endTimeMillis;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
deleted file mode 100644
index 5707293..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.BackOff;
-
-
-/**
- * Implementation of {@link BackOff} that increases the back off period for each retry attempt
- * using a randomization function that grows exponentially.
- *
- * <p>Example: The initial interval is .5 seconds and the maximum number of retries is 10.
- * For 10 tries the sequence will be (values in seconds):
- *
- * <pre>
- * retry#      retry_interval     randomized_interval
- * 1             0.5                [0.25,   0.75]
- * 2             0.75               [0.375,  1.125]
- * 3             1.125              [0.562,  1.687]
- * 4             1.687              [0.8435, 2.53]
- * 5             2.53               [1.265,  3.795]
- * 6             3.795              [1.897,  5.692]
- * 7             5.692              [2.846,  8.538]
- * 8             8.538              [4.269, 12.807]
- * 9            12.807              [6.403, 19.210]
- * 10           {@link BackOff#STOP}
- * </pre>
- *
- * <p>Implementation is not thread-safe.
- */
-public class AttemptBoundedExponentialBackOff implements BackOff {
-  public static final double DEFAULT_MULTIPLIER = 1.5;
-  public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
-  private final int maximumNumberOfAttempts;
-  private final long initialIntervalMillis;
-  private int currentAttempt;
-
-  public AttemptBoundedExponentialBackOff(int maximumNumberOfAttempts, long initialIntervalMillis) {
-    checkArgument(maximumNumberOfAttempts > 0,
-        "Maximum number of attempts must be greater than zero.");
-    checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero.");
-    this.maximumNumberOfAttempts = maximumNumberOfAttempts;
-    this.initialIntervalMillis = initialIntervalMillis;
-    reset();
-  }
-
-  @Override
-  public void reset() {
-    currentAttempt = 1;
-  }
-
-  @Override
-  public long nextBackOffMillis() {
-    if (currentAttempt >= maximumNumberOfAttempts) {
-      return BackOff.STOP;
-    }
-    double currentIntervalMillis = initialIntervalMillis
-        * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1);
-    double randomOffset = (Math.random() * 2 - 1)
-        * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
-    currentAttempt += 1;
-    return Math.round(currentIntervalMillis + randomOffset);
-  }
-
-  public boolean atMaxAttempts() {
-    return currentAttempt >= maximumNumberOfAttempts;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
new file mode 100644
index 0000000..479d7a8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.BackOff;
+import com.google.common.base.MoreObjects;
+import org.joda.time.Duration;
+
+/**
+ * A fluent builder for {@link BackOff} objects that allows customization of the retry algorithm.
+ *
+ * @see #DEFAULT for the default configuration parameters.
+ */
+public final class FluentBackoff {
+
+  private static final double DEFAULT_EXPONENT = 1.5;
+  private static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
+  private static final Duration DEFAULT_MIN_BACKOFF = Duration.standardSeconds(1);
+  private static final Duration DEFAULT_MAX_BACKOFF = Duration.standardDays(1000);
+  private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
+  private static final Duration DEFAULT_MAX_CUM_BACKOFF = Duration.standardDays(1000);
+
+  private final double exponent;
+  private final Duration initialBackoff;
+  private final Duration maxBackoff;
+  private final Duration maxCumulativeBackoff;
+  private final int maxRetries;
+
+  /**
+   * By default the {@link BackOff} created by this builder will use exponential backoff (base
+   * exponent 1.5) with an initial backoff of 1 second. These parameters can be overridden with
+   * {@link #withExponent(double)} and {@link #withInitialBackoff(Duration)},
+   * respectively, and the maximum backoff after exponential increase can be capped using {@link
+   * FluentBackoff#withMaxBackoff(Duration)}.
+   *
+   * <p>The default {@link BackOff} does not limit the number of retries. To limit the backoff, the
+   * maximum total number of retries can be set using {@link #withMaxRetries(int)}. The
+   * total time spent in backoff can be time-bounded as well by configuring {@link
+   * #withMaxCumulativeBackoff(Duration)}. If either of these limits are reached, calls
+   * to {@link BackOff#nextBackOffMillis()} will return {@link BackOff#STOP} to signal that no more
+   * retries should continue.
+   */
+  public static final FluentBackoff DEFAULT = new FluentBackoff(
+      DEFAULT_EXPONENT,
+      DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_MAX_CUM_BACKOFF,
+      DEFAULT_MAX_RETRIES);
+
+  /**
+   * Instantiates a {@link BackOff} that will obey the current configuration.
+   *
+   * @see FluentBackoff
+   */
+  public BackOff backoff() {
+    return new BackoffImpl(this);
+  }
+
+  /**
+   * Returns a copy of this {@link FluentBackoff} that instead uses the specified exponent to
+   * control the exponential growth of delay.
+   *
+   * <p>Does not modify this object.
+   *
+   * @see FluentBackoff
+   */
+  public FluentBackoff withExponent(double exponent) {
+    checkArgument(exponent > 0, "exponent %s must be greater than 0", exponent);
+    return new FluentBackoff(
+        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+  }
+
+  /**
+   * Returns a copy of this {@link FluentBackoff} that instead uses the specified initial backoff
+   * duration.
+   *
+   * <p>Does not modify this object.
+   *
+   * @see FluentBackoff
+   */
+  public FluentBackoff withInitialBackoff(Duration initialBackoff) {
+    checkArgument(
+        initialBackoff.isLongerThan(Duration.ZERO),
+        "initialBackoff %s must be at least 1 millisecond",
+        initialBackoff);
+    return new FluentBackoff(
+        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+  }
+
+  /**
+   * Returns a copy of this {@link FluentBackoff} that limits the maximum backoff of an individual
+   * attempt to the specified duration.
+   *
+   * <p>Does not modify this object.
+   *
+   * @see FluentBackoff
+   */
+  public FluentBackoff withMaxBackoff(Duration maxBackoff) {
+    checkArgument(
+        maxBackoff.getMillis() > 0,
+        "maxBackoff %s must be at least 1 millisecond",
+        maxBackoff);
+    return new FluentBackoff(
+        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+  }
+
+  /**
+   * Returns a copy of this {@link FluentBackoff} that limits the total time spent in backoff
+   * returned across all calls to {@link BackOff#nextBackOffMillis()}.
+   *
+   * <p>Does not modify this object.
+   *
+   * @see FluentBackoff
+   */
+  public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) {
+    checkArgument(maxCumulativeBackoff.isLongerThan(Duration.ZERO),
+        "maxCumulativeBackoff %s must be at least 1 millisecond", maxCumulativeBackoff);
+    return new FluentBackoff(
+        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+  }
+
+  /**
+   * Returns a copy of this {@link FluentBackoff} that limits the total number of retries, aka
+   * the total number of calls to {@link BackOff#nextBackOffMillis()} before returning
+   * {@link BackOff#STOP}.
+   *
+   * <p>Does not modify this object.
+   *
+   * @see FluentBackoff
+   */
+  public FluentBackoff withMaxRetries(int maxRetries) {
+    checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", maxRetries);
+    return new FluentBackoff(
+        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+  }
+
+  public String toString() {
+    return MoreObjects.toStringHelper(FluentBackoff.class)
+        .add("exponent", exponent)
+        .add("initialBackoff", initialBackoff)
+        .add("maxBackoff", maxBackoff)
+        .add("maxRetries", maxRetries)
+        .add("maxCumulativeBackoff", maxCumulativeBackoff)
+        .toString();
+  }
+
+  private static class BackoffImpl implements BackOff {
+
+    // Customization of this backoff.
+    private final FluentBackoff backoffConfig;
+    // Current state
+    private Duration currentCumulativeBackoff;
+    private int currentRetry;
+
+    @Override
+    public void reset() {
+      currentRetry = 0;
+      currentCumulativeBackoff = Duration.ZERO;
+    }
+
+    @Override
+    public long nextBackOffMillis() {
+      // Maximum number of retries reached.
+      if (currentRetry >= backoffConfig.maxRetries) {
+        return BackOff.STOP;
+      }
+      // Maximum cumulative backoff reached.
+      if (currentCumulativeBackoff.compareTo(backoffConfig.maxCumulativeBackoff) >= 0) {
+        return BackOff.STOP;
+      }
+
+      double currentIntervalMillis =
+          Math.min(
+              backoffConfig.initialBackoff.getMillis()
+                  * Math.pow(backoffConfig.exponent, currentRetry),
+              backoffConfig.maxBackoff.getMillis());
+      double randomOffset =
+          (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
+      long nextBackoffMillis = Math.round(currentIntervalMillis + randomOffset);
+      // Cap to limit on cumulative backoff
+      Duration remainingCumulative =
+          backoffConfig.maxCumulativeBackoff.minus(currentCumulativeBackoff);
+      nextBackoffMillis = Math.min(nextBackoffMillis, remainingCumulative.getMillis());
+
+      // Update state and return backoff.
+      currentCumulativeBackoff = currentCumulativeBackoff.plus(nextBackoffMillis);
+      currentRetry += 1;
+      return nextBackoffMillis;
+    }
+
+    private BackoffImpl(FluentBackoff backoffConfig) {
+      this.backoffConfig = backoffConfig;
+      this.reset();
+    }
+
+    public String toString() {
+      return MoreObjects.toStringHelper(BackoffImpl.class)
+          .add("backoffConfig", backoffConfig)
+          .add("currentRetry", currentRetry)
+          .add("currentCumulativeBackoff", currentCumulativeBackoff)
+          .toString();
+    }
+  }
+
+  private FluentBackoff(
+      double exponent, Duration initialBackoff, Duration maxBackoff, Duration maxCumulativeBackoff,
+      int maxRetries) {
+    this.exponent = exponent;
+    this.initialBackoff = initialBackoff;
+    this.maxBackoff = maxBackoff;
+    this.maxRetries = maxRetries;
+    this.maxCumulativeBackoff = maxCumulativeBackoff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 44a182e..41c372e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -66,6 +66,7 @@ import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,6 +120,9 @@ public class GcsUtil {
    */
   private static final int MAX_CONCURRENT_BATCHES = 256;
 
+  private static final FluentBackoff BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+
   /////////////////////////////////////////////////////////////////////////////
 
   /** Client for the GCS API. */
@@ -177,7 +181,7 @@ public class GcsUtil {
         // the request has strong global consistency.
         ResilientOperation.retry(
             ResilientOperation.getGoogleRequestCallable(getObject),
-            new AttemptBoundedExponentialBackOff(3, 200),
+            BACKOFF_FACTORY.backoff(),
             RetryDeterminer.SOCKET_ERRORS,
             IOException.class);
         return ImmutableList.of(gcsPattern);
@@ -216,7 +220,7 @@ public class GcsUtil {
       try {
         objects = ResilientOperation.retry(
             ResilientOperation.getGoogleRequestCallable(listObject),
-            new AttemptBoundedExponentialBackOff(3, 200),
+            BACKOFF_FACTORY.backoff(),
             RetryDeterminer.SOCKET_ERRORS,
             IOException.class);
       } catch (Exception e) {
@@ -257,7 +261,10 @@ public class GcsUtil {
    * if the resource does not exist.
    */
   public long fileSize(GcsPath path) throws IOException {
-    return fileSize(path, new AttemptBoundedExponentialBackOff(4, 200), Sleeper.DEFAULT);
+    return fileSize(
+        path,
+        BACKOFF_FACTORY.backoff(),
+        Sleeper.DEFAULT);
   }
 
   /**
@@ -335,7 +342,10 @@ public class GcsUtil {
    * be accessible otherwise the permissions exception will be propagated.
    */
   public boolean bucketExists(GcsPath path) throws IOException {
-    return bucketExists(path, new AttemptBoundedExponentialBackOff(4, 200), Sleeper.DEFAULT);
+    return bucketExists(
+        path,
+        BACKOFF_FACTORY.backoff(),
+        Sleeper.DEFAULT);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
index 519776a..6fac6dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
@@ -49,6 +49,7 @@ import com.google.api.client.util.BackOff;
  *
  * <p>Implementation is not thread-safe.
  */
+@Deprecated
 public class IntervalBoundedExponentialBackOff implements BackOff {
   public static final double DEFAULT_MULTIPLIER = 1.5;
   public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
deleted file mode 100644
index 59e0fb7..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.api.client.util.BackOff;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link AttemptAndTimeBoundedExponentialBackOff}. */
-@RunWith(JUnit4.class)
-public class AttemptAndTimeBoundedExponentialBackOffTest {
-  @Rule public ExpectedException exception = ExpectedException.none();
-  @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
-
-  @Test
-  public void testUsingInvalidInitialInterval() throws Exception {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Initial interval must be greater than zero.");
-    new AttemptAndTimeBoundedExponentialBackOff(10, 0L, 1000L);
-  }
-
-  @Test
-  public void testUsingInvalidTimeInterval() throws Exception {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Maximum total wait time must be greater than zero.");
-    new AttemptAndTimeBoundedExponentialBackOff(10, 2L, 0L);
-  }
-
-  @Test
-  public void testUsingInvalidMaximumNumberOfRetries() throws Exception {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Maximum number of attempts must be greater than zero.");
-    new AttemptAndTimeBoundedExponentialBackOff(-1, 10L, 1000L);
-  }
-
-  @Test
-  public void testThatFixedNumberOfAttemptsExits() throws Exception {
-    BackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            3,
-            500L,
-            1000L,
-            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
-            fastClock);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-
-  @Test
-  public void testThatResettingAllowsReuse() throws Exception {
-    AttemptBoundedExponentialBackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            3,
-            500,
-            1000L,
-            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
-            fastClock);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-    backOff.reset();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-
-    backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            30,
-            500,
-            1000L,
-            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
-            fastClock);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    fastClock.sleep(2000L);
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-    backOff.reset();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-  }
-
-  @Test
-  public void testThatResettingAttemptsAllowsReuse() throws Exception {
-    AttemptBoundedExponentialBackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            3,
-            500,
-            1000,
-            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
-            fastClock);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-    backOff.reset();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-
-  @Test
-  public void testThatResettingAttemptsDoesNotAllowsReuse() throws Exception {
-    AttemptBoundedExponentialBackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            30,
-            500,
-            1000L,
-            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
-            fastClock);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    fastClock.sleep(2000L);
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-    backOff.reset();
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-
-  @Test
-  public void testThatResettingTimerAllowsReuse() throws Exception {
-    AttemptBoundedExponentialBackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            30,
-            500,
-            1000L,
-            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER,
-            fastClock);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    fastClock.sleep(2000L);
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-    backOff.reset();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(561L), lessThan(1688L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(843L), lessThan(2531L)));
-  }
-
-  @Test
-  public void testThatResettingTimerDoesNotAllowReuse() throws Exception {
-    AttemptBoundedExponentialBackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            3,
-            500,
-            1000L,
-            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER,
-            fastClock);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-    backOff.reset();
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-
-  @Test
-  public void testTimeBound() throws Exception {
-    AttemptBoundedExponentialBackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            3, 500L, 5L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock);
-    assertEquals(backOff.nextBackOffMillis(), 5L);
-  }
-
-  @Test
-  public void testAtMaxAttempts() throws Exception {
-    AttemptBoundedExponentialBackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            3,
-            500L,
-            1000L,
-            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
-            fastClock);
-    assertFalse(backOff.atMaxAttempts());
-    backOff.nextBackOffMillis();
-    assertFalse(backOff.atMaxAttempts());
-    backOff.nextBackOffMillis();
-    assertTrue(backOff.atMaxAttempts());
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-
-  @Test
-  public void testAtMaxTime() throws Exception {
-    AttemptBoundedExponentialBackOff backOff =
-        new AttemptAndTimeBoundedExponentialBackOff(
-            3, 500L, 1L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock);
-    fastClock.sleep(2);
-    assertTrue(backOff.atMaxAttempts());
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
deleted file mode 100644
index 3cfa961..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.api.client.util.BackOff;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link AttemptBoundedExponentialBackOff}. */
-@RunWith(JUnit4.class)
-public class AttemptBoundedExponentialBackOffTest {
-  @Rule public ExpectedException exception = ExpectedException.none();
-
-  @Test
-  public void testUsingInvalidInitialInterval() throws Exception {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Initial interval must be greater than zero.");
-    new AttemptBoundedExponentialBackOff(10, 0L);
-  }
-
-  @Test
-  public void testUsingInvalidMaximumNumberOfRetries() throws Exception {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Maximum number of attempts must be greater than zero.");
-    new AttemptBoundedExponentialBackOff(-1, 10L);
-  }
-
-  @Test
-  public void testThatFixedNumberOfAttemptsExits() throws Exception {
-    BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-
-  @Test
-  public void testThatResettingAllowsReuse() throws Exception {
-    BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-    backOff.reset();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-
-  @Test
-  public void testAtMaxAttempts() throws Exception {
-    AttemptBoundedExponentialBackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
-    assertFalse(backOff.atMaxAttempts());
-    backOff.nextBackOffMillis();
-    assertFalse(backOff.atMaxAttempts());
-    backOff.nextBackOffMillis();
-    assertTrue(backOff.atMaxAttempts());
-    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
new file mode 100644
index 0000000..20b03cf
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.api.client.util.BackOff;
+import java.io.IOException;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FluentBackoff}.
+ */
+@RunWith(JUnit4.class)
+public class FluentBackoffTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+  private final FluentBackoff defaultBackoff = FluentBackoff.DEFAULT;
+
+  @Test
+  public void testInvalidExponent() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("exponent -2.0 must be greater than 0");
+    defaultBackoff.withExponent(-2.0);
+  }
+
+  @Test
+  public void testInvalidInitialBackoff() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("initialBackoff PT0S must be at least 1 millisecond");
+    defaultBackoff.withInitialBackoff(Duration.ZERO);
+  }
+
+  @Test
+  public void testInvalidMaxBackoff() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("maxBackoff PT0S must be at least 1 millisecond");
+    defaultBackoff.withMaxBackoff(Duration.ZERO);
+  }
+
+  @Test
+  public void testInvalidMaxRetries() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("maxRetries -1 cannot be negative");
+    defaultBackoff.withMaxRetries(-1);
+  }
+
+  @Test
+  public void testInvalidCumulativeBackoff() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("maxCumulativeBackoff PT-0.002S must be at least 1 millisecond");
+    defaultBackoff.withMaxCumulativeBackoff(Duration.millis(-2));
+  }
+
+  /**
+   * Tests with bounded interval, custom exponent, and unlimited retries.
+   */
+  @Test
+  public void testBoundedIntervalWithReset() throws Exception {
+    BackOff backOff =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(Duration.millis(500))
+            .withMaxBackoff(Duration.standardSeconds(1)).backoff();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+
+    // Reset, should go back to short times.
+    backOff.reset();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+
+  }
+
+  /**
+   * Tests with bounded interval, custom exponent, limited retries, and a reset.
+   */
+  @Test
+  public void testMaxRetriesWithReset() throws Exception {
+    BackOff backOff =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(Duration.millis(500))
+            .withMaxRetries(1)
+            .backoff();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+
+    backOff.reset();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+  }
+
+  private static long countMaximumBackoff(BackOff backOff) throws IOException {
+    long cumulativeBackoffMillis = 0;
+    long currentBackoffMillis = backOff.nextBackOffMillis();
+    while (currentBackoffMillis != BackOff.STOP) {
+      cumulativeBackoffMillis += currentBackoffMillis;
+      currentBackoffMillis = backOff.nextBackOffMillis();
+    }
+    return cumulativeBackoffMillis;
+  }
+
+  /**
+   * Tests with bounded interval, custom exponent, limited cumulative time, and a reset.
+   */
+  @Test
+  public void testBoundedIntervalAndCumTimeWithReset() throws Exception {
+    BackOff backOff =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(Duration.millis(500))
+            .withMaxBackoff(Duration.standardSeconds(1))
+            .withMaxCumulativeBackoff(Duration.standardMinutes(1)).backoff();
+
+    assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis()));
+
+    backOff.reset();
+    assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis()));
+    // sanity check: should get 0 if we don't reset
+    assertThat(countMaximumBackoff(backOff), equalTo(0L));
+
+    backOff.reset();
+    assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis()));
+  }
+
+  /**
+   * Tests with bounded interval, custom exponent, limited cumulative time and retries.
+   */
+  @Test
+  public void testBoundedIntervalAndCumTimeAndRetriesWithReset() throws Exception {
+    BackOff backOff =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(Duration.millis(500))
+            .withMaxBackoff(Duration.standardSeconds(1))
+            .withMaxCumulativeBackoff(Duration.standardMinutes(1))
+            .backoff();
+
+    long cumulativeBackoffMillis = 0;
+    long currentBackoffMillis = backOff.nextBackOffMillis();
+    while (currentBackoffMillis != BackOff.STOP) {
+      cumulativeBackoffMillis += currentBackoffMillis;
+      currentBackoffMillis = backOff.nextBackOffMillis();
+    }
+    assertThat(cumulativeBackoffMillis, equalTo(Duration.standardMinutes(1).getMillis()));
+  }
+
+  @Test
+  public void testFluentBackoffToString() throws IOException {
+    FluentBackoff config = FluentBackoff.DEFAULT
+        .withExponent(3.4)
+        .withMaxRetries(4)
+        .withInitialBackoff(Duration.standardSeconds(3))
+        .withMaxBackoff(Duration.standardHours(1))
+        .withMaxCumulativeBackoff(Duration.standardDays(1));
+
+    assertEquals(
+        "FluentBackoff{exponent=3.4, initialBackoff=PT3S, maxBackoff=PT3600S,"
+            + " maxRetries=4, maxCumulativeBackoff=PT86400S}",
+        config.toString());
+  }
+  @Test
+  public void testBackoffImplToString() throws IOException {
+    FluentBackoff config = FluentBackoff.DEFAULT
+        .withExponent(3.4)
+        .withMaxRetries(4)
+        .withInitialBackoff(Duration.standardSeconds(3))
+        .withMaxBackoff(Duration.standardHours(1))
+        .withMaxCumulativeBackoff(Duration.standardDays(1));
+    BackOff backOff = config.backoff();
+
+    assertEquals(
+        "BackoffImpl{backoffConfig=" + config.toString() + ","
+            + " currentRetry=0, currentCumulativeBackoff=PT0S}",
+        backOff.toString());
+
+    // backoff once, ignoring result
+    backOff.nextBackOffMillis();
+
+    // currentRetry is exact, we can test it.
+    assertThat(backOff.toString(), containsString("currentRetry=1"));
+    // currentCumulativeBackoff is not exact; we cannot even check that it's non-zero (randomness).
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 681b0aa..9504b4c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -365,7 +365,7 @@ public class GcsUtilTest {
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
 
-    BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200);
+    BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
 
     when(mockStorage.objects()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
@@ -376,7 +376,7 @@ public class GcsUtilTest {
 
     assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"),
         mockBackOff, new FastNanoClockAndSleeper()));
-    assertEquals(mockBackOff.nextBackOffMillis(), BackOff.STOP);
+    assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
   }
 
   @Test
@@ -390,7 +390,7 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200);
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -413,7 +413,7 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200);
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
     GoogleJsonResponseException expectedException =
         googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
             "Waves hand mysteriously", "These aren't the buckets your looking for");
@@ -438,7 +438,7 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200);
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 304dc82..6dde581 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -24,9 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.api.client.json.JsonFactory;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -69,7 +66,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -108,7 +104,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.FileIOChannelFactory;
 import org.apache.beam.sdk.util.GcsIOChannelFactory;
 import org.apache.beam.sdk.util.GcsUtil;
@@ -129,6 +124,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -294,7 +290,7 @@ public class BigQueryIO {
    *
    * <p>If the project id is omitted, the default project id is used.
    */
-  public static TableReference parseTableSpec(String tableSpec) {
+  static TableReference parseTableSpec(String tableSpec) {
     Matcher match = TABLE_SPEC.matcher(tableSpec);
     if (!match.matches()) {
       throw new IllegalArgumentException(
@@ -953,14 +949,14 @@ public class BigQueryIO {
    * ...
    */
   private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
-    // The maximum number of attempts to verify temp files.
-    private static final int MAX_FILES_VERIFY_ATTEMPTS = 10;
+    // The maximum number of retries to verify temp files.
+    private static final int MAX_FILES_VERIFY_RETRIES = 9;
 
     // The maximum number of retries to poll a BigQuery job.
     protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
     // The initial backoff for verifying temp files.
-    private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+    private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds(1);
 
     protected final String jobIdToken;
     protected final String extractDestinationDir;
@@ -1055,14 +1051,7 @@ public class BigQueryIO {
             }};
 
       List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
-      BackOff backoff = new AttemptBoundedExponentialBackOff(
-          MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS);
       for (String fileName : files) {
-        while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
-          if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) {
-            break;
-          }
-        }
         avroSources.add(new TransformingSource<>(
             AvroSource.from(fileName), function, getDefaultOutputCoder()));
       }


Mime
View raw message