beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4711) LocalFileSystem.delete doesn't support globbing
Date Tue, 18 Sep 2018 21:53:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4711?focusedWorklogId=145494&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145494
]

ASF GitHub Bot logged work on BEAM-4711:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Sep/18 21:52
            Start Date: 18/Sep/18 21:52
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #5863: [BEAM-4711] fix globbing in LocalFileSystem.delete
URL: https://github.com/apache/beam/pull/5863
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 73d9a4de6cd..ddd5022b2bb 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -321,11 +321,22 @@ def _delete_path(path):
         raise IOError(err)
 
     exceptions = {}
-    for path in paths:
+
+    def try_delete(path):
       try:
         _delete_path(path)
       except Exception as e:  # pylint: disable=broad-except
         exceptions[path] = e
 
+    for match_result in self.match(paths):
+      metadata_list = match_result.metadata_list
+
+      if not metadata_list:
+        exceptions[match_result.pattern] = \
+          IOError('No files found to delete under: %s' % match_result.pattern)
+
+      for metadata in match_result.metadata_list:
+        try_delete(metadata.path)
+
     if exceptions:
       raise BeamIOError("Delete operation failed", exceptions)
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 5d032db59a1..bc45e4221dd 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -291,6 +291,195 @@ def test_checksum(self):
     self.assertEquals(self.fs.checksum(path1), str(5))
     self.assertEquals(self.fs.checksum(path2), str(3))
 
+  def make_tree(self, path, value, expected_leaf_count=None):
+    """Create a file+directory structure from a simple dict-based DSL
+
+    :param path: root path to create directories+files under
+    :param value: a specification of what ``path`` should contain: ``None`` to
+     make it an empty directory, a string literal to make it a file with those
+      contents, and a ``dict`` to make it a non-empty directory and recurse
+    :param expected_leaf_count: only be set at the top of a recursive call
+     stack; after the whole tree has been created, verify the presence and
+     number of all files+directories, as a sanity check
+    """
+    if value is None:
+      # empty directory
+      os.makedirs(path)
+    elif isinstance(value, str):
+      # file with string-literal contents
+      dir = os.path.dirname(path)
+      if not os.path.exists(dir):
+        os.makedirs(dir)
+      with open(path, 'a') as f:
+        f.write(value)
+    elif isinstance(value, dict):
+      # recurse to create a subdirectory tree
+      for basename, v in value.items():
+        self.make_tree(
+            os.path.join(path, basename),
+            v
+        )
+    else:
+      raise Exception(
+          'Unexpected value in tempdir tree: %s' % value
+      )
+
+    if expected_leaf_count != None:
+      self.assertEqual(
+          self.check_tree(path, value),
+          expected_leaf_count
+      )
+
+  def check_tree(self, path, value, expected_leaf_count=None):
+    """Verify a directory+file structure according to the rules described in
+    ``make_tree``
+
+    :param path: path to check under
+    :param value: DSL-representation of expected files+directories under
+    ``path``
+    :return: number of leaf files/directories that were verified
+    """
+    actual_leaf_count = None
+    if value is None:
+      # empty directory
+      self.assertTrue(os.path.exists(path), msg=path)
+      self.assertEqual(os.listdir(path), [])
+      actual_leaf_count = 1
+    elif isinstance(value, str):
+      # file with string-literal contents
+      with open(path, 'r') as f:
+        self.assertEqual(f.read(), value, msg=path)
+
+      actual_leaf_count = 1
+    elif isinstance(value, dict):
+      # recurse to check subdirectory tree
+      actual_leaf_count = sum(
+          [
+              self.check_tree(
+                  os.path.join(path, basename),
+                  v
+              )
+              for basename, v in value.items()
+          ]
+      )
+    else:
+      raise Exception(
+          'Unexpected value in tempdir tree: %s' % value
+      )
+
+    if expected_leaf_count != None:
+      self.assertEqual(actual_leaf_count, expected_leaf_count)
+
+    return actual_leaf_count
+
+  _test_tree = {
+      'path1': '111',
+      'path2': {
+          '2': '222',
+          'emptydir': None
+      },
+      'aaa': {
+          'b1': 'b1',
+          'b2': None,
+          'bbb': {
+              'ccc': {
+                  'ddd': 'DDD'
+              }
+          },
+          'c': None
+      }
+  }
+
+  def test_delete_globs(self):
+    dir = os.path.join(self.tmpdir, 'dir')
+    self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+    self.fs.delete([
+        os.path.join(dir, 'path*'),
+        os.path.join(dir, 'aaa/b*')
+    ])
+
+    # One empty nested directory is left
+    self.check_tree(
+        dir,
+        {
+            'aaa': {
+                'c': None
+            }
+        },
+        expected_leaf_count=1
+    )
+
+  def test_recursive_delete(self):
+    dir = os.path.join(self.tmpdir, 'dir')
+    self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+    self.fs.delete([dir])
+
+    self.check_tree(
+        self.tmpdir,
+        {'': None},
+        expected_leaf_count=1
+    )
+
+  def test_delete_glob_errors(self):
+    dir = os.path.join(self.tmpdir, 'dir')
+    self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+    with self.assertRaisesRegexp(BeamIOError,
+                                 r'^Delete operation failed') as error:
+      self.fs.delete([
+          os.path.join(dir, 'path*'),
+          os.path.join(dir, 'aaa/b*'),
+          os.path.join(dir, 'aaa/d*')  # doesn't match anything, will raise
+      ])
+
+    self.check_tree(
+        dir,
+        {
+            'aaa': {
+                'c': None
+            }
+        },
+        expected_leaf_count=1
+    )
+
+    self.assertEqual(
+        list(
+            error
+            .exception
+            .exception_details
+            .keys()
+        ),
+        [os.path.join(dir, 'aaa/d*')]
+    )
+
+    with self.assertRaisesRegexp(BeamIOError,
+                                 r'^Delete operation failed') as error:
+      self.fs.delete([
+          os.path.join(dir, 'path*')  # doesn't match anything, will raise
+      ])
+
+    self.check_tree(
+        dir,
+        {
+            'aaa': {
+                'c': None
+            }
+        },
+        expected_leaf_count=1
+    )
+
+    self.assertEqual(
+        list(
+            error
+            .exception
+            .exception_details
+            .keys()
+        ),
+        [os.path.join(dir, 'path*')]
+    )
+
   def test_delete(self):
     path1 = os.path.join(self.tmpdir, 'f1')
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145494)
    Time Spent: 1h 50m  (was: 1h 40m)

