beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 09/45: Update windowAssignTest
Date Tue, 09 Jul 2019 13:18:26 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ab26efca8b2e9be20036a50c6728692e80d8a7d0
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Tue May 7 09:35:46 2019 +0200

    Update windowAssignTest
---
 .../translation/batch/WindowAssignTest.java           | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
index 61da3ea..3011d88 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
@@ -27,13 +27,11 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -50,20 +48,19 @@ public class WindowAssignTest implements Serializable {
     p = Pipeline.create(options);
   }
 
-  @Ignore
   @Test
   public void testWindowAssign() {
-    PCollection<KV<Integer, Integer>> input =
+    PCollection<Integer> input =
         p.apply(
                 Create.timestamped(
-                    TimestampedValue.of(KV.of(1, 1), new Instant(1)),
-                    TimestampedValue.of(KV.of(1, 2), new Instant(2)),
-                    TimestampedValue.of(KV.of(1, 3), new Instant(3)),
-                    TimestampedValue.of(KV.of(1, 4), new Instant(10)),
-                    TimestampedValue.of(KV.of(1, 5), new Instant(11))))
+                    TimestampedValue.of(1, new Instant(1)),
+                    TimestampedValue.of(2, new Instant(2)),
+                    TimestampedValue.of(3, new Instant(3)),
+                    TimestampedValue.of(4, new Instant(10)),
+                    TimestampedValue.of(5, new Instant(11))))
             .apply(Window.into(FixedWindows.of(Duration.millis(10))))
-            .apply(Sum.integersPerKey());
-    PAssert.that(input).containsInAnyOrder(KV.of(1, 6), KV.of(1, 9));
+            .apply(Sum.integersGlobally().withoutDefaults());
+    PAssert.that(input).containsInAnyOrder(6, 9);
     p.run();
   }
 }


Mime
View raw message