beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [01/13] incubator-beam git commit: Port various Spark runner tests to new DoFn
Date Mon, 08 Aug 2016 20:40:36 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master bb00810ad -> 574c3777d


Port various Spark runner tests to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5df3583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5df3583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5df3583

Branch: refs/heads/master
Commit: f5df358320cfde6a1c4d012d4169af691f6a18e9
Parents: d6395e9
Author: Kenneth Knowles <klk@google.com>
Authored: Fri Aug 5 12:31:07 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/spark/TfIdfTest.java    | 22 ++++++++++----------
 .../spark/translation/CombinePerKeyTest.java    |  6 +++---
 .../translation/MultiOutputWordCountTest.java   | 10 ++++-----
 .../spark/translation/SerializationTest.java    | 10 ++++-----
 .../streaming/KafkaStreamingTest.java           |  6 +++---
 5 files changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 074e6aa..17bf6dd 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -101,8 +101,8 @@ public class TfIdfTest {
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
           .apply("SplitWords", ParDo.of(
-              new OldDoFn<KV<URI, String>, KV<URI, String>>() {
-                @Override
+              new DoFn<KV<URI, String>, KV<URI, String>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
                   String line = c.element().getValue();
@@ -144,8 +144,8 @@ public class TfIdfTest {
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
           .apply("ShiftKeys", ParDo.of(
-              new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String,
Long>>>() {
-                @Override
+              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String,
Long>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey().getKey();
                   String word = c.element().getKey().getValue();
@@ -183,8 +183,8 @@ public class TfIdfTest {
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
           .apply("ComputeTermFrequencies", ParDo.of(
-              new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>()
{
-                @Override
+              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>()
{
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
                   Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
@@ -208,8 +208,8 @@ public class TfIdfTest {
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
-              .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
-                @Override
+              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Long documentCount = c.element().getValue();
@@ -237,8 +237,8 @@ public class TfIdfTest {
       // divided by the log of the document frequency.
       return wordToUriAndTfAndDf
           .apply("ComputeTfIdf", ParDo.of(
-              new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>()
{
-                @Override
+              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>()
{
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Double df = c.element().getValue().getOnly(dfTag);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index dee9213..cdf2cfb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -70,8 +70,8 @@ public class CombinePerKeyTest {
     private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T,
Long>>> {
       @Override
       public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
-          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new OldDoFn<T,
KV<T, Long>>() {
-              @Override
+          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T,
KV<T, Long>>() {
+              @ProcessElement
               public void processElement(ProcessContext processContext) throws Exception
{
                   processContext.output(KV.of(processContext.element(), 1L));
               }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 066521b..291f7b2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.ApproximateUnique;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -105,7 +105,7 @@ public class MultiOutputWordCountTest {
   /**
    * A OldDoFn that tokenizes lines of text into individual words.
    */
-  static class ExtractWordsFn extends OldDoFn<String, String> {
+  static class ExtractWordsFn extends DoFn<String, String> {
 
     private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
         new Sum.SumIntegerFn());
@@ -117,7 +117,7 @@ public class MultiOutputWordCountTest {
       this.regex = regex;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String[] words = c.element().split(c.sideInput(regex));
       for (String word : words) {
@@ -170,8 +170,8 @@ public class MultiOutputWordCountTest {
     }
   }
 
-  private static class FormatCountsFn extends OldDoFn<KV<String, Long>, String>
{
-    @Override
+  private static class FormatCountsFn extends DoFn<KV<String, Long>, String>
{
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().getKey() + ": " + c.element().getValue());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index fb97b8b..019b107 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -147,12 +147,12 @@ public class SerializationTest {
   /**
    * A OldDoFn that tokenizes lines of text into individual words.
    */
-  static class ExtractWordsFn extends OldDoFn<StringHolder, StringHolder> {
+  static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
     private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       // Split the line into words.
       String[] words = WORD_BOUNDARY.split(c.element().toString());
@@ -175,8 +175,8 @@ public class SerializationTest {
   /**
    * A OldDoFn that converts a Word and Count into a printable string.
    */
-  private static class FormatCountsFn extends OldDoFn<KV<StringHolder, Long>, StringHolder>
{
-    @Override
+  private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder>
{
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index fa98ca3..17044aa 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -121,8 +121,8 @@ public class KafkaStreamingTest {
     EMBEDDED_ZOOKEEPER.shutdown();
   }
 
-  private static class FormatKVFn extends OldDoFn<KV<String, String>, String>
{
-    @Override
+  private static class FormatKVFn extends DoFn<KV<String, String>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().getKey() + "," + c.element().getValue());
     }


Mime
View raw message