beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [beam] branch master updated: [BEAM-12459] Ensure that we use the min element timestamp for Watch watermark if no explicit watermark is provided.
Date Fri, 11 Jun 2021 04:27:27 GMT
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2eb158d  [BEAM-12459] Ensure that we use the min element timestamp for Watch watermark
if no explicit watermark is provided.
     new f4430e6  Merge pull request #14968 from lukecwik/beam12459
2eb158d is described below

commit 2eb158d55291372a82a9d4960a8046b1ab4f0a7b
Author: Luke Cwik <lcwik@google.com>
AuthorDate: Tue Jun 8 11:20:36 2021 -0700

    [BEAM-12459] Ensure that we use the min element timestamp for Watch
    watermark if no explicit watermark is provided.
---
 .../java/org/apache/beam/sdk/transforms/Watch.java | 22 +++++--
 .../org/apache/beam/sdk/transforms/WatchTest.java  | 74 ++++++++++++++++++++++
 2 files changed, 90 insertions(+), 6 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 0729fff..0f96986 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -840,7 +840,8 @@ public class Watch {
   }
 
   @UnboundedPerElement
-  private static class WatchGrowthFn<InputT, OutputT, KeyT, TerminationStateT>
+  @VisibleForTesting
+  protected static class WatchGrowthFn<InputT, OutputT, KeyT, TerminationStateT>
       extends DoFn<InputT, KV<InputT, List<TimestampedValue<OutputT>>>>
{
     private final Watch.Growth<InputT, OutputT, KeyT> spec;
     private final Coder<OutputT> outputCoder;
@@ -848,7 +849,7 @@ public class Watch {
     private final Coder<KeyT> outputKeyCoder;
     private final Funnel<OutputT> coderFunnel;
 
-    private WatchGrowthFn(
+    WatchGrowthFn(
         Growth<InputT, OutputT, KeyT> spec,
         Coder<OutputT> outputCoder,
         SerializableFunction<OutputT, KeyT> outputKeyFn,
@@ -899,7 +900,6 @@ public class Watch {
                 priorPoll.getOutputs().size());
             c.output(KV.of(c.element(), priorPoll.getOutputs()));
           }
-          watermarkEstimator.setWatermark(priorPoll.getWatermark());
         }
         return stop();
       }
@@ -911,7 +911,8 @@ public class Watch {
 
       PollingGrowthState<TerminationStateT> pollingRestriction =
           (PollingGrowthState<TerminationStateT>) currentRestriction;
-      // Produce a poll result that only contains never seen before results.
+      // Produce a poll result that only contains never seen before results in timestamp
+      // sorted order.
       Growth.PollResult<OutputT> newResults =
           computeNeverSeenBeforeResults(pollingRestriction, res);
 
@@ -941,8 +942,13 @@ public class Watch {
         c.output(KV.of(c.element(), newResults.getOutputs()));
       }
 
+      Instant computedWatermark = null;
       if (newResults.getWatermark() != null) {
-        watermarkEstimator.setWatermark(newResults.getWatermark());
+        computedWatermark = newResults.getWatermark();
+      } else if (!newResults.getOutputs().isEmpty()) {
+        // computeNeverSeenBeforeResults returns the elements in timestamp sorted order so
+        // we can get the timestamp from the first element.
+        computedWatermark = newResults.getOutputs().get(0).getTimestamp();
       }
 
       Instant currentTime = Instant.now();
@@ -955,11 +961,15 @@ public class Watch {
         return stop();
       }
 
-      if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(newResults.getWatermark())) {
+      if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(computedWatermark)) {
         LOG.info("{} - will stop polling, reached max timestamp.", c.element());
         return stop();
       }
 
+      if (computedWatermark != null) {
+        watermarkEstimator.setWatermark(computedWatermark);
+      }
+
       LOG.info(
           "{} - will resume polling in {} ms.", c.element(), spec.getPollInterval().getMillis());
       return resume().withResumeDelay(spec.getPollInterval());
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
index fbc0151..81873be 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -48,6 +49,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.Watch.Growth;
 import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
 import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
@@ -55,6 +57,10 @@ import org.apache.beam.sdk.transforms.Watch.GrowthState;
 import org.apache.beam.sdk.transforms.Watch.GrowthTracker;
 import org.apache.beam.sdk.transforms.Watch.NonPollingGrowthState;
 import org.apache.beam.sdk.transforms.Watch.PollingGrowthState;
+import org.apache.beam.sdk.transforms.Watch.WatchGrowthFn;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -539,6 +545,38 @@ public class WatchTest implements Serializable {
   }
 
   @Test
+  public void testPollingGrowthTrackerUsesElementTimestampIfNoWatermarkProvided() throws
Exception {
+    Instant now = Instant.now();
+    Watch.Growth<String, String, String> growth =
+        Watch.growthOf(
+                new Watch.Growth.PollFn<String, String>() {
+
+                  @Override
+                  public PollResult<String> apply(String element, Context c) throws
Exception {
+                    // We specifically test an unsorted list.
+                    return PollResult.incomplete(
+                        Arrays.asList(
+                            TimestampedValue.of("d", now.plus(standardSeconds(4))),
+                            TimestampedValue.of("c", now.plus(standardSeconds(3))),
+                            TimestampedValue.of("a", now.plus(standardSeconds(1))),
+                            TimestampedValue.of("b", now.plus(standardSeconds(2)))));
+                  }
+                })
+            .withPollInterval(standardSeconds(10));
+    WatchGrowthFn<String, String, String, Integer> growthFn =
+        new WatchGrowthFn(
+            growth, StringUtf8Coder.of(), SerializableFunctions.identity(), StringUtf8Coder.of());
+    GrowthTracker<String, Integer> tracker = newPollingGrowthTracker();
+    DoFn.ProcessContext context = mock(DoFn.ProcessContext.class);
+    ManualWatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.Manual(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    ProcessContinuation processContinuation =
+        growthFn.process(context, tracker, watermarkEstimator);
+    assertEquals(now.plus(standardSeconds(1)), watermarkEstimator.currentWatermark());
+    assertTrue(processContinuation.shouldResume());
+  }
+
+  @Test
   public void testPollingGrowthTrackerCheckpointNonEmpty() {
     Instant now = Instant.now();
     GrowthTracker<String, Integer> tracker = newPollingGrowthTracker();
@@ -612,6 +650,42 @@ public class WatchTest implements Serializable {
   }
 
   @Test
+  public void testNonPollingGrowthTrackerIgnoresWatermark() throws Exception {
+    Instant now = Instant.now();
+    PollResult<String> claim =
+        PollResult.incomplete(
+                Arrays.asList(
+                    TimestampedValue.of("d", now.plus(standardSeconds(4))),
+                    TimestampedValue.of("c", now.plus(standardSeconds(3))),
+                    TimestampedValue.of("a", now.plus(standardSeconds(1))),
+                    TimestampedValue.of("b", now.plus(standardSeconds(2)))))
+            .withWatermark(now.plus(standardSeconds(7)));
+
+    Watch.Growth<String, String, String> growth =
+        Watch.growthOf(
+                new Watch.Growth.PollFn<String, String>() {
+
+                  @Override
+                  public PollResult<String> apply(String element, Context c) throws
Exception {
+                    fail("Never expected to be invoked for NonPollingGrowthState.");
+                    return null;
+                  }
+                })
+            .withPollInterval(standardSeconds(10));
+    GrowthTracker<String, Integer> tracker = newTracker(NonPollingGrowthState.of(claim));
+    WatchGrowthFn<String, String, String, Integer> growthFn =
+        new WatchGrowthFn(
+            growth, StringUtf8Coder.of(), SerializableFunctions.identity(), StringUtf8Coder.of());
+    DoFn.ProcessContext context = mock(DoFn.ProcessContext.class);
+    ManualWatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.Manual(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    ProcessContinuation processContinuation =
+        growthFn.process(context, tracker, watermarkEstimator);
+    assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, watermarkEstimator.currentWatermark());
+    assertFalse(processContinuation.shouldResume());
+  }
+
+  @Test
   public void testNonPollingGrowthTrackerCheckpointNonEmpty() {
     Instant now = Instant.now();
     PollResult<String> claim =

Mime
View raw message