beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 24/50: Create Datasets manipulation methods
Date Fri, 04 Jan 2019 10:38:46 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 31fb182784d86a633ad619d27dd5454ecff3291f
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Thu Nov 29 16:11:35 2018 +0100

    Create Datasets manipulation methods
---
 .../translation/TranslationContext.java                | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index a3276bf..98f77af 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -46,7 +46,9 @@ import org.apache.spark.sql.streaming.StreamingQueryException;
  */
 public class TranslationContext {
 
+  /** All the datasets of the DAG */
   private final Map<PValue, Dataset<?>> datasets;
+  /** datasets that are not used as input to other datasets (leaves of the DAG) */
   private final Set<Dataset<?>> leaves;
 
   private final SparkPipelineOptions options;
@@ -68,7 +70,7 @@ public class TranslationContext {
     this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
     this.options = options;
     this.datasets = new HashMap<>();
-    this.leaves = new LinkedHashSet<>();
+    this.leaves = new HashSet<>();
   }
 
   // --------------------------------------------------------------------------------------------
@@ -82,6 +84,20 @@ public class TranslationContext {
   //  Datasets methods
   // --------------------------------------------------------------------------------------------
 
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
+    Dataset<?> dataset = datasets.get(value);
+    // assume that the Dataset is used as an input if retrieved here. So it is not a leaf
anymore
+    leaves.remove(dataset);
+    return (Dataset<WindowedValue<T>>) dataset;
+  }
+
+  public <T> void putDataset(PValue value, Dataset<WindowedValue<T>> dataset)
{
+    if (!datasets.containsKey(value)) {
+      datasets.put(value, dataset);
+      leaves.add(dataset);
+    }
+  }
 
   // --------------------------------------------------------------------------------------------
   //  PCollections methods


Mime
View raw message