beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 28/45: Remove the mapPartition that adds a key per partition because otherwise spark will reduce values per key instead of globally
Date Tue, 09 Jul 2019 13:18:45 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6450bda372a61b9994444ac9f1ce1fdbbc952def
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Wed Jul 3 16:52:57 2019 +0200

    Remove the mapPartition that adds a key per partition because otherwise spark will reduce
values per key instead of globally
---
 .../translation/batch/AggregatorCombinerGlobally.java |  5 ++---
 .../batch/CombineGloballyTranslatorBatch.java         | 19 ++-----------------
 2 files changed, 4 insertions(+), 20 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index 771e42c..d4c35d8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -46,7 +46,7 @@ import scala.Tuple2;
  * */
 
 class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindow>
-    extends Aggregator<Tuple2<Integer, WindowedValue<InputT>>, Iterable<WindowedValue<AccumT>>,
Iterable<WindowedValue<OutputT>>> {
+    extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>,
Iterable<WindowedValue<OutputT>>> {
 
   private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
   private WindowingStrategy<InputT, W> windowingStrategy;
@@ -63,9 +63,8 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
   }
 
   @Override public Iterable<WindowedValue<AccumT>> reduce(Iterable<WindowedValue<AccumT>>
accumulators,
-      Tuple2<Integer, WindowedValue<InputT>> inputTuple) {
+      WindowedValue<InputT> input) {
 
-    WindowedValue<InputT> input = inputTuple._2;
     //concatenate accumulators windows and input windows and merge the windows
     Collection<W> inputWindows = (Collection<W>)input.getWindows();
     Set<W> windows = collectAccumulatorsWindows(accumulators);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index 1a77fc3..30101dc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -64,23 +64,8 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
     // the coder shipped into the data. For performance reasons
     // (avoid memory consumption and having to deserialize), we do not ship coder + data.
 
-    // We do not want to shuffle data during groupByKey, we cannot get the number of partitions
for
-    // the input dataset without triggering a costly operation (conversion to rdd) so we
cannot use spark Hashpartitioner
-    // so we apply a key to each input dataset partition and then trigger a GBK that should
not shuffle data.
-
-    Dataset<Tuple2<Integer, WindowedValue<InputT>>> keyedDataset = inputDataset
-        .mapPartitions((MapPartitionsFunction<WindowedValue<InputT>, Tuple2<Integer,
WindowedValue<InputT>>>) inputTIterator -> {
-          List<Tuple2<Integer, WindowedValue<InputT>>> result = new ArrayList<>();
-          Random random = new Random();
-          while (inputTIterator.hasNext()) {
-            result.add(Tuple2.apply(random.nextInt(), inputTIterator.next()));
-          }
-          return result.iterator();
-        }, EncoderHelpers.tuple2Encoder());
-
-    KeyValueGroupedDataset<Integer, Tuple2<Integer, WindowedValue<InputT>>>
groupedDataset = keyedDataset
-        .groupByKey(
-            (MapFunction<Tuple2<Integer, WindowedValue<InputT>>, Integer>)
value -> value._1(),
+    KeyValueGroupedDataset<Integer, WindowedValue<InputT>> groupedDataset = inputDataset
+        .groupByKey((MapFunction<WindowedValue<InputT>, Integer>) value ->
1,
             EncoderHelpers.genericEncoder());
 
     Dataset<Tuple2<Integer, Iterable<WindowedValue<OutputT>>>> combinedDataset
= groupedDataset


Mime
View raw message