beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [22/50] [abbrv] beam git commit: ReduceFnRunner: Do not manage EOW hold or timer, set GC hold and timer always
Date Fri, 17 Nov 2017 20:31:13 GMT
ReduceFnRunner: Do not manage EOW hold or timer, set GC hold and timer always


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c994ca4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c994ca4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c994ca4

Branch: refs/heads/tez-runner
Commit: 9c994ca44a161c13120bd44f1415ffec38c4b244
Parents: 296cba0
Author: Kenneth Knowles <kenn@apache.org>
Authored: Mon Oct 16 16:46:05 2017 -0700
Committer: Kenneth Knowles <kenn@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunner.java       |  72 ++----
 .../apache/beam/runners/core/WatermarkHold.java | 221 +++----------------
 .../beam/runners/core/ReduceFnRunnerTest.java   | 191 ++++++++++++----
 3 files changed, 197 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 634a2d1..bc1d0db 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -596,28 +595,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
           window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
 
       nonEmptyPanes.recordContent(renamedContext.state());
-
-      // Make sure we've scheduled the end-of-window or garbage collection timer for this
window.
-      Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
+      scheduleGarbageCollectionTimer(directContext);
 
       // Hold back progress of the output watermark until we have processed the pane this
-      // element will be included within. If the element is too late for that, place a hold
at
-      // the end-of-window or garbage collection time to allow empty panes to contribute
elements
-      // which won't be dropped due to lateness by a following computation (assuming the
following
-      // computation uses the same allowed lateness value...)
-      @Nullable Instant hold = watermarkHold.addHolds(renamedContext);
-
-      if (hold != null) {
-        // Assert that holds have a proximate timer.
-        boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
-        boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
-        checkState(
-            holdInWindow == timerInWindow,
-            "set a hold at %s, a timer at %s, which disagree as to whether they are in window
%s",
-            hold,
-            timer,
-            directContext.window());
-      }
+      // element will be included within. If the element is later than the output watermark,
the
+      // hold will be at GC time.
+      watermarkHold.addHolds(renamedContext);
 
       // Execute the reduceFn, which will buffer the value as appropriate
       reduceFn.processValue(renamedContext);
@@ -1070,48 +1053,33 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
   }
 
   /**
-   * Make sure we'll eventually have a timer fire which will tell us to garbage collect
-   * the window state. For efficiency we may need to do this in two steps rather
-   * than one. Return the time at which the timer will fire.
+   * Schedule a timer to garbage collect the window.
+   *
+   * <p>The timer:
    *
    * <ul>
-   * <li>If allowedLateness is zero then we'll garbage collect at the end of the window.
-   * For simplicity we'll set our own timer for this situation even though an
-   * {@link AfterWatermark} trigger may have also set an end-of-window timer.
-   * ({@code setTimer} is idempotent.)
-   * <li>If allowedLateness is non-zero then we could just always set a timer for the
garbage
-   * collection time. However if the windows are large (eg hourly) and the allowedLateness
is small
-   * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So
we
-   * instead set an end-of-window timer and then roll that forward to a garbage collection
timer
-   * when it fires. We use the input watermark to distinguish those cases.
+   *   <li>...must be fired strictly after the expiration of the window.
+   *   <li>...should be as close to the expiration as possible, to have a timely output
of
+   *       remaining buffered data, and GC.
    * </ul>
    */
