beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 01/37: Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag
Date Thu, 24 Oct 2019 10:18:05 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 22d6466cae94cf482f8151a5fe6e7dde68d28d58
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Thu Jul 18 10:58:35 2019 +0200

    Improve Pardo translation performance: avoid calling a filter transform when there is
only one output tag
---
 .../translation/batch/ParDoTranslatorBatch.java              | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 46808b7..742c1b0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -133,10 +133,14 @@ class ParDoTranslatorBatch<InputT, OutputT>
         inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder());
     if (outputs.entrySet().size() > 1) {
       allOutputs.persist();
-    }
-
-    for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
-      pruneOutputFilteredByTag(context, allOutputs, output);
+      for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+        pruneOutputFilteredByTag(context, allOutputs, output);
+      }
+    } else {
+      Dataset<WindowedValue<?>> outputDataset = allOutputs.map(
+          (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
value -> value._2,
+          EncoderHelpers.windowedValueEncoder());
+      context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset);
     }
   }
 


Mime
View raw message