beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/5] incubator-beam git commit: Explode windows in DirectRunner's Window.into evaluator
Date Thu, 23 Jun 2016 17:01:44 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master f2d2ce5f4 -> 82ae661c5


Explode windows in DirectRunner's Window.into evaluator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3aa4c7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3aa4c7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3aa4c7f

Branch: refs/heads/master
Commit: a3aa4c7f1bd54122358c8e41e984a0d0000c160b
Parents: edf11fa
Author: Kenneth Knowles <klk@google.com>
Authored: Mon Jun 20 11:37:45 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Jun 21 20:58:31 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/WindowEvaluatorFactory.java  |  12 +-
 .../direct/WindowEvaluatorFactoryTest.java      | 174 ++++++++++++-------
 .../org/apache/beam/sdk/WindowMatchers.java     |  80 ++++++++-
 3 files changed, 193 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index b07b58a..6045912 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -82,11 +82,13 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public void processElement(WindowedValue<InputT> element) throws Exception {
-      Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
-      outputBundle.add(
-          WindowedValue.<InputT>of(
-              element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
+    public void processElement(WindowedValue<InputT> compressedElement) throws Exception
{
+      for (WindowedValue<InputT> element : compressedElement.explodeWindows()) {
+        Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
+        outputBundle.add(
+            WindowedValue.<InputT>of(
+                element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
+      }
     }
 
     private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 71abcca..c5faa5a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -73,23 +76,30 @@ public class WindowEvaluatorFactoryTest {
 
   private BundleFactory bundleFactory;
 
-  private WindowedValue<Long> first =
+  private WindowedValue<Long> valueInGlobalWindow =
       WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L));
-  private WindowedValue<Long> second =
-      WindowedValue.of(Long.valueOf(1L),
-          EPOCH.plus(Duration.standardDays(3)),
-          ImmutableList.of(GlobalWindow.INSTANCE,
-              new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE),
-              new IntervalWindow(EPOCH.plus(Duration.standardDays(3)),
-                  EPOCH.plus(Duration.standardDays(6)))),
-          PaneInfo.NO_FIRING);
-  private WindowedValue<Long> third =
+
+  private WindowedValue<Long> valueInIntervalWindow =
       WindowedValue.of(
           Long.valueOf(2L),
           new Instant(-10L),
           new IntervalWindow(new Instant(-100), EPOCH),
           PaneInfo.NO_FIRING);
 
+  private IntervalWindow intervalWindow1 =
+      new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+  private IntervalWindow intervalWindow2 =
+      new IntervalWindow(
+          EPOCH.plus(Duration.standardDays(3)), EPOCH.plus(Duration.standardDays(6)));
+
+  private WindowedValue<Long> valueInGlobalAndTwoIntervalWindows =
+      WindowedValue.of(
+          Long.valueOf(1L),
+          EPOCH.plus(Duration.standardDays(3)),
+          ImmutableList.of(GlobalWindow.INSTANCE, intervalWindow1, intervalWindow2),
+          PaneInfo.NO_FIRING);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
@@ -118,7 +128,10 @@ public class WindowEvaluatorFactoryTest {
         Iterables.getOnlyElement(result.getOutputBundles()),
         Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
-    assertThat(committed.getElements(), containsInAnyOrder(third, first, second));
+    assertThat(
+        committed.getElements(),
+        containsInAnyOrder(
+            valueInIntervalWindow, valueInGlobalWindow, valueInGlobalAndTwoIntervalWindows));
   }
 
   @Test
@@ -141,16 +154,22 @@ public class WindowEvaluatorFactoryTest {
         Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
 
-    WindowedValue<Long> expectedNewFirst =
-        WindowedValue.of(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedNewSecond =
-        WindowedValue.of(
-            1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedNewThird =
-        WindowedValue.of(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING);
     assertThat(
         committed.getElements(),
-        containsInAnyOrder(expectedNewFirst, expectedNewSecond, expectedNewThird));
+        containsInAnyOrder(
+            // value in global window
+            isSingleWindowedValue(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING),
+
+            // value in just interval window
+            isSingleWindowedValue(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING),
+
+            // value in global window and two interval windows
+            isSingleWindowedValue(
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING),
+            isSingleWindowedValue(
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING),
+            isSingleWindowedValue(
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING)));
   }
 
   @Test
@@ -177,24 +196,39 @@ public class WindowEvaluatorFactoryTest {
     BoundedWindow wMinusSlide =
         new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), EPOCH.plus(slidingBy));
 
-    WindowedValue<Long> expectedFirst =
-        WindowedValue.of(
-            first.getValue(),
-            first.getTimestamp(),
-            ImmutableSet.of(w1, wMinusSlide),
-            PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedSecond =
-        WindowedValue.of(
-            second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedThird =
-        WindowedValue.of(
-            third.getValue(),
-            third.getTimestamp(),
-            ImmutableSet.of(wMinus1, wMinusSlide),
-            PaneInfo.NO_FIRING);
-
     assertThat(
-        committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird));
+        committed.getElements(),
+        containsInAnyOrder(
+            // Value in global window mapped to one windowed value in multiple windows
+            isWindowedValue(
+                valueInGlobalWindow.getValue(),
+                valueInGlobalWindow.getTimestamp(),
+                ImmutableSet.of(w1, wMinusSlide),
+                PaneInfo.NO_FIRING),
+
+            // Value in interval window mapped to one windowed value in multiple windows
+            isWindowedValue(
+                valueInIntervalWindow.getValue(),
+                valueInIntervalWindow.getTimestamp(),
+                ImmutableSet.of(wMinus1, wMinusSlide),
+                PaneInfo.NO_FIRING),
+
+            // Value in three windows mapped to three windowed values in the same multiple
windows
+            isWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                ImmutableSet.of(w1, w2),
+                PaneInfo.NO_FIRING),
+            isWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                ImmutableSet.of(w1, w2),
+                PaneInfo.NO_FIRING),
+            isWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                ImmutableSet.of(w1, w2),
+                PaneInfo.NO_FIRING)));
   }
 
   @Test
