crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [crunch] branch master updated: CRUNCH-660, CRUNCH-675: Use DistCp instead of FileUtils.copy when source and destination paths are in different filesystems
Date Fri, 25 Jan 2019 02:08:22 GMT
This is an automated email from the ASF dual-hosted git repository.

jwills pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git


The following commit(s) were added to refs/heads/master by this push:
     new 07458f7  CRUNCH-660, CRUNCH-675: Use DistCp instead of FileUtils.copy when source
and destination paths are in different filesystems
07458f7 is described below

commit 07458f78282e1b55aee90960818f5fcb35dae5c0
Author: Andrew Olson <aolson1@cerner.com>
AuthorDate: Wed Jan 23 11:23:57 2019 -0600

    CRUNCH-660, CRUNCH-675: Use DistCp instead of FileUtils.copy when source and destination
paths are in different filesystems
    
    Signed-off-by: Josh Wills <jwills@apache.org>
---
 crunch-core/pom.xml                                |  6 ++
 .../crunch/impl/mr/run/RuntimeParameters.java      |  4 ++
 .../org/apache/crunch/io/impl/FileTargetImpl.java  | 81 ++++++++++++++++++++--
 crunch-hbase/pom.xml                               |  6 ++
 .../org/apache/crunch/io/hbase/HFileTarget.java    | 71 +++++++++++++++++++
 pom.xml                                            |  6 ++
 6 files changed, 167 insertions(+), 7 deletions(-)

diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index 4b41203..cd77373 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -67,6 +67,12 @@ under the License.
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- Override the slf4j dependency from Avro, which is incompatible with
          Hadoop's. -->
     <dependency>
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index a36b910..f8b1e76 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -47,6 +47,10 @@ public final class RuntimeParameters {
 
   public static final String MAX_POLL_INTERVAL = "crunch.max.poll.interval";
 
+  public static final String FILE_TARGET_USE_DISTCP = "crunch.file.target.use.distcp";
+
+  public static final String FILE_TARGET_MAX_DISTCP_TASKS = "crunch.file.target.max.distcp.tasks";
+
   // Not instantiated
   private RuntimeParameters() {
   }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 5f4cfbb..e8b1dfe 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.io.impl;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -29,7 +30,6 @@ import java.util.regex.Pattern;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -54,6 +54,8 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -162,26 +164,51 @@ public class FileTargetImpl implements PathTarget {
   @Override
   public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException
{
     FileSystem srcFs = workingPath.getFileSystem(conf);
-    Path src = getSourcePattern(workingPath, index);
-    Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
     FileSystem dstFs = path.getFileSystem(conf);
     if (!dstFs.exists(path)) {
       dstFs.mkdirs(path);
     }
+    Path srcPattern = getSourcePattern(workingPath, index);
     boolean sameFs = isCompatible(srcFs, path);
+    boolean useDistributedCopy = conf.getBoolean(RuntimeParameters.FILE_TARGET_USE_DISTCP,
true);
+    int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS,
1000);
+    int maxThreads = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1);
+
+    if (!sameFs) {
+      if (useDistributedCopy) {
+        LOG.info("Source and destination are in different file systems, performing distributed
copy from {} to {}", srcPattern,
+            path);
+        handeOutputsDistributedCopy(conf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks);
+      } else {
+        LOG.info("Source and destination are in different file systems, performing asynch
copies from {} to {}", srcPattern, path);
+        handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
+      }
+    } else {
+      LOG.info("Source and destination are in the same file system, performing asynch renames
from {} to {}", srcPattern, path);
+      handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
+    }
+
+  }
+
+  private void handleOutputsAsynchronously(Configuration conf, Path srcPattern, FileSystem
srcFs, FileSystem dstFs,
+          boolean sameFs, int maxThreads) throws IOException {
+    Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern), srcPattern);
     List<ListenableFuture<Boolean>> renameFutures = Lists.newArrayList();
     ListeningExecutorService executorService =
         MoreExecutors.listeningDecorator(
             Executors.newFixedThreadPool(
-                conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1)));
+                maxThreads));
     for (Path s : srcs) {
       Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
       renameFutures.add(
           executorService.submit(
               new WorkingPathFileMover(conf, s, d, srcFs, dstFs, sameFs)));
     }
-    LOG.debug("Renaming " + renameFutures.size() + " files.");
-
+    if (sameFs) {
+      LOG.info("Renaming {} files using at most {} threads.", renameFutures.size(), maxThreads);
+    } else {
+      LOG.info("Copying {} files using at most {} threads.", renameFutures.size(), maxThreads);
+    }
     ListenableFuture<List<Boolean>> future =
         Futures.successfulAsList(renameFutures);
     List<Boolean> renameResults = null;
