beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 02/03: Fix case when a window does not merge into any other window
Date Thu, 27 Jun 2019 15:03:19 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 11c37929024fc70667d203b05598e15f5f022e4d
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Thu Jun 27 16:59:47 2019 +0200

    Fix case when a window does not merge into any other window
---
 .../translation/batch/AggregatorCombinerGlobally.java                   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index c516629..884f51e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -133,6 +133,8 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends
BoundedWindo
       //each accumulator has only one window
       BoundedWindow accumulatorWindow = accumulator.getWindows().iterator().next();
       W mergedWindowForAccumulator = windowToMergeResult.get(accumulatorWindow);
+      mergedWindowForAccumulator = (mergedWindowForAccumulator == null) ? (W)accumulatorWindow
: mergedWindowForAccumulator;
+
       if (mergedWindowToAccumulators.get(mergedWindowForAccumulator) == null){
         mergedWindowToAccumulators.put(mergedWindowForAccumulator, Collections.singletonList(accumulator));
       }


Mime
View raw message