beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/3] incubator-beam git commit: Generic ordered position range tracker.
Date Tue, 08 Nov 2016 01:55:28 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 8f4551c4e -> 93a95d68b


Generic ordered position range tracker.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4768227c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4768227c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4768227c

Branch: refs/heads/python-sdk
Commit: 4768227c62ac1c43e64bba604639bc4e80607edb
Parents: 8f4551c
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Fri Nov 4 11:18:31 2016 -0700
Committer: Robert Bradshaw <robertwb@google.com>
Committed: Mon Nov 7 17:53:06 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/range_trackers.py    | 75 ++++++++++++++++++++
 .../apache_beam/io/range_trackers_test.py       | 73 +++++++++++++++++++
 2 files changed, 148 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4768227c/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 080e2f3..b42ff1b 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -286,6 +286,81 @@ class GroupedShuffleRangeTracker(iobase.RangeTracker):
                        ' that are interpreted by the service')
 
 
+class OrderedPositionRangeTracker(iobase.RangeTracker):
+  """
+  An abstract base class for range trackers whose positions are comparable.
+
+  Subclasses only need to implement the mapping from position ranges to and from the
+  closed interval [0, 1].
+  """
+
+  UNSTARTED = object()
+
+  def __init__(self, start_position=None, stop_position=None):
+    self._start_position = start_position
+    self._stop_position = stop_position
+    self._lock = threading.Lock()
+    self._last_claim = self.UNSTARTED
+
+  def start_position(self):
+    return self._start_position
+
+  def stop_position(self):
+    with self._lock:
+      return self._end_position
+
+  def try_claim(self, position):
+    with self._lock:
+      if self._last_claim is not self.UNSTARTED and position < self._last_claim:
+        raise ValueError(
+            "Positions must be claimed in order: "
+            "claim '%s' attempted after claim '%s'" % (
+                position, self._last_claim))
+      elif self._start_position is not None and position < self._start_position:
+        raise ValueError("Claim '%s' is before start '%s'" % (
+            position, self._start_position))
+      if self._stop_position is None or position < self._stop_position:
+        self._last_claim = position
+        return True
+
+  def position_at_fraction(self, fraction):
+    return self.fraction_to_position(
+      fraction, self._start_position, self._stop_position)
+
+  def try_split(self, position):
+    with self._lock:
+      if ((self._stop_position is not None and position > self._stop_position)
+          or (self._start_position is not None
+              and position <= self._start_position)):
+        raise ValueError("Split at '%s' not in range %s" % (
+            position, [self._start_position, self._stop_position]))
+      if self._last_claim is self.UNSTARTED or self._last_claim < position:
+        fraction = self.position_to_fraction(
+          position, start=self._start_position, end=self._stop_position)
+        self._stop_position = position
+        return position, fraction
+
+  def fraction_consumed(self):
+    if self._last_claim is self.UNSTARTED:
+      return 0
+    else:
+      return self.position_to_fraction(
+          self._last_claim, self._start_position, self._stop_position)
+
+  def position_to_fraction(self, pos, start, end):
+    """
+    Converts a position `pos` betweeen `start` and `end` (inclusive) to a
+    fraction between 0 and 1.
+    """
+    raise NotImplementedError
+
+  def fraction_to_position(self, fraction, start, end):
+    """
+    Converts a fraction between 0 and 1 to a position between start and end.
+    """
+    raise NotImplementedError
+
+
 class UnsplittableRangeTracker(iobase.RangeTracker):
   """A RangeTracker that always ignores split requests.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4768227c/sdks/python/apache_beam/io/range_trackers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py
index c4c1e28..161103c 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -319,6 +319,79 @@ class GroupedShuffleRangeTrackerTest(unittest.TestCase):
         self.bytes_to_position([3, 2, 1])))
 
 
+class OrderedPositionRangeTrackerTest(unittest.TestCase):
+
+  class DoubleRangeTracker(range_trackers.OrderedPositionRangeTracker):
+
+    @staticmethod
+    def fraction_to_position(fraction, start, end):
+      return start + (end - start) * fraction
+
+    @staticmethod
+    def position_to_fraction(pos, start, end):
+      return float(pos - start) / (end - start)
+
+  def test_try_claim(self):
+    tracker = self.DoubleRangeTracker(10, 20)
+    self.assertTrue(tracker.try_claim(10))
+    self.assertTrue(tracker.try_claim(15))
+    self.assertFalse(tracker.try_claim(20))
+    self.assertFalse(tracker.try_claim(25))
+
+  def test_fraction_consumed(self):
+    tracker = self.DoubleRangeTracker(10, 20)
+    self.assertEqual(0, tracker.fraction_consumed())
+    tracker.try_claim(10)
+    self.assertEqual(0, tracker.fraction_consumed())
+    tracker.try_claim(15)
+    self.assertEqual(.5, tracker.fraction_consumed())
+    tracker.try_claim(17)
+    self.assertEqual(.7, tracker.fraction_consumed())
+    tracker.try_claim(25)
+    self.assertEqual(.7, tracker.fraction_consumed())
+
+  def test_try_split(self):
+    tracker = self.DoubleRangeTracker(10, 20)
+    tracker.try_claim(15)
+    self.assertEqual(.5, tracker.fraction_consumed())
+    # Split at 18.
+    self.assertEqual((18, 0.8), tracker.try_split(18))
+    # Fraction consumed reflects smaller range.
+    self.assertEqual(.625, tracker.fraction_consumed())
+    # We can claim anything less than 18,
+    self.assertTrue(tracker.try_claim(17))
+    # but can't split before claimed 17,
+    self.assertIsNone(tracker.try_split(16))
+    # nor claim anything after 18.
+    self.assertFalse(tracker.try_claim(19))
+
+  def test_claim_order(self):
+    tracker = self.DoubleRangeTracker(10, 20)
+    tracker.try_claim(12)
+    tracker.try_claim(15)
+    with self.assertRaises(ValueError):
+      tracker.try_claim(13)
+
+  def test_out_of_range(self):
+    tracker = self.DoubleRangeTracker(10, 20)
+    # Can't claim before range.
+    with self.assertRaises(ValueError):
+      tracker.try_claim(-5)
+    # Can't split before range.
+    with self.assertRaises(ValueError):
+      tracker.try_split(-5)
+    # Can't split at start position.
+    with self.assertRaises(ValueError):
+      tracker.try_split(10)
+    # Can't split after range.
+    with self.assertRaises(ValueError):
+      tracker.try_split(25)
+    tracker.try_split(15)
+    # Can't split after modified range.
+    with self.assertRaises(ValueError):
+      tracker.try_split(17)
+
+
 class UnsplittableRangeTrackerTest(unittest.TestCase):
 
   def test_try_claim(self):


Mime
View raw message