@@ -193,9 +220,49 @@ public class FileTargetImpl implements PathTarget {
       executorService.shutdownNow();
     }
     if (renameResults != null && !renameResults.contains(false)) {
+      if (sameFs) {
+        LOG.info("Renamed {} files.", renameFutures.size());
+      } else {
+        LOG.info("Copied {} files.", renameFutures.size());
+      }
       dstFs.create(getSuccessIndicator(), true).close();
-      LOG.debug("Renamed " + renameFutures.size() + " files.");
+      LOG.info("Created success indicator file");
+    }
+  }
+
+  private void handeOutputsDistributedCopy(Configuration conf, Path srcPattern, FileSystem
srcFs, FileSystem dstFs,
+          int maxDistributedCopyTasks) throws IOException {
+    Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern), srcPattern);
+    if (srcs.length > 0) {
+      LOG.info("Distributed copying {} files using at most {} tasks", srcs.length, maxDistributedCopyTasks);
+      // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use
the direct write
+      // distcp optimization if the target path is in S3
+      DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
+      options.setMaxMaps(maxDistributedCopyTasks);
+      options.setOverwrite(true);
+      options.setBlocking(true);
+
+      Configuration distCpConf = new Configuration(conf);
+      // Remove unnecessary and problematic properties from the DistCp configuration. This
is necessary since
+      // files referenced by these properties may have already been deleted when the DistCp
is being started.
+      distCpConf.unset("mapreduce.job.cache.files");
+      distCpConf.unset("mapreduce.job.classpath.files");
+      distCpConf.unset("tmpjars");
+
+      try {
+        DistCp distCp = new DistCp(distCpConf, options);
+        if (!distCp.execute().isSuccessful()) {
+          throw new CrunchRuntimeException("Distributed copy failed from " + srcPattern +
" to " + path);
+        }
+        LOG.info("Distributed copy completed for {} files", srcs.length);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException("Distributed copy failed from " + srcPattern + "
to " + path, e);
+      }
+    } else {
+      LOG.info("No files found to distributed copy at {}", srcPattern);
     }
+    dstFs.create(getSuccessIndicator(), true).close();
+    LOG.info("Created success indicator file");
   }
   
   protected Path getSuccessIndicator() {
diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml
index 075b197..cc0ec7d 100644
--- a/crunch-hbase/pom.xml
+++ b/crunch-hbase/pom.xml
@@ -56,6 +56,12 @@ under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-common</artifactId>
       <type>jar</type>
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index b1ce5ba..420c9dd 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -17,12 +17,16 @@
  */
 package org.apache.crunch.io.hbase;
 
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -30,9 +34,18 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
 
 public class HFileTarget extends FileTargetImpl {
 
+  private static final Logger LOG = LoggerFactory.getLogger(HFileTarget.class);
+
   public HFileTarget(String path) {
     this(new Path(path));
   }
@@ -78,6 +91,64 @@ public class HFileTarget extends FileTargetImpl {
   }
 
   @Override
+  public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException
{
+    FileSystem srcFs = workingPath.getFileSystem(conf);
+    Path src = getSourcePattern(workingPath, index);
+    Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
+    FileSystem dstFs = path.getFileSystem(conf);
+    if (!dstFs.exists(path)) {
+      dstFs.mkdirs(path);
+    }
+    boolean sameFs = isCompatible(srcFs, path);
+
+    if (!sameFs) {
+      if (srcs.length > 0) {
+        int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS,
1000);
+        LOG.info(
+                "Source and destination are in different file systems, performing distcp
of {} files from [{}] to [{}] "
+                        + "using at most {} tasks",
+                new Object[] { srcs.length, src, path, maxDistributedCopyTasks });
+        // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use
the direct write
+        // distcp optimization if the target path is in S3
+        DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
+        options.setMaxMaps(maxDistributedCopyTasks);
+        options.setOverwrite(true);
+        options.setBlocking(true);
+
+        Configuration distCpConf = new Configuration(conf);
+        // Remove unnecessary and problematic properties from the DistCp configuration. This
is necessary since
+        // files referenced by these properties may have already been deleted when the DistCp
is being started.
+        distCpConf.unset("mapreduce.job.cache.files");
+        distCpConf.unset("mapreduce.job.classpath.files");
+        distCpConf.unset("tmpjars");
+
+        try {
+          DistCp distCp = new DistCp(distCpConf, options);
+          if (!distCp.execute().isSuccessful()) {
+            throw new CrunchRuntimeException("Unable to move files through distcp from "
+ src + " to " + path);
+          }
+          LOG.info("Distributed copy completed for {} files", srcs.length);
+        } catch (Exception e) {
+          throw new CrunchRuntimeException("Unable to move files through distcp from " +
src + " to " + path, e);
+        }
+      } else {
+        LOG.info("No files found at [{}], not attempting to copy HFiles", src);
+      }
+    } else {
+      LOG.info(
+              "Source and destination are in the same file system, performing rename of {}
files from [{}] to [{}]",
+              new Object[] { srcs.length, src, path });
+
+      for (Path s : srcs) {
+        Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
+        srcFs.rename(s, d);
+      }
+    }
+    dstFs.create(getSuccessIndicator(), true).close();
+    LOG.info("Created success indicator file");
+  }
+
+  @Override
   public String toString() {
     return "HFile(" + path + ")";
   }
diff --git a/pom.xml b/pom.xml
index 11b87fd..ca689ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -201,6 +201,12 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-distcp</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.hive.hcatalog</groupId>
         <artifactId>hive-hcatalog-core</artifactId>
         <version>${hive.version}</version>


Mime
View raw message