-  private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
-      ReduceFn<?, ?, ?, W>.Context directContext) {
+  private void scheduleGarbageCollectionTimer(ReduceFn<?, ?, ?, W>.Context directContext)
{
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Instant endOfWindow = directContext.window().maxTimestamp();
-    String which;
-    Instant timer;
-    if (endOfWindow.isBefore(inputWM)) {
-      timer = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
-      which = "garbage collection";
-    } else {
-      timer = endOfWindow;
-      which = "end-of-window";
-    }
+    Instant gcTime =
+        LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
     WindowTracing.trace(
-        "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer
at {} for "
+        "ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at {} for "
         + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
-        which,
-        timer,
+        gcTime,
         key,
         directContext.window(),
         inputWM,
         timerInternals.currentOutputWatermarkTime());
-    checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-                             "Timer %s is beyond end-of-time", timer);
-    directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
-    return timer;
+    checkState(
+        !gcTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+        "Timer %s is beyond end-of-time",
+        gcTime);
+    directContext.timers().setTimer(gcTime, TimeDomain.EVENT_TIME);
   }
 
   private void cancelEndOfWindowAndGarbageCollectionTimers(

http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 8859bbb..9890826 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -25,9 +25,9 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
@@ -84,112 +84,22 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
 
   /**
    * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted)
timestamp
-   * of the element in {@code context}. We allow the actual hold time to be shifted later
by the
-   * {@link TimestampCombiner}, but no further than the end of the window. The hold will
-   * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the
hold
-   * was placed, or {@literal null} if no hold was placed.
+   * of the element in {@code context}.
    *
-   * <p>In the following we'll write {@code E} to represent an element's timestamp
after passing
-   * through the window strategy's output time function, {@code IWM} for the local input
watermark,
-   * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection
-   * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left
to right,
-   * and we write {@code [ ... ]} to denote a bounded window with implied lower bound.
+   * <p>The target time for the aggregated output is shifted by the {@link WindowFn}
and combined
+   * with a {@link TimestampCombiner} to determine where the output watermark is held.
    *
-   * <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness}
-   * is {@code ZERO}.
+   * <p>If the target time would be late, then we do not set this hold, but instead
add the hold
+   * to allow a final output at GC time.
    *
-   * <p>Here are the cases we need to handle. They are conceptually considered in the
-   * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM.
-   * <ol>
-   * <li>(Normal)
-   * <pre>
-   *          |
-   *      [   | E        ]
-   *          |
-   *         IWM
-   * </pre>
-   * This is, hopefully, the common and happy case. The element is locally on-time and can
-   * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window
timer
-   * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's
-   * timestamp (depending on the output time function). Thus the OWM will not proceed past
E
-   * until the next pane fires.
-   *
-   * <li>(Discard - no target window)
-   * <pre>
-   *                       |                            |
-   *      [     E        ] |                            |
-   *                       |                            |
-   *                     GCWM  <-getAllowedLateness->  IWM
-   * </pre>
-   * The element is very locally late. The window has been garbage collected, thus there
-   * is no target pane E could be assigned to. We discard E.
-   *
-   * <li>(Unobservably late)
-   * <pre>
-   *          |    |
-   *      [   | E  |     ]
-   *          |    |
-   *         OWM  IWM
-   * </pre>
-   * The element is locally late, however we can still treat this case as for 'Normal' above
-   * since the IWM has not yet passed the end of the window and the element is ahead of the
-   * OWM. In effect, we get to 'launder' the locally late element and consider it as locally
-   * on-time because no downstream computation can observe the difference.
-   *
-   * <li>(Maybe late 1)
-   * <pre>
-   *          |            |
-   *      [   | E        ] |
-   *          |            |
-   *         OWM          IWM
-   * </pre>
-   * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME}
-   * pane may have already been emitted. However, if timer firings have been delayed then
it
-   * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element
-   * hold since we can't be sure if it will be cleared promptly. Thus this element *may*
find
-   * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's
-   * timestamp. We may however set a garbage collection hold if required.
-   *
-   * <li>(Maybe late 2)
-   * <pre>
-   *               |   |
-   *      [     E  |   | ]
-   *               |   |
-   *              OWM IWM
-   * </pre>
-   * The end-of-window timer has not yet fired, so this element may still appear in an
-   * {@code ON_TIME} pane. However the element is too late to contribute to the output
-   * watermark hold, and thus won't contribute to the pane's timestamp. We can still place
an
-   * end-of-window hold.
-   *
-   * <li>(Maybe late 3)
-   * <pre>
-   *               |       |
-   *      [     E  |     ] |
-   *               |       |
-   *              OWM     IWM
-   * </pre>
-   * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer
-   * has already fired, or it is about to fire. We can place only the garbage collection
hold,
-   * if required.
-   *
-   * <li>(Definitely late)
-   * <pre>
-   *                       |   |
-   *      [     E        ] |   |
-   *                       |   |
-   *                      OWM IWM
-   * </pre>
-   * The element is definitely too late to make an {@code ON_TIME} pane. We are too late
to
-   * place an end-of-window hold. We can still place a garbage collection hold if required.
-   *
-   * </ol>
+   * <p>See https://s.apache.org/beam-lateness for the full design of how late data
and watermarks
+   * interact.
    */
   @Nullable
   public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
     Instant hold = addElementHold(context);
     if (hold == null) {
-      hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/);
+      hold = addGarbageCollectionHold(context, false /*paneIsEmpty*/);
     }
     return hold;
   }