> LocalFileSystem.delete doesn't support globbing
> -----------------------------------------------
>
>                 Key: BEAM-4711
>                 URL: https://issues.apache.org/jira/browse/BEAM-4711
>             Project: Beam
>          Issue Type: Task
>          Components: sdk-py-core
>    Affects Versions: 2.5.0
>            Reporter: Ryan Williams
>            Assignee: Ryan Williams
>            Priority: Minor
>             Fix For: 2.8.0
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> I attempted to run {{wordcount_it_test:WordCountIT.test_wordcount_it}} locally with {{DirectRunner}}:
> {code}
> python setup.py nosetests \
>   --tests apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it \
>   --test-pipeline-options="--output=foo"
> {code}
> It failed in [the {{delete_files}} cleanup command|https://github.com/apache/beam/blob/a58f1ffaafb0e2ebcc73a1c5abfb05a15ec6a84b/sdks/python/apache_beam/examples/wordcount_it_test.py#L64]:
> {code}
> root: WARNING: Retry with exponential backoff: waiting for 11.1454450937 seconds before
retrying delete_files because we caught exception: BeamIOError: Delete operation failed with
exceptions {'foo/1530557644/results*': IOError(OSError(2, 'No such file or directory'),)}
>  Traceback for above exception (most recent call last):
>   File "/Users/ryan/c/beam/sdks/python/apache_beam/utils/retry.py", line 184, in wrapper
>     return fun(*args, **kwargs)
>   File "/Users/ryan/c/beam/sdks/python/apache_beam/testing/test_utils.py", line 136,
in delete_files
>     FileSystems.delete(file_paths)
>   File "/Users/ryan/c/beam/sdks/python/apache_beam/io/filesystems.py", line 282, in delete
>     return filesystem.delete(paths)
>   File "/Users/ryan/c/beam/sdks/python/apache_beam/io/localfilesystem.py", line 304,
in delete
>     raise BeamIOError("Delete operation failed", exceptions)
> {code}
> The line:
> {code}
> self.addCleanup(delete_files, [output + '*'])
> {code}
> works as expected in GCS, and deletes a test's output-directory, but it fails in on the
local-filesystem, which doesn't expand globs before attempting to delete paths.
> It would be good to make these consistent, presumably by adding glob-support to {{LocalFileSystem}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message