beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 30/45: Now that there is only Combine.PerKey translation, make only one Aggregator
Date Tue, 09 Jul 2019 13:18:47 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f05e17c49ad4b3c004328ea18505ed7848ffa318
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Thu Jul 4 11:20:12 2019 +0200

    Now that there is only Combine.PerKey translation, make only one Aggregator
---
 ...mbinerGlobally.java => AggregatorCombiner.java} | 20 ++++++-----
 .../batch/CombinePerKeyTranslatorBatch.java        | 41 +++++++++++++---------
 2 files changed, 35 insertions(+), 26 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
similarity index 92%
rename from runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
rename to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
index d4c35d8..c61d937 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.spark.sql.Encoder;
@@ -45,14 +46,14 @@ import scala.Tuple2;
  * The accumulator is a {@code Iterable<WindowedValue<AccumT>> because an {@code
InputT} can be in multiple windows. So, when accumulating {@code InputT} values, we create
one accumulator per input window.
  * */
 
-class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindow>
-    extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>,
Iterable<WindowedValue<OutputT>>> {
+class AggregatorCombiner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
+    extends Aggregator<WindowedValue<KV<K, InputT>>, Iterable<WindowedValue<AccumT>>,
Iterable<WindowedValue<OutputT>>> {
 
   private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
   private WindowingStrategy<InputT, W> windowingStrategy;
   private TimestampCombiner timestampCombiner;
 
-  public AggregatorCombinerGlobally(Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
WindowingStrategy<?, ?> windowingStrategy) {
+  public AggregatorCombiner(Combine.CombineFn<InputT, AccumT, OutputT> combineFn, WindowingStrategy<?,
?> windowingStrategy) {
     this.combineFn = combineFn;
     this.windowingStrategy = (WindowingStrategy<InputT, W>) windowingStrategy;
     this.timestampCombiner = windowingStrategy.getTimestampCombiner();
@@ -63,10 +64,11 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends
BoundedWindo
   }
 
   @Override public Iterable<WindowedValue<AccumT>> reduce(Iterable<WindowedValue<AccumT>>
accumulators,
-      WindowedValue<InputT> input) {
+      WindowedValue<KV<K, InputT>> inputWv) {
 
+    KV<K, InputT> inputKv = inputWv.getValue();
     //concatenate accumulators windows and input windows and merge the windows
-    Collection<W> inputWindows = (Collection<W>)input.getWindows();
+    Collection<W> inputWindows = (Collection<W>)inputWv.getWindows();
     Set<W> windows = collectAccumulatorsWindows(accumulators);
     windows.addAll(inputWindows);
     Map<W, W> windowToMergeResult;
@@ -86,17 +88,17 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends
BoundedWindo
       Tuple2<AccumT, Instant> accumAndInstant = windowToAccumAndInstant.get(mergedWindow);
       // if there is no accumulator associated with this window yet, create one
       if (accumAndInstant == null) {
-        AccumT accum = combineFn.addInput(combineFn.createAccumulator(), input.getValue());
+        AccumT accum = combineFn.addInput(combineFn.createAccumulator(), inputKv.getValue());
         Instant windowTimestamp =
             timestampCombiner.assign(
-                mergedWindow, windowingStrategy.getWindowFn().getOutputTime(input.getTimestamp(),
mergedWindow));
+                mergedWindow, windowingStrategy.getWindowFn().getOutputTime(inputWv.getTimestamp(),
mergedWindow));
         accumAndInstant = new Tuple2<>(accum, windowTimestamp);
       } else {
         AccumT updatedAccum =
-            combineFn.addInput(accumAndInstant._1, input.getValue());
+            combineFn.addInput(accumAndInstant._1, inputKv.getValue());
         Instant updatedTimestamp = timestampCombiner.combine(accumAndInstant._2, timestampCombiner
             .assign(mergedWindow,
-                windowingStrategy.getWindowFn().getOutputTime(input.getTimestamp(), mergedWindow)));
+                windowingStrategy.getWindowFn().getOutputTime(inputWv.getTimestamp(), mergedWindow)));
         accumAndInstant = new Tuple2<>(updatedAccum, updatedTimestamp);
       }
       windowToAccumAndInstant.put(mergedWindow, accumAndInstant);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 1c35301..12e5944 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -17,18 +17,20 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.AggregatorCombinerPerKey;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.spark.api.java.function.MapFunction;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.KeyValueGroupedDataset;
 import scala.Tuple2;
@@ -50,25 +52,30 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
     @SuppressWarnings("unchecked")
     final Combine.CombineFn<InputT, AccumT, OutputT> combineFn =
         (Combine.CombineFn<InputT, AccumT, OutputT>) combineTransform.getFn();
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
 
     Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
 
-    Dataset<KV<K, InputT>> unwindowedDataset = inputDataset
-        .map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
-    KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
-        unwindowedDataset.groupByKey((MapFunction<KV<K, InputT>, K>) kv ->
kv.getKey(), EncoderHelpers.genericEncoder());
+    KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset
= inputDataset
+        .groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
 
-    Dataset<Tuple2<K, OutputT>> combinedDataset =
-        groupedDataset.agg(
-            new AggregatorCombinerPerKey<K, InputT, AccumT, OutputT>(combineFn).toColumn());
+    Dataset<Tuple2<K, Iterable<WindowedValue<OutputT>>>> combinedDataset
= groupedDataset.agg(
+        new AggregatorCombiner<K, InputT, AccumT, OutputT, BoundedWindow>(combineFn,
+            windowingStrategy).toColumn());
 
-    Dataset<KV<K, OutputT>> kvOutputDataset =
-        combinedDataset.map(KVHelpers.tuple2ToKV(), EncoderHelpers.kvEncoder());
-
-    // Window the result into global window.
-    Dataset<WindowedValue<KV<K, OutputT>>> outputDataset =
-        kvOutputDataset.map(
-            WindowingHelpers.windowMapFunction(), EncoderHelpers.windowedValueEncoder());
+    Dataset<WindowedValue<KV<K, OutputT>>> outputDataset = combinedDataset.flatMap(
+        (FlatMapFunction<Tuple2<K, Iterable<WindowedValue<OutputT>>>,
WindowedValue<KV<K, OutputT>>>) tuple2 -> {
+          K key = tuple2._1;
+          Iterable<WindowedValue<OutputT>> windowedValues = tuple2._2;
+          List<WindowedValue<KV<K, OutputT>>> result = new ArrayList<>();
+          for (WindowedValue<OutputT> windowedValue : windowedValues) {
+            KV<K, OutputT> kv = KV.of(key, windowedValue.getValue());
+            result.add(WindowedValue
+                .of(kv, windowedValue.getTimestamp(), windowedValue.getWindows(),
+                    windowedValue.getPane()));
+          }
+          return result.iterator();
+        }, EncoderHelpers.windowedValueEncoder());
     context.putDataset(output, outputDataset);
   }
 }


Mime
View raw message