sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mau...@apache.org
Subject sqoop git commit: SQOOP-3136: Add support to Sqoop being able to handle different file system urls (e.g. s3a://some-bucket/tmp/sqoop)
Date Fri, 24 Feb 2017 06:41:44 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk d8c4b3ccd -> 9466a0c7d


SQOOP-3136: Add support to Sqoop being able to handle different file
system urls (e.g. s3a://some-bucket/tmp/sqoop)

(Illya Yalovyy via Attila Szabo)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/9466a0c7
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/9466a0c7
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/9466a0c7

Branch: refs/heads/trunk
Commit: 9466a0c7d9a585a94a472fc672cad87c30c8125b
Parents: d8c4b3c
Author: Attila Szabo <maugli@apache.org>
Authored: Fri Feb 24 07:40:48 2017 +0100
Committer: Attila Szabo <maugli@apache.org>
Committed: Fri Feb 24 07:40:48 2017 +0100

----------------------------------------------------------------------
 .../com/cloudera/sqoop/io/LobReaderCache.java   |  2 +-
 src/java/org/apache/sqoop/hive/HiveImport.java  |  4 +-
 .../org/apache/sqoop/hive/TableDefWriter.java   |  4 +-
 .../org/apache/sqoop/io/LobReaderCache.java     | 24 +-------
 .../apache/sqoop/io/SplittingOutputStream.java  |  3 +-
 .../org/apache/sqoop/lib/LargeObjectLoader.java |  2 +-
 .../sqoop/manager/oracle/OraOopUtilities.java   |  6 +-
 .../sqoop/mapreduce/CombineFileInputFormat.java |  5 +-
 .../sqoop/mapreduce/DataDrivenImportJob.java    |  6 +-
 .../apache/sqoop/mapreduce/ExportJobBase.java   |  3 +-
 .../sqoop/mapreduce/HBaseBulkImportJob.java     |  8 +--
 .../apache/sqoop/mapreduce/JdbcExportJob.java   |  5 +-
 .../sqoop/mapreduce/JdbcUpdateExportJob.java    |  5 +-
 .../org/apache/sqoop/mapreduce/MergeJob.java    |  7 ++-
 src/java/org/apache/sqoop/tool/ImportTool.java  | 26 +++++---
 .../org/apache/sqoop/util/FileSystemUtil.java   | 45 ++++++++++++++
 .../org/apache/sqoop/util/FileUploader.java     | 12 +---
 .../apache/sqoop/util/TestFileSystemUtil.java   | 65 ++++++++++++++++++++
 18 files changed, 162 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/com/cloudera/sqoop/io/LobReaderCache.java
----------------------------------------------------------------------
diff --git a/src/java/com/cloudera/sqoop/io/LobReaderCache.java b/src/java/com/cloudera/sqoop/io/LobReaderCache.java
index 3394296..89d31d3 100644
--- a/src/java/com/cloudera/sqoop/io/LobReaderCache.java
+++ b/src/java/com/cloudera/sqoop/io/LobReaderCache.java
@@ -59,7 +59,7 @@ public final class LobReaderCache extends org.apache.sqoop.io.LobReaderCache
{
    */
   public static Path qualify(Path path, Configuration conf)
       throws IOException {
-    return org.apache.sqoop.io.LobReaderCache.qualify(path, conf);
+    return org.apache.sqoop.util.FileSystemUtil.makeQualified(path, conf);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/hive/HiveImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java
index 4828375..153d091 100644
--- a/src/java/org/apache/sqoop/hive/HiveImport.java
+++ b/src/java/org/apache/sqoop/hive/HiveImport.java
@@ -115,7 +115,7 @@ public class HiveImport {
    * from where we put it, before running Hive LOAD DATA INPATH.
    */
   private void removeTempLogs(Path tablePath) throws IOException {
-    FileSystem fs = FileSystem.get(configuration);
+    FileSystem fs = tablePath.getFileSystem(configuration);
     Path logsPath = new Path(tablePath, "_logs");
     if (fs.exists(logsPath)) {
       LOG.info("Removing temporary files from import process: " + logsPath);
@@ -263,7 +263,7 @@ public class HiveImport {
    * @throws IOException
    */
   private void cleanUp(Path outputPath) throws IOException {
-    FileSystem fs = FileSystem.get(configuration);
+    FileSystem fs = outputPath.getFileSystem(configuration);
 
     // HIVE is not always removing input directory after LOAD DATA statement
     // (which is our export directory). We're removing export directory in case

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/hive/TableDefWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hive/TableDefWriter.java b/src/java/org/apache/sqoop/hive/TableDefWriter.java
index c9962e9..32fcca3 100644
--- a/src/java/org/apache/sqoop/hive/TableDefWriter.java
+++ b/src/java/org/apache/sqoop/hive/TableDefWriter.java
@@ -36,6 +36,7 @@ import org.apache.sqoop.io.CodecMap;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.manager.ConnManager;
+import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * Creates (Hive-specific) SQL DDL statements to create tables to hold data
@@ -271,8 +272,7 @@ public class TableDefWriter {
     } else {
       tablePath = warehouseDir + inputTableName;
     }
-    FileSystem fs = FileSystem.get(configuration);
-    return new Path(tablePath).makeQualified(fs);
+    return FileSystemUtil.makeQualified(new Path(tablePath), configuration);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/io/LobReaderCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/io/LobReaderCache.java b/src/java/org/apache/sqoop/io/LobReaderCache.java
index bd75374..dbfa4f1 100644
--- a/src/java/org/apache/sqoop/io/LobReaderCache.java
+++ b/src/java/org/apache/sqoop/io/LobReaderCache.java
@@ -24,10 +24,10 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.cloudera.sqoop.io.LobFile;
+import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * A cache of open LobFile.Reader objects.
@@ -55,7 +55,7 @@ public class LobReaderCache {
       throws IOException {
 
     LobFile.Reader reader = null;
-    Path canonicalPath = qualify(path, conf);
+    Path canonicalPath = FileSystemUtil.makeQualified(path, conf);
     // Look up an entry in the cache.
     synchronized(this) {
       reader = readerMap.remove(canonicalPath);
@@ -111,24 +111,4 @@ public class LobReaderCache {
   protected LobReaderCache() {
     this.readerMap = new TreeMap<Path, LobFile.Reader>();
   }
-
-  /**
-   * Created a fully-qualified path object.
-   * @param path the path to fully-qualify with its fs URI.
-   * @param conf the current Hadoop FS configuration.
-   * @return a new path representing the same location as the input 'path',
-   * but with a fully-qualified URI.
-   */
-  public static Path qualify(Path path, Configuration conf)
-      throws IOException {
-    if (null == path) {
-      return null;
-    }
-
-    FileSystem fs = path.getFileSystem(conf);
-    if (null == fs) {
-      fs = FileSystem.get(conf);
-    }
-    return path.makeQualified(fs);
-  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/io/SplittingOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/io/SplittingOutputStream.java b/src/java/org/apache/sqoop/io/SplittingOutputStream.java
index 5f98192..129b508 100644
--- a/src/java/org/apache/sqoop/io/SplittingOutputStream.java
+++ b/src/java/org/apache/sqoop/io/SplittingOutputStream.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * An output stream that writes to an underlying filesystem, opening
@@ -90,7 +91,7 @@ public class SplittingOutputStream extends OutputStream {
     FileSystem fs = destFile.getFileSystem(conf);
     LOG.debug("Opening next output file: " + destFile);
     if (fs.exists(destFile)) {
-      Path canonicalDest = destFile.makeQualified(fs);
+      Path canonicalDest = fs.makeQualified(destFile);
       throw new IOException("Destination file " + canonicalDest
           + " already exists");
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/lib/LargeObjectLoader.java b/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
index 70c0f4e..b8525fe 100644
--- a/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
+++ b/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
@@ -79,7 +79,7 @@ public class LargeObjectLoader implements Closeable  {
       throws IOException {
     this.conf = conf;
     this.workPath = workPath;
-    this.fs = FileSystem.get(conf);
+    this.fs = workPath.getFileSystem(conf);
     this.curBlobWriter = null;
     this.curClobWriter = null;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
index e81588c..e73fd68 100644
--- a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
@@ -714,16 +714,16 @@ public final class OraOopUtilities {
 
     Path uniqueFileName = null;
     try {
-      FileSystem fileSystem = FileSystem.get(conf);
-
       // NOTE: This code is not thread-safe.
       // i.e. A race-condition could still cause this code to 'fail'.
 
       int suffix = 0;
       String fileNameTemplate = fileName + "%s";
+      Path outputDirectory = new Path(getOutputDirectory(conf));
+      FileSystem fileSystem = outputDirectory.getFileSystem(conf);
       while (true) {
         uniqueFileName =
-            new Path(getOutputDirectory(conf), String.format(fileNameTemplate,
+            new Path(outputDirectory, String.format(fileNameTemplate,
                 suffix == 0 ? "" : String.format(" (%d)", suffix)));
         if (!fileSystem.exists(uniqueFileName)) {
           break;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
index e08f997..fd2cf89 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * This file was ported from Hadoop 2.0.2-alpha
@@ -224,11 +225,9 @@ public abstract class CombineFileInputFormat<K, V>
     // times, one time each for each pool in the next loop.
     List<Path> newpaths = new LinkedList<Path>();
     for (int i = 0; i < paths.length; i++) {
-      FileSystem fs = paths[i].getFileSystem(conf);
-
       //the scheme and authority will be kept if the path is
       //a valid path for a non-default file system
-      Path p = fs.makeQualified(paths[i]);
+      Path p = FileSystemUtil.makeQualified(paths[i], conf);
       newpaths.add(p);
     }
     paths = null;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 260bc29..dc49282 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -48,6 +49,7 @@ import com.cloudera.sqoop.mapreduce.ImportJobBase;
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
 import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+import org.apache.sqoop.util.FileSystemUtil;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
 
@@ -141,8 +143,8 @@ public class DataDrivenImportJob extends ImportJobBase {
           options.getHiveTableName();
       return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
     } else {
-      FileSystem fs = FileSystem.get(conf);
-      return "dataset:" + fs.makeQualified(getContext().getDestination());
+      Path destination = getContext().getDestination();
+      return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 27f84da..c7609a5 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -50,6 +50,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Date;
+import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * Base class for running an export MapReduce job.
@@ -232,7 +233,7 @@ public class ExportJobBase extends JobBase {
     }
     Path inputPath = new Path(context.getOptions().getExportDir());
     Configuration conf = options.getConf();
-    inputPath = inputPath.makeQualified(FileSystem.get(conf));
+    inputPath = FileSystemUtil.makeQualified(inputPath, conf);
     return inputPath;
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
index b32cdd1..2bbfffe 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
@@ -92,11 +92,10 @@ public class HBaseBulkImportJob extends HBaseImportJob {
   protected void completeImport(Job job) throws IOException, ImportException {
     super.completeImport(job);
 
-    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
-
     // Make the bulk load files source directory accessible to the world
     // so that the hbase user can deal with it
     Path bulkLoadDir = getContext().getDestination();
+    FileSystem fileSystem = bulkLoadDir.getFileSystem(job.getConfiguration());
     setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
       FsPermission.createImmutable((short) 00777));
 
@@ -120,8 +119,9 @@ public class HBaseBulkImportJob extends HBaseImportJob {
   protected void jobTeardown(Job job) throws IOException, ImportException {
     super.jobTeardown(job);
     // Delete the hfiles directory after we are finished.
-    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
-    fileSystem.delete(getContext().getDestination(), true);
+    Path destination = getContext().getDestination();
+    FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
+    fileSystem.delete(destination, true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 626119b..6f9afaf 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -25,7 +25,6 @@ import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
@@ -38,6 +37,7 @@ import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
 
 import java.io.IOException;
 import java.util.Map;
+import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * Run an export using JDBC (JDBC-based ExportOutputFormat).
@@ -79,8 +79,7 @@ public class JdbcExportJob extends ExportJobBase {
     } else if (fileType == FileType.PARQUET_FILE) {
       LOG.debug("Configuring for Parquet export");
       configureGenericRecordExportInputFormat(job, tableName);
-      FileSystem fs = FileSystem.get(job.getConfiguration());
-      String uri = "dataset:" + fs.makeQualified(getInputPath());
+      String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
       DatasetKeyInputFormat.configure(job).readFrom(uri);
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index f911280..d13b560 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -26,7 +26,6 @@ import java.util.Set;
 import java.util.StringTokenizer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
@@ -43,6 +42,7 @@ import com.cloudera.sqoop.manager.ExportJobContext;
 import com.cloudera.sqoop.mapreduce.ExportJobBase;
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@@ -187,8 +187,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     } else if (fileType == FileType.PARQUET_FILE) {
       LOG.debug("Configuring for Parquet export");
       configureGenericRecordExportInputFormat(job, tableName);
-      FileSystem fs = FileSystem.get(job.getConfiguration());
-      String uri = "dataset:" + fs.makeQualified(getInputPath());
+      String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
       DatasetKeyInputFormat.configure(job).readFrom(uri);
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/MergeJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index 5b6c4df..8b1cba3 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -46,6 +46,7 @@ import org.apache.sqoop.util.Jars;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.mapreduce.JobBase;
+import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * Run a MapReduce job that merges two datasets.
@@ -111,9 +112,9 @@ public class MergeJob extends JobBase {
       Path newPath = new Path(options.getMergeNewPath());
 
       Configuration jobConf = job.getConfiguration();
-      FileSystem fs = FileSystem.get(jobConf);
-      oldPath = oldPath.makeQualified(fs);
-      newPath = newPath.makeQualified(fs);
+
+      oldPath = FileSystemUtil.makeQualified(oldPath, jobConf);
+      newPath = FileSystemUtil.makeQualified(newPath, jobConf);
 
       propagateOptionsToJob(job);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 258ef79..d1c9749 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -300,7 +300,6 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
       return true;
     }
 
-    FileSystem fs = FileSystem.get(options.getConf());
     SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
     String nextIncrementalValue = null;
 
@@ -325,11 +324,14 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
       }
       break;
     case DateLastModified:
-      if (options.getMergeKeyCol() == null && !options.isAppendMode()
-          && fs.exists(getOutputPath(options, context.getTableName(), false))) {
-        throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
-          + " is required when using --" + this.INCREMENT_TYPE_ARG
-          + " lastmodified and the output directory exists.");
+      if (options.getMergeKeyCol() == null && !options.isAppendMode()) {
+        Path outputPath = getOutputPath(options, context.getTableName(), false);
+        FileSystem fs = outputPath.getFileSystem(options.getConf());
+        if (fs.exists(outputPath)) {
+          throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
+            + " is required when using --" + this.INCREMENT_TYPE_ARG
+            + " lastmodified and the output directory exists.");
+        }
       }
       checkColumnType = manager.getColumnTypes(options.getTableName(),
         options.getSqlQuery()).get(options.getIncrementalTestColumn());
@@ -436,10 +438,14 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
    * Merge HDFS output directories
    */
   protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws
IOException {
-    FileSystem fs = FileSystem.get(options.getConf());
-    if (context.getDestination() != null && fs.exists(context.getDestination()))
{
+    if (context.getDestination() == null) {
+      return;
+    }
+
+    Path userDestDir = getOutputPath(options, context.getTableName(), false);
+    FileSystem fs = userDestDir.getFileSystem(options.getConf());
+    if (fs.exists(context.getDestination())) {
       LOG.info("Final destination exists, will run merge job.");
-      Path userDestDir = getOutputPath(options, context.getTableName(), false);
       if (fs.exists(userDestDir)) {
         String tableClassName = null;
         if (!context.getConnManager().isORMFacilitySelfManaged()) {
@@ -541,8 +547,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
   private void deleteTargetDir(ImportJobContext context) throws IOException {
 
     SqoopOptions options = context.getOptions();
-    FileSystem fs = FileSystem.get(options.getConf());
     Path destDir = context.getDestination();
+    FileSystem fs = destDir.getFileSystem(options.getConf());
 
     if (fs.exists(destDir)) {
       fs.delete(destDir, true);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/util/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/FileSystemUtil.java b/src/java/org/apache/sqoop/util/FileSystemUtil.java
new file mode 100644
index 0000000..1493e09
--- /dev/null
+++ b/src/java/org/apache/sqoop/util/FileSystemUtil.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public final class FileSystemUtil {
+  private FileSystemUtil() {
+  }
+
+
+  /**
+   * Creates a fully-qualified path object.
+   * @param path the path to fully-qualify with its file system URI.
+   * @param conf the current Hadoop configuration.
+   * @return a new path representing the same location as the input path,
+   * but with a fully-qualified URI. Returns {@code null} if provided path is {@code null};
+   */
+  public static Path makeQualified(Path path, Configuration conf)
+      throws IOException {
+    if (null == path) {
+      return null;
+    }
+
+    return path.getFileSystem(conf).makeQualified(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/util/FileUploader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/FileUploader.java b/src/java/org/apache/sqoop/util/FileUploader.java
index 155cffc..673a05b 100644
--- a/src/java/org/apache/sqoop/util/FileUploader.java
+++ b/src/java/org/apache/sqoop/util/FileUploader.java
@@ -18,16 +18,11 @@
 
 package org.apache.sqoop.util;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -40,15 +35,14 @@ public class FileUploader {
   public static void uploadFilesToDFS(String srcBasePath, String src,
     String destBasePath, String dest, Configuration conf) throws IOException {
 
-    FileSystem fs = FileSystem.get(conf);
-    Path targetPath = null;
     Path srcPath = new Path(srcBasePath, src);
 
-    if (destBasePath == null || destBasePath.length() == 0) {
+    if (destBasePath == null || destBasePath.isEmpty()) {
       destBasePath = ".";
     }
 
-    targetPath = new Path(destBasePath, dest);
+    Path targetPath = new Path(destBasePath, dest);
+    FileSystem fs = targetPath.getFileSystem(conf);
 
     if (!fs.exists(targetPath)) {
       fs.mkdirs(targetPath);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/test/org/apache/sqoop/util/TestFileSystemUtil.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/util/TestFileSystemUtil.java b/src/test/org/apache/sqoop/util/TestFileSystemUtil.java
new file mode 100644
index 0000000..fef74af
--- /dev/null
+++ b/src/test/org/apache/sqoop/util/TestFileSystemUtil.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+import org.junit.Before;
+import static org.junit.Assert.*;
+
+public class TestFileSystemUtil {
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    conf = new Configuration();
+    conf.set("fs.my.impl", MyFileSystem.class.getTypeName());
+  }
+
+  @Test
+  public void testMakeQualifiedWhenPathIsNullThenReturnsNull() throws IOException {
+    assertNull(FileSystemUtil.makeQualified(null, conf));
+  }
+
+  @Test
+  public void testMakeQualifiedWhenPathIsRelativeThenReturnDefault() throws IOException {
+    Path actual = FileSystemUtil.makeQualified(new Path("foo/bar"), conf);
+    assertEquals("file", actual.toUri().getScheme());
+  }
+
+  @Test
+  public void testMakeQualifiedWhenPathHasCustomSchemaThenReturnSameSchema() throws IOException
{
+    Path actual = FileSystemUtil.makeQualified(new Path("my:/foo/bar"), conf);
+    assertEquals("my", actual.toUri().getScheme());
+  }
+
+  @Test(expected = IOException.class)
+  public void testMakeQualifiedWhenPathHasBadSchemaThenThrowsIOException() throws IOException
{
+    FileSystemUtil.makeQualified(new Path("nosuchfs://foo/bar"), conf);
+  }
+
+  public static final class MyFileSystem extends RawLocalFileSystem {
+    @Override
+    public URI getUri() { return URI.create("my:///"); }
+  }
+}


Mime
View raw message