beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Move allowsDynamicSplitting to Reader, and set it in CompressedSource.
Date Thu, 23 Jun 2016 01:07:39 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 8949ec315 -> e0cae9fb6


Move allowsDynamicSplitting to Reader, and set it in CompressedSource.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b7e9a7e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b7e9a7e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b7e9a7e8

Branch: refs/heads/master
Commit: b7e9a7e8fa467641dfe1c19fa5d5f63f4b74a6d0
Parents: 8949ec3
Author: Pei He <peihe@google.com>
Authored: Fri Jun 17 18:24:06 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Jun 22 18:07:16 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    |  5 +++
 .../apache/beam/sdk/io/OffsetBasedSource.java   | 26 +++++++--------
 .../beam/sdk/io/CompressedSourceTest.java       | 35 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index a5c54b3..75bfc8f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -396,6 +396,11 @@ public class CompressedSource<T> extends FileBasedSource<T>
{
     }
 
     @Override
+    public boolean allowsDynamicSplitting() {
+      return splittable;
+    }
+
+    @Override
     public final long getSplitPointsConsumed() {
       if (splittable) {
         return readerDelegate.getSplitPointsConsumed();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index 2f62acd..295eab9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -192,17 +192,6 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T>
{
    */
   public abstract OffsetBasedSource<T> createSourceForSubrange(long start, long end);
 
-  /**
-   * Whether this source should allow dynamic splitting of the offset ranges.
-   *
-   * <p>True by default. Override this to return false if the source cannot
-   * support dynamic splitting correctly. If this returns false,
-   * {@link OffsetBasedSource.OffsetBasedReader#splitAtFraction} will refuse all split requests.
-   */
-  public boolean allowsDynamicSplitting() {
-    return true;
-  }
-
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
@@ -342,7 +331,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T>
{
         // Note that even if the current source does not allow splitting, we don't know that
         // it's non-empty so we return UNKNOWN instead of 1.
         return BoundedReader.SPLIT_POINTS_UNKNOWN;
-      } else if (!getCurrentSource().allowsDynamicSplitting()) {
+      } else if (!allowsDynamicSplitting()) {
         // Started (so non-empty) and unsplittable, so only the current task.
         return 1;
       } else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) {
@@ -355,9 +344,20 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T>
{
       }
     }
 
+    /**
+     * Whether this reader should allow dynamic splitting of the offset ranges.
+     *
+     * <p>True by default. Override this to return false if the reader cannot
+     * support dynamic splitting correctly. If this returns false,
+     * {@link OffsetBasedReader#splitAtFraction} will refuse all split requests.
+     */
+    public boolean allowsDynamicSplitting() {
+      return true;
+    }
+
     @Override
     public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction)
{
-      if (!getCurrentSource().allowsDynamicSplitting()) {
+      if (!allowsDynamicSplitting()) {
         return null;
       }
       if (rangeTracker.getStopPosition() == Long.MAX_VALUE) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index abf1de3..8fbed94 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -24,6 +24,8 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -44,6 +46,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.common.primitives.Bytes;
 
@@ -633,6 +637,37 @@ public class CompressedSourceTest {
   }
 
   @Test
+  public void testUnsplittable() throws IOException {
+    String baseName = "test-input";
+    File compressedFile = tmpFolder.newFile(baseName + ".gz");
+    byte[] input = generateInput(10000);
+    writeFile(compressedFile, input, CompressionMode.GZIP);
+
+    CompressedSource<Byte> source =
+        CompressedSource.from(new ByteSource(compressedFile.getPath(), 1));
+    List<Byte> expected = Lists.newArrayList();
+    for (byte i : input) {
+      expected.add(i);
+    }
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    BoundedReader<Byte> reader = source.createReader(options);
+
+    List<Byte> actual = Lists.newArrayList();
+    for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
+      actual.add(reader.getCurrent());
+      // checkpoint every 9 elements
+      if (actual.size() % 9 == 0) {
+        Double fractionConsumed = reader.getFractionConsumed();
+        assertNotNull(fractionConsumed);
+        assertNull(reader.splitAtFraction(fractionConsumed));
+      }
+    }
+    assertEquals(expected.size(), actual.size());
+    assertEquals(Sets.newHashSet(expected), Sets.newHashSet(actual));
+  }
+
+  @Test
   public void testSplittableProgress() throws IOException {
     File tmpFile = tmpFolder.newFile("nonempty.txt");
     String filename = tmpFile.toPath().toString();


Mime
View raw message