beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 39/50: Move DatasetSourceMock to proper batch mode
Date Fri, 04 Jan 2019 10:39:01 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 49ee25994ceaf766bca17f5600e72df48583e3f0
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Thu Dec 27 16:33:00 2018 +0100

    Move DatasetSourceMock to proper batch mode
---
 .../batch/ReadSourceTranslatorMockBatch.java       |  3 +-
 .../translation/io/DatasetSourceMock.java          | 41 +++++-----------------
 2 files changed, 10 insertions(+), 34 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 4a509de..184d24c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -46,9 +46,8 @@ class ReadSourceTranslatorMockBatch<T>
   public void translateTransform(
       PTransform<PBegin, PCollection<T>> transform, TranslationContext context)
{
     SparkSession sparkSession = context.getSparkSession();
-    DataStreamReader dataStreamReader = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS);
 
-    Dataset<Row> rowDataset = dataStreamReader.load();
+    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
 
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>()
{
       @Override public WindowedValue call(Row value) throws Exception {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
index ec88364..f722377 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
@@ -22,52 +22,29 @@ import static scala.collection.JavaConversions.asScalaBuffer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
 import org.apache.spark.sql.types.StructType;
 import org.joda.time.Instant;
 
 /**
  * This is a mock source that gives values between 0 and 999.
  */
-public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
+public class DatasetSourceMock implements DataSourceV2, ReadSupport {
 
-  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
String checkpointLocation, DataSourceOptions options) {
-    return new DatasetMicroBatchReader();
+  @Override public DataSourceReader createReader(DataSourceOptions options) {
+    return new DatasetReader();
   }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
-  private static class DatasetMicroBatchReader implements MicroBatchReader {
-
-    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset>
end) {
-    }
-
-    @Override public Offset getStartOffset() {
-      return null;
-    }
-
-    @Override public Offset getEndOffset() {
-      return null;
-    }
-
-    @Override public Offset deserializeOffset(String json) {
-      return null;
-    }
-
-    @Override public void commit(Offset end) {
-    }
-
-    @Override public void stop() {
-    }
+  private static class DatasetReader implements DataSourceReader {
 
     @Override public StructType readSchema() {
       return new StructType();
@@ -78,7 +55,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport
{
       result.add(new InputPartition<InternalRow>() {
 
         @Override public InputPartitionReader<InternalRow> createPartitionReader()
{
-          return new DatasetMicroBatchPartitionReaderMock();
+          return new DatasetPartitionReaderMock();
         }
       });
       return result;
@@ -86,12 +63,12 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport
{
   }
 
   /** This class is a mocked reader*/
-  private static class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow>
{
+  private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow>
{
 
     private ArrayList<Integer> values;
     private int currentIndex = 0;
 
-    private DatasetMicroBatchPartitionReaderMock() {
+    private DatasetPartitionReaderMock() {
       for (int i = 0; i < 1000; i++){
         values.add(i);
       }


Mime
View raw message