beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 25/37: Apply new Encoders to Read source
Date Thu, 24 Oct 2019 10:15:34 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3cc256e5f81616d8b4126cef6ae8d049fb03460f
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Fri Sep 6 17:49:10 2019 +0200

    Apply new Encoders to Read source
---
 .../translation/batch/ReadSourceTranslatorBatch.java              | 8 ++++++--
 .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +---
 .../translation/streaming/ReadSourceTranslatorStreaming.java      | 7 +++++--
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 6ae6646..2dcf66f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch<T>
             .load();
 
     // extract windowedValue from Row
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
+        .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
-            RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()),
-            EncoderHelpers.windowedValueEncoder());
+            RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
index 6ee0e07..ac74c29 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
@@ -43,13 +43,11 @@ public final class RowHelpers {
    * @return A {@link MapFunction} that accepts a {@link Row} and returns its {@link WindowedValue}.
    */
   public static <T> MapFunction<Row, WindowedValue<T>> extractWindowedValueFromRowMapFunction(
-      Coder<T> coder) {
+      WindowedValue.WindowedValueCoder<T> windowedValueCoder) {
     return (MapFunction<Row, WindowedValue<T>>)
         value -> {
           // there is only one value put in each Row by the InputPartitionReader
           byte[] bytes = (byte[]) value.get(0);
-          WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
-              WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
           return windowedValueCoder.decode(new ByteArrayInputStream(bytes));
         };
   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index c3d07ff..9e03d96 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming<T>
             .load();
 
     // extract windowedValue from Row
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
+        .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
-            RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()),
-            EncoderHelpers.windowedValueEncoder());
+            RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);


Mime
View raw message