@@ -268,94 +178,22 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
   }
 
   /**
-   * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required).
-   * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was
added.
-   */
-  @Nullable
-  private Instant addEndOfWindowOrGarbageCollectionHolds(
-      ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
-    Instant hold = addEndOfWindowHold(context, paneIsEmpty);
-    if (hold == null) {
-      hold = addGarbageCollectionHold(context, paneIsEmpty);
-    }
-    return hold;
-  }
-
-  /**
-   * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold
was added
-   * (ie the end of window time), or {@literal null} if no end of window hold is possible
and we
-   * should fallback to a garbage collection hold.
-   *
-   * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner})
-   * to clear it. In other words, the input watermark cannot be ahead of the end of window
time.
-   *
-   * <p>An end-of-window hold is added in two situations:
-   * <ol>
-   * <li>An incoming element came in behind the output watermark (so we are too late
for placing
-   * the usual element hold), but it may still be possible to include the element in an
-   * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will
-   * not be considered late by any downstream computation.
-   * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows
which saw at
-   * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements
in
-   * a pane are processed due to a fired trigger we must set both an end of window timer
and an end
-   * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be
considered
-   * late by any downstream computation.
-   * </ol>
-   */
-  @Nullable
-  private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean
paneIsEmpty) {
-    Instant outputWM = timerInternals.currentOutputWatermarkTime();
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Instant eowHold = context.window().maxTimestamp();
-
-    if (eowHold.isBefore(inputWM)) {
-      WindowTracing.trace(
-          "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for "
-              + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-          eowHold, context.key(), context.window(), inputWM, outputWM);
-      return null;
-    }
-
-    checkState(outputWM == null || !eowHold.isBefore(outputWM),
-        "End-of-window hold %s cannot be before output watermark %s",
-        eowHold, outputWM);
-    checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-        "End-of-window hold %s is beyond end-of-time", eowHold);
-    // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
-    // the hold away from the combining function in elementHoldTag.
-    // However if !paneIsEmpty then it could make sense  to use the elementHoldTag here.
-    // Alas, onMerge is forced to add an end of window or garbage collection hold without
-    // knowing whether an element hold is already in place (stopping to check is too expensive).
-    // This it would end up adding an element hold at the end of the window which could
-    // upset the elementHoldTag combining function.
-    context.state().access(EXTRA_HOLD_TAG).add(eowHold);
-    WindowTracing.trace(
-        "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for "
-            + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-        eowHold, context.key(), context.window(), inputWM, outputWM);
-    return eowHold;
-  }
-
-  /**
    * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant}
at
-   * which the hold was added (ie the end of window time plus allowed lateness),
-   * or {@literal null} if no hold was added.
-   *
-   * <p>We only add the hold if it is distinct from what would be added by
-   * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness}
-   * must be non-zero.
+   * which the hold was added (ie the end of window time plus allowed lateness), or {@literal
null}
+   * if no hold was added.
    *
    * <p>A garbage collection hold is added in two situations:
+   *
    * <ol>
-   * <li>An incoming element came in behind the output watermark, and was too late
for placing
-   * the usual element hold or an end of window hold. Place the garbage collection hold so
that
-   * we can guarantee when the pane is finally triggered its output will not be dropped due
to
-   * excessive lateness by any downstream computation.
-   * <li>The {@link WindowingStrategy#getClosingBehavior()} is
-   * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
-   * for all windows which saw at least one element. Again, the garbage collection hold guarantees
-   * that any empty final pane can be given a timestamp which will not be considered beyond
-   * allowed lateness by any downstream computation.
+   *   <li>An incoming element has a timestamp earlier than the output watermark, and
was too late
+   *       for placing the usual element hold or an end of window hold. Place the garbage
collection
+   *       hold so that we can guarantee when the pane is finally triggered its output will
not be
+   *       dropped due to excessive lateness by any downstream computation.
+   *   <li>The {@link WindowingStrategy#getClosingBehavior()} is {@link
+   *       ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
for all
+   *       windows which saw at least one element. Again, the garbage collection hold guarantees
+   *       that any empty final pane can be given a timestamp which will not be considered
beyond
+   *       allowed lateness by any downstream computation.
    * </ol>
    *
    * <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2.
@@ -367,12 +205,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
     Instant inputWM = timerInternals.currentInputWatermarkTime();
     Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy);
 
-    if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
+    if (gcHold.isBefore(inputWM)) {
       WindowTracing.trace(
-          "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary
"
-              + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; "
-              + "outputWatermark:{}",
-          gcHold, context.key(), context.window(), inputWM, outputWM);
+          "{}.addGarbageCollectionHold: gc hold would be before the input watermark "
+              + "for key:{}; window: {}; inputWatermark: {}; outputWatermark: {}",
+          getClass().getSimpleName(), context.key(), context.window(), inputWM, outputWM);
       return null;
     }
 
@@ -432,7 +269,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
     // the hold depends on the min of the element timestamps.
     // At least one merged window must be non-empty for the merge to have been triggered.
     StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
-    addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/);
+    addGarbageCollectionHold(context, false /*paneIsEmpty*/);
   }
 
   /**
@@ -497,7 +334,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
           oldHold = extraHold;
         }
         if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
-          // If no hold (eg because all elements came in behind the output watermark), or
+          // If no hold (eg because all elements came in before the output watermark), or
           // the hold was for garbage collection, take the end of window as the result.
           WindowTracing.debug(
               "WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
@@ -514,9 +351,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable
{
 
         @Nullable Instant newHold = null;
         if (!isFinished) {
-          // Only need to leave behind an end-of-window or garbage collection hold
-          // if future elements will be processed.
-          newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/);
+          newHold = addGarbageCollectionHold(context, true /*paneIsEmpty*/);
         }
 
         return new OldAndNewHolds(oldHold, newHold);

