beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-961] Add starting number to CountingInput
Date Tue, 06 Dec 2016 10:17:10 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 1efda59ab -> 493c04faa


[BEAM-961] Add starting number to CountingInput


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41ae08bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41ae08bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41ae08bf

Branch: refs/heads/master
Commit: 41ae08bf18525f52b03252dee783505ae400911e
Parents: 1efda59
Author: Vladisav Jelisavcic <vj@apache.org>
Authored: Sun Dec 4 10:42:28 2016 +0100
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Tue Dec 6 11:01:45 2016 +0100

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/CountingInput.java   | 42 ++++++++++++++++----
 .../org/apache/beam/sdk/io/CountingSource.java  | 11 +++++
 .../apache/beam/sdk/io/CountingInputTest.java   | 42 +++++++++++++++-----
 3 files changed, 76 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41ae08bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index f479215..456d291 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -35,7 +35,7 @@ import org.joda.time.Instant;
 /**
  * A {@link PTransform} that produces longs. When used to produce a
  * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at
{@code 0}
- * and counts up to a specified maximum. When used to produce an
+ * or starting value, and counts up to a specified maximum. When used to produce an
  * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
  * and then never produces more output. (In practice, this limit should never be reached.)
  *
@@ -43,7 +43,8 @@ import org.joda.time.Instant;
  * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting
and it
  * supports dynamic work rebalancing.
  *
- * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingInput#upTo(long)}:
+ * <p>To produce a bounded {@code PCollection<Long>} starting from {@code 0},
+ * use {@link CountingInput#upTo(long)}:
  *
  * <pre>{@code
  * Pipeline p = ...
@@ -51,6 +52,9 @@ import org.joda.time.Instant;
  * PCollection<Long> bounded = p.apply(producer);
  * }</pre>
  *
+ * <p>To produce a bounded {@code PCollection<Long>} starting from {@code startOffset},
+ * use {@link CountingInput#forSubrange(long, long)} instead.
+ *
  * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
  * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide
values
  * with timestamps other than {@link Instant#now}.
@@ -76,6 +80,16 @@ public class CountingInput {
   }
 
   /**
+   * Creates a {@link BoundedCountingInput} that will produce elements
+   * starting from {@code startIndex} to {@code endIndex - 1}.
+   */
+  public static BoundedCountingInput forSubrange(long startIndex, long endIndex) {
+    checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex
(%s)",
+            endIndex, startIndex);
+    return new BoundedCountingInput(startIndex, endIndex);
+  }
+
+  /**
    * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code
0} up
    * to {@link Long#MAX_VALUE}.
    *
@@ -102,23 +116,35 @@ public class CountingInput {
    * 0.
    */
   public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>>
{
-    private final long numElements;
+    private final long startIndex;
+    private final long endIndex;
 
     private BoundedCountingInput(long numElements) {
-      this.numElements = numElements;
+      this.endIndex = numElements;
+      this.startIndex = 0;
+    }
+
+    private BoundedCountingInput(long startIndex, long endIndex) {
+      this.endIndex = endIndex;
+      this.startIndex = startIndex;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public PCollection<Long> apply(PBegin begin) {
-      return begin.apply(Read.from(CountingSource.upTo(numElements)));
+      return begin.apply(Read.from(CountingSource.createSourceForSubrange(startIndex, endIndex)));
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.add(DisplayData.item("upTo", numElements)
-        .withLabel("Count Up To"));
+
+      if (startIndex == 0) {
+            builder.add(DisplayData.item("upTo", endIndex)
+                .withLabel("Count Up To"));
+      } else {
+            builder.add(DisplayData.item("startAt", startIndex).withLabel("Count Starting
At"))
+                    .add(DisplayData.item("upTo", endIndex).withLabel("Count Up To"));
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41ae08bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 59a8df8..bc7fb78 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -83,6 +83,17 @@ public class CountingSource {
   }
 
   /**
+   * Creates a {@link BoundedSource} that will produce elements
+   * from {@code startIndex} to {@code endIndex - 1}.
+   */
+  static BoundedSource<Long> createSourceForSubrange(long startIndex, long endIndex)
{
+    checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex
(%s)",
+            endIndex, startIndex);
+
+    return new BoundedCountingSource(startIndex, endIndex);
+  }
+
+  /**
    * Create a new {@link UnboundedCountingSource}.
    */
   // package-private to return a typed UnboundedCountingSource rather than the UnboundedSource
type.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41ae08bf/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 2397d10..02b4ba0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -49,21 +49,21 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class CountingInputTest {
-  public static void addCountingAsserts(PCollection<Long> input, long numElements)
{
+  public static void addCountingAsserts(PCollection<Long> input, long start, long end)
{
     // Count == numElements
     PAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
-        .isEqualTo(numElements);
+        .isEqualTo(end - start);
     // Unique count == numElements
     PAssert.thatSingleton(
             input
                 .apply(Distinct.<Long>create())
                 .apply("UniqueCount", Count.<Long>globally()))
-        .isEqualTo(numElements);
-    // Min == 0
-    PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(0L);
-    // Max == numElements-1
+        .isEqualTo(end - start);
+    // Min == start
+    PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(start);
+    // Max == end-1
     PAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
-        .isEqualTo(numElements - 1);
+        .isEqualTo(end - 1);
   }
 
   @Test
@@ -73,7 +73,19 @@ public class CountingInputTest {
     long numElements = 1000;
     PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
 
-    addCountingAsserts(input, numElements);
+    addCountingAsserts(input, 0, numElements);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBoundedInputSubrange() {
+    Pipeline p = TestPipeline.create();
+    long start = 10;
+    long end = 1000;
+    PCollection<Long> input = p.apply(CountingInput.forSubrange(start, end));
+
+    addCountingAsserts(input, start, end);
     p.run();
   }
 
@@ -85,6 +97,14 @@ public class CountingInputTest {
   }
 
   @Test
+  public void testBoundedDisplayDataSubrange() {
+    PTransform<?, ?> input = CountingInput.forSubrange(12, 1234);
+    DisplayData displayData = DisplayData.from(input);
+    assertThat(displayData, hasDisplayItem("startAt", 12));
+    assertThat(displayData, hasDisplayItem("upTo", 1234));
+  }
+
+  @Test
   @Category(RunnableOnService.class)
   public void testUnboundedInput() {
     Pipeline p = TestPipeline.create();
@@ -92,7 +112,7 @@ public class CountingInputTest {
 
     PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
 
-    addCountingAsserts(input, numElements);
+    addCountingAsserts(input, 0, numElements);
     p.run();
   }
 
@@ -110,7 +130,7 @@ public class CountingInputTest {
                 .withRate(elemsPerPeriod, periodLength)
                 .withMaxNumRecords(numElements));
 
-    addCountingAsserts(input, numElements);
+    addCountingAsserts(input, 0, numElements);
     long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
     Instant startTime = Instant.now();
     p.run();
@@ -136,7 +156,7 @@ public class CountingInputTest {
             CountingInput.unbounded()
                 .withTimestampFn(new ValueAsTimestampFn())
                 .withMaxNumRecords(numElements));
-    addCountingAsserts(input, numElements);
+    addCountingAsserts(input, 0, numElements);
 
     PCollection<Long> diffs =
         input


Mime
View raw message