beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] incubator-beam git commit: Simplify FileOperations interface
Date Tue, 15 Nov 2016 00:30:38 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master ee55f6e39 -> 0117a2834


Simplify FileOperations interface

Refactor removeDirectoryAndFiles() into FileBasedSink.

FileIOChannelFactory.match() returns the empty list on an absent parent
directory instead of throwing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1bd0f8bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1bd0f8bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1bd0f8bc

Branch: refs/heads/master
Commit: 1bd0f8bcc6213586318cb363e9ed746ae836feeb
Parents: ee55f6e
Author: Pei He <peihe@google.com>
Authored: Thu Nov 10 13:03:38 2016 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Nov 14 16:27:23 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 77 +++++++++-----------
 .../beam/sdk/util/FileIOChannelFactory.java     | 11 ++-
 .../beam/sdk/util/FileIOChannelFactoryTest.java |  8 ++
 3 files changed, 50 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bd0f8bc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 2d058ae..f11fbee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 
 import java.io.File;
@@ -459,7 +460,25 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       LOG.debug("Removing temporary bundle output files in {}.", tempDirectory);
       FileOperations fileOperations =
           FileOperationsFactory.getFileOperations(tempDirectory, options);
-      fileOperations.removeDirectoryAndFiles(tempDirectory, knownFiles);
+      IOChannelFactory factory = IOChannelUtils.getFactory(tempDirectory);
+
+      // To partially mitigate the effects of filesystems with eventually-consistent
+      // directory matching APIs, we remove not only files that the filesystem says exist
+      // in the directory (which may be incomplete), but also files that are known to exist
+      // (produced by successfully completed bundles).
+      // This may still fail to remove temporary outputs of some failed bundles, but at least
+      // the common case (where all bundles succeed) is guaranteed to be fully addressed.
+      Collection<String> matches = factory.match(factory.resolve(tempDirectory, "*"));
+      Set<String> allMatches = new HashSet<>(matches);
+      allMatches.addAll(knownFiles);
+      LOG.debug(
+          "Removing {} temporary files found under {} ({} matched glob, {} known files)",
+          allMatches.size(),
+          tempDirectory,
+          matches.size(),
+          allMatches.size() - matches.size());
+      fileOperations.remove(allMatches);
+      fileOperations.remove(ImmutableList.of(tempDirectory));
     }
 
     /**
@@ -656,26 +675,22 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    */
   private interface FileOperations {
     /**
-     * Copy a collection of files from one location to another.
+     * Copies a collection of files from one location to another.
      *
      * <p>The number of source filenames must equal the number of destination filenames.
      *
      * @param srcFilenames the source filenames.
      * @param destFilenames the destination filenames.
      */
-     void copy(List<String> srcFilenames, List<String> destFilenames) throws
IOException;
+    void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
 
     /**
-     * Removes a directory and the files in it (but not subdirectories).
+     * Removes a collection of files or directories.
      *
-     * <p>Additionally, to partially mitigate the effects of filesystems with eventually-consistent
-     * directory matching APIs, takes a list of files that are known to exist - i.e. removes
the
-     * union of the known files and files that the filesystem says exist in the directory.
-     *
-     * <p>Assumes that, if directory listing had been strongly consistent, it would
have matched
-     * all of knownFiles - i.e. on a strongly consistent filesystem, knownFiles can be ignored.
+     * <p>Directories are required to be empty. Non-empty directories will not be deleted,
+     * and this method may return silently or throw an exception.
      */
-    void removeDirectoryAndFiles(String directory, List<String> knownFiles) throws
IOException;
+    void remove(Collection<String> filesOrDirs) throws IOException;
   }
 
   /**
@@ -694,21 +709,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     }
 
     @Override
-    public void removeDirectoryAndFiles(String directory, List<String> knownFiles)
-        throws IOException {
-      IOChannelFactory factory = IOChannelUtils.getFactory(directory);
-      Collection<String> matches = factory.match(directory + "/*");
-      Set<String> allMatches = new HashSet<>(matches);
-      allMatches.addAll(knownFiles);
-      LOG.debug(
-          "Removing {} temporary files found under {} ({} matched glob, {} additional known
files)",
-          allMatches.size(),
-          directory,
-          matches.size(),
-          allMatches.size() - matches.size());
-      gcsUtil.remove(allMatches);
-      // No need to remove the directory itself: GCS doesn't have directories, so if the
directory
-      // is empty, then it already doesn't exist.
+    public void remove(Collection<String> filesOrDirs) throws IOException {
+      gcsUtil.remove(filesOrDirs);
     }
   }
 
@@ -755,27 +757,18 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     }
 
     @Override
-    public void removeDirectoryAndFiles(String directory, List<String> knownFiles)
-        throws IOException {
-      if (!new File(directory).exists()) {
-        LOG.debug("Directory {} already doesn't exist", directory);
-        return;
-      }
-      Collection<String> matches = factory.match(new File(directory, "*").getAbsolutePath());
-      LOG.debug("Removing {} temporary files found under {}", matches.size(), directory);
-      for (String filename : matches) {
-        LOG.debug("Removing file {}", filename);
-        removeOne(filename);
+    public void remove(Collection<String> filesOrDirs) throws IOException {
+      for (String fileOrDir : filesOrDirs) {
+        LOG.debug("Removing file {}", fileOrDir);
+        removeOne(fileOrDir);
       }
-      LOG.debug("Removing directory {}", directory);
-      removeOne(directory);
     }
 
-    private void removeOne(String filename) throws IOException {
+    private void removeOne(String fileOrDir) throws IOException {
       // Delete the file if it exists.
-      boolean exists = Files.deleteIfExists(Paths.get(filename));
+      boolean exists = Files.deleteIfExists(Paths.get(fileOrDir));
       if (!exists) {
-        LOG.debug("{} does not exist.", filename);
+        LOG.debug("Tried to delete {}, but it did not exist", fileOrDir);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bd0f8bc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index b920efb..b5d85fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -37,6 +37,7 @@ import java.nio.file.Path;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Matcher;
@@ -65,16 +66,18 @@ public class FileIOChannelFactory implements IOChannelFactory {
     }
   }
 
-  // This implementation only allows for wildcards in the file name.
-  // The directory portion must exist as-is.
+  /**
+   * {@inheritDoc}
+   *
+   * <p>Wildcards in the directory portion are not supported.
+   */
   @Override
   public Collection<String> match(String spec) throws IOException {
     File file = specToFile(spec);
 
     File parent = file.getAbsoluteFile().getParentFile();
     if (!parent.exists()) {
-      throw new FileNotFoundException(
-          "Parent directory " + parent + " of " + spec + " does not exist");
+      return Collections.EMPTY_LIST;
     }
 
     // Method getAbsolutePath() on Windows platform may return something like

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bd0f8bc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
index 011b4f5..4d4f93b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
@@ -204,6 +204,14 @@ public class FileIOChannelFactoryTest {
   }
 
   @Test
+  public void testMatchWithoutParentDirectory() throws Exception {
+    String pattern = factory.resolve(
+        factory.resolve(temporaryFolder.getRoot().getPath(), "non_existing_dir"),
+        "*");
+    assertTrue(factory.match(pattern).isEmpty());
+  }
+
+  @Test
   public void testResolve() throws Exception {
     Path rootPath = temporaryFolder.getRoot().toPath();
     String rootString = rootPath.toString();


Mime
View raw message