beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [beam] branch master updated: [BEAM-6167] Add class ReadFromTextWithFilename (Python) (#7193)
Date Mon, 10 Dec 2018 08:15:40 GMT
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 378d907  [BEAM-6167] Add class ReadFromTextWithFilename (Python) (#7193)
378d907 is described below

commit 378d9079cdc8cf8582c0ba2f03ab6a393c878dcb
Author: lcaggio <lorenzo.caggioni@gmail.com>
AuthorDate: Mon Dec 10 09:15:29 2018 +0100

    [BEAM-6167] Add class ReadFromTextWithFilename (Python) (#7193)
---
 sdks/python/apache_beam/io/textio.py      | 27 +++++++++++++++++++++++++--
 sdks/python/apache_beam/io/textio_test.py | 26 ++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 73ab4a9..b31628a 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -38,7 +38,8 @@ from apache_beam.io.iobase import Write
 from apache_beam.transforms import PTransform
 from apache_beam.transforms.display import DisplayDataItem
 
-__all__ = ['ReadFromText', 'ReadAllFromText', 'WriteToText']
+__all__ = ['ReadFromText', 'ReadFromTextWithFilename', 'ReadAllFromText',
+           'WriteToText']
 
 
 class _TextSource(filebasedsource.FileBasedSource):
@@ -320,6 +321,14 @@ class _TextSource(filebasedsource.FileBasedSource):
               sep_bounds[1] - record_start_position_in_buffer)
 
 
+class _TextSourceWithFilename(_TextSource):
+  def read_records(self, file_name, range_tracker):
+    records = super(_TextSourceWithFilename, self).read_records(file_name,
+                                                                range_tracker)
+    for record in records:
+      yield (file_name, record)
+
+
 class _TextSink(filebasedsink.FileBasedSink):
   """A sink to a GCS or local text file or files."""
 
@@ -483,6 +492,9 @@ class ReadFromText(PTransform):
   ``ASCII``.
   This does not support other encodings such as ``UTF-16`` or ``UTF-32``.
   """
+
+  _source_class = _TextSource
+
   def __init__(
       self,
       file_pattern=None,
@@ -518,7 +530,7 @@ class ReadFromText(PTransform):
     """
 
     super(ReadFromText, self).__init__(**kwargs)
-    self._source = _TextSource(
+    self._source = self._source_class(
         file_pattern, min_bundle_size, compression_type,
         strip_trailing_newlines, coder, validate=validate,
         skip_header_lines=skip_header_lines)
@@ -527,6 +539,17 @@ class ReadFromText(PTransform):
     return pvalue.pipeline | Read(self._source)
 
 
+class ReadFromTextWithFilename(ReadFromText):
+  r"""A :class:`~apache_beam.io.textio.ReadFromText` for reading text
+  files returning the name of the file and the content of the file.
+
+  This class extend ReadFromText class just setting a different
+  _source_class attribute.
+  """
+
+  _source_class = _TextSourceWithFilename
+
+
 class WriteToText(PTransform):
   """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to
   text files."""
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 2ed3b48..e42e940 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -20,6 +20,7 @@ from __future__ import absolute_import
 from __future__ import division
 
 import bz2
+import datetime
 import glob
 import gzip
 import logging
@@ -43,6 +44,7 @@ from apache_beam.io.textio import _TextSink as TextSink
 from apache_beam.io.textio import _TextSource as TextSource
 # Importing following private classes for testing.
 from apache_beam.io.textio import ReadFromText
+from apache_beam.io.textio import ReadFromTextWithFilename
 from apache_beam.io.textio import WriteToText
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_utils import TempDir
@@ -347,6 +349,15 @@ class TextSourceTest(unittest.TestCase):
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 
+  def test_read_from_text_with_file_name_single_file(self):
+    file_name, data = write_data(5)
+    expected_data = [(file_name, el) for el in data]
+    assert len(expected_data) == 5
+    pipeline = TestPipeline()
+    pcoll = pipeline | 'Read' >> ReadFromTextWithFilename(file_name)
+    assert_that(pcoll, equal_to(expected_data))
+    pipeline.run()
+
   def test_read_all_single_file(self):
     file_name, expected_data = write_data(5)
     assert len(expected_data) == 5
@@ -416,6 +427,21 @@ class TextSourceTest(unittest.TestCase):
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 
+  def test_read_from_text_with_file_name_file_pattern(self):
+    prefix = datetime.datetime.now().strftime("%Y%m%d%H%M%s")
+    file_name_1, data_1 = write_data(5, prefix=prefix)
+    file_name_2, data_2 = write_data(5, prefix=prefix)
+    expected_data = []
+    expected_data.extend([(file_name_1, el) for el in data_1])
+    expected_data.extend([(file_name_2, el) for el in data_2])
+    folder = file_name_1[:file_name_1.rfind(os.path.sep)]
+    pattern = folder + os.path.sep + prefix + '*'
+    assert len(expected_data) == 10
+    pipeline = TestPipeline()
+    pcoll = pipeline | 'Read' >> ReadFromTextWithFilename(pattern)
+    assert_that(pcoll, equal_to(expected_data))
+    pipeline.run()
+
   def test_read_all_file_pattern(self):
     pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4])
     assert len(expected_data) == 40


Mime
View raw message