beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [1/3] beam git commit: ReduceFnRunner.onTrigger: add short circuit for empty pane, and move inputWM and pane after the short circuit.
Date Thu, 29 Jun 2017 06:04:17 GMT
Repository: beam
Updated Branches:
  refs/heads/master b1ece0196 -> 997bf4025


ReduceFnRunner.onTrigger: add short circuit for empty pane, and move inputWM and pane after
the short circuit.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2efb0d56
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2efb0d56
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2efb0d56

Branch: refs/heads/master
Commit: 2efb0d561fc62ba44bf71db6937a54708944f0f6
Parents: 38dd12d
Author: Author: 波特 <haozhi.shz@alibaba-inc.com>
Authored: Fri May 26 17:46:55 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Thu Jun 29 14:01:54 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/ReduceFnRunner.java  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2efb0d56/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index a33bac1..ef33bef 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -953,11 +953,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
       final boolean isFinished, boolean isEndOfWindow)
           throws Exception {
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-
-    // Calculate the pane info.
-    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
-
     // Extract the window hold, and as a side effect clear it.
     final WatermarkHold.OldAndNewHolds pair =
         watermarkHold.extractAndRelease(renamedContext, isFinished).read();
@@ -966,7 +961,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     @Nullable Instant newHold = pair.newHold;
 
     final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
+    if (isEmpty
+        && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_IF_NON_EMPTY
+        && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
{
+      return newHold;
+    }
 
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
     if (newHold != null) {
       // We can't be finished yet.
       checkState(
@@ -998,6 +999,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       }
     }
 
+    // Calculate the pane info.
+    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
+
     // Only emit a pane if it has data or empty panes are observable.
     if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
       // Run reduceFn.onTrigger method.


Mime
View raw message