beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Repeatedly#onFire should clear all finished bits
Date Fri, 01 Apr 2016 02:11:50 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master e5bca60de -> 164676bc7


Repeatedly#onFire should clear all finished bits

Previously, Repeatedly#onFire only cleared the finished bits associated
with the root of the sub-tree, as demonstrated by the new unit tests.

This led to problems with AfterFirst#shouldFire, which checked to see if
any of the sub-triggers have their finished bits set.

Now, Repeatedly#onFire calls #resetTree, which clears all the finished
bits in the entire sub-tree.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ad027f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ad027f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ad027f4

Branch: refs/heads/master
Commit: 2ad027f49adbcb80ffac87c3fffa81fd68c89e86
Parents: cab0c57
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Mar 31 15:03:59 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Thu Mar 31 15:45:13 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/windowing/Repeatedly.java    |  4 +-
 .../transforms/windowing/RepeatedlyTest.java    | 83 ++++++++++++++++++++
 2 files changed, 85 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ad027f4/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
index 3416551..9be0259 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
@@ -90,8 +90,8 @@ public class Repeatedly<W extends BoundedWindow> extends Trigger<W>
{
     getRepeated(context).invokeOnFire(context);
 
     if (context.trigger().isFinished(REPEATED)) {
-      context.trigger().setFinished(false, REPEATED);
-      getRepeated(context).invokeClear(context);
+      // Reset tree will recursively clear the finished bits, and invoke clear.
+      context.forTrigger(getRepeated(context)).trigger().resetTree();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ad027f4/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java
index ddfec1c..99907b2 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java
@@ -126,4 +126,87 @@ public class RepeatedlyTest {
     IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
     assertTrue(tester.shouldFire(mergedWindow));
   }
+
+  @Test
+  public void testRepeatedlyAfterFirstElementCount() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                AfterFirst.<GlobalWindow>of(
+                    AfterProcessingTime.<GlobalWindow>pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15)),
+                    AfterPane.<GlobalWindow>elementCountAtLeast(5))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2, 3, 4, 5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                AfterFirst.<GlobalWindow>of(
+                    AfterProcessingTime.<GlobalWindow>pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15)),
+                    AfterPane.<GlobalWindow>elementCountAtLeast(5))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyElementCount() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(AfterPane.<GlobalWindow>elementCountAtLeast(5)),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2, 3, 4, 5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyProcessingTime() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                    AfterProcessingTime.<GlobalWindow>pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
 }


Mime
View raw message