beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 12/13: Add a test that combine per key preserves windowing
Date Fri, 05 Jul 2019 13:53:22 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 59b9e3f46008e009e0c7e946e4e1a38d43305fd3
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Thu Jul 4 15:26:41 2019 +0200

    Add a test that combine per key preserves windowing
---
 .../translation/batch/CombineTest.java              | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
index b3a2915..f48f79e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
@@ -27,8 +27,13 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -69,4 +74,20 @@ public class CombineTest implements Serializable {
     PAssert.that(input).containsInAnyOrder(KV.of(1, 9), KV.of(2, 12));
     p.run();
   }
+
+  @Test
+  public void testCombinePerKeyPreservesWindowing(){
+    PCollection<KV<Integer, Integer>> input = p.apply(Create
+        .timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1)),
+            TimestampedValue.of(KV.of(1, 3), new Instant(2)),
+            TimestampedValue.of(KV.of(1, 5), new Instant(11)),
+            TimestampedValue.of(KV.of(2, 2), new Instant(3)),
+            TimestampedValue.of(KV.of(2, 4), new Instant(11)),
+            TimestampedValue.of(KV.of(2, 6), new Instant(12))))
+        .apply(Window.into(FixedWindows.of(Duration.millis(10)))).apply(Sum.integersPerKey());
+    PAssert.that(input).containsInAnyOrder(KV.of(1, 4), KV.of(1, 5), KV.of(2, 2), KV.of(2,
10));
+    p.run();
+
+  }
+
 }


Mime
View raw message