beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ieme...@apache.org
Subject [1/2] beam git commit: Shutdown Flink Streaming Pipeline when reaching +Inf watermark
Date Wed, 07 Jun 2017 21:18:55 GMT
Repository: beam
Updated Branches:
  refs/heads/master caecac3b4 -> 32f22b7d9


Shutdown Flink Streaming Pipeline when reaching +Inf watermark


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c83ffe0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c83ffe0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c83ffe0

Branch: refs/heads/master
Commit: 9c83ffe0cdc6636d2187bf9439a73a3b45756d50
Parents: caecac3
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Jun 5 12:19:00 2017 +0200
Committer: Ismaël Mejía <iemejia@apache.org>
Committed: Wed Jun 7 23:13:52 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/io/UnboundedSourceWrapper.java           | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9c83ffe0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 6055a43..e75072a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -436,6 +437,10 @@ public class UnboundedSourceWrapper<
           }
         }
         context.emitWatermark(new Watermark(watermarkMillis));
+
+        if (watermarkMillis >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+          this.isRunning = false;
+        }
       }
       setNextWatermarkTimer(this.runtimeContext);
     }


Mime
View raw message