beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [beam] boyuanzz commented on a change in pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper
Date Thu, 27 Feb 2020 00:27:34 GMT
boyuanzz commented on a change in pull request #10897: [BEAM-2939] Java UnboundedSource SDF
wrapper
URL: https://github.com/apache/beam/pull/10897#discussion_r384846699
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
 ##########
 @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException {
       }
     }
   }
+
+  /**
+   * A splittable {@link DoFn} which executes an {@link UnboundedSource}.
+   *
+   * <p>We model the element as the original source and the restriction as a pair of
the sub-source
+   * and its {@link CheckpointMark}. This allows us to split the sub-source over and over
as long as
+   * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does
not
+   * maintain any state.
+   */
+  // TODO: Support reporting the watermark, currently the watermark never advances.
+  @UnboundedPerElement
+  static class UnboundedSourceAsSDFWrapperFn<OutputT, CheckpointT extends CheckpointMark>
+      extends DoFn<UnboundedSource<OutputT, CheckpointT>, ValueWithRecordId<OutputT>>
{
+
+    private static final int DEFAULT_DESIRED_NUM_SPLITS = 20;
+    private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
+    private final Coder<CheckpointT> restrictionCoder;
+
+    private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> restrictionCoder) {
+      this.restrictionCoder = restrictionCoder;
+    }
+
+    @GetInitialRestriction
+    public KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction(
+        @Element UnboundedSource<OutputT, CheckpointT> element) {
+      return KV.of(element, null);
+    }
+
+    @GetSize
+    public double getSize(
+        @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> restriction,
+        PipelineOptions pipelineOptions)
+        throws Exception {
+      if (restriction.getKey() instanceof EmptyUnboundedSource) {
+        return 1;
+      }
+
+      UnboundedReader<OutputT> reader =
+          restriction.getKey().createReader(pipelineOptions, restriction.getValue());
+      long size = reader.getSplitBacklogBytes();
+      if (size != UnboundedReader.BACKLOG_UNKNOWN) {
+        return size;
+      }
+      // TODO: Support "global" backlog reporting
+      // size = reader.getTotalBacklogBytes();
+      // if (size != UnboundedReader.BACKLOG_UNKNOWN) {
+      //   return size;
+      // }
+      return 1;
+    }
+
+    @SplitRestriction
+    public void splitRestriction(
+        @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> restriction,
+        OutputReceiver<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>>
receiver,
+        PipelineOptions pipelineOptions)
+        throws Exception {
+      // The empty unbounded source is trivially done and hence we don't need to output any
splits
+      // for it.
+      if (restriction.getKey() instanceof EmptyUnboundedSource) {
+        return;
+      }
+
+      // The UnboundedSource API does not support splitting after a meaningful checkpoint
mark has
+      // been created.
+      if (restriction.getValue() != null
+          && !(restriction.getValue()
+              instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) {
+        receiver.output(restriction);
+      }
+
+      try {
+        for (UnboundedSource<OutputT, CheckpointT> split :
+            restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) {
+          receiver.output(KV.of(split, null));
+        }
+      } catch (Exception e) {
+        receiver.output(restriction);
+      }
+    }
+
+    @NewTracker
+    public RestrictionTracker<
+            KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue<OutputT>[]>
+        restrictionTracker(
+            @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>
restriction,
+            PipelineOptions pipelineOptions) {
+      return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions);
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+        RestrictionTracker<
+                KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue[]>
+            tracker,
+        OutputReceiver<ValueWithRecordId<OutputT>> receiver,
+        BundleFinalizer bundleFinalizer)
+        throws IOException {
+      UnboundedSourceValue<OutputT>[] out = new UnboundedSourceValue[1];
+      while (tracker.tryClaim(out)) {
+        receiver.outputWithTimestamp(
+            new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp());
+      }
+
+      // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial.
+      KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction
=
+          tracker.currentRestriction();
+      if (currentRestriction.getValue() != null
+          && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark))
{
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(Duration.standardMinutes(DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS)),
+            currentRestriction.getValue()::finalizeCheckpoint);
+      }
+
+      // If we have been split/checkpoint by a runner, the tracker will have been updated
to the
+      // empty source and we will return stop. Otherwise the unbounded source has only temporarily
+      // run out of work.
+      if (tracker.currentRestriction().getKey() instanceof EmptyUnboundedSource) {
+        return ProcessContinuation.stop();
+      }
+      return ProcessContinuation.resume();
+    }
+
+    @GetRestrictionCoder
+    public Coder<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>>
restrictionCoder() {
+      return KvCoder.of(
+          SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>()
{}),
+          NullableCoder.of(restrictionCoder));
+    }
+
+    /**
+     * A named tuple representing all the values we need to pass between the {@link UnboundedReader}
+     * and the {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement}
method of
+     * the splittable DoFn.
+     */
+    @AutoValue
+    abstract static class UnboundedSourceValue<T> {
+      public static <T> UnboundedSourceValue<T> create(
+          byte[] id, T value, Instant timestamp, Instant watermark) {
+        return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceValue<T>(
+            id, value, timestamp, watermark);
+      }
+
+      @SuppressWarnings("mutable")
+      public abstract byte[] getId();
+
+      public abstract T getValue();
+
+      public abstract Instant getTimestamp();
+
+      public abstract Instant getWatermark();
 
 Review comment:
   Also curious why each output has its own `watermark`. Will the `watermark` be different
from `timestamp` for one given record?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message