http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 2341502..982e934 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -84,7 +85,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -670,9 +670,9 @@ public class ReduceFnRunnerTest {
     TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
     options.setValue(expectedValue);
 
-    when(mockSideInputReader.contains(Matchers.<PCollectionView<Integer>>any())).thenReturn(true);
-    when(mockSideInputReader.get(
-            Matchers.<PCollectionView<Integer>>any(), any(BoundedWindow.class)))
+    when(mockSideInputReader.contains(org.mockito.Matchers.any(PCollectionView.class)))
+        .thenReturn(true);
+    when(mockSideInputReader.get(any(PCollectionView.class), any(BoundedWindow.class)))
         .then(
             new Answer<Integer>() {
               @Override
@@ -721,9 +721,13 @@ public class ReduceFnRunnerTest {
     MetricsContainerImpl container = new MetricsContainerImpl("any");
     MetricsEnvironment.setCurrentContainer(container);
     // Test handling of late data. Specifically, ensure the watermark hold is correct.
+    Duration allowedLateness = Duration.millis(10);
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
-            AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
+        ReduceFnTester.nonCombining(
+            FixedWindows.of(Duration.millis(10)),
+            mockTriggerStateMachine,
+            AccumulationMode.ACCUMULATING_FIRED_PANES,
+            allowedLateness,
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     // Input watermark -> null
@@ -731,22 +735,27 @@ public class ReduceFnRunnerTest {
     assertEquals(null, tester.getOutputWatermark());
 
     // All on time data, verify watermark hold.
+    IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10));
     injectElement(tester, 1);
     injectElement(tester, 3);
     assertEquals(new Instant(1), tester.getWatermarkHold());
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 2);
     List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
-    assertThat(output, contains(
-        isSingleWindowedValue(containsInAnyOrder(1, 2, 3),
-            1, // timestamp
-            0, // window start
-            10))); // window end
+    assertThat(
+        output,
+        contains(
+            isSingleWindowedValue(
+                containsInAnyOrder(1, 2, 3),
+                equalTo(new Instant(1)),
+                equalTo((BoundedWindow) expectedWindow))));
     assertThat(output.get(0).getPane(),
         equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
 
-    // Holding for the end-of-window transition.
-    assertEquals(new Instant(9), tester.getWatermarkHold());
+    // There is no end-of-window hold, but the timer set by the trigger holds the watermark
+    assertThat(
+        tester.getWatermarkHold(), nullValue());
+
     // Nothing dropped.
     long droppedElements = container.getCounter(
         MetricName.named(ReduceFnRunner.class,
@@ -763,9 +772,16 @@ public class ReduceFnRunnerTest {
     tester.advanceInputWatermark(new Instant(4));
     injectElement(tester, 2);
     injectElement(tester, 3);
-    assertEquals(new Instant(9), tester.getWatermarkHold());
+
+    // Late data has arrived behind the _output_ watermark. The ReduceFnRunner sets a GC
hold
+    // since this data is not permitted to hold up the output watermark.
+    assertThat(
+        tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness)));
+
+    // Now data just ahead of the output watermark arrives and sets an earlier "element"
hold
     injectElement(tester, 5);
     assertEquals(new Instant(5), tester.getWatermarkHold());
+
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 4);
     output = tester.extractOutput();
@@ -780,17 +796,29 @@ public class ReduceFnRunnerTest {
     assertThat(output.get(0).getPane(),
         equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
 
-    // All late -- output at end of window timestamp.
+    // Since the element hold is cleared, there is no hold remaining
+    assertThat(tester.getWatermarkHold(), nullValue());
+
+    // All behind the output watermark -- hold is at GC time (if we imagine the
+    // trigger sets a timer for ON_TIME firing, that is actually when they'll be emitted)
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     tester.advanceInputWatermark(new Instant(8));
     injectElement(tester, 6);
     injectElement(tester, 5);
-    assertEquals(new Instant(9), tester.getWatermarkHold());
+    assertThat(
+        tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness)));
+
     injectElement(tester, 4);
 
     // Fire the ON_TIME pane
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
-    tester.advanceInputWatermark(new Instant(10));
+
+    // To get an ON_TIME pane, we need the output watermark to be held back a little; this
would
+    // be done by way of the timers set by the trigger, which are mocked here
+    tester.setAutoAdvanceOutputWatermark(false);
+
+    tester.advanceInputWatermark(expectedWindow.maxTimestamp().plus(1));
+    tester.fireTimer(expectedWindow, expectedWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
 
     // Output time is end of the window, because all the new data was late, but the pane
     // is the ON_TIME pane.
@@ -806,6 +834,8 @@ public class ReduceFnRunnerTest {
     assertThat(output.get(0).getPane(),
         equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)));
 
+    tester.setAutoAdvanceOutputWatermark(true);
+
     // This is "pending" at the time the watermark makes it way-late.
     // Because we're about to expire the window, we output it.
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
@@ -845,26 +875,32 @@ public class ReduceFnRunnerTest {
     tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  /** Make sure that if data comes in too late to make it on time, the hold is the GC time.
*/
   @Test
   public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception {
-    // Make sure holds are only set if they are accompanied by an end-of-window timer.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
-            AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
+        ReduceFnTester.nonCombining(
+            FixedWindows.of(Duration.millis(10)),
+            mockTriggerStateMachine,
+            AccumulationMode.ACCUMULATING_FIRED_PANES,
+            Duration.millis(10),
             ClosingBehavior.FIRE_ALWAYS);
     tester.setAutoAdvanceOutputWatermark(false);
 
-    // Case: Unobservably late
+    // Case: Unobservably "late" relative to input watermark, but on time for output watermark
     tester.advanceInputWatermark(new Instant(15));
     tester.advanceOutputWatermark(new Instant(11));
+
+    IntervalWindow expectedWindow = new IntervalWindow(new Instant(10), new Instant(20));
     injectElement(tester, 14);
     // Hold was applied, waiting for end-of-window timer.
     assertEquals(new Instant(14), tester.getWatermarkHold());
-    assertEquals(new Instant(19), tester.getNextTimer(TimeDomain.EVENT_TIME));
 
-    // Trigger the end-of-window timer.
+    // Trigger the end-of-window timer, fire a timer as though the mock trigger set it
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     tester.advanceInputWatermark(new Instant(20));
+    tester.fireTimer(expectedWindow, expectedWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
+
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     // Hold has been replaced with garbage collection hold. Waiting for garbage collection.
     assertEquals(new Instant(29), tester.getWatermarkHold());
@@ -887,10 +923,15 @@ public class ReduceFnRunnerTest {
   @Test
   public void testPaneInfoAllStates() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
-            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
+        ReduceFnTester.nonCombining(
+            FixedWindows.of(Duration.millis(10)),
+            mockTriggerStateMachine,
+            AccumulationMode.DISCARDING_FIRED_PANES,
+            Duration.millis(100),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
+    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
     tester.advanceInputWatermark(new Instant(0));
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 1);
@@ -903,7 +944,9 @@ public class ReduceFnRunnerTest {
         WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY,
1, -1))));
 
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.setAutoAdvanceOutputWatermark(false);
     tester.advanceInputWatermark(new Instant(15));
+
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 3);
     assertThat(tester.extractOutput(), contains(
@@ -911,6 +954,7 @@ public class ReduceFnRunnerTest {
             PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
 
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    tester.setAutoAdvanceOutputWatermark(true);
     injectElement(tester, 4);
     assertThat(tester.extractOutput(), contains(
         WindowMatchers.valueWithPaneInfo(
@@ -1032,6 +1076,54 @@ public class ReduceFnRunnerTest {
         WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
   }
 
+  /**
+   * If the trigger does not care about the watermark, the ReduceFnRunner should still emit
an
+   * element for the ON_TIME pane.
+   */
+  @Test
+  public void testNoWatermarkTriggerNoHold() throws Exception {
+    Duration allowedLateness = Duration.standardDays(1);
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+                .withTrigger(
+                    Repeatedly.forever(
+                        AfterProcessingTime.pastFirstElementInPane()
+                            .plusDelayOf(Duration.standardSeconds(5))))
+                .withAllowedLateness(allowedLateness));
+
+    // First, an element comes in on time in [0, 10) but ReduceFnRunner should
+    // not set a hold or timer for 9. That is the trigger's job.
+    IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10));
+    tester.advanceInputWatermark(new Instant(0));
+    tester.advanceProcessingTime(new Instant(0));
+
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)));
+
+    // Since some data arrived, the element hold will be the end of the window.
+    assertThat(tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp()));
+
+    tester.advanceProcessingTime(new Instant(6000));
+
+    // Sanity check; we aren't trying to verify output in this test
+    assertThat(tester.getOutputSize(), equalTo(1));
+
+    // Since we did not request empty final panes, no hold
+    assertThat(tester.getWatermarkHold(), nullValue());
+
+    // So when the input watermark advanced, the output advances with it (automated by tester)
+    tester.advanceInputWatermark(
+        new Instant(expectedWindow.maxTimestamp().plus(Duration.standardHours(1))));
+
+    // Now late data arrives
+    tester.injectElements(TimestampedValue.of(3, new Instant(3)));
+
+    // The ReduceFnRunner should set a GC hold since the element was too late and its timestamp
+    // will be ignored for the purposes of the watermark hold
+    assertThat(
+        tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness)));
+  }
+
   @Test
   public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
@@ -1216,32 +1308,39 @@ public class ReduceFnRunnerTest {
    */
   @Test
   public void testMergingWithCloseTrigger() throws Exception {
+    Duration allowedLateness = Duration.millis(50);
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+        ReduceFnTester.nonCombining(
+            Sessions.withGapDuration(Duration.millis(10)),
             mockTriggerStateMachine,
-                                    AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
-                                    ClosingBehavior.FIRE_IF_NON_EMPTY);
+            AccumulationMode.DISCARDING_FIRED_PANES,
+            allowedLateness,
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     // Create a new merged session window.
-    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
-                          TimestampedValue.of(2, new Instant(2)));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
 
     // Force the trigger to be closed for the merged window.
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     triggerShouldFinish(mockTriggerStateMachine);
+
+    // Fire and end-of-window timer as though the trigger set it
     tester.advanceInputWatermark(new Instant(13));
+    tester.fireTimer(mergedWindow, mergedWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
 
     // Trigger is now closed.
-    assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+    assertTrue(tester.isMarkedFinished(mergedWindow));
 
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
 
     // Revisit the same session window.
-    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
-                          TimestampedValue.of(2, new Instant(2)));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
 
     // Trigger is still closed.
-    assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+    assertTrue(tester.isMarkedFinished(mergedWindow));
   }
 
   /**
@@ -1250,11 +1349,16 @@ public class ReduceFnRunnerTest {
    */
   @Test
   public void testMergingWithReusedWindow() throws Exception {
+    Duration allowedLateness = Duration.millis(50);
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+        ReduceFnTester.nonCombining(
+            Sessions.withGapDuration(Duration.millis(10)),
             mockTriggerStateMachine,
-                                    AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
-                                    ClosingBehavior.FIRE_IF_NON_EMPTY);
+            AccumulationMode.DISCARDING_FIRED_PANES,
+            allowedLateness,
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(11));
 
     // One elements in one session window.
     tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
@@ -1263,6 +1367,7 @@ public class ReduceFnRunnerTest {
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     triggerShouldFinish(mockTriggerStateMachine);
     tester.advanceInputWatermark(new Instant(15));
+    tester.fireTimer(mergedWindow, mergedWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
 
     // Another element in the same session window.
     // Should be discarded with 'window closed'.
@@ -1276,11 +1381,13 @@ public class ReduceFnRunnerTest {
 
     List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
     assertThat(output.size(), equalTo(1));
-    assertThat(output.get(0),
-               isSingleWindowedValue(containsInAnyOrder(1),
-                                     1, // timestamp
-                                     1, // window start
-                                     11)); // window end
+    assertThat(
+        output.get(0),
+        isSingleWindowedValue(
+            containsInAnyOrder(1),
+            equalTo(new Instant(1)), // timestamp
+            equalTo((BoundedWindow) mergedWindow)));
+
     assertThat(
         output.get(0).getPane(),
         equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));


Mime
View raw message