@@ -212,34 +246,54 @@ public class WindowEvaluatorFactoryTest {
         Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
 
-    WindowedValue<Long> expectedFirst =
-        WindowedValue.of(
-            first.getValue(),
-            first.getTimestamp(),
-            new IntervalWindow(first.getTimestamp(), first.getTimestamp().plus(1L)),
-            PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedSecond = WindowedValue.of(second.getValue(),
-        second.getTimestamp(),
-        new IntervalWindow(second.getTimestamp(), second.getTimestamp().plus(1L)),
-        PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedThird =
-        WindowedValue.of(
-            third.getValue(),
-            third.getTimestamp(),
-            third.getWindows(),
-            PaneInfo.NO_FIRING);
-
     assertThat(
-        committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird));
+        committed.getElements(),
+        containsInAnyOrder(
+            // Value in global window mapped to [timestamp, timestamp+1)
+            isSingleWindowedValue(
+                valueInGlobalWindow.getValue(),
+                valueInGlobalWindow.getTimestamp(),
+                new IntervalWindow(
+                    valueInGlobalWindow.getTimestamp(),
+                    valueInGlobalWindow.getTimestamp().plus(1L)),
+                PaneInfo.NO_FIRING),
+
+            // Value in interval window mapped to the same window
+            isWindowedValue(
+                valueInIntervalWindow.getValue(),
+                valueInIntervalWindow.getTimestamp(),
+                valueInIntervalWindow.getWindows(),
+                PaneInfo.NO_FIRING),
+
+            // Value in global window and two interval windows exploded and mapped in both
ways
+            isSingleWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                new IntervalWindow(
+                    valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                    valueInGlobalAndTwoIntervalWindows.getTimestamp().plus(1L)),
+                PaneInfo.NO_FIRING),
+
+            isSingleWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                intervalWindow1,
+                PaneInfo.NO_FIRING),
+
+            isSingleWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                intervalWindow2,
+                PaneInfo.NO_FIRING)));
   }
 
   private CommittedBundle<Long> createInputBundle() {
     CommittedBundle<Long> inputBundle =
         bundleFactory
             .createRootBundle(input)
-            .add(first)
-            .add(second)
-            .add(third)
+            .add(valueInGlobalWindow)
+            .add(valueInGlobalAndTwoIntervalWindows)
+            .add(valueInIntervalWindow)
             .commit(Instant.now());
     return inputBundle;
   }
