beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/5] incubator-beam git commit: Add simple tests for stateful ParDo
Date Mon, 28 Nov 2016 21:17:21 GMT
Add simple tests for stateful ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e158e4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e158e4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e158e4e

Branch: refs/heads/master
Commit: 7e158e4e583372dd79ffaa380ac7c2dbb4846c50
Parents: e17dc4a
Author: Kenneth Knowles <klk@google.com>
Authored: Mon Nov 21 15:41:27 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Mon Nov 28 11:43:21 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 106 ++++++++++++++++++-
 1 file changed, 102 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e158e4e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index be1eaa4..593f304 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -1464,8 +1465,8 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  public void testValueState() {
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testValueStateSimple() {
     final String stateId = "foo";
 
     DoFn<KV<String, Integer>, Integer> fn =
@@ -1494,8 +1495,59 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  public void testBagSTate() {
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testValueStateSideOutput() {
+    final String stateId = "foo";
+
+    final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
+    final TupleTag<Integer> oddTag = new TupleTag<Integer>() {};
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, ValueState<Integer>> intState =
+              StateSpecs.value(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+            Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+            if (currentValue % 2 == 0) {
+              c.output(currentValue);
+            } else {
+              c.sideOutput(oddTag, currentValue);
+            }
+            state.write(currentValue + 1);
+          }
+        };
+
+    Pipeline p = TestPipeline.create();
+    PCollectionTuple output =
+        p.apply(
+                Create.of(
+                    KV.of("hello", 42),
+                    KV.of("hello", 97),
+                    KV.of("hello", 84),
+                    KV.of("goodbye", 33),
+                    KV.of("hello", 859),
+                    KV.of("goodbye", 83945)))
+            .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag)));
+
+    PCollection<Integer> evens = output.get(evenTag);
+    PCollection<Integer> odds = output.get(oddTag);
+
+    // There are 0 and 2 from "hello" and just 0 from "goodbye"
+    PAssert.that(evens).containsInAnyOrder(0, 2, 0);
+
+    // There are 1 and 3 from "hello" and just "1" from "goodbye"
+    PAssert.that(odds).containsInAnyOrder(1, 3, 1);
+    p.run();
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testBagState() {
     final String stateId = "foo";
 
     DoFn<KV<String, Integer>, List<Integer>> fn =
@@ -1530,6 +1582,52 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testBagStateSideInput() {
+    Pipeline p = TestPipeline.create();
+
+    final PCollectionView<List<Integer>> listView =
+        p.apply("Create list for side input", Create.of(2, 1, 0)).apply(View.<Integer>asList());
+
+    final String stateId = "foo";
+    DoFn<KV<String, Integer>, List<Integer>> fn =
+        new DoFn<KV<String, Integer>, List<Integer>>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, BagState<Integer>> bufferState =
+              StateSpecs.bag(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) BagState<Integer> state) {
+            Iterable<Integer> currentValue = state.read();
+            state.add(c.element().getValue());
+            if (Iterables.size(state.read()) >= 4) {
+              List<Integer> sorted = Lists.newArrayList(currentValue);
+              Collections.sort(sorted);
+              c.output(sorted);
+
+              List<Integer> sideSorted = Lists.newArrayList(c.sideInput(listView));
+              Collections.sort(sideSorted);
+              c.output(sideSorted);
+            }
+          }
+        };
+
+    PCollection<List<Integer>> output =
+        p.apply(
+                "Create main input",
+                Create.of(
+                    KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello",
12)))
+            .apply(ParDo.of(fn).withSideInputs(listView));
+
+    PAssert.that(output).containsInAnyOrder(
+        Lists.newArrayList(12, 42, 84, 97),
+        Lists.newArrayList(0, 1, 2));
+    p.run();
+  }
+
+  @Test
   public void testWithOutputTagsDisplayData() {
     DoFn<String, String> fn = new DoFn<String, String>() {
       @ProcessElement


Mime
View raw message