beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/2] incubator-beam git commit: Modified range tracker to use first response seen as start key
Date Wed, 15 Jun 2016 21:19:32 GMT
Modified range tracker to use first response seen as start key


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/172d4fdc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/172d4fdc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/172d4fdc

Branch: refs/heads/master
Commit: 172d4fdc84eb65898259b0d65f5f60492a0cbf7b
Parents: 2f46bc0
Author: Ian Zhou <ianzhou@google.com>
Authored: Thu Jun 9 14:17:14 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Jun 15 14:18:50 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/range/ByteKeyRangeTracker.java  |  8 ++
 .../sdk/io/range/ByteKeyRangeTrackerTest.java   | 84 ++++++++++++++------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |  5 +-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |  2 -
 4 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/172d4fdc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
index cb779fd..b165924 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
@@ -48,11 +48,19 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey>
{
     return range.getEndKey();
   }
 
+  /** Returns the current range. */
+  public synchronized ByteKeyRange getRange() {
+    return range;
+  }
+
   @Override
   public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart)
{
     if (isAtSplitPoint && !range.containsKey(recordStart)) {
       return false;
     }
+    if (position == null) {
+      range = range.withStartKey(recordStart);
+    }
     position = recordStart;
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/172d4fdc/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
index 958fa48..4404414 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
@@ -28,66 +28,100 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link ByteKeyRangeTracker}. */
 @RunWith(JUnit4.class)
 public class ByteKeyRangeTrackerTest {
-  private static final ByteKey START_KEY = ByteKey.of(0x12);
-  private static final ByteKey MIDDLE_KEY = ByteKey.of(0x23);
+  private static final ByteKey INITIAL_START_KEY = ByteKey.of(0x12);
+  private static final ByteKey INITIAL_MIDDLE_KEY = ByteKey.of(0x23);
+  private static final ByteKey NEW_START_KEY = ByteKey.of(0x14);
+  private static final ByteKey NEW_MIDDLE_KEY = ByteKey.of(0x24);
   private static final ByteKey BEFORE_END_KEY = ByteKey.of(0x33);
   private static final ByteKey END_KEY = ByteKey.of(0x34);
-  private static final double RANGE_SIZE = 0x34 - 0x12;
-  private static final ByteKeyRange RANGE = ByteKeyRange.of(START_KEY, END_KEY);
+  private static final double INITIAL_RANGE_SIZE = 0x34 - 0x12;
+  private static final ByteKeyRange INITIAL_RANGE = ByteKeyRange.of(INITIAL_START_KEY, END_KEY);
+  private static final double NEW_RANGE_SIZE = 0x34 - 0x14;
+  private static final ByteKeyRange NEW_RANGE = ByteKeyRange.of(NEW_START_KEY, END_KEY);
 
   /** Tests for {@link ByteKeyRangeTracker#toString}. */
   @Test
   public void testToString() {
-    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
-    String expected = String.format("ByteKeyRangeTracker{range=%s, position=null}", RANGE);
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+    String expected = String.format("ByteKeyRangeTracker{range=%s, position=null}", INITIAL_RANGE);
     assertEquals(expected, tracker.toString());
 
-    tracker.tryReturnRecordAt(true, MIDDLE_KEY);
-    expected = String.format("ByteKeyRangeTracker{range=%s, position=%s}", RANGE, MIDDLE_KEY);
+    tracker.tryReturnRecordAt(true, INITIAL_START_KEY);
+    tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY);
+    expected =
+        String.format("ByteKeyRangeTracker{range=%s, position=%s}", INITIAL_RANGE,
+            INITIAL_MIDDLE_KEY);
+    assertEquals(expected, tracker.toString());
+  }
+
+  /** Tests for updating the start key to the first record returned. */
+  @Test
+  public void testUpdateStartKey() {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+
+    tracker.tryReturnRecordAt(true, NEW_START_KEY);
+    String expected =
+        String.format("ByteKeyRangeTracker{range=%s, position=%s}", NEW_RANGE, NEW_START_KEY);
     assertEquals(expected, tracker.toString());
   }
 
   /** Tests for {@link ByteKeyRangeTracker#of}. */
   @Test
   public void testBuilding() {
-    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
 
-    assertEquals(START_KEY, tracker.getStartPosition());
+    assertEquals(INITIAL_START_KEY, tracker.getStartPosition());
     assertEquals(END_KEY, tracker.getStopPosition());
   }
 
   /** Tests for {@link ByteKeyRangeTracker#getFractionConsumed()}. */
   @Test
   public void testGetFractionConsumed() {
-    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
     double delta = 0.00001;
 
     assertEquals(0.0, tracker.getFractionConsumed(), delta);
 
-    tracker.tryReturnRecordAt(true, START_KEY);
+    tracker.tryReturnRecordAt(true, INITIAL_START_KEY);
+    assertEquals(0.0, tracker.getFractionConsumed(), delta);
+
+    tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY);
+    assertEquals(0.5, tracker.getFractionConsumed(), delta);
+
+    tracker.tryReturnRecordAt(true, BEFORE_END_KEY);
+    assertEquals(1 - 1 / INITIAL_RANGE_SIZE, tracker.getFractionConsumed(), delta);
+  }
+
+  /** Tests for {@link ByteKeyRangeTracker#getFractionConsumed()} with updated start key.
*/
+  @Test
+  public void testGetFractionConsumedUpdateStartKey() {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+    double delta = 0.00001;
+
+    tracker.tryReturnRecordAt(true, NEW_START_KEY);
     assertEquals(0.0, tracker.getFractionConsumed(), delta);
 
-    tracker.tryReturnRecordAt(true, MIDDLE_KEY);
+    tracker.tryReturnRecordAt(true, NEW_MIDDLE_KEY);
     assertEquals(0.5, tracker.getFractionConsumed(), delta);
 
     tracker.tryReturnRecordAt(true, BEFORE_END_KEY);
-    assertEquals(1 - 1 / RANGE_SIZE, tracker.getFractionConsumed(), delta);
+    assertEquals(1 - 1 / NEW_RANGE_SIZE, tracker.getFractionConsumed(), delta);
   }
 
   /** Tests for {@link ByteKeyRangeTracker#tryReturnRecordAt}. */
   @Test
   public void testTryReturnRecordAt() {
-    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
 
     // Should be able to emit at the same key twice, should that happen.
     // Should be able to emit within range (in order, but system guarantees won't try out
of order).
     // Should not be able to emit past end of range.
 
