Repository: sqoop
Updated Branches:
refs/heads/trunk d8c4b3ccd -> 9466a0c7d
SQOOP-3136: Add support to Sqoop being able to handle different file
system urls (e.g. s3a://some-bucket/tmp/sqoop)
(Illya Yalovyy via Attila Szabo)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/9466a0c7
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/9466a0c7
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/9466a0c7
Branch: refs/heads/trunk
Commit: 9466a0c7d9a585a94a472fc672cad87c30c8125b
Parents: d8c4b3c
Author: Attila Szabo <maugli@apache.org>
Authored: Fri Feb 24 07:40:48 2017 +0100
Committer: Attila Szabo <maugli@apache.org>
Committed: Fri Feb 24 07:40:48 2017 +0100
----------------------------------------------------------------------
.../com/cloudera/sqoop/io/LobReaderCache.java | 2 +-
src/java/org/apache/sqoop/hive/HiveImport.java | 4 +-
.../org/apache/sqoop/hive/TableDefWriter.java | 4 +-
.../org/apache/sqoop/io/LobReaderCache.java | 24 +-------
.../apache/sqoop/io/SplittingOutputStream.java | 3 +-
.../org/apache/sqoop/lib/LargeObjectLoader.java | 2 +-
.../sqoop/manager/oracle/OraOopUtilities.java | 6 +-
.../sqoop/mapreduce/CombineFileInputFormat.java | 5 +-
.../sqoop/mapreduce/DataDrivenImportJob.java | 6 +-
.../apache/sqoop/mapreduce/ExportJobBase.java | 3 +-
.../sqoop/mapreduce/HBaseBulkImportJob.java | 8 +--
.../apache/sqoop/mapreduce/JdbcExportJob.java | 5 +-
.../sqoop/mapreduce/JdbcUpdateExportJob.java | 5 +-
.../org/apache/sqoop/mapreduce/MergeJob.java | 7 ++-
src/java/org/apache/sqoop/tool/ImportTool.java | 26 +++++---
.../org/apache/sqoop/util/FileSystemUtil.java | 45 ++++++++++++++
.../org/apache/sqoop/util/FileUploader.java | 12 +---
.../apache/sqoop/util/TestFileSystemUtil.java | 65 ++++++++++++++++++++
18 files changed, 162 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/com/cloudera/sqoop/io/LobReaderCache.java
----------------------------------------------------------------------
diff --git a/src/java/com/cloudera/sqoop/io/LobReaderCache.java b/src/java/com/cloudera/sqoop/io/LobReaderCache.java
index 3394296..89d31d3 100644
--- a/src/java/com/cloudera/sqoop/io/LobReaderCache.java
+++ b/src/java/com/cloudera/sqoop/io/LobReaderCache.java
@@ -59,7 +59,7 @@ public final class LobReaderCache extends org.apache.sqoop.io.LobReaderCache
{
*/
public static Path qualify(Path path, Configuration conf)
throws IOException {
- return org.apache.sqoop.io.LobReaderCache.qualify(path, conf);
+ return org.apache.sqoop.util.FileSystemUtil.makeQualified(path, conf);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/hive/HiveImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java
index 4828375..153d091 100644
--- a/src/java/org/apache/sqoop/hive/HiveImport.java
+++ b/src/java/org/apache/sqoop/hive/HiveImport.java
@@ -115,7 +115,7 @@ public class HiveImport {
* from where we put it, before running Hive LOAD DATA INPATH.
*/
private void removeTempLogs(Path tablePath) throws IOException {
- FileSystem fs = FileSystem.get(configuration);
+ FileSystem fs = tablePath.getFileSystem(configuration);
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
@@ -263,7 +263,7 @@ public class HiveImport {
* @throws IOException
*/
private void cleanUp(Path outputPath) throws IOException {
- FileSystem fs = FileSystem.get(configuration);
+ FileSystem fs = outputPath.getFileSystem(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/hive/TableDefWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hive/TableDefWriter.java b/src/java/org/apache/sqoop/hive/TableDefWriter.java
index c9962e9..32fcca3 100644
--- a/src/java/org/apache/sqoop/hive/TableDefWriter.java
+++ b/src/java/org/apache/sqoop/hive/TableDefWriter.java
@@ -36,6 +36,7 @@ import org.apache.sqoop.io.CodecMap;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Creates (Hive-specific) SQL DDL statements to create tables to hold data
@@ -271,8 +272,7 @@ public class TableDefWriter {
} else {
tablePath = warehouseDir + inputTableName;
}
- FileSystem fs = FileSystem.get(configuration);
- return new Path(tablePath).makeQualified(fs);
+ return FileSystemUtil.makeQualified(new Path(tablePath), configuration);
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/io/LobReaderCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/io/LobReaderCache.java b/src/java/org/apache/sqoop/io/LobReaderCache.java
index bd75374..dbfa4f1 100644
--- a/src/java/org/apache/sqoop/io/LobReaderCache.java
+++ b/src/java/org/apache/sqoop/io/LobReaderCache.java
@@ -24,10 +24,10 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.io.LobFile;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* A cache of open LobFile.Reader objects.
@@ -55,7 +55,7 @@ public class LobReaderCache {
throws IOException {
LobFile.Reader reader = null;
- Path canonicalPath = qualify(path, conf);
+ Path canonicalPath = FileSystemUtil.makeQualified(path, conf);
// Look up an entry in the cache.
synchronized(this) {
reader = readerMap.remove(canonicalPath);
@@ -111,24 +111,4 @@ public class LobReaderCache {
protected LobReaderCache() {
this.readerMap = new TreeMap<Path, LobFile.Reader>();
}
-
- /**
- * Created a fully-qualified path object.
- * @param path the path to fully-qualify with its fs URI.
- * @param conf the current Hadoop FS configuration.
- * @return a new path representing the same location as the input 'path',
- * but with a fully-qualified URI.
- */
- public static Path qualify(Path path, Configuration conf)
- throws IOException {
- if (null == path) {
- return null;
- }
-
- FileSystem fs = path.getFileSystem(conf);
- if (null == fs) {
- fs = FileSystem.get(conf);
- }
- return path.makeQualified(fs);
- }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/io/SplittingOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/io/SplittingOutputStream.java b/src/java/org/apache/sqoop/io/SplittingOutputStream.java
index 5f98192..129b508 100644
--- a/src/java/org/apache/sqoop/io/SplittingOutputStream.java
+++ b/src/java/org/apache/sqoop/io/SplittingOutputStream.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* An output stream that writes to an underlying filesystem, opening
@@ -90,7 +91,7 @@ public class SplittingOutputStream extends OutputStream {
FileSystem fs = destFile.getFileSystem(conf);
LOG.debug("Opening next output file: " + destFile);
if (fs.exists(destFile)) {
- Path canonicalDest = destFile.makeQualified(fs);
+ Path canonicalDest = fs.makeQualified(destFile);
throw new IOException("Destination file " + canonicalDest
+ " already exists");
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/lib/LargeObjectLoader.java b/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
index 70c0f4e..b8525fe 100644
--- a/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
+++ b/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
@@ -79,7 +79,7 @@ public class LargeObjectLoader implements Closeable {
throws IOException {
this.conf = conf;
this.workPath = workPath;
- this.fs = FileSystem.get(conf);
+ this.fs = workPath.getFileSystem(conf);
this.curBlobWriter = null;
this.curClobWriter = null;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
index e81588c..e73fd68 100644
--- a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
@@ -714,16 +714,16 @@ public final class OraOopUtilities {
Path uniqueFileName = null;
try {
- FileSystem fileSystem = FileSystem.get(conf);
-
// NOTE: This code is not thread-safe.
// i.e. A race-condition could still cause this code to 'fail'.
int suffix = 0;
String fileNameTemplate = fileName + "%s";
+ Path outputDirectory = new Path(getOutputDirectory(conf));
+ FileSystem fileSystem = outputDirectory.getFileSystem(conf);
while (true) {
uniqueFileName =
- new Path(getOutputDirectory(conf), String.format(fileNameTemplate,
+ new Path(outputDirectory, String.format(fileNameTemplate,
suffix == 0 ? "" : String.format(" (%d)", suffix)));
if (!fileSystem.exists(uniqueFileName)) {
break;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
index e08f997..fd2cf89 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* This file was ported from Hadoop 2.0.2-alpha
@@ -224,11 +225,9 @@ public abstract class CombineFileInputFormat<K, V>
// times, one time each for each pool in the next loop.
List<Path> newpaths = new LinkedList<Path>();
for (int i = 0; i < paths.length; i++) {
- FileSystem fs = paths[i].getFileSystem(conf);
-
//the scheme and authority will be kept if the path is
//a valid path for a non-default file system
- Path p = fs.makeQualified(paths[i]);
+ Path p = FileSystemUtil.makeQualified(paths[i], conf);
newpaths.add(p);
}
paths = null;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 260bc29..dc49282 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -48,6 +49,7 @@ import com.cloudera.sqoop.mapreduce.ImportJobBase;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+import org.apache.sqoop.util.FileSystemUtil;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
@@ -141,8 +143,8 @@ public class DataDrivenImportJob extends ImportJobBase {
options.getHiveTableName();
return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
} else {
- FileSystem fs = FileSystem.get(conf);
- return "dataset:" + fs.makeQualified(getContext().getDestination());
+ Path destination = getContext().getDestination();
+ return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 27f84da..c7609a5 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -50,6 +50,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Base class for running an export MapReduce job.
@@ -232,7 +233,7 @@ public class ExportJobBase extends JobBase {
}
Path inputPath = new Path(context.getOptions().getExportDir());
Configuration conf = options.getConf();
- inputPath = inputPath.makeQualified(FileSystem.get(conf));
+ inputPath = FileSystemUtil.makeQualified(inputPath, conf);
return inputPath;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
index b32cdd1..2bbfffe 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
@@ -92,11 +92,10 @@ public class HBaseBulkImportJob extends HBaseImportJob {
protected void completeImport(Job job) throws IOException, ImportException {
super.completeImport(job);
- FileSystem fileSystem = FileSystem.get(job.getConfiguration());
-
// Make the bulk load files source directory accessible to the world
// so that the hbase user can deal with it
Path bulkLoadDir = getContext().getDestination();
+ FileSystem fileSystem = bulkLoadDir.getFileSystem(job.getConfiguration());
setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
FsPermission.createImmutable((short) 00777));
@@ -120,8 +119,9 @@ public class HBaseBulkImportJob extends HBaseImportJob {
protected void jobTeardown(Job job) throws IOException, ImportException {
super.jobTeardown(job);
// Delete the hfiles directory after we are finished.
- FileSystem fileSystem = FileSystem.get(job.getConfiguration());
- fileSystem.delete(getContext().getDestination(), true);
+ Path destination = getContext().getDestination();
+ FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
+ fileSystem.delete(destination, true);
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 626119b..6f9afaf 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -25,7 +25,6 @@ import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@@ -38,6 +37,7 @@ import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
import java.io.IOException;
import java.util.Map;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Run an export using JDBC (JDBC-based ExportOutputFormat).
@@ -79,8 +79,7 @@ public class JdbcExportJob extends ExportJobBase {
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
- FileSystem fs = FileSystem.get(job.getConfiguration());
- String uri = "dataset:" + fs.makeQualified(getInputPath());
+ String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index f911280..d13b560 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -26,7 +26,6 @@ import java.util.Set;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@@ -43,6 +42,7 @@ import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.mapreduce.ExportJobBase;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@@ -187,8 +187,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
- FileSystem fs = FileSystem.get(job.getConfiguration());
- String uri = "dataset:" + fs.makeQualified(getInputPath());
+ String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/mapreduce/MergeJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index 5b6c4df..8b1cba3 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -46,6 +46,7 @@ import org.apache.sqoop.util.Jars;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.JobBase;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Run a MapReduce job that merges two datasets.
@@ -111,9 +112,9 @@ public class MergeJob extends JobBase {
Path newPath = new Path(options.getMergeNewPath());
Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.get(jobConf);
- oldPath = oldPath.makeQualified(fs);
- newPath = newPath.makeQualified(fs);
+
+ oldPath = FileSystemUtil.makeQualified(oldPath, jobConf);
+ newPath = FileSystemUtil.makeQualified(newPath, jobConf);
propagateOptionsToJob(job);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 258ef79..d1c9749 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -300,7 +300,6 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
return true;
}
- FileSystem fs = FileSystem.get(options.getConf());
SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
String nextIncrementalValue = null;
@@ -325,11 +324,14 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
}
break;
case DateLastModified:
- if (options.getMergeKeyCol() == null && !options.isAppendMode()
- && fs.exists(getOutputPath(options, context.getTableName(), false))) {
- throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
- + " is required when using --" + this.INCREMENT_TYPE_ARG
- + " lastmodified and the output directory exists.");
+ if (options.getMergeKeyCol() == null && !options.isAppendMode()) {
+ Path outputPath = getOutputPath(options, context.getTableName(), false);
+ FileSystem fs = outputPath.getFileSystem(options.getConf());
+ if (fs.exists(outputPath)) {
+ throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
+ + " is required when using --" + this.INCREMENT_TYPE_ARG
+ + " lastmodified and the output directory exists.");
+ }
}
checkColumnType = manager.getColumnTypes(options.getTableName(),
options.getSqlQuery()).get(options.getIncrementalTestColumn());
@@ -436,10 +438,14 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
* Merge HDFS output directories
*/
protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws
IOException {
- FileSystem fs = FileSystem.get(options.getConf());
- if (context.getDestination() != null && fs.exists(context.getDestination()))
{
+ if (context.getDestination() == null) {
+ return;
+ }
+
+ Path userDestDir = getOutputPath(options, context.getTableName(), false);
+ FileSystem fs = userDestDir.getFileSystem(options.getConf());
+ if (fs.exists(context.getDestination())) {
LOG.info("Final destination exists, will run merge job.");
- Path userDestDir = getOutputPath(options, context.getTableName(), false);
if (fs.exists(userDestDir)) {
String tableClassName = null;
if (!context.getConnManager().isORMFacilitySelfManaged()) {
@@ -541,8 +547,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
private void deleteTargetDir(ImportJobContext context) throws IOException {
SqoopOptions options = context.getOptions();
- FileSystem fs = FileSystem.get(options.getConf());
Path destDir = context.getDestination();
+ FileSystem fs = destDir.getFileSystem(options.getConf());
if (fs.exists(destDir)) {
fs.delete(destDir, true);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/util/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/FileSystemUtil.java b/src/java/org/apache/sqoop/util/FileSystemUtil.java
new file mode 100644
index 0000000..1493e09
--- /dev/null
+++ b/src/java/org/apache/sqoop/util/FileSystemUtil.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public final class FileSystemUtil {
+ private FileSystemUtil() {
+ }
+
+
+ /**
+ * Creates a fully-qualified path object.
+ * @param path the path to fully-qualify with its file system URI.
+ * @param conf the current Hadoop configuration.
+ * @return a new path representing the same location as the input path,
+ * but with a fully-qualified URI. Returns {@code null} if provided path is {@code null};
+ */
+ public static Path makeQualified(Path path, Configuration conf)
+ throws IOException {
+ if (null == path) {
+ return null;
+ }
+
+ return path.getFileSystem(conf).makeQualified(path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/java/org/apache/sqoop/util/FileUploader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/FileUploader.java b/src/java/org/apache/sqoop/util/FileUploader.java
index 155cffc..673a05b 100644
--- a/src/java/org/apache/sqoop/util/FileUploader.java
+++ b/src/java/org/apache/sqoop/util/FileUploader.java
@@ -18,16 +18,11 @@
package org.apache.sqoop.util;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,15 +35,14 @@ public class FileUploader {
public static void uploadFilesToDFS(String srcBasePath, String src,
String destBasePath, String dest, Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path targetPath = null;
Path srcPath = new Path(srcBasePath, src);
- if (destBasePath == null || destBasePath.length() == 0) {
+ if (destBasePath == null || destBasePath.isEmpty()) {
destBasePath = ".";
}
- targetPath = new Path(destBasePath, dest);
+ Path targetPath = new Path(destBasePath, dest);
+ FileSystem fs = targetPath.getFileSystem(conf);
if (!fs.exists(targetPath)) {
fs.mkdirs(targetPath);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/9466a0c7/src/test/org/apache/sqoop/util/TestFileSystemUtil.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/util/TestFileSystemUtil.java b/src/test/org/apache/sqoop/util/TestFileSystemUtil.java
new file mode 100644
index 0000000..fef74af
--- /dev/null
+++ b/src/test/org/apache/sqoop/util/TestFileSystemUtil.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+import org.junit.Before;
+import static org.junit.Assert.*;
+
+public class TestFileSystemUtil {
+ private Configuration conf;
+
+ @Before
+ public void setUp() {
+ conf = new Configuration();
+ conf.set("fs.my.impl", MyFileSystem.class.getTypeName());
+ }
+
+ @Test
+ public void testMakeQualifiedWhenPathIsNullThenReturnsNull() throws IOException {
+ assertNull(FileSystemUtil.makeQualified(null, conf));
+ }
+
+ @Test
+ public void testMakeQualifiedWhenPathIsRelativeThenReturnDefault() throws IOException {
+ Path actual = FileSystemUtil.makeQualified(new Path("foo/bar"), conf);
+ assertEquals("file", actual.toUri().getScheme());
+ }
+
+ @Test
+ public void testMakeQualifiedWhenPathHasCustomSchemaThenReturnSameSchema() throws IOException
{
+ Path actual = FileSystemUtil.makeQualified(new Path("my:/foo/bar"), conf);
+ assertEquals("my", actual.toUri().getScheme());
+ }
+
+ @Test(expected = IOException.class)
+ public void testMakeQualifiedWhenPathHasBadSchemaThenThrowsIOException() throws IOException
{
+ FileSystemUtil.makeQualified(new Path("nosuchfs://foo/bar"), conf);
+ }
+
+ public static final class MyFileSystem extends RawLocalFileSystem {
+ @Override
+ public URI getUri() { return URI.create("my:///"); }
+ }
+}
|