beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [beam] branch master updated: [BEAM-4711] fix globbing in LocalFileSystem.delete (#5863)
Date Tue, 18 Sep 2018 21:52:45 GMT
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 073f77d  [BEAM-4711] fix globbing in LocalFileSystem.delete (#5863)
073f77d is described below

commit 073f77df021c452bea03ee21eb61fc5331b61c14
Author: Ryan Williams <ryan.blake.williams@gmail.com>
AuthorDate: Tue Sep 18 17:52:39 2018 -0400

    [BEAM-4711] fix globbing in LocalFileSystem.delete (#5863)
---
 sdks/python/apache_beam/io/localfilesystem.py      |  13 +-
 sdks/python/apache_beam/io/localfilesystem_test.py | 189 +++++++++++++++++++++
 2 files changed, 201 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 73d9a4d..ddd5022 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -321,11 +321,22 @@ class LocalFileSystem(FileSystem):
         raise IOError(err)
 
     exceptions = {}
-    for path in paths:
+
+    def try_delete(path):
       try:
         _delete_path(path)
       except Exception as e:  # pylint: disable=broad-except
         exceptions[path] = e
 
+    for match_result in self.match(paths):
+      metadata_list = match_result.metadata_list
+
+      if not metadata_list:
+        exceptions[match_result.pattern] = \
+          IOError('No files found to delete under: %s' % match_result.pattern)
+
+      for metadata in match_result.metadata_list:
+        try_delete(metadata.path)
+
     if exceptions:
       raise BeamIOError("Delete operation failed", exceptions)
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 5d032db..bc45e42 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -291,6 +291,195 @@ class LocalFileSystemTest(unittest.TestCase):
     self.assertEquals(self.fs.checksum(path1), str(5))
     self.assertEquals(self.fs.checksum(path2), str(3))
 
+  def make_tree(self, path, value, expected_leaf_count=None):
+    """Create a file+directory structure from a simple dict-based DSL
+
+    :param path: root path to create directories+files under
+    :param value: a specification of what ``path`` should contain: ``None`` to
+     make it an empty directory, a string literal to make it a file with those
+      contents, and a ``dict`` to make it a non-empty directory and recurse
+    :param expected_leaf_count: only be set at the top of a recursive call
+     stack; after the whole tree has been created, verify the presence and
+     number of all files+directories, as a sanity check
+    """
+    if value is None:
+      # empty directory
+      os.makedirs(path)
+    elif isinstance(value, str):
+      # file with string-literal contents
+      dir = os.path.dirname(path)
+      if not os.path.exists(dir):
+        os.makedirs(dir)
+      with open(path, 'a') as f:
+        f.write(value)
+    elif isinstance(value, dict):
+      # recurse to create a subdirectory tree
+      for basename, v in value.items():
+        self.make_tree(
+            os.path.join(path, basename),
+            v
+        )
+    else:
+      raise Exception(
+          'Unexpected value in tempdir tree: %s' % value
+      )
+
+    if expected_leaf_count != None:
+      self.assertEqual(
+          self.check_tree(path, value),
+          expected_leaf_count
+      )
+
+  def check_tree(self, path, value, expected_leaf_count=None):
+    """Verify a directory+file structure according to the rules described in
+    ``make_tree``
+
+    :param path: path to check under
+    :param value: DSL-representation of expected files+directories under
+    ``path``
+    :return: number of leaf files/directories that were verified
+    """
+    actual_leaf_count = None
+    if value is None:
+      # empty directory
+      self.assertTrue(os.path.exists(path), msg=path)
+      self.assertEqual(os.listdir(path), [])
+      actual_leaf_count = 1
+    elif isinstance(value, str):
+      # file with string-literal contents
+      with open(path, 'r') as f:
+        self.assertEqual(f.read(), value, msg=path)
+
+      actual_leaf_count = 1
+    elif isinstance(value, dict):
+      # recurse to check subdirectory tree
+      actual_leaf_count = sum(
+          [
+              self.check_tree(
+                  os.path.join(path, basename),
+                  v
+              )
+              for basename, v in value.items()
+          ]
+      )
+    else:
+      raise Exception(
+          'Unexpected value in tempdir tree: %s' % value
+      )
+
+    if expected_leaf_count != None:
+      self.assertEqual(actual_leaf_count, expected_leaf_count)
+
+    return actual_leaf_count
+
+  _test_tree = {
+      'path1': '111',
+      'path2': {
+          '2': '222',
+          'emptydir': None
+      },
+      'aaa': {
+          'b1': 'b1',
+          'b2': None,
+          'bbb': {
+              'ccc': {
+                  'ddd': 'DDD'
+              }
+          },
+          'c': None
+      }
+  }
+
+  def test_delete_globs(self):
+    dir = os.path.join(self.tmpdir, 'dir')
+    self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+    self.fs.delete([
+        os.path.join(dir, 'path*'),
+        os.path.join(dir, 'aaa/b*')
+    ])
+
+    # One empty nested directory is left
+    self.check_tree(
+        dir,
+        {
+            'aaa': {
+                'c': None
+            }
+        },
+        expected_leaf_count=1
+    )
+
+  def test_recursive_delete(self):
+    dir = os.path.join(self.tmpdir, 'dir')
+    self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+    self.fs.delete([dir])
+
+    self.check_tree(
+        self.tmpdir,
+        {'': None},
+        expected_leaf_count=1
+    )
+
+  def test_delete_glob_errors(self):
+    dir = os.path.join(self.tmpdir, 'dir')
+    self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+    with self.assertRaisesRegexp(BeamIOError,
+                                 r'^Delete operation failed') as error:
+      self.fs.delete([
+          os.path.join(dir, 'path*'),
+          os.path.join(dir, 'aaa/b*'),
+          os.path.join(dir, 'aaa/d*')  # doesn't match anything, will raise
+      ])
+
+    self.check_tree(
+        dir,
+        {
+            'aaa': {
+                'c': None
+            }
+        },
+        expected_leaf_count=1
+    )
+
+    self.assertEqual(
+        list(
+            error
+            .exception
+            .exception_details
+            .keys()
+        ),
+        [os.path.join(dir, 'aaa/d*')]
+    )
+
+    with self.assertRaisesRegexp(BeamIOError,
+                                 r'^Delete operation failed') as error:
+      self.fs.delete([
+          os.path.join(dir, 'path*')  # doesn't match anything, will raise
+      ])
+
+    self.check_tree(
+        dir,
+        {
+            'aaa': {
+                'c': None
+            }
+        },
+        expected_leaf_count=1
+    )
+
+    self.assertEqual(
+        list(
+            error
+            .exception
+            .exception_details
+            .keys()
+        ),
+        [os.path.join(dir, 'path*')]
+    )
+
   def test_delete(self):
     path1 = os.path.join(self.tmpdir, 'f1')
 


Mime
View raw message