-    assertTrue(tracker.tryReturnRecordAt(true, START_KEY));
-    assertTrue(tracker.tryReturnRecordAt(true, START_KEY));
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
 
-    assertTrue(tracker.tryReturnRecordAt(true, MIDDLE_KEY));
-    assertTrue(tracker.tryReturnRecordAt(true, MIDDLE_KEY));
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
 
     assertTrue(tracker.tryReturnRecordAt(true, BEFORE_END_KEY));
 
@@ -99,13 +133,13 @@ public class ByteKeyRangeTrackerTest {
   /** Tests for {@link ByteKeyRangeTracker#trySplitAtPosition}. */
   @Test
   public void testSplitAtPosition() {
-    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
 
     // Unstarted, should not split.
-    assertFalse(tracker.trySplitAtPosition(MIDDLE_KEY));
+    assertFalse(tracker.trySplitAtPosition(INITIAL_MIDDLE_KEY));
 
     // Start it, split it before the end.
-    assertTrue(tracker.tryReturnRecordAt(true, START_KEY));
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
     assertTrue(tracker.trySplitAtPosition(BEFORE_END_KEY));
     assertEquals(BEFORE_END_KEY, tracker.getStopPosition());
 
@@ -113,8 +147,8 @@ public class ByteKeyRangeTrackerTest {
     assertFalse(tracker.trySplitAtPosition(END_KEY));
 
     // Should not be able to split after emitting.
-    assertTrue(tracker.tryReturnRecordAt(true, MIDDLE_KEY));
-    assertFalse(tracker.trySplitAtPosition(MIDDLE_KEY));
-    assertTrue(tracker.tryReturnRecordAt(true, MIDDLE_KEY));
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
+    assertFalse(tracker.trySplitAtPosition(INITIAL_MIDDLE_KEY));
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/172d4fdc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 9656494..f725a66 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -811,9 +811,10 @@ public class BigtableIO {
     public final synchronized BigtableSource splitAtFraction(double fraction) {
       ByteKey splitKey;
       try {
-        splitKey = source.getRange().interpolateKey(fraction);
+        splitKey = rangeTracker.getRange().interpolateKey(fraction);
       } catch (IllegalArgumentException e) {
-        logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(),
fraction);
+        logger.info(
+            "%s: Failed to interpolate key for fraction %s.", rangeTracker.getRange(), fraction);
         return null;
       }
       logger.debug(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/172d4fdc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 357ab44..4cb30b4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -65,7 +65,6 @@ import com.google.protobuf.Empty;
 
 import org.hamcrest.Matchers;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -294,7 +293,6 @@ public class BigtableIOTest {
    * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing
the
    * {@link BigtableIO.Read} implementation.
    */
-  @Ignore("Slow. Rerun when changing the implementation.")
   @Test
   public void testReadingSplitAtFractionExhaustive() throws Exception {
     final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";


Mime
View raw message