beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 12/45: re-enable reduceFnRunner timers for output
Date Tue, 09 Jul 2019 13:18:29 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6c9a3a8cdf4c3b2c91fdbe50153ffed8a080d4db
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Tue May 28 09:31:28 2019 +0200

    re-enable reduceFnRunner timers for output
---
 .../batch/functions/GroupAlsoByWindowViaOutputBufferFn.java       | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
index 2fb08f5..cc65716 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
@@ -100,21 +101,18 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends
BoundedWind
     // Finish any pending windows by advancing the input watermark to infinity.
     timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    // not supported yet
-/*
     // Finally, advance the processing time to infinity to fire any timers.
     timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     fireEligibleTimers(timerInternals, reduceFnRunner);
-*/
 
     reduceFnRunner.persist();
 
     return outputter.getOutputs().iterator();
   }
 
-/*  private void fireEligibleTimers(
+  private void fireEligibleTimers(
       InMemoryTimerInternals timerInternals,
       ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
       throws Exception {
@@ -136,7 +134,7 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends
BoundedWind
       reduceFnRunner.onTimers(timers);
       timers.clear();
     }
-  }*/
+  }
 
   private static class GABWOutputWindowedValue<K, V>
       implements OutputWindowedValue<KV<K, Iterable<V>>> {


Mime
View raw message