beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] incubator-beam git commit: Add support for having an empty CountingInput/CountingSource
Date Fri, 09 Dec 2016 02:41:53 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 40bd27602 -> ddb59125a


Add support for having an empty CountingInput/CountingSource


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30ff1ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30ff1ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30ff1ee1

Branch: refs/heads/master
Commit: 30ff1ee17bb290f2b50fd082d8cb63d48280c5c2
Parents: 40bd276
Author: Luke Cwik <lcwik@google.com>
Authored: Thu Dec 8 15:22:35 2016 -0800
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu Dec 8 18:41:17 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/CountingInput.java   | 12 ++++++----
 .../org/apache/beam/sdk/io/CountingSource.java  | 12 ++++++----
 .../apache/beam/sdk/io/CountingInputTest.java   | 23 +++++++++++++++++++-
 .../apache/beam/sdk/io/CountingSourceTest.java  | 10 +++++++++
 4 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index 3148d8d..ac70aca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -75,17 +75,21 @@ public class CountingInput {
    * from {@code 0} to {@code numElements - 1}.
    */
   public static BoundedCountingInput upTo(long numElements) {
-    checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
+    checkArgument(numElements >= 0,
+        "numElements (%s) must be greater than or equal to 0",
+        numElements);
     return new BoundedCountingInput(numElements);
   }
 
   /**
    * Creates a {@link BoundedCountingInput} that will produce elements
-   * starting from {@code startIndex} to {@code endIndex - 1}.
+   * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive).
+   * If {@code startIndex == endIndex}, then no elements will be produced.
    */
   public static BoundedCountingInput forSubrange(long startIndex, long endIndex) {
-    checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex
(%s)",
-            endIndex, startIndex);
+    checkArgument(endIndex >= startIndex,
+        "endIndex (%s) must be greater than or equal to startIndex (%s)",
+        endIndex, startIndex);
     return new BoundedCountingInput(startIndex, endIndex);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index bc7fb78..9752dba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -78,17 +78,21 @@ public class CountingSource {
    */
   @Deprecated
   public static BoundedSource<Long> upTo(long numElements) {
-    checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
+    checkArgument(numElements >= 0,
+        "numElements (%s) must be greater than or equal to 0",
+        numElements);
     return new BoundedCountingSource(0, numElements);
   }
 
   /**
    * Creates a {@link BoundedSource} that will produce elements
-   * from {@code startIndex} to {@code endIndex - 1}.
+   * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive).
+   * If {@code startIndex == endIndex}, then no elements will be produced.
    */
   static BoundedSource<Long> createSourceForSubrange(long startIndex, long endIndex)
{
-    checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex
(%s)",
-            endIndex, startIndex);
+    checkArgument(endIndex >= startIndex,
+        "endIndex (%s) must be greater than or equal to startIndex (%s)",
+        endIndex, startIndex);
 
     return new BoundedCountingSource(startIndex, endIndex);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 02b4ba0..4349f66 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -71,7 +71,7 @@ public class CountingInputTest {
   public void testBoundedInput() {
     Pipeline p = TestPipeline.create();
     long numElements = 1000;
-    PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
+    PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(numElements)));
 
     addCountingAsserts(input, 0, numElements);
     p.run();
@@ -79,6 +79,27 @@ public class CountingInputTest {
 
   @Test
   @Category(RunnableOnService.class)
+  public void testEmptyBoundedSource() {
+    Pipeline p = TestPipeline.create();
+    PCollection<Long> input = p.apply(CountingInput.upTo(0));
+
+    PAssert.that(input).empty();
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testEmptyBoundedSourceUsingRange() {
+    Pipeline p = TestPipeline.create();
+    PCollection<Long> input = p.apply(CountingInput.forSubrange(42, 42));
+
+    PAssert.that(input).empty();
+    p.run();
+  }
+
+
+  @Test
+  @Category(RunnableOnService.class)
   public void testBoundedInputSubrange() {
     Pipeline p = TestPipeline.create();
     long start = 10;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 88c68d3..5eccde6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -92,6 +92,16 @@ public class CountingSourceTest {
 
   @Test
   @Category(RunnableOnService.class)
+  public void testEmptyBoundedSource() {
+    Pipeline p = TestPipeline.create();
+    PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(0)));
+
+    PAssert.that(input).empty();
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
   public void testBoundedSourceSplits() throws Exception {
     Pipeline p = TestPipeline.create();
     long numElements = 1000;


Mime
View raw message