beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] branch spark-runner_structured-streaming updated: Improve type enforcement in ReadSourceTranslator
Date Tue, 11 Dec 2018 15:22:16 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this
push:
     new e7ed784  Improve type enforcement in ReadSourceTranslator
e7ed784 is described below

commit e7ed78420afa8c27801419bc0ed54c87970ae3d0
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Tue Dec 11 16:21:05 2018 +0100

    Improve type enforcement in ReadSourceTranslator
---
 .../translation/batch/ReadSourceTranslatorBatch.java               | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index a75730a..2c1aa93 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -62,11 +63,11 @@ class ReadSourceTranslatorBatch<T>
     // instantiates to be able to call DatasetSource.initialize()
     MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>()
{
       @Override public WindowedValue<T> call(Row value) throws Exception {
-        //TODO fix row content extraction: I guess cast is not enough
-        return (WindowedValue<T>) value.get(0);
+        //there is only one value put in each Row by the InputPartitionReader
+        return value.<WindowedValue<T>>getAs(0);
       }
     };
-    //TODO fix encoder
+    //TODO fix encoder: how to get an Encoder<WindowedValue<T>>
     Dataset<WindowedValue<T>> dataset = rowDataset.map(func, null);
 
     PCollection<T> output = (PCollection<T>) context.getOutput();


Mime
View raw message