[flink] improve lifecycle of ParDoBoundWrapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d10ae23c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d10ae23c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d10ae23c
Branch: refs/heads/master
Commit: d10ae23c9bc9529d04d02951bfed01bbf2957773
Parents: ffbfc66
Author: Maximilian Michels <mxm@apache.org>
Authored: Mon Jun 6 12:40:50 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Jun 8 15:19:50 2016 +0200
----------------------------------------------------------------------
.../streaming/FlinkAbstractParDoWrapper.java | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d10ae23c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index bb6ed67..117303c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -66,15 +66,21 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
this.windowingStrategy = windowingStrategy;
}
- private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>>
outCollector) {
- if (this.context == null) {
- this.context = new DoFnProcessContext(function, outCollector);
- }
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.doFn.startBundle(context);
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.doFn.finishBundle(context);
}
@Override
public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>>
out) throws Exception {
- this.initContext(doFn, out);
+ if (this.context == null) {
+ this.context = new DoFnProcessContext(doFn, out);
+ }
// for each window the element belongs to, create a new copy here.
Collection<? extends BoundedWindow> windows = value.getWindows();
@@ -90,9 +96,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
private void processElement(WindowedValue<IN> value) throws Exception {
this.context.setElement(value);
- this.doFn.startBundle(context);
doFn.processElement(context);
- this.doFn.finishBundle(context);
}
private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
|