beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [21/50] [abbrv] beam git commit: Truncate the very last fixed window if it goes beyond representable time
Date Fri, 17 Nov 2017 20:31:12 GMT
Truncate the very last fixed window if it goes beyond representable time


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/296cba00
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/296cba00
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/296cba00

Branch: refs/heads/tez-runner
Commit: 296cba009a5c979223fb61bd411816169eaad515
Parents: 555ba40
Author: Kenneth Knowles <kenn@apache.org>
Authored: Fri Oct 27 10:51:45 2017 -0700
Committer: Kenneth Knowles <kenn@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/LateDataUtilsTest.java    |  2 +-
 .../sdk/transforms/windowing/FixedWindows.java  | 24 +++++++++++++++++---
 .../transforms/windowing/FixedWindowsTest.java  | 12 ++++++++++
 3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
index f0f315d..cef865c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
@@ -64,7 +64,7 @@ public class LateDataUtilsTest {
     IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
     assertThat(
         window.maxTimestamp(),
-        Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
+        equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
     assertThat(
         LateDataUtils.garbageCollectionTime(window, strategy),
         equalTo(GlobalWindow.INSTANCE.maxTimestamp()));

http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
index 8b16916..6c9376c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
@@ -76,9 +76,27 @@ public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow>
{
 
   @Override
   public IntervalWindow assignWindow(Instant timestamp) {
-    long start = timestamp.getMillis()
-        - timestamp.plus(size).minus(offset).getMillis() % size.getMillis();
-    return new IntervalWindow(new Instant(start), size);
+    Instant start =
+        new Instant(
+            timestamp.getMillis()
+                - timestamp.plus(size).minus(offset).getMillis() % size.getMillis());
+
+
+    // The global window is inclusive of max timestamp, while interval window excludes its
+    // upper bound
+    Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp().plus(1);
+
+    // The end of the window is either start + size if that is within the allowable range,
otherwise
+    // the end of the global window. Truncating the window drives many other
+    // areas of this system in the appropriate way automatically.
+    //
+    // Though it is curious that the very last representable fixed window is shorter than
the rest,
+    // when we are processing data in the year 294247, we'll probably have technology that
can
+    // account for this.
+    Instant end =
+        start.isAfter(endOfGlobalWindow.minus(size)) ? endOfGlobalWindow : start.plus(size);
+
+    return new IntervalWindow(start, end);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
index 80a534c..8dc02f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
@@ -107,6 +107,18 @@ public class FixedWindowsTest {
     assertThat(mapping.maximumLookback(), equalTo(Duration.ZERO));
   }
 
+  /** Tests that the last hour of the universe in fact ends at the end of time. */
+  @Test
+  public void testEndOfTime() {
+    Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
+    FixedWindows windowFn = FixedWindows.of(Duration.standardHours(1));
+
+    IntervalWindow truncatedWindow =
+        windowFn.assignWindow(endOfGlobalWindow.minus(1));
+
+    assertThat(truncatedWindow.maxTimestamp(), equalTo(endOfGlobalWindow));
+  }
+
   @Test
   public void testDefaultWindowMappingFnGlobalWindow() {
     PartitioningWindowFn<?, ?> windowFn = FixedWindows.of(Duration.standardMinutes(20L));


Mime
View raw message