beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [11/13] incubator-beam git commit: Port ViewTest to new DoFn
Date Mon, 08 Aug 2016 20:40:46 GMT
Port ViewTest to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1db02d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1db02d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1db02d2

Branch: refs/heads/master
Commit: b1db02d23f9454ff1a169d0aa81552e8dbe59fe3
Parents: 32f84bb
Author: Kenneth Knowles <klk@google.com>
Authored: Fri Aug 5 12:07:28 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ViewTest.java    | 192 ++++++++++---------
 1 file changed, 97 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1db02d2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index ee240bf..170e6ce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.values.KV.of;
+
 import static com.google.common.base.Preconditions.checkArgument;
 
 import static org.hamcrest.Matchers.isA;
@@ -100,8 +102,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("Create123", Create.of(1, 2, 3))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view));
                   }
@@ -131,8 +133,8 @@ public class ViewTest implements Serializable {
             TimestampedValue.of(3, new Instant(12))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view));
                   }
@@ -153,8 +155,8 @@ public class ViewTest implements Serializable {
             .apply(View.<Integer>asSingleton());
 
     pipeline.apply("Create123", Create.of(1, 2, 3))
-        .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer,
Integer>() {
-          @Override
+        .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>()
{
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.sideInput(view));
           }
@@ -178,8 +180,8 @@ public class ViewTest implements Serializable {
     final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton());
 
     oneTwoThree.apply(
-        "OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>()
{
-          @Override
+        "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>()
{
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.sideInput(view));
           }
@@ -205,8 +207,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29, 31))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     checkArgument(c.sideInput(view).size() == 4);
                     checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
@@ -246,8 +248,8 @@ public class ViewTest implements Serializable {
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     checkArgument(c.sideInput(view).size() == 4);
                     checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
@@ -274,8 +276,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertFalse(c.sideInput(view).iterator().hasNext());
@@ -283,7 +285,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -300,8 +302,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
                       c.sideInput(view).clear();
@@ -329,7 +331,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(11);
 
     pipeline.run();
@@ -347,8 +349,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29, 31))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer i : c.sideInput(view)) {
                       c.output(i);
@@ -387,8 +389,8 @@ public class ViewTest implements Serializable {
                     TimestampedValue.of(35, new Instant(11))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer i : c.sideInput(view)) {
                       c.output(i);
@@ -413,15 +415,15 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertFalse(c.sideInput(view).iterator().hasNext());
                     c.output(1);
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -438,8 +440,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     Iterator<Integer> iterator = c.sideInput(view).iterator();
                     while (iterator.hasNext()) {
@@ -453,7 +455,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(11);
 
     pipeline.run();
@@ -472,11 +474,11 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer v : c.sideInput(view).get(c.element().substring(0, 1)))
{
-                      c.output(KV.of(c.element(), v));
+                      c.output(of(c.element(), v));
                     }
                   }
                 }));
@@ -500,8 +502,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(2 /* size */))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertEquals((int) c.element(), c.sideInput(view).size());
                     assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
@@ -554,11 +556,11 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer v : c.sideInput(view).get(c.element().substring(0, 1)))
{
-                      c.output(KV.of(c.element(), v));
+                      c.output(of(c.element(), v));
                     }
                   }
                 }));
@@ -591,13 +593,13 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<String, KV<String, Integer>>()
{
-                                             @Override
+                                           new DoFn<String, KV<String, Integer>>()
{
+                                             @ProcessElement
                                              public void processElement(ProcessContext c)
{
                                                for (Integer v :
                                                    c.sideInput(view)
                                                        .get(c.element().substring(0, 1)))
{
-                                                 c.output(KV.of(c.element(), v));
+                                                 c.output(of(c.element(), v));
                                                }
                                              }
                                            }));
@@ -629,8 +631,8 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of(1 /* size */, new Instant(16))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<Integer, KV<String, Integer>>()
{
-                                             @Override
+                                           new DoFn<Integer, KV<String, Integer>>()
{
+                                             @ProcessElement
                                              public void processElement(ProcessContext c)
{
                                                assertEquals((int) c.element(),
                                                    c.sideInput(view).size());
@@ -674,13 +676,13 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<String, KV<String, Integer>>()
{
-                                             @Override
+                                           new DoFn<String, KV<String, Integer>>()
{
+                                             @ProcessElement
                                              public void processElement(ProcessContext c)
{
                                                for (Integer v :
                                                    c.sideInput(view)
                                                        .get(c.element().substring(0, 1)))
{
-                                                 c.output(KV.of(c.element(), v));
+                                                 c.output(of(c.element(), v));
                                                }
                                              }
                                            }));
@@ -704,8 +706,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -714,7 +716,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -734,8 +736,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -744,7 +746,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -763,8 +765,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
                       c.sideInput(view).clear();
@@ -792,7 +794,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
 
     pipeline.run();
@@ -811,11 +813,11 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
-                        KV.of(c.element(), c.sideInput(view).get(c.element().substring(0,
1))));
+                        of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
                   }
                 }));
 
