beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [beam] branch master updated: [BEAM-6180] Remove duplicated IdGenerator from runner harness and use IdGenerator from fnexecution instead. (#7201)
Date Wed, 05 Dec 2018 00:44:56 GMT
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f9330c  [BEAM-6180] Remove duplicated IdGenerator from runner harness and use IdGenerator from fnexecution instead. (#7201)
2f9330c is described below

commit 2f9330c429d8ee800c57271c755b529b211e82cf
Author: Boyuan Zhang <36090911+boyuanzz@users.noreply.github.com>
AuthorDate: Tue Dec 4 16:44:49 2018 -0800

    [BEAM-6180] Remove duplicated IdGenerator from runner harness and use IdGenerator from fnexecution instead. (#7201)
---
 .../dataflow/worker/BatchDataflowWorker.java       | 24 +++++++-------
 .../worker/BeamFnMapTaskExecutorFactory.java       |  8 ++---
 .../worker/DataflowMapTaskExecutorFactory.java     |  4 +--
 ...FetchAndFilterStreamingSideInputsOperation.java |  6 ++--
 .../dataflow/worker/FnApiWindowMappingFn.java      | 12 +++----
 .../worker/IntrinsicMapTaskExecutorFactory.java    |  4 +--
 .../dataflow/worker/StreamingDataflowWorker.java   | 23 +++++++------
 .../FixMultiOutputInfosOnParDoInstructions.java    |  8 ++---
 .../runners/dataflow/worker/fn/IdGenerator.java    | 34 -------------------
 .../control/RegisterAndProcessBundleOperation.java | 12 +++----
 .../fn/data/RemoteGrpcPortReadOperation.java       |  8 ++---
 .../fn/data/RemoteGrpcPortWriteOperation.java      |  9 ++---
 .../graph/CreateRegisterFnOperationFunction.java   |  9 ++---
 .../worker/graph/RegisterNodeFunction.java         | 24 +++++++-------
 .../dataflow/worker/FnApiWindowMappingFnTest.java  |  4 +--
 .../IntrinsicMapTaskExecutorFactoryTest.java       | 11 ++++---
 ...FixMultiOutputInfosOnParDoInstructionsTest.java | 20 +++---------
 .../dataflow/worker/fn/IdGeneratorTest.java        | 38 ----------------------
 .../fn/control/BeamFnMapTaskExecutorTest.java      | 20 +++---------
 .../RegisterAndProcessBundleOperationTest.java     | 34 +++++++++++--------
 .../fn/data/RemoteGrpcPortReadOperationTest.java   | 14 ++++----
 .../fn/data/RemoteGrpcPortWriteOperationTest.java  | 16 ++++-----
 .../CreateRegisterFnOperationFunctionTest.java     |  4 +--
 23 files changed, 131 insertions(+), 215 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index 2666dea..1433702 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -36,7 +36,6 @@ import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness;
 import org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction;
 import org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction;
 import org.apache.beam.runners.dataflow.worker.graph.DeduceFlattenLocationsFunction;
@@ -53,6 +52,8 @@ import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.util.Weighted;
 import org.apache.beam.sdk.util.WeightedValue;
 import org.slf4j.Logger;
@@ -80,6 +81,9 @@ public class BatchDataflowWorker implements Closeable {
   /** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */
   private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
 
+  /** The idGenerator to generate unique id globally. */
+  private static final IdGenerator idGenerator = IdGenerators.decrementingLongs();
+
   /**
    * Function which converts map tasks to their network representation for execution.
    *
@@ -91,7 +95,7 @@ public class BatchDataflowWorker implements Closeable {
    * </ul>
    */
   private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToBaseNetwork =
-      new FixMultiOutputInfosOnParDoInstructions(IdGenerator::generate)
+      new FixMultiOutputInfosOnParDoInstructions(idGenerator)
           .andThen(new MapTaskToNetworkFunction());
 
   /** Registry of known {@link ReaderFactory ReaderFactories}. */
@@ -217,18 +221,14 @@ public class BatchDataflowWorker implements Closeable {
       Function<MutableNetwork<Node, Edge>, Node> sdkFusedStage =
           pipeline == null
               ? RegisterNodeFunction.withoutPipeline(
-                  IdGenerator::generate, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+                  idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
               : RegisterNodeFunction.forPipeline(
-                  pipeline,
-                  IdGenerator::generate,
-                  sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
+                  pipeline, idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> lengthPrefixUnknownCoders =
           LengthPrefixUnknownCoders::forSdkNetwork;
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> transformToRunnerNetwork =
           new CreateRegisterFnOperationFunction(
-              IdGenerator::generate,
-              this::createPortNode,
-              lengthPrefixUnknownCoders.andThen(sdkFusedStage));
+              idGenerator, this::createPortNode, lengthPrefixUnknownCoders.andThen(sdkFusedStage));
 
       mapTaskToNetwork =
           mapTaskToBaseNetwork
@@ -268,8 +268,8 @@ public class BatchDataflowWorker implements Closeable {
         RemoteGrpcPort.newBuilder()
             .setApiServiceDescriptor(sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
             .build(),
-        IdGenerator.generate(),
-        IdGenerator.generate(),
+        idGenerator.getId(),
+        idGenerator.getId(),
         predecessorId,
         successorId);
   }
@@ -346,7 +346,7 @@ public class BatchDataflowWorker implements Closeable {
                 sinkRegistry,
                 executionContext,
                 counterSet,
-                IdGenerator::generate);
+                idGenerator);
       } else if (workItem.getSourceOperationTask() != null) {
         worker =
             SourceOperationExecutorFactory.create(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 8e191e2..a76f326 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -40,7 +40,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.ElementByteSizeObservable;
@@ -90,6 +89,7 @@ import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -128,7 +128,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
       SinkFactory sinkFactory,
       DataflowExecutionContext<?> executionContext,
       CounterSet counterSet,
-      Supplier<String> idGenerator) {
+      IdGenerator idGenerator) {
 
     // TODO: remove this once we trust the code paths
     checkArgument(
@@ -204,7 +204,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
 
   private Function<Node, Node> createOperationTransformForFetchAndFilterStreamingSideInputNodes(
       MutableNetwork<Node, Edge> network,
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       InstructionRequestHandler instructionRequestHandler,
       FnDataService beamFnDataService,
       Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
@@ -303,7 +303,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
   }
 
   private Function<Node, Node> createOperationTransformForRegisterFnNodes(
-      final Supplier<String> idGenerator,
+      final IdGenerator idGenerator,
       final InstructionRequestHandler instructionRequestHandler,
       final StateDelegator beamFnStateDelegator,
       final String stageName,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
index b6bcb5a..4e6e5a1 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.worker;
 
 import com.google.api.services.dataflow.model.MapTask;
 import com.google.common.graph.MutableNetwork;
-import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
@@ -28,6 +27,7 @@ import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /** Creates a {@link DataflowMapTaskExecutor} from a {@link MapTask} definition. */
@@ -49,5 +49,5 @@ public interface DataflowMapTaskExecutorFactory {
       SinkFactory sinkFactory,
       DataflowExecutionContext<?> executionContext,
       CounterSet counterSet,
-      Supplier<String> idGenerator);
+      IdGenerator idGenerator);
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java
index aef6e2e..f65e830 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FetchAndFilterStreamingSideInputsOperation.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -36,6 +35,7 @@ import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -71,7 +71,7 @@ public class FetchAndFilterStreamingSideInputsOperation<T, W extends BoundedWind
       InstructionRequestHandler instructionRequestHandler,
       FnDataService beamFnDataService,
       ApiServiceDescriptor dataServiceApiServiceDescriptor,
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       Coder<WindowedValue<T>> inputCoder,
       WindowingStrategy<?, W> windowingStrategy,
       DataflowExecutionContext.DataflowStepContext stepContext,
@@ -162,7 +162,7 @@ public class FetchAndFilterStreamingSideInputsOperation<T, W extends BoundedWind
   }
 
   private Iterable<PCollectionView<?>> buildPCollectionViewsWithSdkSupportedWindowMappingFn(
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       InstructionRequestHandler instructionRequestHandler,
       FnDataService beamFnDataService,
       ApiServiceDescriptor dataServiceApiServiceDescriptor,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
index ff798c0..71f5bed 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
@@ -48,6 +47,7 @@ import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
@@ -93,7 +93,7 @@ class FnApiWindowMappingFn<TargetWindowT extends BoundedWindow>
   private static final Cache<CacheKey, BoundedWindow> sideInputMappingCache =
       CacheBuilder.newBuilder().maximumSize(1000).build();
 
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
   private final FnDataService beamFnDataService;
   private final InstructionRequestHandler instructionRequestHandler;
   private final SdkFunctionSpec windowMappingFn;
@@ -102,7 +102,7 @@ class FnApiWindowMappingFn<TargetWindowT extends BoundedWindow>
   private final ProcessBundleDescriptor processBundleDescriptor;
 
   FnApiWindowMappingFn(
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       InstructionRequestHandler instructionRequestHandler,
       ApiServiceDescriptor dataServiceApiServiceDescriptor,
       FnDataService beamFnDataService,
@@ -223,7 +223,7 @@ class FnApiWindowMappingFn<TargetWindowT extends BoundedWindow>
 
   private TargetWindowT loadIfNeeded(SdkFunctionSpec windowMappingFn, BoundedWindow mainWindow) {
     try {
-      String processRequestInstructionId = idGenerator.get();
+      String processRequestInstructionId = idGenerator.getId();
       InstructionRequest processRequest =
           InstructionRequest.newBuilder()
               .setInstructionId(processRequestInstructionId)
@@ -296,12 +296,12 @@ class FnApiWindowMappingFn<TargetWindowT extends BoundedWindow>
    */
   private synchronized String registerIfRequired() throws ExecutionException, InterruptedException {
     if (processBundleDescriptorId == null) {
-      String descriptorId = idGenerator.get();
+      String descriptorId = idGenerator.getId();
 
       CompletionStage<InstructionResponse> response =
           instructionRequestHandler.handle(
               InstructionRequest.newBuilder()
-                  .setInstructionId(idGenerator.get())
+                  .setInstructionId(idGenerator.getId())
                   .setRegister(
                       RegisterRequest.newBuilder()
                           .addProcessBundleDescriptor(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index e632719b..99eee23 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -37,7 +37,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.ElementByteSizeObservable;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -75,6 +74,7 @@ import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -109,7 +109,7 @@ public class IntrinsicMapTaskExecutorFactory implements DataflowMapTaskExecutorF
       SinkFactory sinkFactory,
       DataflowExecutionContext<?> executionContext,
       CounterSet counterSet,
-      Supplier<String> idGenerator) {
+      IdGenerator idGenerator) {
 
     // TODO: remove this once we trust the code paths
     checkArgument(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 80556a8..d75c703 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -88,7 +88,6 @@ import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction;
 import org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction;
 import org.apache.beam.runners.dataflow.worker.graph.DeduceFlattenLocationsFunction;
@@ -125,6 +124,8 @@ import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.Commi
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWorkStream;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.StreamPool;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.BackOffUtils;
@@ -143,12 +144,14 @@ import org.slf4j.LoggerFactory;
 public class StreamingDataflowWorker {
   private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class);
 
+  /** The idGenerator to generate unique id globally. */
+  private static final IdGenerator idGenerator = IdGenerators.decrementingLongs();
   /**
    * Fix up MapTask representation because MultiOutputInfos are missing from system generated
    * ParDoInstructions.
    */
   private static final Function<MapTask, MapTask> fixMultiOutputInfos =
-      new FixMultiOutputInfosOnParDoInstructions(IdGenerator::generate);
+      new FixMultiOutputInfosOnParDoInstructions(idGenerator);
 
   /**
    * Function which converts map tasks to their network representation for execution.
@@ -627,11 +630,9 @@ public class StreamingDataflowWorker {
       Function<MutableNetwork<Node, Edge>, Node> sdkFusedStage =
           pipeline == null
               ? RegisterNodeFunction.withoutPipeline(
-                  IdGenerator::generate, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+                  idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
               : RegisterNodeFunction.forPipeline(
-                  pipeline,
-                  IdGenerator::generate,
-                  sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
+                  pipeline, idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> lengthPrefixUnknownCoders =
           LengthPrefixUnknownCoders::forSdkNetwork;
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>>
@@ -640,9 +641,7 @@ public class StreamingDataflowWorker {
 
       Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> transformToRunnerNetwork =
           new CreateRegisterFnOperationFunction(
-              IdGenerator::generate,
-              this::createPortNode,
-              lengthPrefixUnknownCoders.andThen(sdkFusedStage));
+              idGenerator, this::createPortNode, lengthPrefixUnknownCoders.andThen(sdkFusedStage));
 
       mapTaskToNetwork =
           mapTaskToBaseNetwork
@@ -669,8 +668,8 @@ public class StreamingDataflowWorker {
         RemoteGrpcPort.newBuilder()
             .setApiServiceDescriptor(sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
             .build(),
-        IdGenerator.generate(),
-        IdGenerator.generate(),
+        idGenerator.getId(),
+        idGenerator.getId(),
         predecessorId,
         successorId);
   }
@@ -1153,7 +1152,7 @@ public class StreamingDataflowWorker {
                 sinkRegistry,
                 context,
                 pendingDeltaCounters,
-                IdGenerator::generate);
+                idGenerator);
         ReadOperation readOperation = mapTaskExecutor.getReadOperation();
         // Disable progress updates since its results are unused  for streaming
         // and involves starting a thread.
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructions.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructions.java
index e774bcc..b161baa 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructions.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructions.java
@@ -24,7 +24,7 @@ import com.google.api.services.dataflow.model.ParallelInstruction;
 import com.google.common.collect.ImmutableList;
 import java.util.List;
 import java.util.function.Function;
-import java.util.function.Supplier;
+import org.apache.beam.sdk.fn.IdGenerator;
 
 /**
  * {@link ParDoInstruction}s are meant to always have {@link MultiOutputInfo}s which give names to
@@ -33,9 +33,9 @@ import java.util.function.Supplier;
  * should supply ids outside the ids used within the {@link MapTask} to prevent collisions.
  */
 public class FixMultiOutputInfosOnParDoInstructions implements Function<MapTask, MapTask> {
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
 
-  public FixMultiOutputInfosOnParDoInstructions(Supplier<String> idGenerator) {
+  public FixMultiOutputInfosOnParDoInstructions(IdGenerator idGenerator) {
     this.idGenerator = idGenerator;
   }
 
@@ -50,7 +50,7 @@ public class FixMultiOutputInfosOnParDoInstructions implements Function<MapTask,
         if (numOutputs != Apiary.listOrEmpty(instruction.getParDo().getMultiOutputInfos()).size()) {
           if (numOutputs == 1) {
             parDoInstruction.setMultiOutputInfos(
-                ImmutableList.of(new MultiOutputInfo().setTag(idGenerator.get())));
+                ImmutableList.of(new MultiOutputInfo().setTag(idGenerator.getId())));
           } else {
             throw new IllegalArgumentException(
                 String.format(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/IdGenerator.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/IdGenerator.java
deleted file mode 100644
index edf25b5..0000000
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/IdGenerator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * IdGenerator generates a new, unique identifier on each invocation.
- *
- * <p>Consumers of these ids should not make any additional assumptions regarding the nature of the
- * returned identifiers. Uniqueness is the only guaranteed property.
- */
-public final class IdGenerator {
-  private static final AtomicLong idGenerator = new AtomicLong(-1);
-
-  public static String generate() {
-    return Long.toString(idGenerator.getAndDecrement());
-  }
-}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 7dd674f..c316ea7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -62,6 +61,7 @@ import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.transforms.Materializations;
@@ -89,7 +89,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
 
   private static final OutputReceiver[] EMPTY_RECEIVERS = new OutputReceiver[0];
 
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
   private final InstructionRequestHandler instructionRequestHandler;
   private final StateDelegator beamFnStateDelegator;
   private final RegisterRequest registerRequest;
@@ -108,7 +108,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
   private String grpcReadTransformOutputName = null;
 
   public RegisterAndProcessBundleOperation(
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       InstructionRequestHandler instructionRequestHandler,
       StateDelegator beamFnStateDelegator,
       RegisterRequest registerRequest,
@@ -226,7 +226,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
    */
   public synchronized String getProcessBundleInstructionId() {
     if (processBundleId == null) {
-      processBundleId = idGenerator.get();
+      processBundleId = idGenerator.getId();
     }
     return processBundleId;
   }
@@ -240,7 +240,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
       if (registerFuture == null) {
         InstructionRequest request =
             InstructionRequest.newBuilder()
-                .setInstructionId(idGenerator.get())
+                .setInstructionId(idGenerator.getId())
                 .setRegister(registerRequest)
                 .build();
         registerFuture = instructionRequestHandler.handle(request);
@@ -314,7 +314,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
     }
     InstructionRequest processBundleRequest =
         InstructionRequest.newBuilder()
-            .setInstructionId(idGenerator.get())
+            .setInstructionId(idGenerator.getId())
             .setProcessBundleProgress(
                 ProcessBundleProgressRequest.newBuilder().setInstructionReference(processBundleId))
             .build();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperation.java
index 7195cc6..2e591a6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperation.java
@@ -19,13 +19,13 @@ package org.apache.beam.runners.dataflow.worker.fn.data;
 
 import com.google.common.base.MoreObjects;
 import java.io.Closeable;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -45,7 +45,7 @@ public class RemoteGrpcPortReadOperation<T> extends Operation {
   private static final Logger LOG = LoggerFactory.getLogger(RemoteGrpcPortReadOperation.class);
   private final Coder<WindowedValue<T>> coder;
   private final FnDataService beamFnDataService;
-  private final Supplier<String> bundleIdSupplier;
+  private final IdGenerator bundleIdSupplier;
   // Should only be set and cleared once per start/finish cycle in the start method and
   // finish method respectively.
   private String bundleId;
@@ -55,7 +55,7 @@ public class RemoteGrpcPortReadOperation<T> extends Operation {
   public RemoteGrpcPortReadOperation(
       FnDataService beamFnDataService,
       Target target,
-      Supplier<String> bundleIdSupplier,
+      IdGenerator bundleIdSupplier,
       Coder<WindowedValue<T>> coder,
       OutputReceiver[] receivers,
       OperationContext context) {
@@ -69,7 +69,7 @@ public class RemoteGrpcPortReadOperation<T> extends Operation {
   @Override
   public void start() throws Exception {
     try (Closeable scope = context.enterStart()) {
-      bundleId = bundleIdSupplier.get();
+      bundleId = bundleIdSupplier.getId();
       super.start();
       inboundDataClient =
           beamFnDataService.receive(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
index 9cf3dd4..bd625bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -50,7 +51,7 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
   private static final OutputReceiver[] EMPTY_RECEIVER_ARRAY = new OutputReceiver[0];
   private final Coder<WindowedValue<T>> coder;
   private final FnDataService beamFnDataService;
-  private final Supplier<String> bundleIdSupplier;
+  private final IdGenerator bundleIdSupplier;
   // Should only be set and cleared once per start/finish cycle in the start method and
   // finish method respectively.
   private String bundleId;
@@ -72,7 +73,7 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
   public RemoteGrpcPortWriteOperation(
       FnDataService beamFnDataService,
       Target target,
-      Supplier<String> bundleIdSupplier,
+      IdGenerator bundleIdSupplier,
       Coder<WindowedValue<T>> coder,
       OperationContext context) {
     this(beamFnDataService, target, bundleIdSupplier, coder, context, System::currentTimeMillis);
@@ -81,7 +82,7 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
   public RemoteGrpcPortWriteOperation(
       FnDataService beamFnDataService,
       Target target,
-      Supplier<String> bundleIdSupplier,
+      IdGenerator bundleIdSupplier,
       Coder<WindowedValue<T>> coder,
       OperationContext context,
       Supplier<Long> currentTimeMillis) {
@@ -101,7 +102,7 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
       targetElementsSent = 1;
       elementsFlushed = 0;
       super.start();
-      bundleId = bundleIdSupplier.get();
+      bundleId = bundleIdSupplier.getId();
       receiver = beamFnDataService.send(LogicalEndpoint.of(bundleId, target), coder);
     }
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
index 1ca45e9..2a71acc 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
@@ -39,6 +39,7 @@ import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerator;
 
 /**
  * Splits the instruction graph into SDK and runner harness portions replacing the SDK sub-graphs
@@ -74,7 +75,7 @@ import org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNo
 public class CreateRegisterFnOperationFunction
     implements Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> {
 
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
   private final BiFunction<String, String, Node> portSupplier;
   private final Function<MutableNetwork<Node, Edge>, Node> registerFnOperationFunction;
 
@@ -90,7 +91,7 @@ public class CreateRegisterFnOperationFunction
    *     produces a {@link Node} that is able to register the SDK functions within the SDK harness.
    */
   public CreateRegisterFnOperationFunction(
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       BiFunction<String, String, Node> portSupplier,
       Function<MutableNetwork<Node, Edge>, Node> registerFnOperationFunction) {
     this.idGenerator = idGenerator;
@@ -243,8 +244,8 @@ public class CreateRegisterFnOperationFunction
         InstructionOutputNode.create(outputNode.getInstructionOutput());
     InstructionOutputNode portOutputNode =
         InstructionOutputNode.create(outputNode.getInstructionOutput());
-    String predecessorPortEdgeId = idGenerator.get();
-    String successorPortEdgeId = idGenerator.get();
+    String predecessorPortEdgeId = idGenerator.getId();
+    String successorPortEdgeId = idGenerator.getId();
     Node portNode = portSupplier.apply(predecessorPortEdgeId, successorPortEdgeId);
     network.addNode(newPredecessorOutputNode);
     network.addNode(portNode);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index bccb26a..8791fd4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -42,7 +42,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RegisterRequest;
@@ -79,6 +78,7 @@ import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames;
 import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
@@ -115,7 +115,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
   private static final String SERIALIZED_SOURCE = "serialized_source";
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-  private final Supplier<String> idGenerator;
+  private final IdGenerator idGenerator;
   private final Endpoints.ApiServiceDescriptor stateApiServiceDescriptor;
   private final @Nullable RunnerApi.Pipeline pipeline;
 
@@ -125,7 +125,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
    */
   public static RegisterNodeFunction forPipeline(
       RunnerApi.Pipeline pipeline,
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
     return new RegisterNodeFunction(pipeline, idGenerator, stateApiServiceDescriptor);
   }
@@ -136,13 +136,13 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
    * harnesses, then this method should be removed.
    */
   public static RegisterNodeFunction withoutPipeline(
-      Supplier<String> idGenerator, Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
+      IdGenerator idGenerator, Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
     return new RegisterNodeFunction(null, idGenerator, stateApiServiceDescriptor);
   }
 
   private RegisterNodeFunction(
       @Nullable RunnerApi.Pipeline pipeline,
-      Supplier<String> idGenerator,
+      IdGenerator idGenerator,
       Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
     this.pipeline = pipeline;
     this.idGenerator = idGenerator;
@@ -173,7 +173,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
             input.addEdge(
                 node,
                 successor,
-                MultiOutputInfoEdge.create(new MultiOutputInfo().setTag(idGenerator.get())));
+                MultiOutputInfoEdge.create(new MultiOutputInfo().setTag(idGenerator.getId())));
           }
         }
       }
@@ -185,7 +185,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
 
     ProcessBundleDescriptor.Builder processBundleDescriptor =
         ProcessBundleDescriptor.newBuilder()
-            .setId(idGenerator.get())
+            .setId(idGenerator.getId())
             .setStateApiServiceDescriptor(stateApiServiceDescriptor);
 
     // For intermediate PCollections we fabricate, we make a bogus WindowingStrategy
@@ -199,7 +199,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
       sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
     }
 
-    String fakeWindowingStrategyId = "fakeWindowingStrategy" + idGenerator.get();
+    String fakeWindowingStrategyId = "fakeWindowingStrategy" + idGenerator.getId();
     try {
       RunnerApi.MessageWithComponents fakeWindowingStrategyProto =
           WindowingStrategyTranslation.toMessageProto(
@@ -224,7 +224,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
         Iterables.filter(input.nodes(), InstructionOutputNode.class)) {
       InstructionOutput instructionOutput = node.getInstructionOutput();
 
-      String coderId = "generatedCoder" + idGenerator.get();
+      String coderId = "generatedCoder" + idGenerator.getId();
       try (ByteString.Output output = ByteString.newOutput()) {
         try {
           Coder<?> javaCoder =
@@ -259,7 +259,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
             e);
       }
 
-      String pcollectionId = "generatedPcollection" + idGenerator.get();
+      String pcollectionId = "generatedPcollection" + idGenerator.getId();
       processBundleDescriptor.putPcollections(
           pcollectionId,
           RunnerApi.PCollection.newBuilder()
@@ -273,7 +273,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
     for (ParallelInstructionNode node :
         Iterables.filter(input.nodes(), ParallelInstructionNode.class)) {
       ParallelInstruction parallelInstruction = node.getParallelInstruction();
-      String ptransformId = "generatedPtransform" + idGenerator.get();
+      String ptransformId = "generatedPtransform" + idGenerator.getId();
       ptransformIdToNameContexts.put(
           ptransformId,
           NameContext.create(
@@ -383,7 +383,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
 
       for (Node predecessorOutput : input.predecessors(node)) {
         pTransform.putInputs(
-            "generatedInput" + idGenerator.get(), nodesToPCollections.get(predecessorOutput));
+            "generatedInput" + idGenerator.getId(), nodesToPCollections.get(predecessorOutput));
       }
 
       for (Edge edge : input.outEdges(node)) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
index eb44eaa..ae9abf8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.SdkComponents;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
@@ -73,7 +73,7 @@ public class FnApiWindowMappingFnTest {
 
     FnApiWindowMappingFn windowMappingFn =
         new FnApiWindowMappingFn(
-            IdGenerator::generate,
+            IdGenerators.decrementingLongs(),
             testSdkHarness,
             DATA_SERVICE,
             testSdkHarness,
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 972c19a..be947c4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -67,7 +67,6 @@ import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.Counter.CounterUpdateExtractor;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
 import org.apache.beam.runners.dataflow.worker.graph.MapTaskToNetworkFunction;
@@ -88,6 +87,8 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -115,8 +116,10 @@ import org.mockito.MockitoAnnotations;
 public class IntrinsicMapTaskExecutorFactoryTest {
   private static final String STAGE = "test";
 
+  private static final IdGenerator idGenerator = IdGenerators.decrementingLongs();
+
   private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork =
-      new FixMultiOutputInfosOnParDoInstructions(IdGenerator::generate)
+      new FixMultiOutputInfosOnParDoInstructions(idGenerator)
           .andThen(new MapTaskToNetworkFunction());
 
   private static final CloudObject windowedStringCoder =
@@ -180,7 +183,7 @@ public class IntrinsicMapTaskExecutorFactoryTest {
             sinkRegistry,
             BatchModeExecutionContext.forTesting(options, counterSet, "testStage"),
             counterSet,
-            IdGenerator::generate)) {
+            idGenerator)) {
       // Safe covariant cast not expressible without rawtypes.
       @SuppressWarnings({"rawtypes", "unchecked"})
       List<Object> operations = (List) executor.operations;
@@ -271,7 +274,7 @@ public class IntrinsicMapTaskExecutorFactoryTest {
             sinkRegistry,
             context,
             counterSet,
-            IdGenerator::generate)) {
+            idGenerator)) {
       executor.execute();
     }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructionsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructionsTest.java
index 7a24369..99fc745 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructionsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/FixMultiOutputInfosOnParDoInstructionsTest.java
@@ -26,8 +26,7 @@ import com.google.api.services.dataflow.model.ParallelInstruction;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -39,19 +38,10 @@ import org.junit.runners.JUnit4;
 public class FixMultiOutputInfosOnParDoInstructionsTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  private Supplier<String> makeIdGenerator() {
-    return makeIdGeneratorStartingFrom(-1);
-  }
-
-  private Supplier<String> makeIdGeneratorStartingFrom(long initialValue) {
-    AtomicLong longIdGenerator = new AtomicLong(initialValue);
-    return () -> Long.toString(longIdGenerator.getAndDecrement());
-  }
-
   @Test
   public void testExistingMultiOutputInfosAreUnmodified() {
     FixMultiOutputInfosOnParDoInstructions function =
-        new FixMultiOutputInfosOnParDoInstructions(makeIdGenerator());
+        new FixMultiOutputInfosOnParDoInstructions(IdGenerators.decrementingLongs());
     MapTask output = function.apply(createMapTaskWithParDo(2, "5", "6"));
     assertEquals(createMapTaskWithParDo(2, "5", "6"), output);
   }
@@ -59,7 +49,7 @@ public class FixMultiOutputInfosOnParDoInstructionsTest {
   @Test
   public void testDefaultOutputIsAddedIfOnlySingleOutput() {
     FixMultiOutputInfosOnParDoInstructions function =
-        new FixMultiOutputInfosOnParDoInstructions(makeIdGenerator());
+        new FixMultiOutputInfosOnParDoInstructions(IdGenerators.decrementingLongs());
     MapTask output = function.apply(createMapTaskWithParDo(1));
     assertEquals(createMapTaskWithParDo(1, "-1"), output);
   }
@@ -67,7 +57,7 @@ public class FixMultiOutputInfosOnParDoInstructionsTest {
   @Test
   public void testDefaultOutputHasDifferentIdsForEachMapTask() {
     FixMultiOutputInfosOnParDoInstructions function =
-        new FixMultiOutputInfosOnParDoInstructions(makeIdGenerator());
+        new FixMultiOutputInfosOnParDoInstructions(IdGenerators.decrementingLongs());
     MapTask output = function.apply(createMapTaskWithParDo(1));
     assertEquals(createMapTaskWithParDo(1, "-1"), output);
 
@@ -78,7 +68,7 @@ public class FixMultiOutputInfosOnParDoInstructionsTest {
   @Test
   public void testMissingTagsForMultipleOutputsThrows() {
     FixMultiOutputInfosOnParDoInstructions function =
-        new FixMultiOutputInfosOnParDoInstructions(makeIdGeneratorStartingFrom(0));
+        new FixMultiOutputInfosOnParDoInstructions(IdGenerators.decrementingLongs());
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Invalid ParDoInstruction");
     thrown.expectMessage("2 outputs specified");
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/IdGeneratorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/IdGeneratorTest.java
deleted file mode 100644
index 2bfca08..0000000
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/IdGeneratorTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import static org.junit.Assert.assertNotEquals;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link IdGenerator}. */
-@RunWith(JUnit4.class)
-public class IdGeneratorTest {
-  @Test
-  public void testGenerationNeverMatches() {
-    String previous = IdGenerator.generate();
-    for (int i = 0; i < 10000; ++i) {
-      String next = IdGenerator.generate();
-      assertNotEquals(previous, next);
-      previous = next;
-    }
-  }
-}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index 9bcb224..dd07d99 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -32,8 +32,6 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
@@ -46,6 +44,7 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.util.MoreFutures;
 import org.junit.Before;
@@ -103,8 +102,6 @@ public class BeamFnMapTaskExecutorTest {
 
   @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
   public void testTentativeUserMetrics() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
-
     final String stepName = "fakeStepNameWithUserMetrics";
     final String namespace = "sdk/whatever";
     final String name = "someCounter";
@@ -166,7 +163,7 @@ public class BeamFnMapTaskExecutorTest {
 
     RegisterAndProcessBundleOperation processOperation =
         new RegisterAndProcessBundleOperation(
-            idGenerator,
+            IdGenerators.decrementingLongs(),
             instructionRequestHandler,
             mockBeamFnStateDelegator,
             REGISTER_REQUEST,
@@ -204,8 +201,6 @@ public class BeamFnMapTaskExecutorTest {
   /** Tests that successive metric updates overwrite the previous. */
   @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
   public void testTentativeUserMetricsOverwrite() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
-
     final String stepName = "fakeStepNameWithUserMetrics";
     final String namespace = "sdk/whatever";
     final String name = "someCounter";
@@ -275,7 +270,7 @@ public class BeamFnMapTaskExecutorTest {
 
     RegisterAndProcessBundleOperation processOperation =
         new RegisterAndProcessBundleOperation(
-            idGenerator,
+            IdGenerators.decrementingLongs(),
             instructionRequestHandler,
             mockBeamFnStateDelegator,
             REGISTER_REQUEST,
@@ -312,8 +307,6 @@ public class BeamFnMapTaskExecutorTest {
 
   @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
   public void testFinalUserMetrics() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
-
     final String stepName = "fakeStepNameWithUserMetrics";
     final String namespace = "sdk/whatever";
     final String name = "someCounter";
@@ -391,7 +384,7 @@ public class BeamFnMapTaskExecutorTest {
 
     RegisterAndProcessBundleOperation processOperation =
         new RegisterAndProcessBundleOperation(
-            idGenerator,
+            IdGenerators.decrementingLongs(),
             instructionRequestHandler,
             mockBeamFnStateDelegator,
             REGISTER_REQUEST,
@@ -430,11 +423,6 @@ public class BeamFnMapTaskExecutorTest {
         contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue)));
   }
 
-  private Supplier<String> makeIdGeneratorStartingFrom(long initialValue) {
-    AtomicLong longIdGenerator = new AtomicLong(initialValue);
-    return () -> Long.toString(longIdGenerator.getAndIncrement());
-  }
-
   private BeamFnApi.InstructionResponse.Builder responseFor(BeamFnApi.InstructionRequest request) {
     return BeamFnApi.InstructionResponse.newBuilder().setInstructionId(request.getInstructionId());
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 342216b..8e24bf8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -63,7 +62,6 @@ import org.apache.beam.runners.core.InMemoryStateInternals;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.DataflowPortabilityPCollectionView;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
@@ -71,6 +69,8 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -140,15 +140,21 @@ public class RegisterAndProcessBundleOperationTest {
             });
   }
 
-  private Supplier<String> makeIdGeneratorStartingFrom(long initialValue) {
-    AtomicLong longIdGenerator = new AtomicLong(initialValue);
-    return () -> Long.toString(longIdGenerator.getAndIncrement());
+  private IdGenerator makeIdGeneratorStartingFrom(long initialValue) {
+    return new IdGenerator() {
+      AtomicLong longs = new AtomicLong(initialValue);
+
+      @Override
+      public String getId() {
+        return Long.toString(longs.getAndIncrement());
+      }
+    };
   }
 
   @Test
   public void testSupportsRestart() {
     new RegisterAndProcessBundleOperation(
-            IdGenerator::generate,
+            IdGenerators.decrementingLongs(),
             new InstructionRequestHandler() {
               @Override
               public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
@@ -173,7 +179,7 @@ public class RegisterAndProcessBundleOperationTest {
   @Test
   public void testRegisterOnlyOnFirstBundle() throws Exception {
     List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     RegisterAndProcessBundleOperation operation =
         new RegisterAndProcessBundleOperation(
             idGenerator,
@@ -243,7 +249,7 @@ public class RegisterAndProcessBundleOperationTest {
 
   @Test
   public void testTentativeUserMetrics() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
 
     CountDownLatch processBundleLatch = new CountDownLatch(1);
 
@@ -330,7 +336,7 @@ public class RegisterAndProcessBundleOperationTest {
   @Test
   public void testFinalUserMetrics() throws Exception {
     List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     CountDownLatch processBundleLatch = new CountDownLatch(1);
@@ -438,7 +444,7 @@ public class RegisterAndProcessBundleOperationTest {
   @Test
   public void testProcessingBundleBlocksOnFinish() throws Exception {
     List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
     RegisterAndProcessBundleOperation operation =
         new RegisterAndProcessBundleOperation(
@@ -510,7 +516,7 @@ public class RegisterAndProcessBundleOperationTest {
 
   @Test
   public void testProcessingBundleHandlesUserStateRequests() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     InMemoryStateInternals<ByteString> stateInternals =
@@ -620,7 +626,7 @@ public class RegisterAndProcessBundleOperationTest {
 
   @Test
   public void testProcessingBundleHandlesMultimapSideInputRequests() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     DataflowStepContext mockStepContext = mock(DataflowStepContext.class);
@@ -748,7 +754,7 @@ public class RegisterAndProcessBundleOperationTest {
 
   @Test
   public void testAbortCancelsAndCleansUpDuringRegister() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     CountDownLatch waitForAbortToComplete = new CountDownLatch(1);
@@ -795,7 +801,7 @@ public class RegisterAndProcessBundleOperationTest {
 
   @Test
   public void testAbortCancelsAndCleansUpDuringProcessBundle() throws Exception {
-    Supplier<String> idGenerator = makeIdGeneratorStartingFrom(777L);
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
     ExecutorService executorService = Executors.newCachedThreadPool();
 
     CountDownLatch waitForAbortToComplete = new CountDownLatch(1);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperationTest.java
index ae417da..d739801 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperationTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortReadOperationTest.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.when;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.worker.NameContextsForTests;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
@@ -39,6 +38,7 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.TestOutputRece
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
@@ -68,7 +68,7 @@ public class RemoteGrpcPortReadOperationTest {
   @Mock private FnDataService beamFnDataService;
   @Mock private OperationContext operationContext;
   @Captor private ArgumentCaptor<FnDataReceiver<WindowedValue<String>>> consumerCaptor;
-  @Mock private Supplier<String> bundleIdSupplier;
+  @Mock private IdGenerator bundleIdSupplier;
   private RemoteGrpcPortReadOperation<String> operation;
   private TestOutputReceiver testReceiver;
 
@@ -96,7 +96,7 @@ public class RemoteGrpcPortReadOperationTest {
     InboundDataClient inboundDataClient = CompletableFutureInboundDataClient.create();
     when(beamFnDataService.receive(any(), Matchers.<Coder<WindowedValue<String>>>any(), any()))
         .thenReturn(inboundDataClient);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
 
     operation.start();
     verify(beamFnDataService)
@@ -122,14 +122,14 @@ public class RemoteGrpcPortReadOperationTest {
     inboundDataClient.complete();
     operationFinish.get();
 
-    verify(bundleIdSupplier, times(1)).get();
+    verify(bundleIdSupplier, times(1)).getId();
     assertThat(
         testReceiver.outputElems,
         contains(
             valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), valueInGlobalWindow("GHI")));
 
     // Ensure that the old bundle id is cleared.
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID_2);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID_2);
     operation.start();
     verify(beamFnDataService)
         .receive(eq(LogicalEndpoint.of(BUNDLE_ID_2, TARGET)), eq(CODER), consumerCaptor.capture());
@@ -140,7 +140,7 @@ public class RemoteGrpcPortReadOperationTest {
     InboundDataClient inboundDataClient = CompletableFutureInboundDataClient.create();
     when(beamFnDataService.receive(any(), Matchers.<Coder<WindowedValue<String>>>any(), any()))
         .thenReturn(inboundDataClient);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
 
     operation.start();
     verify(beamFnDataService)
@@ -149,6 +149,6 @@ public class RemoteGrpcPortReadOperationTest {
     assertFalse(inboundDataClient.isDone());
     operation.abort();
     assertTrue(inboundDataClient.isDone());
-    verify(bundleIdSupplier, times(1)).get();
+    verify(bundleIdSupplier, times(1)).getId();
   }
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
index 7274132..404e718 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
@@ -31,12 +31,12 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -59,7 +59,7 @@ public class RemoteGrpcPortWriteOperationTest {
   private static final String BUNDLE_ID = "999";
   private static final String BUNDLE_ID_2 = "222";
 
-  @Mock Supplier<String> bundleIdSupplier;
+  @Mock IdGenerator bundleIdSupplier;
   @Mock private FnDataService beamFnDataService;
   @Mock private OperationContext operationContext;
   private RemoteGrpcPortWriteOperation<String> operation;
@@ -82,7 +82,7 @@ public class RemoteGrpcPortWriteOperationTest {
     RecordingConsumer<WindowedValue<String>> recordingConsumer = new RecordingConsumer<>();
     when(beamFnDataService.send(any(), Matchers.<Coder<WindowedValue<String>>>any()))
         .thenReturn(recordingConsumer);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
     operation.start();
     verify(beamFnDataService).send(LogicalEndpoint.of(BUNDLE_ID, TARGET), CODER);
     assertFalse(recordingConsumer.closed);
@@ -94,7 +94,7 @@ public class RemoteGrpcPortWriteOperationTest {
 
     operation.finish();
     assertTrue(recordingConsumer.closed);
-    verify(bundleIdSupplier, times(1)).get();
+    verify(bundleIdSupplier, times(1)).getId();
 
     assertThat(
         recordingConsumer,
@@ -102,7 +102,7 @@ public class RemoteGrpcPortWriteOperationTest {
             valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), valueInGlobalWindow("GHI")));
 
     // Ensure that the old bundle id is cleared.
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID_2);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID_2);
     when(beamFnDataService.send(any(), Matchers.<Coder<WindowedValue<String>>>any()))
         .thenReturn(recordingConsumer);
     operation.start();
@@ -116,7 +116,7 @@ public class RemoteGrpcPortWriteOperationTest {
     RecordingConsumer<WindowedValue<String>> recordingConsumer = new RecordingConsumer<>();
     when(beamFnDataService.send(any(), Matchers.<Coder<WindowedValue<String>>>any()))
         .thenReturn(recordingConsumer);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
     operation.start();
     verify(beamFnDataService).send(LogicalEndpoint.of(BUNDLE_ID, TARGET), CODER);
     assertFalse(recordingConsumer.closed);
@@ -127,7 +127,7 @@ public class RemoteGrpcPortWriteOperationTest {
 
     operation.abort();
     assertTrue(recordingConsumer.closed);
-    verify(bundleIdSupplier, times(1)).get();
+    verify(bundleIdSupplier, times(1)).getId();
 
     verifyNoMoreInteractions(beamFnDataService);
   }
@@ -156,7 +156,7 @@ public class RemoteGrpcPortWriteOperationTest {
     RecordingConsumer<WindowedValue<String>> recordingConsumer = new RecordingConsumer<>();
     when(beamFnDataService.send(any(), Matchers.<Coder<WindowedValue<String>>>any()))
         .thenReturn(recordingConsumer);
-    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+    when(bundleIdSupplier.getId()).thenReturn(BUNDLE_ID);
 
     Consumer<Integer> processedElementConsumer = operation.processedElementsConsumer();
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
index fe29bc3..cb8d55a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
@@ -40,13 +40,13 @@ import com.google.common.graph.NetworkBuilder;
 import java.util.List;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.DefaultEdge;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.HappensBeforeEdge;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
@@ -70,7 +70,7 @@ public class CreateRegisterFnOperationFunctionTest {
     MockitoAnnotations.initMocks(this);
     createRegisterFnOperation =
         new CreateRegisterFnOperationFunction(
-            IdGenerator::generate, portSupplier, registerFnOperationFunction);
+            IdGenerators.decrementingLongs(), portSupplier, registerFnOperationFunction);
   }
 
   @Test


Mime
View raw message