beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] beam git commit: [BEAM-1116] Support for new Timer API in Flink Batch runner
Date Tue, 28 Feb 2017 14:30:05 GMT
Repository: beam
Updated Branches:
  refs/heads/master 2a2337460 -> 2e072a032


[BEAM-1116] Support for new Timer API in Flink Batch runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e7f825cd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e7f825cd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e7f825cd

Branch: refs/heads/master
Commit: e7f825cd7f78250e51326f51a0015bbc96d10d84
Parents: 2a23374
Author: JingsongLi <lzljs3620320@aliyun.com>
Authored: Tue Feb 28 19:00:48 2017 +0800
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Feb 28 14:13:04 2017 +0100

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |  1 -
 .../flink/FlinkBatchTransformTranslators.java   | 16 -----
 .../functions/FlinkStatefulDoFnFunction.java    | 64 ++++++++++++++++++++
 3 files changed, 64 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e7f825cd/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 8cc65b0..13d5b10 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -55,7 +55,6 @@
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
-                    org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics

http://git-wip-us.apache.org/repos/asf/beam/blob/e7f825cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index ed2f4aa..30e9d68 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -498,20 +498,6 @@ class FlinkBatchTransformTranslators {
     }
   }
 
-  private static void rejectTimers(DoFn<?, ?> doFn) {
-    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-
-    if (signature.timerDeclarations().size() > 0) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
-              DoFn.TimerId.class.getSimpleName(),
-              doFn.getClass().getName(),
-              DoFn.class.getSimpleName(),
-              FlinkRunner.class.getSimpleName()));
-    }
-  }
-
   private static class ParDoBoundTranslatorBatch<InputT, OutputT>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
           ParDo.Bound<InputT, OutputT>> {
@@ -524,7 +510,6 @@ class FlinkBatchTransformTranslators {
         FlinkBatchTranslationContext context) {
       DoFn<InputT, OutputT> doFn = transform.getFn();
       rejectSplittable(doFn);
-      rejectTimers(doFn);
 
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
@@ -597,7 +582,6 @@ class FlinkBatchTransformTranslators {
         FlinkBatchTranslationContext context) {
       DoFn<InputT, OutputT> doFn = transform.getFn();
       rejectSplittable(doFn);
-      rejectTimers(doFn);
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e7f825cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index fca7691..0d8399e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -17,19 +17,26 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -39,6 +46,7 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
 
 /**
  * A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink Batch Runner.
@@ -93,6 +101,14 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     final K key = currentValue.getValue().getKey();
 
     final InMemoryStateInternals<K> stateInternals = InMemoryStateInternals.forKey(key);
+
+    // Used with Batch, we know that all the data is available for this key. We can't use
the
+    // timer manager from the context because it doesn't exist. So we create one and advance
+    // time to the end after processing all elements.
+    final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    timerInternals.advanceProcessingTime(Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
     DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(), dofn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
@@ -105,6 +121,10 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
           public StateInternals<?> stateInternals() {
             return stateInternals;
           }
+          @Override
+          public TimerInternals timerInternals() {
+            return timerInternals;
+          }
         },
         new FlinkAggregatorFactory(runtimeContext),
         windowingStrategy);
@@ -117,9 +137,53 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
       doFnRunner.processElement(currentValue);
     }
 
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    fireEligibleTimers(timerInternals, doFnRunner);
+
     doFnRunner.finishBundle();
   }
 
+  private void fireEligibleTimers(
+      InMemoryTimerInternals timerInternals, DoFnRunner<KV<K, V>, OutputT> runner)
+      throws Exception {
+
+    while (true) {
+
+      TimerInternals.TimerData timer;
+      boolean hasFired = false;
+
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        hasFired = true;
+        fireTimer(timer, runner);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        hasFired = true;
+        fireTimer(timer, runner);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        hasFired = true;
+        fireTimer(timer, runner);
+      }
+      if (!hasFired) {
+        break;
+      }
+    }
+  }
+
+  private void fireTimer(
+      TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, OutputT> doFnRunner)
{
+    StateNamespace namespace = timer.getNamespace();
+    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+    BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+    doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+  }
+
   @Override
   public void open(Configuration parameters) throws Exception {
     doFnInvoker = DoFnInvokers.invokerFor(dofn);


Mime
View raw message