@@ -838,8 +840,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(2 /* size */))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertEquals((int) c.element(), c.sideInput(view).size());
                     assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
@@ -870,11 +872,11 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
-                        KV.of(c.element(), c.sideInput(view).get(c.element().substring(0,
1))));
+                        of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
                   }
                 }));
 
@@ -906,8 +908,8 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<String, KV<String, Integer>>()
{
-                                             @Override
+                                           new DoFn<String, KV<String, Integer>>()
{
+                                             @ProcessElement
                                              public void processElement(ProcessContext c)
{
                                                c.output(KV.of(
                                                    c.element(),
@@ -943,8 +945,8 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of(1 /* size */, new Instant(16))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<Integer, KV<String, Integer>>()
{
-                                             @Override
+                                           new DoFn<Integer, KV<String, Integer>>()
{
+                                             @ProcessElement
                                              public void processElement(ProcessContext c)
{
                                                assertEquals((int) c.element(),
                                                    c.sideInput(view).size());
@@ -988,10 +990,10 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<String, KV<String, Integer>>()
{
-                                             @Override
+                                           new DoFn<String, KV<String, Integer>>()
{
+                                             @ProcessElement
                                              public void processElement(ProcessContext c)
{
-                                               c.output(KV.of(
+                                               c.output(of(
                                                    c.element(),
                                                    c.sideInput(view).get(
                                                        c.element().substring(0, 1))));
@@ -1017,8 +1019,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -1027,7 +1029,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -1046,8 +1048,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -1056,7 +1058,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -1080,8 +1082,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
                         KV.of(c.element(), c.sideInput(view).get(c.element().substring(0,
1))));
@@ -1111,8 +1113,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
                       c.sideInput(view).clear();
@@ -1139,7 +1141,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
 
     pipeline.run();
@@ -1158,8 +1160,8 @@ public class ViewTest implements Serializable {
     PCollection<KV<String, Integer>> output =
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply("Output",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(KV
                         .of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
@@ -1193,8 +1195,8 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new OldDoFn<String, String>() {
-                                                    @Override
+                                                  new DoFn<String, String>() {
+                                                    @ProcessElement
                                                     public void processElement(ProcessContext
c) {
                                                       c.output(c.element() + c.sideInput(view));
                                                     }
@@ -1226,8 +1228,8 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new OldDoFn<String, String>() {
-                                                    @Override
+                                                  new DoFn<String, String>() {
+                                                    @ProcessElement
                                                     public void processElement(ProcessContext
c) {
                                                       c.output(c.element() + c.sideInput(view));
                                                     }
@@ -1257,8 +1259,8 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new OldDoFn<String, String>() {
-                                                    @Override
+                                                  new DoFn<String, String>() {
+                                                    @ProcessElement
                                                     public void processElement(ProcessContext
c) {
                                                       c.output(c.element() + c.sideInput(view));
                                                     }
@@ -1287,8 +1289,8 @@ public class ViewTest implements Serializable {
         p.apply("CreateMainInput", Create.of(""))
             .apply(
                 "OutputMainAndSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, String>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, String>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.element() + c.sideInput(view));
                   }
@@ -1305,8 +1307,8 @@ public class ViewTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
     final PCollectionView<Iterable<Integer>> view1 =
         pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of()))
-            .apply("OutputOneInteger", ParDo.of(new OldDoFn<Void, Integer>() {
-              @Override
+            .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(17);
               }
@@ -1317,8 +1319,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of()))
             .apply(
                 "OutputSideInput",
-                ParDo.withSideInputs(view1).of(new OldDoFn<Void, Iterable<Integer>>()
{
-                  @Override
+                ParDo.withSideInputs(view1).of(new DoFn<Void, Iterable<Integer>>()
{
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view1));
                   }
@@ -1328,8 +1330,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of()))
             .apply("ReadIterableSideInput",
-                ParDo.withSideInputs(view2).of(new OldDoFn<Void, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Iterable<Integer> input : c.sideInput(view2)) {
                       for (Integer i : input) {


Mime
View raw message