beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 24/37: Apply new Encoders to CombinePerKey
Date Thu, 24 Oct 2019 10:18:28 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7d456b42c1bafef6eab281dc2ed2dd098f8bda6a
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Fri Sep 6 13:24:18 2019 +0200

    Apply new Encoders to CombinePerKey
---
 .../translation/batch/CombinePerKeyTranslatorBatch.java     | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index e0e80dd..33b037a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -23,6 +23,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -56,8 +58,11 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
 
     Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
 
+    Coder<K> keyCoder = (Coder<K>) input.getCoder().getCoderArguments().get(0);
+    Coder<OutputT> outputTCoder = (Coder<OutputT>) output.getCoder().getCoderArguments().get(1);
+
     KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset
=
-        inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+        inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder));
 
     Dataset<Tuple2<K, Iterable<WindowedValue<OutputT>>>> combinedDataset
=
         groupedDataset.agg(
@@ -66,6 +71,10 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
                 .toColumn());
 
     // expand the list into separate elements and put the key back into the elements
+    Coder<KV<K, OutputT>> kvCoder = KvCoder.of(keyCoder, outputTCoder);
+    WindowedValue.WindowedValueCoder<KV<K, OutputT>> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            kvCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
     Dataset<WindowedValue<KV<K, OutputT>>> outputDataset =
         combinedDataset.flatMap(
             (FlatMapFunction<
@@ -85,7 +94,7 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
                   }
                   return result.iterator();
                 },
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(wvCoder));
     context.putDataset(output, outputDataset);
   }
 }


Mime
View raw message