beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [3/4] beam git commit: Remove google api BackOff usage from sdks/core
Date Tue, 09 May 2017 16:55:32 GMT
Remove google api BackOff usage from sdks/core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6a0c674
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6a0c674
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6a0c674

Branch: refs/heads/release-2.0.0
Commit: f6a0c67499afa9364233db822e26c80161302fa2
Parents: e282601
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Fri May 5 19:24:51 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue May 9 09:54:29 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/common/ExampleUtils.java      |  6 +-
 .../beam/examples/WindowedWordCountIT.java      |  2 +-
 .../runners/dataflow/DataflowPipelineJob.java   | 18 +++-
 .../beam/runners/dataflow/util/PackageUtil.java |  3 +-
 .../dataflow/DataflowPipelineJobTest.java       | 13 ++-
 .../runners/dataflow/util/PackageUtilTest.java  |  2 +-
 runners/spark/pom.xml                           |  5 -
 .../beam/runners/spark/io/MicrobatchSource.java |  2 +-
 sdks/java/core/pom.xml                          |  5 -
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  2 +-
 .../beam/sdk/testing/FileChecksumMatcher.java   |  2 +-
 .../java/org/apache/beam/sdk/util/BackOff.java  | 81 ++++++++++++++++
 .../org/apache/beam/sdk/util/BackOffUtils.java  | 57 ++++++++++++
 .../beam/sdk/util/ExplicitShardedFile.java      |  3 -
 .../org/apache/beam/sdk/util/FluentBackoff.java |  1 -
 .../beam/sdk/util/NumberedShardedFile.java      |  3 -
 .../org/apache/beam/sdk/util/ShardedFile.java   |  2 -
 .../java/org/apache/beam/sdk/util/Sleeper.java  | 48 ++++++++++
 .../sdk/util/UploadIdResponseInterceptor.java   | 60 ------------
 .../org/apache/beam/SdkCoreApiSurfaceTest.java  |  1 -
 .../beam/sdk/testing/ExpectedLogsTest.java      |  2 +-
 .../sdk/testing/FastNanoClockAndSleeper.java    | 47 ----------
 .../testing/FastNanoClockAndSleeperTest.java    | 47 ----------
 .../sdk/testing/FileChecksumMatcherTest.java    |  5 -
 .../beam/sdk/testing/SystemNanoTimeSleeper.java |  2 +-
 .../apache/beam/sdk/util/FluentBackoffTest.java |  1 -
 .../beam/sdk/util/NumberedShardedFileTest.java  | 14 ++-
 .../util/UploadIdResponseInterceptorTest.java   | 98 --------------------
 .../sdk/extensions/gcp/options/GcpOptions.java  |  3 +-
 .../apache/beam/sdk/util/BackOffAdapter.java    | 43 +++++++++
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 13 ++-
 .../sdk/util/UploadIdResponseInterceptor.java   | 60 ++++++++++++
 .../beam/sdk/util/FastNanoClockAndSleeper.java  | 47 ++++++++++
 .../sdk/util/FastNanoClockAndSleeperTest.java   | 47 ++++++++++
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 23 +++--
 .../util/UploadIdResponseInterceptorTest.java   | 98 ++++++++++++++++++++
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 81 +++++++---------
 .../gcp/bigquery/BigQueryTableRowIterator.java  |  7 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  6 +-
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 24 +++--
 .../sdk/io/gcp/bigquery/FakeJobService.java     | 12 ++-
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java   |  6 +-
 .../sdk/io/gcp/testing/BigqueryMatcher.java     |  4 +-
 .../sdk/io/gcp/testing/BigqueryMatcherTest.java |  7 +-
 44 files changed, 620 insertions(+), 393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 409085a..6e4698f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -20,9 +20,6 @@ package org.apache.beam.examples.common;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
 import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.Bigquery.Datasets;
 import com.google.api.services.bigquery.Bigquery.Tables;
@@ -51,8 +48,11 @@ import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.util.Transport;
 import org.joda.time.Duration;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index f7e35c0..93c4543 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -19,7 +19,6 @@ package org.apache.beam.examples;
 
 import static org.hamcrest.Matchers.equalTo;
 
-import com.google.api.client.util.Sleeper;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -47,6 +46,7 @@ import org.apache.beam.sdk.util.ExplicitShardedFile;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.NumberedShardedFile;
 import org.apache.beam.sdk.util.ShardedFile;