@@ -262,9 +316,9 @@ public class WindowEvaluatorFactoryTest {
             inputBundle,
             evaluationContext);
 
-    evaluator.processElement(first);
-    evaluator.processElement(second);
-    evaluator.processElement(third);
+    evaluator.processElement(valueInGlobalWindow);
+    evaluator.processElement(valueInGlobalAndTwoIntervalWindows);
+    evaluator.processElement(valueInIntervalWindow);
     TransformResult result = evaluator.finishBundle();
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
index 7a5e2fb..48c2589 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
@@ -22,6 +22,8 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import com.google.common.collect.Lists;
+
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -37,19 +39,51 @@ import java.util.Objects;
 public class WindowMatchers {
 
   public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
-      Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher,
+      T value,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo paneInfo) {
+
+    Collection<Matcher<? super BoundedWindow>> windowMatchers =
+        Lists.newArrayListWithCapacity(windows.size());
+    for (BoundedWindow window : windows) {
+      windowMatchers.add(Matchers.equalTo(window));
+    }
+
+    return isWindowedValue(
+        Matchers.equalTo(value),
+        Matchers.equalTo(timestamp),
+        Matchers.containsInAnyOrder(windowMatchers),
+        Matchers.equalTo(paneInfo));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
+      Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
+      Matcher<? super PaneInfo> paneInfoMatcher) {
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher);
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
       Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowsMatcher);
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything());
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
       Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher)
{
-    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, Matchers.anything(), Matchers.anything());
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
       Matcher<? super T> valueMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+    return new WindowedValueMatcher<>(
+        valueMatcher, Matchers.anything(), Matchers.anything(), Matchers.anything());
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
@@ -59,20 +93,46 @@ public class WindowMatchers {
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
+    return WindowMatchers.<T>isSingleWindowedValue(
+        Matchers.equalTo(value),
+        Matchers.equalTo(timestamp),
+        Matchers.equalTo(window),
+        Matchers.equalTo(paneInfo));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      T value, Instant timestamp, BoundedWindow window) {
+    return WindowMatchers.<T>isSingleWindowedValue(
+        Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
       Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) {
     IntervalWindow intervalWindow =
         new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
     return WindowMatchers.<T>isSingleWindowedValue(
         valueMatcher,
         Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
-        Matchers.<BoundedWindow>equalTo(intervalWindow));
+        Matchers.<BoundedWindow>equalTo(intervalWindow),
+        Matchers.anything());
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher,
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
       Matcher<? super BoundedWindow> windowMatcher) {
     return new WindowedValueMatcher<T>(
-        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher));
+        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), Matchers.anything());
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
+      Matcher<? super BoundedWindow> windowMatcher,
+      Matcher<? super PaneInfo> paneInfoMatcher) {
+    return new WindowedValueMatcher<T>(
+        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), paneInfoMatcher);
   }
 
   public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
@@ -114,14 +174,17 @@ public class WindowMatchers {
     private Matcher<? super T> valueMatcher;
     private Matcher<? super Instant> timestampMatcher;
     private Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher;
+    private Matcher<? super PaneInfo> paneInfoMatcher;
 
     private WindowedValueMatcher(
         Matcher<? super T> valueMatcher,
         Matcher<? super Instant> timestampMatcher,
-        Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher)
{
+        Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
+        Matcher<? super PaneInfo> paneInfoMatcher) {
       this.valueMatcher = valueMatcher;
       this.timestampMatcher = timestampMatcher;
       this.windowsMatcher = windowsMatcher;
+      this.paneInfoMatcher = paneInfoMatcher;
     }
 
     @Override
@@ -130,6 +193,7 @@ public class WindowMatchers {
           .appendText("a WindowedValue(").appendValue(valueMatcher)
           .appendText(", ").appendValue(timestampMatcher)
           .appendText(", ").appendValue(windowsMatcher)
+          .appendText(", ").appendValue(paneInfoMatcher)
           .appendText(")");
     }
 


Mime
View raw message