beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [beam] branch master updated: [BEAM-11361] Dynamic splitting of SDF IOs. (#13443)
Date Thu, 03 Dec 2020 20:38:28 GMT
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bcea54f  [BEAM-11361] Dynamic splitting of SDF IOs. (#13443)
bcea54f is described below

commit bcea54f20574ea6658e7afe934213a7246bbcfaf
Author: Robert Bradshaw <robertwb@google.com>
AuthorDate: Thu Dec 3 12:37:49 2020 -0800

    [BEAM-11361] Dynamic splitting of SDF IOs. (#13443)
    
    For now, only json lines are supported. CSV files are more complicated.
---
 sdks/python/apache_beam/dataframe/io.py            | 143 ++++++++++++++++++++-
 sdks/python/apache_beam/dataframe/io_test.py       |  41 ++++++
 sdks/python/apache_beam/io/restriction_trackers.py |  17 +++
 3 files changed, 195 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py
index b1af416..2f6404d 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -27,6 +27,9 @@ from apache_beam import io
 from apache_beam.dataframe import frame_base
 from apache_beam.io import fileio
 
+_DEFAULT_LINES_CHUNKSIZE = 10_000
+_DEFAULT_BYTES_CHUNKSIZE = 1 << 20
+
 
 def read_csv(path, *args, **kwargs):
   """Emulates `pd.read_csv` from Pandas, but as a Beam PTransform.
@@ -62,6 +65,7 @@ def read_json(path, *args, **kwargs):
       args,
       kwargs,
       incremental=kwargs.get('lines', False),
+      splittable=kwargs.get('lines', False),
       binary=False)
 
 
@@ -143,7 +147,14 @@ def _prefix_range_index_with(prefix, df):
 
 class _ReadFromPandas(beam.PTransform):
   def __init__(
-      self, reader, path, args, kwargs, incremental=False, binary=True):
+      self,
+      reader,
+      path,
+      args,
+      kwargs,
+      incremental=False,
+      splittable=False,
+      binary=True):
     if 'compression' in kwargs:
       raise NotImplementedError('compression')
     if not isinstance(path, str):
@@ -153,6 +164,7 @@ class _ReadFromPandas(beam.PTransform):
     self.args = args
     self.kwargs = kwargs
     self.incremental = incremental
+    self.splittable = splittable
     self.binary = binary
 
   def expand(self, root):
@@ -181,15 +193,105 @@ class _ReadFromPandas(beam.PTransform):
                 self.args,
                 self.kwargs,
                 self.incremental,
+                self.splittable,
                 self.binary)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
-# TODO(robertwb): Actually make an SDF.
-class _ReadFromPandasDoFn(beam.DoFn):
-  def __init__(self, reader, args, kwargs, incremental, binary):
+class _TruncatingFileHandle(object):
+  """A wrapper of a file-like object representing the restriction of the
+  underling handle according to the given SDF restriction tracker, breaking
+  the file only after the given delimiter.
+
+  For example, if the underling restriction is [103, 607) and each line were
+  exactly 10 characters long (i.e. every 10th charcter was a newline), then this
+  would give a view of a 500-byte file consisting of bytes bytes 110 to 609
+  (inclusive) of the underlying file.
+
+  As with all SDF trackers, the endpoint may change dynamically during reading.
+  """
+  def __init__(
+      self,
+      underlying,
+      tracker,
+      delim=b'\n',
+      chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
+    self._underlying = underlying
+    self._tracker = tracker
+    self._buffer_start_pos = self._tracker.current_restriction().start
+    self._delim = delim
+    self._chunk_size = chunk_size
+
+    self._buffer = self._empty = self._delim[:0]
+    self._done = False
+    if self._buffer_start_pos > 0:
+      # Seek to first delimiter after the start position.
+      self._underlying.seek(self._buffer_start_pos)
+      if self.buffer_to_delim():
+        line_start = self._buffer.index(self._delim) + len(self._delim)
+        self._buffer_start_pos += line_start
+        self._buffer = self._buffer[line_start:]
+      else:
+        self._done = True
+
+  def readable(self):
+    return True
+
+  def writable(self):
+    return False
+
+  def seekable(self):
+    return False
+
+  @property
+  def closed(self):
+    return False
+
+  def __iter__(self):
+    # For pandas is_file_like.
+    raise NotImplementedError()
+
+  def buffer_to_delim(self, offset=0):
+    """Read enough of the file such that the buffer contains the delimiter, or
+    end-of-file is reached.
+    """
+    if self._delim in self._buffer[offset:]:
+      return True
+    while True:
+      chunk = self._underlying.read(self._chunk_size)
+      self._buffer += chunk
+      if self._delim in chunk:
+        return True
+      elif not chunk:
+        return False
+
+  def read(self, size=-1):
+    if self._done:
+      return self._empty
+    elif size == -1:
+      self._buffer += self._underlying.read()
+    elif not self._buffer:
+      self._buffer = self._underlying.read(size)
+
+    if self._tracker.try_claim(self._buffer_start_pos + len(self._buffer)):
+      res = self._buffer
+      self._buffer = self._empty
+      self._buffer_start_pos += len(res)
+    else:
+      offset = self._tracker.current_restriction().stop - self._buffer_start_pos
+      if self.buffer_to_delim(offset):
+        end_of_line = self._buffer.index(self._delim, offset)
+        res = self._buffer[:end_of_line + len(self._delim)]
+      else:
+        res = self._buffer
+      self._done = True
+    return res
+
+
+class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider):
+  def __init__(self, reader, args, kwargs, incremental, splittable, binary):
     # avoid pickling issues
     if reader.__module__.startswith('pandas.'):
       reader = reader.__name__
@@ -197,23 +299,52 @@ class _ReadFromPandasDoFn(beam.DoFn):
     self.args = args
     self.kwargs = kwargs
     self.incremental = incremental
+    self.splittable = splittable
     self.binary = binary
 
-  def process(self, readable_file):
+  def initial_restriction(self, readable_file):
+    return beam.io.restriction_trackers.OffsetRange(
+        0, readable_file.metadata.size_in_bytes)
+
+  def restriction_size(self, readable_file, restriction):
+    return restriction.size()
+
+  def create_tracker(self, restriction):
+    tracker = beam.io.restriction_trackers.OffsetRestrictionTracker(restriction)
+    if self.splittable:
+      return tracker
+    else:
+      return beam.io.restriction_trackers.UnsplittableRestrictionTracker(
+          tracker)
+
+  def process(self, readable_file, tracker=beam.DoFn.RestrictionParam()):
     reader = self.reader
     if isinstance(reader, str):
       reader = getattr(pd, self.reader)
     with readable_file.open() as handle:
+      if self.incremental:
+        # We can get progress even if we can't split.
+        # TODO(robertwb): We could consider trying to get progress for
+        # non-incremental sources that are read linearly, as long as they
+        # don't try to seek.  This could be deceptive as progress would
+        # advance to 100% the instant the (large) read was done, discounting
+        # any downstream processing.
+        handle = _TruncatingFileHandle(handle, tracker)
       if not self.binary:
         handle = TextIOWrapper(handle)
       if self.incremental:
         if 'chunksize' not in self.kwargs:
-          self.kwargs['chunksize'] = 10_000
+          self.kwargs['chunksize'] = _DEFAULT_LINES_CHUNKSIZE
         frames = reader(handle, *self.args, **self.kwargs)
       else:
         frames = [reader(handle, *self.args, **self.kwargs)]
       for df in frames:
         yield _prefix_range_index_with(readable_file.metadata.path + ':', df)
+      if not self.incremental:
+        # Satisfy the SDF contract by claiming the whole range.
+        # Do this after emitting the frames to avoid advancing progress to 100%
+        # prior to that.
+        tracker.try_claim(tracker.current_restriction().stop)
 
 
 class _WriteToPandas(beam.PTransform):
diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py
index a86961a..de67eff 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -26,6 +26,7 @@ import shutil
 import sys
 import tempfile
 import unittest
+from io import StringIO
 
 import pandas as pd
 from pandas.testing import assert_frame_equal
@@ -34,6 +35,7 @@ from parameterized import parameterized
 import apache_beam as beam
 from apache_beam.dataframe import convert
 from apache_beam.dataframe import io
+from apache_beam.io import restriction_trackers
 from apache_beam.testing.util import assert_that
 
 
@@ -160,6 +162,45 @@ class IOTest(unittest.TestCase):
           os.system('head -n 100 ' + dest + '*')
           raise
 
+  def _run_truncating_file_handle_test(
+      self, s, splits, delim=' ', chunk_size=10):
+    split_results = []
+    next_range = restriction_trackers.OffsetRange(0, len(s))
+    for split in list(splits) + [None]:
+      tracker = restriction_trackers.OffsetRestrictionTracker(next_range)
+      handle = io._TruncatingFileHandle(
+          StringIO(s), tracker, delim=delim, chunk_size=chunk_size)
+      data = ''
+      chunk = handle.read(1)
+      if split is not None:
+        _, next_range = tracker.try_split(split)
+      while chunk:
+        data += chunk
+        chunk = handle.read(7)
+      split_results.append(data)
+    return split_results
+
+  def test_truncating_filehandle(self):
+    self.assertEqual(
+        self._run_truncating_file_handle_test('a b c d e', [0.5]),
+        ['a b c ', 'd e'])
+    self.assertEqual(
+        self._run_truncating_file_handle_test('aaaaaaaaaaaaaaXaaa b', [0.5]),
+        ['aaaaaaaaaaaaaaXaaa ', 'b'])
+    self.assertEqual(
+        self._run_truncating_file_handle_test(
+            'aa bbbbbbbbbbbbbbbbbbbbbbbbbb ccc ', [0.01, 0.5]),
+        ['aa ', 'bbbbbbbbbbbbbbbbbbbbbbbbbb ', 'ccc '])
+
+    numbers = 'x'.join(str(k) for k in range(1000))
+    splits = self._run_truncating_file_handle_test(
+        numbers, [0.1] * 20, delim='x')
+    self.assertEqual(numbers, ''.join(splits))
+    self.assertTrue(s.endswith('x') for s in splits[:-1])
+    self.assertLess(max(len(s) for s in splits), len(numbers) * 0.9 + 10)
+    self.assertGreater(
+        min(len(s) for s in splits), len(numbers) * 0.9**20 * 0.1)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/io/restriction_trackers.py b/sdks/python/apache_beam/io/restriction_trackers.py
index 2420c0b..00cefee 100644
--- a/sdks/python/apache_beam/io/restriction_trackers.py
+++ b/sdks/python/apache_beam/io/restriction_trackers.py
@@ -164,3 +164,20 @@ class OffsetRestrictionTracker(RestrictionTracker):
 
   def is_bounded(self):
     return True
+
+
+class UnsplittableRestrictionTracker(RestrictionTracker):
+  """An `iobase.RestrictionTracker` that wraps another but does not split."""
+  def __init__(self, underling_tracker):
+    self._underling_tracker = underling_tracker
+
+  def try_split(self, fraction_of_remainder):
+    return False
+
+  # __getattribute__ is used rather than __getattr__ to override the
+  # stubs in the baseclass.
+  def __getattribute__(self, name):
+    if name.startswith('_') or name in ('try_split', ):
+      return super(UnsplittableRestrictionTracker, self).__getattribute__(name)
+    else:
+      return getattr(self._underling_tracker, name)


Mime
View raw message