+import org.apache.beam.sdk.util.Sleeper;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 23084ed..2d23983 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -46,6 +46,7 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.util.BackOffAdapter;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -285,9 +286,10 @@ public class DataflowPipelineJob implements PipelineResult {
 
     BackOff backoff;
     if (!duration.isLongerThan(Duration.ZERO)) {
-      backoff = MESSAGES_BACKOFF_FACTORY.backoff();
+      backoff = BackOffAdapter.toGcpBackOff(MESSAGES_BACKOFF_FACTORY.backoff());
     } else {
-      backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff();
+      backoff = BackOffAdapter.toGcpBackOff(
+          MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff());
     }
 
     // This function tracks the cumulative time from the *first request* to enforce the wall-clock
@@ -299,7 +301,10 @@ public class DataflowPipelineJob implements PipelineResult {
     do {
       // Get the state of the job before listing messages. This ensures we always fetch job
       // messages after the job finishes to ensure we have all them.
-      state = getStateWithRetries(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff(), sleeper);
+      state = getStateWithRetries(
+          BackOffAdapter.toGcpBackOff(
+              STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
+          sleeper);
       boolean hasError = state == State.UNKNOWN;
 
       if (messageHandler != null && !hasError) {
@@ -354,7 +359,8 @@ public class DataflowPipelineJob implements PipelineResult {
           Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
           Duration remaining = duration.minus(consumed);
           if (remaining.isLongerThan(Duration.ZERO)) {
-            backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff();
+            backoff = BackOffAdapter.toGcpBackOff(
+                MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff());
           } else {
             // If there is no time remaining, don't bother backing off.
             backoff = BackOff.STOP_BACKOFF;
@@ -437,7 +443,9 @@ public class DataflowPipelineJob implements PipelineResult {
       return terminalState;
     }
 
-    return getStateWithRetries(STATUS_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+    return getStateWithRetries(
+        BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.backoff()),
+        Sleeper.DEFAULT);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 5ddcd29..931f7ea 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -54,6 +54,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.util.BackOffAdapter;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.ZipFiles;
 import org.joda.time.Duration;
@@ -210,7 +211,7 @@ class PackageUtil {
       }
 
       // Upload file, retrying on failure.
-      BackOff backoff = BACKOFF_FACTORY.backoff();
+      BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
       while (true) {
         try {
           LOG.debug("Uploading classpath element {} to {}", source, target);

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index f868a17..e95babb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -57,8 +57,9 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -347,7 +348,10 @@ public class DataflowPipelineJobTest {
 
     assertEquals(
         State.RUNNING,
-        job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock));
+        job.getStateWithRetries(
+            BackOffAdapter.toGcpBackOff(
+                DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
+            fastClock));
   }
 
   @Test
@@ -368,7 +372,10 @@ public class DataflowPipelineJobTest {
     long startTime = fastClock.nanoTime();
     assertEquals(
         State.UNKNOWN,
-        job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock));
+        job.getStateWithRetries(
+            BackOffAdapter.toGcpBackOff(
+                DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
+            fastClock));
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
     checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL,
         DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff);

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 4ae3a77..c7a660e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -79,8 +79,8 @@ import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.testing.RegexMatcher;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
 import org.apache.beam.sdk.util.MimeTypes;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ddec7e7..6991171 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -190,11 +190,6 @@
       <artifactId>joda-time</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.google.http-client</groupId>
-      <artifactId>google-http-client</artifactId>
-      <version>${google-clients.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 53d1ba7..3b48caf 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.spark.io;
 
-import com.google.api.client.util.BackOff;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -36,6 +35,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.joda.time.Duration;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6620a08..a3967a2 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -125,11 +125,6 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.google.http-client</groupId>
-      <artifactId>google-http-client</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index d9adf92..c882447 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import com.google.api.client.util.BackOff;
 import com.google.auto.value.AutoValue;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.values.PBegin;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
index 82a6b71..5ed0525 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.testing;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.api.client.util.Sleeper;
 import com.google.common.base.Strings;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.NumberedShardedFile;
 import org.apache.beam.sdk.util.ShardedFile;
+import org.apache.beam.sdk.util.Sleeper;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
new file mode 100644
index 0000000..5bc6027
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+
+/**
+ * Back-off policy when retrying an operation.
+ *
+ * <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency.
+ */
+public interface BackOff {
+
+  /** Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */
+  long STOP = -1L;
+
+  /** Reset to initial state. */
+  void reset() throws IOException;
+
+  /**
+   * Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to
+   * indicate that no retries should be made.
+   *
+   * <p>
+   * Example usage:
+   * </p>
+   *
+   * <pre>
+   long backOffMillis = backoff.nextBackOffMillis();
+   if (backOffMillis == Backoff.STOP) {
+   // do not retry operation
+   } else {
+   // sleep for backOffMillis milliseconds and retry operation
+   }
+   * </pre>
+   */
+  long nextBackOffMillis() throws IOException;
+
+  /**
+   * Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried
+   * immediately without waiting.
+   */
+  BackOff ZERO_BACKOFF = new BackOff() {
+
+    public void reset() throws IOException {
+    }
+
+    public long nextBackOffMillis() throws IOException {
+      return 0;
+    }
+  };
+
+  /**
+   * Fixed back-off policy that always returns {@code #STOP} for {@link #nextBackOffMillis()},
+   * meaning that the operation should not be retried.
+   */
+  BackOff STOP_BACKOFF = new BackOff() {
+
+    public void reset() throws IOException {
+    }
+
+    public long nextBackOffMillis() throws IOException {
+      return STOP;
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
new file mode 100644
index 0000000..aa7461c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+
+/**
+ * Utilities for {@link BackOff}.
+ *
+ * <p><b>Note</b>: This is copied from Google API client library to avoid its dependency.
+ */
+public final class BackOffUtils {
+
+  /**
+   * Runs the next iteration of the back-off policy, and returns whether to continue to retry the
+   * operation.
+   *
+   * <p>
+   * If {@code true}, it will call {@link Sleeper#sleep(long)} with the specified number of
+   * milliseconds from {@link BackOff#nextBackOffMillis()}.
+   * </p>
+   *
+   * @param sleeper sleeper
+   * @param backOff back-off policy
+   * @return whether to continue to back off; in other words, whether
+   *         {@link BackOff#nextBackOffMillis()} did not return {@link BackOff#STOP}
+   * @throws InterruptedException if any thread has interrupted the current thread
+   */
+  public static boolean next(Sleeper sleeper, BackOff backOff)
+      throws InterruptedException, IOException {
+    long backOffTime = backOff.nextBackOffMillis();
+    if (backOffTime == BackOff.STOP) {
+      return false;
+    }
+    sleeper.sleep(backOffTime);
+    return true;
+  }
+
+  private BackOffUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
index 0f184de..50e5ed1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
@@ -18,9 +18,6 @@
 
 package org.apache.beam.sdk.util;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
index 479d7a8..468b742 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.api.client.util.BackOff;
 import com.google.common.base.MoreObjects;
 import org.joda.time.Duration;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
index e18dd96..8889358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -21,9 +21,6 @@ package org.apache.beam.sdk.util;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
index ec9ed64..5961c4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.sdk.util;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java
new file mode 100644
index 0000000..d180ec6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+/**
+ * Sleeper interface to use for requesting the current thread to sleep as specified in
+ * {@link Thread#sleep(long)}.
+ *
+ * <p>
+ * The default implementation can be accessed at {@link #DEFAULT}. Primarily used for testing.
+ * </p>
+ *
+ * <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency.
+ */
+public interface Sleeper {
+
+  /**
+   * Causes the currently executing thread to sleep (temporarily cease execution) for the specified
+   * number of milliseconds as specified in {@link Thread#sleep(long)}.
+   *
+   * @param millis length of time to sleep in milliseconds
+   * @throws InterruptedException if any thread has interrupted the current thread
+   */
+  void sleep(long millis) throws InterruptedException;
+
+  /** Provides the default implementation based on {@link Thread#sleep(long)}. */
+  Sleeper DEFAULT = new Sleeper() {
+
+    public void sleep(long millis) throws InterruptedException {
+      Thread.sleep(millis);
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
deleted file mode 100644
index f685b69..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.http.GenericUrl;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpResponseInterceptor;
-import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a response intercepter that logs the upload id if the upload
- * id header exists and it is the first request (does not have upload_id parameter in the request).
- * Only logs if debug level is enabled.
- */
-public class UploadIdResponseInterceptor implements HttpResponseInterceptor {
-
-  private static final Logger LOG = LoggerFactory.getLogger(UploadIdResponseInterceptor.class);
-  private static final String UPLOAD_ID_PARAM = "upload_id";
-  private static final String UPLOAD_TYPE_PARAM = "uploadType";
-  private static final String UPLOAD_HEADER = "X-GUploader-UploadID";
-
-  @Override
-  public void interceptResponse(HttpResponse response) throws IOException {
-    if (!LOG.isDebugEnabled()) {
-      return;
-    }
-    String uploadId = response.getHeaders().getFirstHeaderStringValue(UPLOAD_HEADER);
-    if (uploadId == null) {
-      return;
-    }
-
-    GenericUrl url = response.getRequest().getUrl();
-    // The check for no upload id limits the output to one log line per upload.
-    // The check for upload type makes sure this is an upload and not a read.
-    if (url.get(UPLOAD_ID_PARAM) == null && url.get(UPLOAD_TYPE_PARAM) != null) {
-      LOG.debug(
-          "Upload ID for url {} on worker {} is {}",
-          url,
-          System.getProperty("worker_id"),
-          uploadId);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index 26dd9f9..080f34a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -38,7 +38,6 @@ public class SdkCoreApiSurfaceTest {
     final Set<String> allowed =
         ImmutableSet.of(
             "org.apache.beam",
-            "com.google.api.client",
             "com.fasterxml.jackson.annotation",
             "com.fasterxml.jackson.core",
             "com.fasterxml.jackson.databind",

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
index 1762d0d..d307bed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
@@ -38,7 +38,7 @@ import org.junit.runners.model.Statement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Tests for {@link FastNanoClockAndSleeper}. */
+/** Tests for {@link ExpectedLogs}. */
 @RunWith(JUnit4.class)
 public class ExpectedLogsTest {
   private static final Logger LOG = LoggerFactory.getLogger(ExpectedLogsTest.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java
deleted file mode 100644
index 6bfafa5..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.TestRule;
-
-/**
- * This object quickly moves time forward based upon how much it has been asked to sleep,
- * without actually sleeping, to simulate the backoff.
- */
-public class FastNanoClockAndSleeper extends ExternalResource
-    implements NanoClock, Sleeper, TestRule {
-  private long fastNanoTime;
-
-  @Override
-  public long nanoTime() {
-    return fastNanoTime;
-  }
-
-  @Override
-  protected void before() throws Throwable {
-    fastNanoTime = NanoClock.SYSTEM.nanoTime();
-  }
-
-  @Override
-  public void sleep(long millis) throws InterruptedException {
-    fastNanoTime += millis * 1000000L;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java
deleted file mode 100644
index 7d20951..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.TimeUnit;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link FastNanoClockAndSleeper}. */
-@RunWith(JUnit4.class)
-public class FastNanoClockAndSleeperTest {
-  @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
-
-  @Test
-  public void testClockAndSleeper() throws Exception {
-    long sleepTimeMs = TimeUnit.SECONDS.toMillis(30);
-    long sleepTimeNano = TimeUnit.MILLISECONDS.toNanos(sleepTimeMs);
-    long fakeTimeNano = fastNanoClockAndSleeper.nanoTime();
-    long startTimeNano = System.nanoTime();
-    fastNanoClockAndSleeper.sleep(sleepTimeMs);
-    long maxTimeNano = startTimeNano + TimeUnit.SECONDS.toNanos(1);
-    // Verify that actual time didn't progress as much as was requested
-    assertTrue(System.nanoTime() < maxTimeNano);
-    // Verify that the fake time did go up by the amount requested
-    assertEquals(fakeTimeNano + sleepTimeNano, fastNanoClockAndSleeper.nanoTime());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
index 4ee6750..80f02e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.testing;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 
-import com.google.api.client.util.BackOff;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
@@ -43,14 +42,10 @@ public class FileChecksumMatcherTest {
   public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule
   public ExpectedException thrown = ExpectedException.none();
-  @Rule
-  public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
 
   @Mock
   private PipelineResult pResult = Mockito.mock(PipelineResult.class);
 
-  private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff();
-
   @Test
   public void testPreconditionChecksumIsNull() throws IOException {
     String tmpPath = tmpFolder.newFile().getPath();

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
index 3677e84..dd57669 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.sdk.testing;
 
-import com.google.api.client.util.Sleeper;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
+import org.apache.beam.sdk.util.Sleeper;
 
 /**
  * This class provides an expensive sleeper to deal with issues around Java's

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
index 20b03cf..e810278 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
@@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
-import com.google.api.client.util.BackOff;
 import java.io.IOException;
 import org.joda.time.Duration;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
index 43a9166..cf8c722 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
@@ -25,16 +25,13 @@ import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 
-import com.google.api.client.util.BackOff;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.regex.Pattern;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.LocalResources;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -42,17 +39,18 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
 
 /** Tests for {@link NumberedShardedFile}. */
 @RunWith(JUnit4.class)
 public class NumberedShardedFileTest {
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
-
-  @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class);
+  private Sleeper fastClock = new Sleeper() {
+    @Override
+    public void sleep(long millis) throws InterruptedException {
+      // No sleep.
+    }
+  };
 
   private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff();
   private String filePattern;

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
deleted file mode 100644
index 8b9f77e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.http.GenericUrl;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.testing.http.HttpTesting;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import java.io.IOException;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * A test for {@link UploadIdResponseInterceptor}.
- */
-
-@RunWith(JUnit4.class)
-public class UploadIdResponseInterceptorTest {
-
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-  // Note that expected logs also turns on debug logging.
-  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(UploadIdResponseInterceptor.class);
-
-  /**
-   * Builds a HttpResponse with the given string response.
-   *
-   * @param header header value to provide or null if none.
-   * @param uploadId upload id to provide in the url upload id param or null if none.
-   * @param uploadType upload type to provide in url upload type param or null if none.
-   * @return HttpResponse with the given parameters
-   * @throws IOException
-   */
-  private HttpResponse buildHttpResponse(String header, String uploadId, String uploadType)
-      throws IOException {
-    MockHttpTransport.Builder builder = new MockHttpTransport.Builder();
-    MockLowLevelHttpResponse resp = new MockLowLevelHttpResponse();
-    builder.setLowLevelHttpResponse(resp);
-    resp.setStatusCode(200);
-    GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL);
-    if (header != null) {
-      resp.addHeader("X-GUploader-UploadID", header);
-    }
-    if (uploadId != null) {
-      url.put("upload_id", uploadId);
-    }
-    if (uploadType != null) {
-      url.put("uploadType", uploadType);
-    }
-    return builder.build().createRequestFactory().buildGetRequest(url).execute();
-  }
-
-  /**
-   * Tests the responses that should not log.
-   */
-  @Test
-  public void testResponseNoLogging() throws IOException {
-    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, null));
-    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", null));
-    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", null));
-    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", null, null));
-    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, "type"));
-    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", "type"));
-    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", "type"));
-    expectedLogs.verifyNotLogged("");
-  }
-
-  /**
-   * Check that a response logs with the correct log.
-   */
-  @Test
-  public void testResponseLogs() throws IOException {
-    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("abc", null, "type"));
-    GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL);
-    url.put("uploadType", "type");
-    String worker = System.getProperty("worker_id");
-    expectedLogs.verifyDebug("Upload ID for url " + url + " on worker " + worker + " is abc");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index a4128e8..985520f 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.BackOffAdapter;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
@@ -319,7 +320,7 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
       return getProjectNumber(
           projectId,
           crmClient,
-          BACKOFF_FACTORY.backoff(),
+          BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()),
           Sleeper.DEFAULT);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java
new file mode 100644
index 0000000..e5a0a6e
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+
+/**
+ * An adapter for converting between Apache Beam and Google API client representations of backoffs.
+ */
+public class BackOffAdapter {
+  /**
+   * Returns an adapter to convert from {@link BackOff} to
+   * {@link com.google.api.client.util.BackOff}.
+   */
+  public static com.google.api.client.util.BackOff toGcpBackOff(final BackOff backOff) {
+    return new com.google.api.client.util.BackOff() {
+      @Override
+      public void reset() throws IOException {
+        backOff.reset();
+      }
+
+      @Override
+      public long nextBackOffMillis() throws IOException {
+        return backOff.nextBackOffMillis();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index ee2e231..18e3e2b 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -306,6 +306,9 @@ public class GcsUtil {
     return uploadBufferSizeBytes;
   }
 
+  private static BackOff createBackOff() {
+    return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
+  }
   /**
    * Returns the file size from GCS or throws {@link FileNotFoundException}
    * if the resource does not exist.
@@ -318,7 +321,7 @@ public class GcsUtil {
    * Returns the {@link StorageObject} for the given {@link GcsPath}.
    */
   public StorageObject getObject(GcsPath gcsPath) throws IOException {
-    return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+    return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT);
   }
 
   @VisibleForTesting
@@ -377,7 +380,7 @@ public class GcsUtil {
     try {
       return ResilientOperation.retry(
           ResilientOperation.getGoogleRequestCallable(listObject),
-          BACKOFF_FACTORY.backoff(),
+          createBackOff(),
           RetryDeterminer.SOCKET_ERRORS,
           IOException.class);
     } catch (Exception e) {
@@ -469,7 +472,7 @@ public class GcsUtil {
   public boolean bucketAccessible(GcsPath path) throws IOException {
     return bucketAccessible(
         path,
-        BACKOFF_FACTORY.backoff(),
+        createBackOff(),
         Sleeper.DEFAULT);
   }
 
@@ -482,7 +485,7 @@ public class GcsUtil {
   public long bucketOwner(GcsPath path) throws IOException {
     return getBucket(
         path,
-        BACKOFF_FACTORY.backoff(),
+        createBackOff(),
         Sleeper.DEFAULT).getProjectNumber().longValue();
   }
 
@@ -492,7 +495,7 @@ public class GcsUtil {
    */
   public void createBucket(String projectId, Bucket bucket) throws IOException {
     createBucket(
-        projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+        projectId, bucket, createBackOff(), Sleeper.DEFAULT);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
new file mode 100644
index 0000000..f685b69
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseInterceptor;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a response intercepter that logs the upload id if the upload
+ * id header exists and it is the first request (does not have upload_id parameter in the request).
+ * Only logs if debug level is enabled.
+ */
+public class UploadIdResponseInterceptor implements HttpResponseInterceptor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(UploadIdResponseInterceptor.class);
+  private static final String UPLOAD_ID_PARAM = "upload_id";
+  private static final String UPLOAD_TYPE_PARAM = "uploadType";
+  private static final String UPLOAD_HEADER = "X-GUploader-UploadID";
+
+  @Override
+  public void interceptResponse(HttpResponse response) throws IOException {
+    if (!LOG.isDebugEnabled()) {
+      return;
+    }
+    String uploadId = response.getHeaders().getFirstHeaderStringValue(UPLOAD_HEADER);
+    if (uploadId == null) {
+      return;
+    }
+
+    GenericUrl url = response.getRequest().getUrl();
+    // The check for no upload id limits the output to one log line per upload.
+    // The check for upload type makes sure this is an upload and not a read.
+    if (url.get(UPLOAD_ID_PARAM) == null && url.get(UPLOAD_TYPE_PARAM) != null) {
+      LOG.debug(
+          "Upload ID for url {} on worker {} is {}",
+          url,
+          System.getProperty("worker_id"),
+          uploadId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
new file mode 100644
index 0000000..f1392d7
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TestRule;
+
+/**
+ * This object quickly moves time forward based upon how much it has been asked to sleep,
+ * without actually sleeping, to simulate the backoff.
+ */
+public class FastNanoClockAndSleeper extends ExternalResource
+    implements NanoClock, Sleeper, TestRule {
+  private long fastNanoTime;
+
+  @Override
+  public long nanoTime() {
+    return fastNanoTime;
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    fastNanoTime = SYSTEM.nanoTime();
+  }
+
+  @Override
+  public void sleep(long millis) throws InterruptedException {
+    fastNanoTime += millis * 1000000L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
new file mode 100644
index 0000000..03f9588
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FastNanoClockAndSleeper}. */
+@RunWith(JUnit4.class)
+public class FastNanoClockAndSleeperTest {
+  @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+
+  @Test
+  public void testClockAndSleeper() throws Exception {
+    long sleepTimeMs = TimeUnit.SECONDS.toMillis(30);
+    long sleepTimeNano = TimeUnit.MILLISECONDS.toNanos(sleepTimeMs);
+    long fakeTimeNano = fastNanoClockAndSleeper.nanoTime();
+    long startTimeNano = System.nanoTime();
+    fastNanoClockAndSleeper.sleep(sleepTimeMs);
+    long maxTimeNano = startTimeNano + TimeUnit.SECONDS.toNanos(1);
+    // Verify that actual time didn't progress as much as was requested
+    assertTrue(System.nanoTime() < maxTimeNano);
+    // Verify that the fake time did go up by the amount requested
+    assertEquals(fakeTimeNano + sleepTimeNano, fastNanoClockAndSleeper.nanoTime());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 6ffcaeb..0af584e 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -76,7 +76,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.junit.Rule;
@@ -374,7 +373,8 @@ public class GcsUtilTest {
     Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
     Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
 
-    BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+        FluentBackoff.DEFAULT.withMaxRetries(2).backoff());
 
     when(mockStorage.objects()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
@@ -491,7 +491,7 @@ public class GcsUtilTest {
 
     Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
 
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.insert(
@@ -514,7 +514,7 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
 
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
     GoogleJsonResponseException expectedException =
         googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
             "Waves hand mysteriously", "These aren't the buckets you're looking for");
@@ -541,7 +541,8 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+        FluentBackoff.DEFAULT.backoff());
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -564,7 +565,8 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+        FluentBackoff.DEFAULT.backoff());
     GoogleJsonResponseException expectedException =
         googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
             "Waves hand mysteriously", "These aren't the buckets you're looking for");
@@ -589,7 +591,8 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+        FluentBackoff.DEFAULT.backoff());
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -612,7 +615,8 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+        FluentBackoff.DEFAULT.backoff());
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -635,7 +639,8 @@ public class GcsUtilTest {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+        FluentBackoff.DEFAULT.backoff());
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
new file mode 100644
index 0000000..8b9f77e
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.testing.http.HttpTesting;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import java.io.IOException;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * A test for {@link UploadIdResponseInterceptor}.
+ */
+
+@RunWith(JUnit4.class)
+public class UploadIdResponseInterceptorTest {
+
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  // Note that expected logs also turns on debug logging.
+  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(UploadIdResponseInterceptor.class);
+
+  /**
+   * Builds a HttpResponse with the given string response.
+   *
+   * @param header header value to provide or null if none.
+   * @param uploadId upload id to provide in the url upload id param or null if none.
+   * @param uploadType upload type to provide in url upload type param or null if none.
+   * @return HttpResponse with the given parameters
+   * @throws IOException
+   */
+  private HttpResponse buildHttpResponse(String header, String uploadId, String uploadType)
+      throws IOException {
+    MockHttpTransport.Builder builder = new MockHttpTransport.Builder();
+    MockLowLevelHttpResponse resp = new MockLowLevelHttpResponse();
+    builder.setLowLevelHttpResponse(resp);
+    resp.setStatusCode(200);
+    GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL);
+    if (header != null) {
+      resp.addHeader("X-GUploader-UploadID", header);
+    }
+    if (uploadId != null) {
+      url.put("upload_id", uploadId);
+    }
+    if (uploadType != null) {
+      url.put("uploadType", uploadType);
+    }
+    return builder.build().createRequestFactory().buildGetRequest(url).execute();
+  }
+
+  /**
+   * Tests the responses that should not log.
+   */
+  @Test
+  public void testResponseNoLogging() throws IOException {
+    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, null));
+    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", null));
+    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", null));
+    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", null, null));
+    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, "type"));
+    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", "type"));
+    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", "type"));
+    expectedLogs.verifyNotLogged("");
+  }
+
+  /**
+   * Check that a response logs with the correct log.
+   */
+  @Test
+  public void testResponseLogs() throws IOException {
+    new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("abc", null, "type"));
+    GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL);
+    url.put("uploadType", "type");
+    String worker = System.getProperty("worker_id");
+    expectedLogs.verifyDebug("Upload ID for url " + url + " on worker " + worker + " is abc");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index b348abd..5d5a519 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.BackOffAdapter;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
@@ -93,6 +94,9 @@ class BigQueryServicesImpl implements BigQueryServices {
   // The initial backoff for polling the status of a BigQuery job.
   private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds(1);
 
+  private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF);
+
   @Override
   public JobService getJobService(BigQueryOptions options) {
     return new JobServiceImpl(options);
@@ -114,6 +118,10 @@ class BigQueryServicesImpl implements BigQueryServices {
     return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig);
   }
 
+  private static BackOff createDefaultBackoff() {
+    return BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff());
+  }
+
   @VisibleForTesting
   static class JobServiceImpl implements BigQueryServices.JobService {
     private final ApiErrorExtractor errorExtractor;
@@ -205,10 +213,7 @@ class BigQueryServicesImpl implements BigQueryServices {
     private static void startJob(Job job,
       ApiErrorExtractor errorExtractor,
       Bigquery client) throws IOException, InterruptedException {
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
-      startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
+      startJob(job, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff());
     }
 
     @VisibleForTesting
@@ -249,11 +254,12 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException {
       BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(maxAttempts)
-              .withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF)
-              .withMaxBackoff(Duration.standardMinutes(1))
-              .backoff();
+          BackOffAdapter.toGcpBackOff(
+              FluentBackoff.DEFAULT
+                  .withMaxRetries(maxAttempts)
+                  .withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF)
+                  .withMaxBackoff(Duration.standardMinutes(1))
+                  .backoff());
       return pollJob(jobRef, Sleeper.DEFAULT, backoff);
     }
 
@@ -299,16 +305,13 @@ class BigQueryServicesImpl implements BigQueryServices {
           .setConfiguration(new JobConfiguration()
               .setQuery(queryConfig)
               .setDryRun(true));
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       return executeWithRetries(
           client.jobs().insert(projectId, job),
           String.format(
               "Unable to dry run query: %s, aborting after %d retries.",
               queryConfig, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff,
+          createDefaultBackoff(),
           ALWAYS_RETRY).getStatistics();
     }
 
@@ -321,10 +324,7 @@ class BigQueryServicesImpl implements BigQueryServices {
      */
     @Override
     public Job getJob(JobReference jobRef) throws IOException, InterruptedException {
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
-     return getJob(jobRef, Sleeper.DEFAULT, backoff);
+     return getJob(jobRef, Sleeper.DEFAULT, createDefaultBackoff());
     }
 
     @VisibleForTesting
@@ -371,7 +371,7 @@ class BigQueryServicesImpl implements BigQueryServices {
         FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
 
     // A backoff for rate limit exceeded errors. Retries forever.
-    private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
+    private static final FluentBackoff RATE_LIMIT_BACKOFF_FACTORY =
         FluentBackoff.DEFAULT
             .withInitialBackoff(Duration.standardSeconds(1))
             .withMaxBackoff(Duration.standardMinutes(2));
@@ -420,10 +420,7 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Nullable
     public Table getTable(TableReference tableRef)
         throws IOException, InterruptedException {
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
-      return getTable(tableRef, backoff, Sleeper.DEFAULT);
+      return getTable(tableRef, createDefaultBackoff(), Sleeper.DEFAULT);
     }
 
     @VisibleForTesting
@@ -528,9 +525,6 @@ class BigQueryServicesImpl implements BigQueryServices {
      */
     @Override
     public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       executeWithRetries(
           client.tables().delete(
               tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
@@ -538,16 +532,13 @@ class BigQueryServicesImpl implements BigQueryServices {
               "Unable to delete table: %s, aborting after %d retries.",
               tableRef.getTableId(), MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff,
+          createDefaultBackoff(),
           ALWAYS_RETRY);
     }
 
     @Override
     public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException {
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
-      return isTableEmpty(tableRef, backoff, Sleeper.DEFAULT);
+      return isTableEmpty(tableRef, createDefaultBackoff(), Sleeper.DEFAULT);
     }
 
     @VisibleForTesting
@@ -575,16 +566,13 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public Dataset getDataset(String projectId, String datasetId)
         throws IOException, InterruptedException {
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       return executeWithRetries(
           client.datasets().get(projectId, datasetId),
           String.format(
               "Unable to get dataset: %s, aborting after %d retries.",
               datasetId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff,
+          createDefaultBackoff(),
           DONT_RETRY_NOT_FOUND);
     }
 
@@ -599,10 +587,8 @@ class BigQueryServicesImpl implements BigQueryServices {
     public void createDataset(
         String projectId, String datasetId, @Nullable String location, @Nullable String description)
         throws IOException, InterruptedException {
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
-      createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
+      createDataset(
+          projectId, datasetId, location, description, Sleeper.DEFAULT, createDefaultBackoff());
     }
 
     private void createDataset(
@@ -659,16 +645,13 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public void deleteDataset(String projectId, String datasetId)
         throws IOException, InterruptedException {
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       executeWithRetries(
           client.datasets().delete(projectId, datasetId),
           String.format(
               "Unable to delete table: %s, aborting after %d retries.",
               datasetId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff,
+          createDefaultBackoff(),
           ALWAYS_RETRY);
     }
 
@@ -725,7 +708,9 @@ class BigQueryServicesImpl implements BigQueryServices {
                 executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
                   @Override
                   public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
-                    BackOff backoff = DEFAULT_BACKOFF_FACTORY.backoff();
+                    // A backoff for rate limit exceeded errors. Retries forever.
+                    BackOff backoff = BackOffAdapter.toGcpBackOff(
+                        RATE_LIMIT_BACKOFF_FACTORY.backoff());
                     while (true) {
                       try {
                         return insert.execute().getInsertErrors();
@@ -811,7 +796,10 @@ class BigQueryServicesImpl implements BigQueryServices {
         TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
         throws IOException, InterruptedException {
       return insertAll(
-          ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+          ref, rowList, insertIdList,
+          BackOffAdapter.toGcpBackOff(
+              INSERT_BACKOFF_FACTORY.backoff()),
+          Sleeper.DEFAULT);
     }
 
 
@@ -822,9 +810,6 @@ class BigQueryServicesImpl implements BigQueryServices {
       Table table = new Table();
       table.setDescription(tableDescription);
 
-      BackOff backoff =
-          FluentBackoff.DEFAULT
-              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       return executeWithRetries(
           client.tables().patch(
               tableReference.getProjectId(),
@@ -835,7 +820,7 @@ class BigQueryServicesImpl implements BigQueryServices {
               "Unable to patch table description: %s, aborting after %d retries.",
               tableReference, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff,
+          createDefaultBackoff(),
           ALWAYS_RETRY);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 59f2bb6..ba19cf0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -57,6 +57,8 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.util.BackOffAdapter;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -453,8 +455,9 @@ class BigQueryTableRowIterator implements AutoCloseable {
       throws IOException, InterruptedException {
     Sleeper sleeper = Sleeper.DEFAULT;
     BackOff backOff =
-        FluentBackoff.DEFAULT
-            .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff();
+        BackOffAdapter.toGcpBackOff(
+            FluentBackoff.DEFAULT
+                .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff());
 
     T result = null;
     while (true) {


Mime
View raw message