Add simple tests for stateful ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e158e4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e158e4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e158e4e
Branch: refs/heads/master
Commit: 7e158e4e583372dd79ffaa380ac7c2dbb4846c50
Parents: e17dc4a
Author: Kenneth Knowles <klk@google.com>
Authored: Mon Nov 21 15:41:27 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Mon Nov 28 11:43:21 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/ParDoTest.java | 106 ++++++++++++++++++-
1 file changed, 102 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e158e4e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index be1eaa4..593f304 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -1464,8 +1465,8 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(RunnableOnService.class)
- public void testValueState() {
+ @Category({RunnableOnService.class, UsesStatefulParDo.class})
+ public void testValueStateSimple() {
final String stateId = "foo";
DoFn<KV<String, Integer>, Integer> fn =
@@ -1494,8 +1495,59 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(RunnableOnService.class)
- public void testBagSTate() {
+ @Category({RunnableOnService.class, UsesStatefulParDo.class})
+ public void testValueStateSideOutput() {
+ final String stateId = "foo";
+
+ final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
+ final TupleTag<Integer> oddTag = new TupleTag<Integer>() {};
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @StateId(stateId)
+ private final StateSpec<Object, ValueState<Integer>> intState =
+ StateSpecs.value(VarIntCoder.of());
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+ Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+ if (currentValue % 2 == 0) {
+ c.output(currentValue);
+ } else {
+ c.sideOutput(oddTag, currentValue);
+ }
+ state.write(currentValue + 1);
+ }
+ };
+
+ Pipeline p = TestPipeline.create();
+ PCollectionTuple output =
+ p.apply(
+ Create.of(
+ KV.of("hello", 42),
+ KV.of("hello", 97),
+ KV.of("hello", 84),
+ KV.of("goodbye", 33),
+ KV.of("hello", 859),
+ KV.of("goodbye", 83945)))
+ .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag)));
+
+ PCollection<Integer> evens = output.get(evenTag);
+ PCollection<Integer> odds = output.get(oddTag);
+
+ // There are 0 and 2 from "hello" and just 0 from "goodbye"
+ PAssert.that(evens).containsInAnyOrder(0, 2, 0);
+
+ // There are 1 and 3 from "hello" and just "1" from "goodbye"
+ PAssert.that(odds).containsInAnyOrder(1, 3, 1);
+ p.run();
+ }
+
+ @Test
+ @Category({RunnableOnService.class, UsesStatefulParDo.class})
+ public void testBagState() {
final String stateId = "foo";
DoFn<KV<String, Integer>, List<Integer>> fn =
@@ -1530,6 +1582,52 @@ public class ParDoTest implements Serializable {
}
@Test
+ @Category({RunnableOnService.class, UsesStatefulParDo.class})
+ public void testBagStateSideInput() {
+ Pipeline p = TestPipeline.create();
+
+ final PCollectionView<List<Integer>> listView =
+ p.apply("Create list for side input", Create.of(2, 1, 0)).apply(View.<Integer>asList());
+
+ final String stateId = "foo";
+ DoFn<KV<String, Integer>, List<Integer>> fn =
+ new DoFn<KV<String, Integer>, List<Integer>>() {
+
+ @StateId(stateId)
+ private final StateSpec<Object, BagState<Integer>> bufferState =
+ StateSpecs.bag(VarIntCoder.of());
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @StateId(stateId) BagState<Integer> state) {
+ Iterable<Integer> currentValue = state.read();
+ state.add(c.element().getValue());
+ if (Iterables.size(state.read()) >= 4) {
+ List<Integer> sorted = Lists.newArrayList(currentValue);
+ Collections.sort(sorted);
+ c.output(sorted);
+
+ List<Integer> sideSorted = Lists.newArrayList(c.sideInput(listView));
+ Collections.sort(sideSorted);
+ c.output(sideSorted);
+ }
+ }
+ };
+
+ PCollection<List<Integer>> output =
+ p.apply(
+ "Create main input",
+ Create.of(
+ KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello",
12)))
+ .apply(ParDo.of(fn).withSideInputs(listView));
+
+ PAssert.that(output).containsInAnyOrder(
+ Lists.newArrayList(12, 42, 84, 97),
+ Lists.newArrayList(0, 1, 2));
+ p.run();
+ }
+
+ @Test
public void testWithOutputTagsDisplayData() {
DoFn<String, String> fn = new DoFn<String, String>() {
@ProcessElement
|