beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 46/50: Pass Beam Source and PipelineOptions to the spark DataSource as serialized strings
Date Fri, 04 Jan 2019 10:39:08 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1cea29df3702ef438d8eb5964450a7bafea3c7d5
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Wed Jan 2 15:52:46 2019 +0100

    Pass Beam Source and PipelineOptions to the spark DataSource as serialized strings
---
 .../translation/batch/DatasetSourceBatch.java      | 41 ++++++++++++++++------
 .../batch/ReadSourceTranslatorBatch.java           | 16 +++++++--
 2 files changed, 45 insertions(+), 12 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index f4cd885..331e397 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -24,8 +24,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -45,16 +46,38 @@ import org.apache.spark.sql.types.StructType;
  */
 public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
 
+  static final String BEAM_SOURCE_OPTION = "beam-source";
+  static final String DEFAULT_PARALLELISM = "default-parallelism";
+  static final String PIPELINE_OPTIONS = "pipeline-options";
   private int numPartitions;
   private Long bundleSize;
-  private TranslationContext context;
   private BoundedSource<T> source;
+  private SparkPipelineOptions sparkPipelineOptions;
 
 
-  @Override public DataSourceReader createReader(DataSourceOptions options) {
-    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
+  @SuppressWarnings("unchecked")
+  @Override
+  public DataSourceReader createReader(DataSourceOptions options) {
+    if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
+      throw new RuntimeException("Beam source was not set in DataSource options");
+    }
+    this.source = Base64Serializer
+        .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
+
+    if (!options.get(DEFAULT_PARALLELISM).isPresent()){
+      throw new RuntimeException("Spark default parallelism was not set in DataSource options");
+    }
+    if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
+      throw new RuntimeException("Beam source was not set in DataSource options");
+    }
+    this.numPartitions = Integer.valueOf(options.get(DEFAULT_PARALLELISM).get());
     checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
-    this.bundleSize = context.getOptions().getBundleSize();
+    if (!options.get(PIPELINE_OPTIONS).isPresent()){
+      throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
+    }
+    this.sparkPipelineOptions = SerializablePipelineOptions
+        .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
+    this.bundleSize = sparkPipelineOptions.getBundleSize();
     return new DatasetReader();  }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
@@ -62,7 +85,6 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport
{
 
     private Optional<StructType> schema;
     private String checkpointLocation;
-    private DataSourceOptions options;
 
     @Override
     public StructType readSchema() {
@@ -73,13 +95,12 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport
{
     public List<InputPartition<InternalRow>> planInputPartitions() {
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       long desiredSizeBytes;
-      SparkPipelineOptions options = context.getOptions();
       try {
         desiredSizeBytes =
             (bundleSize == null)
-                ? source.getEstimatedSizeBytes(options) / numPartitions
+                ? source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions
                 : bundleSize;
-        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes,
options);
+        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes,
sparkPipelineOptions);
         for (BoundedSource<T> source : sources) {
           result.add(
               new InputPartition<InternalRow>() {
@@ -88,7 +109,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport
{
                 public InputPartitionReader<InternalRow> createPartitionReader() {
                   BoundedReader<T> reader = null;
                   try {
-                    reader = source.createReader(options);
+                    reader = source.createReader(sparkPipelineOptions);
                   } catch (IOException e) {
                     throw new RuntimeException(
                         "Error creating BoundedReader " + reader.getClass().getCanonicalName(),
e);
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index d980a52..50f4915 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -18,7 +18,11 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.runners.core.construction.ReadTranslation;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -54,7 +58,15 @@ class ReadSourceTranslatorBatch<T>
     }
     SparkSession sparkSession = context.getSparkSession();
 
-    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
+    String serializedSource = Base64Serializer.serializeUnchecked(source);
+    Map<String, String> datasetSourceOptions = new HashMap<>();
+    datasetSourceOptions.put(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource);
+    datasetSourceOptions.put(DatasetSourceBatch.DEFAULT_PARALLELISM,
+        String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()));
+    datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
+        SerializablePipelineOptions.serializeToJson(context.getOptions()));
+    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).options(datasetSourceOptions)
+        .load();
 
     //TODO pass the source and the translation context serialized as string to the DatasetSource
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>()
{
@@ -63,7 +75,7 @@ class ReadSourceTranslatorBatch<T>
         return value.<WindowedValue>getAs(0);
       }
     };
-    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
+    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedValue<T>>
     // be created ?
     Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
 


Mime
View raw message