Repository: incubator-beam
Updated Branches:
refs/heads/master ffbfc66e1 -> cc448e976
[BEAM-196] provide PipelineOptions in DoFn
- fixes NPE when accessing the PipelineOptions
- adds a test to verify that the PipelineOptions are available
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eced106e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eced106e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eced106e
Branch: refs/heads/master
Commit: eced106e50ddb257524a7826ab7d27254be89da8
Parents: d10ae23
Author: Maximilian Michels <mxm@apache.org>
Authored: Tue Jun 7 13:57:33 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Jun 8 15:19:50 2016 +0200
----------------------------------------------------------------------
.../streaming/FlinkAbstractParDoWrapper.java | 11 ++-
.../beam/runners/flink/PipelineOptionsTest.java | 97 +++++++++++++++++++-
2 files changed, 100 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 117303c..a935011 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -37,6 +38,7 @@ import com.google.common.base.Preconditions;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
@@ -52,7 +54,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
private final DoFn<IN, OUTDF> doFn;
private final WindowingStrategy<?, ?> windowingStrategy;
- private transient PipelineOptions options;
+ private final SerializedPipelineOptions serializedPipelineOptions;
private DoFnProcessContext context;
@@ -62,7 +64,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
Preconditions.checkNotNull(doFn);
this.doFn = doFn;
- this.options = options;
+ this.serializedPipelineOptions = new SerializedPipelineOptions(options);
this.windowingStrategy = windowingStrategy;
}
@@ -107,7 +109,8 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
private WindowedValue<IN> element;
- private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>>
outCollector) {
+ private DoFnProcessContext(DoFn<IN, OUTDF> function,
+ Collector<WindowedValue<OUTFL>> outCollector) {
function.super();
super.setupDelegateAggregators();
@@ -156,7 +159,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
@Override
public PipelineOptions getPipelineOptions() {
- return options;
+ return serializedPipelineOptions.getPipelineOptions();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 464c6df..d571f31 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -18,14 +18,29 @@
package org.apache.beam.runners.flink;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* Tests the serialization and deserialization of PipelineOptions.
@@ -58,11 +73,85 @@ public class PipelineOptionsTest {
@Test
public void testCaching() {
- MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
+ PipelineOptions deserializedOptions = serializedOptions.getPipelineOptions().as(PipelineOptions.class);
assertNotNull(deserializedOptions);
- assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
- assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
- assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
+ assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+ assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+ assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+ }
+
+ @Test(expected = Exception.class)
+ public void testNonNull() {
+ new SerializedPipelineOptions(null);
+ }
+
+ @Test(expected = Exception.class)
+ public void ParDoBaseClassPipelineOptionsNullTest() {
+ new TestParDoWrapper(null, WindowingStrategy.globalDefault(), new TestDoFn());
+ }
+
+ /**
+ * Tests that PipelineOptions are present after serialization
+ */
+ @Test
+ public void ParDoBaseClassPipelineOptionsSerializationTest() throws Exception {
+ TestParDoWrapper wrapper =
+ new TestParDoWrapper(options, WindowingStrategy.globalDefault(), new TestDoFn());
+
+ final byte[] serialized = SerializationUtils.serialize(wrapper);
+ TestParDoWrapper deserialize = (TestParDoWrapper) SerializationUtils.deserialize(serialized);
+
+ // execute once to access options
+ deserialize.flatMap(
+ WindowedValue.of(
+ new Object(),
+ Instant.now(),
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING),
+ Mockito.mock(Collector.class));
+
}
+
+ private static class TestDoFn extends DoFn<Object, Object> {
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ Assert.assertNotNull(c.getPipelineOptions());
+ Assert.assertEquals(
+ options.getTestOption(),
+ c.getPipelineOptions().as(MyOptions.class).getTestOption());
+ }
+ }
+
+ private static class TestParDoWrapper extends FlinkAbstractParDoWrapper {
+ public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy,
DoFn doFn) {
+ super(options, windowingStrategy, doFn);
+ }
+
+
+ @Override
+ public WindowingInternals windowingInternalsHelper(
+ WindowedValue inElement,
+ Collector outCollector) {
+ return null;
+ }
+
+ @Override
+ public void sideOutputWithTimestampHelper(
+ WindowedValue inElement,
+ Object output,
+ Instant timestamp,
+ Collector outCollector,
+ TupleTag tag) {}
+
+ @Override
+ public void outputWithTimestampHelper(
+ WindowedValue inElement,
+ Object output,
+ Instant timestamp,
+ Collector outCollector) {}
+ }
+
+
}
|