beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] incubator-beam git commit: Updates filebasedsource to support CompressionType.AUTO.
Date Tue, 04 Oct 2016 20:00:45 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 731a77152 -> 3a69db0c5


Updates filebasedsource to support CompressionType.AUTO.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2126a34c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2126a34c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2126a34c

Branch: refs/heads/python-sdk
Commit: 2126a34c06d74c5ad44fbec8dd4e278f99ed473a
Parents: 731a771
Author: chamikara@google.com <chamikara@google.com>
Authored: Sun Sep 25 21:44:34 2016 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Mon Oct 3 10:20:46 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py   |  44 ++++----
 .../apache_beam/io/filebasedsource_test.py      | 104 +++++++++++++++++--
 2 files changed, 121 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2126a34c/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 8ff69ca..e067833 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -42,8 +42,7 @@ class FileBasedSource(iobase.BoundedSource):
   def __init__(self,
                file_pattern,
                min_bundle_size=0,
-               # TODO(BEAM-614)
-               compression_type=fileio.CompressionTypes.UNCOMPRESSED,
+               compression_type=fileio.CompressionTypes.AUTO,
                splittable=True):
     """Initializes ``FileBasedSource``.
 
@@ -72,13 +71,6 @@ class FileBasedSource(iobase.BoundedSource):
           '%s: file_pattern must be a string;  got %r instead' %
           (self.__class__.__name__, file_pattern))
 
-    if compression_type == fileio.CompressionTypes.AUTO:
-      raise ValueError('FileBasedSource currently does not support '
-                       'CompressionTypes.AUTO. Please explicitly specify the '
-                       'compression type or use '
-                       'CompressionTypes.UNCOMPRESSED if file is '
-                       'uncompressed.')
-
     self._pattern = file_pattern
     self._concat_source = None
     self._min_bundle_size = min_bundle_size
@@ -86,11 +78,12 @@ class FileBasedSource(iobase.BoundedSource):
       raise TypeError('compression_type must be CompressionType object but '
                       'was %s' % type(compression_type))
     self._compression_type = compression_type
-    if compression_type != fileio.CompressionTypes.UNCOMPRESSED:
+    if compression_type in (fileio.CompressionTypes.UNCOMPRESSED,
+                            fileio.CompressionTypes.AUTO):
+      self._splittable = splittable
+    else:
       # We can't split compressed files efficiently so turn off splitting.
       self._splittable = False
-    else:
-      self._splittable = splittable
 
   def _get_concat_source(self):
     if self._concat_source is None:
@@ -102,11 +95,21 @@ class FileBasedSource(iobase.BoundedSource):
         if sizes[index] == 0:
           continue  # Ignoring empty file.
 
+        # We determine splittability of this specific file.
+        splittable = self.splittable
+        if (splittable and
+            self._compression_type == fileio.CompressionTypes.AUTO):
+          compression_type = fileio.CompressionTypes.detect_compression_type(
+              file_name)
+          if compression_type != fileio.CompressionTypes.UNCOMPRESSED:
+            splittable = False
+
         single_file_source = _SingleFileSource(
             self, file_name,
             0,
             sizes[index],
-            min_bundle_size=self._min_bundle_size)
+            min_bundle_size=self._min_bundle_size,
+            splittable=splittable)
         single_file_sources.append(single_file_source)
       self._concat_source = concat_source.ConcatSource(single_file_sources)
     return self._concat_source
@@ -173,7 +176,7 @@ class _SingleFileSource(iobase.BoundedSource):
   """Denotes a source for a specific file type."""
 
   def __init__(self, file_based_source, file_name, start_offset, stop_offset,
-               min_bundle_size=0):
+               min_bundle_size=0, splittable=True):
     if not isinstance(start_offset, (int, long)):
       raise TypeError(
           'start_offset must be a number. Received: %r' % start_offset)
@@ -193,6 +196,7 @@ class _SingleFileSource(iobase.BoundedSource):
     self._stop_offset = stop_offset
     self._min_bundle_size = min_bundle_size
     self._file_based_source = file_based_source
+    self._splittable = splittable
 
   def split(self, desired_bundle_size, start_offset=None, stop_offset=None):
     if start_offset is None:
@@ -200,7 +204,7 @@ class _SingleFileSource(iobase.BoundedSource):
     if stop_offset is None:
       stop_offset = self._stop_offset
 
-    if self._file_based_source.splittable:
+    if self._splittable:
       bundle_size = max(desired_bundle_size, self._min_bundle_size)
 
       bundle_start = start_offset
@@ -214,7 +218,8 @@ class _SingleFileSource(iobase.BoundedSource):
                 self._file_name,
                 bundle_start,
                 bundle_stop,
-                min_bundle_size=self._min_bundle_size),
+                min_bundle_size=self._min_bundle_size,
+                splittable=self._splittable),
             bundle_start,
             bundle_stop)
         bundle_start = bundle_stop
@@ -230,7 +235,8 @@ class _SingleFileSource(iobase.BoundedSource):
               self._file_name,
               start_offset,
               range_trackers.OffsetRangeTracker.OFFSET_INFINITY,
-              min_bundle_size=self._min_bundle_size
+              min_bundle_size=self._min_bundle_size,
+              splittable=self._splittable
           ),
           start_offset,
           range_trackers.OffsetRangeTracker.OFFSET_INFINITY
@@ -248,12 +254,12 @@ class _SingleFileSource(iobase.BoundedSource):
       # file as end offset will be wrong for certain unsplittable source, for
       # e.g., compressed sources.
       stop_position = (
-          self._stop_offset if self._file_based_source.splittable
+          self._stop_offset if self._splittable
           else range_trackers.OffsetRangeTracker.OFFSET_INFINITY)
 
     range_tracker = range_trackers.OffsetRangeTracker(
         start_position, stop_position)
-    if not self._file_based_source.splittable:
+    if not self._splittable:
       range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)
 
     return range_tracker

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2126a34c/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index d45a6f9..91b1ffb 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -19,6 +19,7 @@ import gzip
 import logging
 import math
 import os
+import StringIO
 import tempfile
 import unittest
 import zlib
@@ -44,18 +45,23 @@ class LineSource(FileBasedSource):
     f = self.open_file(file_name)
     try:
       start = range_tracker.start_position()
-      f.seek(start)
       if start > 0:
-        f.seek(-1, os.SEEK_CUR)
+        # Any line that starts after 'start' does not belong to the current
+        # bundle. Seeking to (start - 1) and skipping a line moves the current
+        # position to the starting position of the first line that belongs to
+        # the current bundle.
         start -= 1
+        f.seek(start)
         line = f.readline()
         start += len(line)
       current = start
-      for line in f:
+      line = f.readline()
+      while line:
         if not range_tracker.try_claim(current):
           return
         yield line.rstrip('\n')
         current += len(line)
+        line = f.readline()
     finally:
       f.close()
 
@@ -94,17 +100,22 @@ def write_data(
     return f.name, all_data
 
 
-def _write_prepared_data(data, directory=None, prefix=tempfile.template):
+def _write_prepared_data(data, directory=None,
+                         prefix=tempfile.template, suffix=''):
   with tempfile.NamedTemporaryFile(
-      delete=False, dir=directory, prefix=prefix) as f:
+      delete=False, dir=directory, prefix=prefix, suffix=suffix) as f:
     f.write(data)
     return f.name
 
 
-def write_prepared_pattern(data):
+def write_prepared_pattern(data, suffixes=None):
+  if suffixes is None:
+    suffixes = [''] * len(data)
   temp_dir = tempfile.mkdtemp()
-  for d in data:
-    file_name = _write_prepared_data(d, temp_dir, prefix='mytemp')
+  assert len(data) > 0
+  for i, d in enumerate(data):
+    file_name = _write_prepared_data(d, temp_dir, prefix='mytemp',
+                                     suffix=suffixes[i])
   return file_name[:file_name.rfind(os.path.sep)] + os.path.sep + 'mytemp*'
 
 
@@ -337,6 +348,7 @@ class TestFileBasedSource(unittest.TestCase):
         splittable=False,
         compression_type=fileio.CompressionTypes.GZIP))
     assert_that(pcoll, equal_to(lines))
+    pipeline.run()
 
   def test_read_zlib_file(self):
     _, lines = write_data(10)
@@ -351,6 +363,7 @@ class TestFileBasedSource(unittest.TestCase):
         splittable=False,
         compression_type=fileio.CompressionTypes.ZLIB))
     assert_that(pcoll, equal_to(lines))
+    pipeline.run()
 
   def test_read_zlib_pattern(self):
     _, lines = write_data(200)
@@ -369,6 +382,81 @@ class TestFileBasedSource(unittest.TestCase):
         splittable=False,
         compression_type=fileio.CompressionTypes.ZLIB))
     assert_that(pcoll, equal_to(lines))
+    pipeline.run()
+
+  def test_read_gzip_pattern(self):
+    _, lines = write_data(200)
+    splits = [0, 34, 100, 140, 164, 188, 200]
+    chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
+    compressed_chunks = []
+    for c in chunks:
+      out = StringIO.StringIO()
+      with gzip.GzipFile(fileobj=out, mode="w") as f:
+        f.write('\n'.join(c))
+      compressed_chunks.append(out.getvalue())
+    file_pattern = write_prepared_pattern(compressed_chunks)
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        file_pattern,
+        splittable=False,
+        compression_type=fileio.CompressionTypes.GZIP))
+    assert_that(pcoll, equal_to(lines))
+    pipeline.run()
+
+  def test_read_auto_single_file(self):
+    _, lines = write_data(10)
+    filename = tempfile.NamedTemporaryFile(
+        delete=False, prefix=tempfile.template, suffix='.gz').name
+    with gzip.GzipFile(filename, 'wb') as f:
+      f.write('\n'.join(lines))
+
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        filename,
+        compression_type=fileio.CompressionTypes.AUTO))
+    assert_that(pcoll, equal_to(lines))
+    pipeline.run()
+
+  def test_read_auto_pattern(self):
+    _, lines = write_data(200)
+    splits = [0, 34, 100, 140, 164, 188, 200]
+    chunks = [lines[splits[i - 1]:splits[i]] for i in xrange(1, len(splits))]
+    compressed_chunks = []
+    for c in chunks:
+      out = StringIO.StringIO()
+      with gzip.GzipFile(fileobj=out, mode="w") as f:
+        f.write('\n'.join(c))
+      compressed_chunks.append(out.getvalue())
+    file_pattern = write_prepared_pattern(
+        compressed_chunks, suffixes=['.gz']*len(chunks))
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        file_pattern,
+        compression_type=fileio.CompressionTypes.AUTO))
+    assert_that(pcoll, equal_to(lines))
+    pipeline.run()
+
+  def test_read_auto_pattern_compressed_and_uncompressed(self):
+    _, lines = write_data(200)
+    splits = [0, 34, 100, 140, 164, 188, 200]
+    chunks = [lines[splits[i - 1]:splits[i]] for i in xrange(1, len(splits))]
+    chunks_to_write = []
+    for i, c in enumerate(chunks):
+      if i%2 == 0:
+        out = StringIO.StringIO()
+        with gzip.GzipFile(fileobj=out, mode="w") as f:
+          f.write('\n'.join(c))
+        chunks_to_write.append(out.getvalue())
+      else:
+        chunks_to_write.append('\n'.join(c))
+    file_pattern = write_prepared_pattern(chunks_to_write,
+                                          suffixes=(['.gz', '']*3))
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        file_pattern,
+        compression_type=fileio.CompressionTypes.AUTO))
+    assert_that(pcoll, equal_to(lines))
+    pipeline.run()
 
 
 class TestSingleFileSource(unittest.TestCase):


Mime
View raw message