beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: [BEAM-6356] Add the option to use TFRecord to store cache results using PCollection's PCoder (#8687)
Date Thu, 06 Jun 2019 19:16:49 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ae3a1f2  [BEAM-6356] Add the option to use TFRecord to store cache results using
PCollection's PCoder (#8687)
ae3a1f2 is described below

commit ae3a1f23e2b59c0cad743de416aa3809516f9554
Author: Alexey Strokach <ostrokach@gmail.com>
AuthorDate: Thu Jun 6 12:16:20 2019 -0700

    [BEAM-6356] Add the option to use TFRecord to store cache results using PCollection's
PCoder (#8687)
    
    * Use TFRecord to store intermediate cache results using PCollection's
    PCoder.
    * Add optional support for TFRecord as a cache serialization format
    * Rename _Reader and _Writer to _reader_class and _writer_class
    * Clarify the return type of the CacheManager.read method
---
 .../runners/interactive/cache_manager.py           | 88 ++++++++++++++++++----
 .../runners/interactive/cache_manager_test.py      | 36 +++++++--
 .../runners/interactive/interactive_runner.py      |  9 ++-
 3 files changed, 113 insertions(+), 20 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index e8816fe..20d84e3 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -28,6 +28,8 @@ import urllib
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import filesystems
+from apache_beam.io import textio
+from apache_beam.io import tfrecordio
 from apache_beam.transforms import combiners
 
 try:                    # Python 3
@@ -62,9 +64,12 @@ class CacheManager(object):
   def read(self, *labels):
     """Return the PCollection as a list as well as the version number.
 
+    Args:
+      *labels: List of labels for PCollection instance.
+
     Returns:
-      (List[PCollection])
-      (int) the version number
+      Tuple[List[Any], int]: A tuple containing a list of items in the
+        PCollection and the version number.
 
     It is possible that the version numbers from read() and_latest_version()
     are different. This usually means that the cache's been evicted (thus
@@ -81,6 +86,25 @@ class CacheManager(object):
     """Returns a beam.io.Sink that writes the PCollection cache."""
     raise NotImplementedError
 
+  def save_pcoder(self, pcoder, *labels):
+    """Saves pcoder for given PCollection.
+
+    Correct reading of PCollection from Cache requires PCoder to be known.
+    This method saves desired PCoder for PCollection that will subsequently
+    be used by sink(...), source(...), and, most importantly, read(...) method.
+    The latter must be able to read a PCollection written by Beam using
+    non-Beam IO.
+
+    Args:
+      pcoder: A PCoder to be used for reading and writing a PCollection.
+      *labels: List of labels for PCollection instance.
+    """
+    raise NotImplementedError
+
+  def load_pcoder(self, *labels):
+    """Returns previously saved PCoder for reading and writing PCollection."""
+    raise NotImplementedError
+
   def cleanup(self):
     """Cleans up all the PCollection caches."""
     raise NotImplementedError
@@ -89,7 +113,12 @@ class CacheManager(object):
 class FileBasedCacheManager(CacheManager):
   """Maps PCollections to local temp files for materialization."""
 
-  def __init__(self, cache_dir=None):
+  _available_formats = {
+      'text': (textio.ReadFromText, textio.WriteToText),
+      'tfrecord': (tfrecordio.ReadFromTFRecord, tfrecordio.WriteToTFRecord)
+  }
+
+  def __init__(self, cache_dir=None, cache_format='text'):
     if cache_dir:
       self._cache_dir = filesystems.FileSystems.join(
           cache_dir,
@@ -99,6 +128,25 @@ class FileBasedCacheManager(CacheManager):
           prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
     self._versions = collections.defaultdict(lambda: self._CacheVersion())
 
+    if cache_format not in self._available_formats:
+      raise ValueError("Unsupported cache format: '%s'." % cache_format)
+    self._reader_class, self._writer_class = self._available_formats[
+        cache_format]
+    self._default_pcoder = (
+        SafeFastPrimitivesCoder() if cache_format == 'text' else None)
+
+    # List of saved pcoders keyed by PCollection path. It is OK to keep this
+    # list in memory because once FileBasedCacheManager object is
+    # destroyed/re-created it loses the access to previously written cache
+    # objects anyways even if cache_dir already exists. In other words,
+    # it is not possible to resume execution of Beam pipeline from the
+    # saved cache if FileBasedCacheManager has been reset.
+    #
+    # However, if we are to implement better cache persistence, one needs
+    # to take care of keeping consistency between the cached PCollection
+    # and its PCoder type.
+    self._saved_pcoders = {}
+
   def exists(self, *labels):
     return bool(self._match(*labels))
 
@@ -109,29 +157,35 @@ class FileBasedCacheManager(CacheManager):
     result = self._versions["-".join(labels)].get_version(timestamp)
     return result
 
+  def save_pcoder(self, pcoder, *labels):
+    self._saved_pcoders[self._path(*labels)] = pcoder
+
+  def load_pcoder(self, *labels):
+    return (self._default_pcoder if self._default_pcoder is not None else
+            self._saved_pcoders[self._path(*labels)])
+
   def read(self, *labels):
     if not self.exists(*labels):
       return [], -1
 
-    def _read_helper():
-      coder = SafeFastPrimitivesCoder()
-      for path in self._match(*labels):
-        for line in filesystems.FileSystems.open(path):
-          yield coder.decode(line.strip())
-    result, version = list(_read_helper()), self._latest_version(*labels)
+    source = self.source(*labels)
+    range_tracker = source.get_range_tracker(None, None)
+    result = list(source.read(range_tracker))
+    version = self._latest_version(*labels)
     return result, version
 
   def source(self, *labels):
-    return beam.io.ReadFromText(self._glob_path(*labels),
-                                coder=SafeFastPrimitivesCoder())._source
+    return self._reader_class(
+        self._glob_path(*labels), coder=self.load_pcoder(*labels))._source
 
   def sink(self, *labels):
-    return beam.io.WriteToText(self._path(*labels),
-                               coder=SafeFastPrimitivesCoder())._sink
+    return self._writer_class(
+        self._path(*labels), coder=self.load_pcoder(*labels))._sink
 
   def cleanup(self):
     if filesystems.FileSystems.exists(self._cache_dir):
       filesystems.FileSystems.delete([self._cache_dir])
+    self._saved_pcoders = {}
 
   def _glob_path(self, *labels):
     return self._path(*labels) + '-*-of-*'
@@ -188,6 +242,14 @@ class WriteCache(beam.PTransform):
 
   def expand(self, pcoll):
     prefix = 'sample' if self._sample else 'full'
+
+    # We save pcoder that is necessary for proper reading of
+    # cached PCollection. _cache_manager.sink(...) call below
+    # should be using this saved pcoder.
+    self._cache_manager.save_pcoder(
+        coders.registry.get_coder(pcoll.element_type),
+        prefix, self._label)
+
     if self._sample:
       pcoll |= 'Sample' >> (
           combiners.Sample.FixedSizeGlobally(self._sample_size)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index 641643f..3ad81b8 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -25,14 +25,15 @@ import tempfile
 import time
 import unittest
 
+from apache_beam import coders
 from apache_beam.io import filesystems
 from apache_beam.runners.interactive import cache_manager as cache
 
 
-class FileBasedCacheManagerTest(unittest.TestCase):
+class FileBasedCacheManagerTest(object):
   """Unit test for FileBasedCacheManager.
 
-  Note that this set of tests focuses only the the methods that interacts with
+  Note that this set of tests focuses only the methods that interacts with
   the LOCAL file system. The idea is that once FileBasedCacheManager works well
   with the local file system, it should work with any file system with
   `apache_beam.io.filesystem` interface. Those tests that involve interactions
@@ -40,9 +41,12 @@ class FileBasedCacheManagerTest(unittest.TestCase):
   tested with InteractiveRunner as a part of integration tests instead.
   """
 
+  cache_format = None
+
   def setUp(self):
     self.test_dir = tempfile.mkdtemp()
-    self.cache_manager = cache.FileBasedCacheManager(self.test_dir)
+    self.cache_manager = cache.FileBasedCacheManager(
+        self.test_dir, cache_format=self.cache_format)
 
   def tearDown(self):
     # The test_dir might have already been removed by cache_manager.cleanup().
@@ -61,10 +65,16 @@ class FileBasedCacheManagerTest(unittest.TestCase):
     time.sleep(0.1)
 
     cache_file = cache_label + '-1-of-2'
+    labels = [prefix, cache_label]
+
+    # Usually, the pcoder will be inferred from `pcoll.element_type`
+    pcoder = coders.registry.get_coder(object)
+    self.cache_manager.save_pcoder(pcoder, *labels)
+    sink = self.cache_manager.sink(*labels)
+
     with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
       for line in pcoll_list:
-        f.write(cache.SafeFastPrimitivesCoder().encode(line))
-        f.write(b'\n')
+        sink.write_record(f, line)
 
   def test_exists(self):
     """Test that CacheManager can correctly tell if the cache exists or not."""
@@ -163,5 +173,21 @@ class FileBasedCacheManagerTest(unittest.TestCase):
         self.cache_manager.is_latest_version(version, prefix, cache_label))
 
 
+class TextFileBasedCacheManagerTest(
+    FileBasedCacheManagerTest,
+    unittest.TestCase,
+):
+
+  cache_format = 'text'
+
+
+class TFRecordBasedCacheManagerTest(
+    FileBasedCacheManagerTest,
+    unittest.TestCase,
+):
+
+  cache_format = 'tfrecord'
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 391f3f0..4bf125e 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -44,19 +44,24 @@ class InteractiveRunner(runners.PipelineRunner):
   Allows interactively building and running Beam Python pipelines.
   """
 
-  def __init__(self, underlying_runner=None, cache_dir=None,
+  def __init__(self,
+               underlying_runner=None,
+               cache_dir=None,
+               cache_format='text',
                render_option=None):
     """Constructor of InteractiveRunner.
 
     Args:
       underlying_runner: (runner.PipelineRunner)
       cache_dir: (str) the directory where PCollection caches are kept
+      cache_format: (str) the file format that should be used for saving
+          PCollection caches. Available options are 'text' and 'tfrecord'.
       render_option: (str) this parameter decides how the pipeline graph is
           rendered. See display.pipeline_graph_renderer for available options.
     """
     self._underlying_runner = (underlying_runner
                                or direct_runner.DirectRunner())
-    self._cache_manager = cache.FileBasedCacheManager(cache_dir)
+    self._cache_manager = cache.FileBasedCacheManager(cache_dir, cache_format)
     self._renderer = pipeline_graph_renderer.get_renderer(render_option)
     self._in_session = False
 


Mime
View raw message