beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 28/37: Apply new Encoders to Window assign translation
Date Thu, 24 Oct 2019 10:18:32 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7f1060aa189a625400a1fbcfc2503d3e721ade8f
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Fri Sep 27 11:22:15 2019 +0200

    Apply new Encoders to Window assign translation
---
 .../translation/batch/WindowAssignTranslatorBatch.java            | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index fb37f97..576b914 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.sql.Dataset;
@@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch<T>
     if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
       context.putDataset(output, inputDataset);
     } else {
+      WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
+      WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder = WindowedValue.FullWindowedValueCoder
+          .of(input.getCoder(), windowFn.windowCoder());
       Dataset<WindowedValue<T>> outputDataset =
           inputDataset.map(
-              WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()),
-              EncoderHelpers.windowedValueEncoder());
+              WindowingHelpers.assignWindowsMapFunction(windowFn),
+              EncoderHelpers.fromBeamCoder(windoweVdalueCoder));
       context.putDataset(output, outputDataset);
     }
   }


Mime
View raw message