beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 34/37: Use beam encoders also in the output of the source translation
Date Thu, 24 Oct 2019 10:18:38 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6a278395d77b3578da10a9621c85883a2d6f2ded
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Wed Oct 23 11:45:39 2019 +0200

    Use beam encoders also in the output of the source translation
---
 .../translation/batch/ReadSourceTranslatorBatch.java                  | 4 +---
 .../translation/streaming/ReadSourceTranslatorStreaming.java          | 4 +---
 2 files changed, 2 insertions(+), 6 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index ceb87cf..6af7f55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -77,9 +77,7 @@ class ReadSourceTranslatorBatch<T>
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
             RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-            // using kryo bytes serialization because the mapper already calls
-            // windowedValueCoder.decode, no need to call it also in the Spark encoder
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 9f1e34d..ea10272 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -77,9 +77,7 @@ class ReadSourceTranslatorStreaming<T>
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
             RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-            // using kryo bytes serialization because the mapper already calls
-            // windowedValueCoder.decode, no need to call it also in the Spark encoder
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);


Mime
View raw message