beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/4] incubator-beam git commit: Spark runner: Assign windows when re-windowing into global window
Date Thu, 09 Jun 2016 21:58:19 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master d6adbbf96 -> d53e96a0d


Spark runner: Assign windows when re-windowing into global window

Previously, window assignment was elided when the window was the
global window. But when the source windows are not the global window,
this elision is not correct. Now window assignment is run except
when both the source *and* the destination window are the global window
(which remains a common case in globally windowed batch tests
using PAssert).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d53e96a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d53e96a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d53e96a0

Branch: refs/heads/master
Commit: d53e96a0d1f5f26ad0e3efc90dc9f7b53135443b
Parents: f222df1
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Jun 9 09:15:39 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Jun 9 14:41:09 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/translation/TransformTranslator.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d53e96a0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b462d35..ebceb6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -699,13 +699,16 @@ public final class TransformTranslator {
         JavaRDDLike<WindowedValue<T>, ?> inRDD =
             (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
         WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
-        if (windowFn instanceof GlobalWindows) {
+        // Avoid running assign windows if both source and destination are global window
+        if (context.getInput(transform).getWindowingStrategy().getWindowFn()
+                instanceof GlobalWindows
+            && windowFn instanceof GlobalWindows) {
           context.setOutputRDD(transform, inRDD);
         } else {
           @SuppressWarnings("unchecked")
           DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
           DoFnFunction<T, T> dofn =
-                  new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
+              new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
           context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
         }
       }


Mime
View raw message