SQOOP-3319: Extract code using Kite into separate classes
(Szabolcs Vasas via Boglarka Egyed)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3233db8e
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3233db8e
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3233db8e
Branch: refs/heads/trunk
Commit: 3233db8e1c481e38c538f4caaf55bcbc0c11f208
Parents: ad7d046
Author: Boglarka Egyed <bogi@apache.org>
Authored: Tue May 29 10:17:25 2018 +0200
Committer: Boglarka Egyed <bogi@apache.org>
Committed: Tue May 29 10:17:25 2018 +0200
----------------------------------------------------------------------
src/java/org/apache/sqoop/avro/AvroUtil.java | 4 +
.../org/apache/sqoop/manager/ConnManager.java | 5 +
.../org/apache/sqoop/manager/CubridManager.java | 4 +-
.../org/apache/sqoop/manager/Db2Manager.java | 2 +-
.../sqoop/manager/DirectPostgresqlManager.java | 3 +-
.../apache/sqoop/manager/MainframeManager.java | 2 +-
.../org/apache/sqoop/manager/MySQLManager.java | 2 +-
.../org/apache/sqoop/manager/OracleManager.java | 4 +-
.../apache/sqoop/manager/SQLServerManager.java | 8 +-
.../org/apache/sqoop/manager/SqlManager.java | 10 +-
.../sqoop/manager/oracle/OraOopConnManager.java | 4 +-
.../sqoop/mapreduce/DataDrivenImportJob.java | 66 ++----
.../apache/sqoop/mapreduce/ImportJobBase.java | 4 +-
.../sqoop/mapreduce/JdbcCallExportJob.java | 10 +-
.../apache/sqoop/mapreduce/JdbcExportJob.java | 19 +-
.../sqoop/mapreduce/JdbcUpdateExportJob.java | 20 +-
.../sqoop/mapreduce/JdbcUpsertExportJob.java | 6 +-
.../org/apache/sqoop/mapreduce/MergeJob.java | 69 +-----
.../sqoop/mapreduce/MergeParquetReducer.java | 15 +-
.../sqoop/mapreduce/ParquetExportMapper.java | 43 ----
.../sqoop/mapreduce/ParquetImportMapper.java | 20 +-
.../org/apache/sqoop/mapreduce/ParquetJob.java | 220 -------------------
.../mapreduce/mainframe/MainframeImportJob.java | 5 +-
.../mapreduce/parquet/ParquetConstants.java | 31 +++
.../parquet/ParquetExportJobConfigurator.java | 35 +++
.../parquet/ParquetImportJobConfigurator.java | 38 ++++
.../parquet/ParquetJobConfiguratorFactory.java | 29 +++
.../ParquetJobConfiguratorFactoryProvider.java | 34 +++
.../parquet/ParquetMergeJobConfigurator.java | 31 +++
.../parquet/kite/KiteMergeParquetReducer.java | 33 +++
.../kite/KiteParquetExportJobConfigurator.java | 48 ++++
.../parquet/kite/KiteParquetExportMapper.java | 37 ++++
.../kite/KiteParquetImportJobConfigurator.java | 90 ++++++++
.../parquet/kite/KiteParquetImportMapper.java | 52 +++++
.../kite/KiteParquetJobConfiguratorFactory.java | 42 ++++
.../kite/KiteParquetMergeJobConfigurator.java | 100 +++++++++
.../parquet/kite/KiteParquetUtils.java | 217 ++++++++++++++++++
.../postgresql/PostgreSQLCopyExportJob.java | 11 +-
.../org/apache/sqoop/tool/BaseSqoopTool.java | 7 +
src/java/org/apache/sqoop/tool/ImportTool.java | 4 +-
src/java/org/apache/sqoop/tool/MergeTool.java | 4 +-
.../org/apache/sqoop/TestParquetImport.java | 7 +-
.../org/apache/sqoop/hive/TestHiveImport.java | 6 +-
.../sqoop/mapreduce/TestJdbcExportJob.java | 3 +-
.../mainframe/TestMainframeImportJob.java | 6 +-
45 files changed, 961 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
index 603cc63..57c2062 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -340,4 +340,8 @@ public final class AvroUtil {
return LogicalTypes.decimal(precision, scale);
}
+
+ public static Schema parseAvroSchema(String schemaString) {
+ return new Schema.Parser().parse(schemaString);
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index d7d6279..c80dd5d 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -45,6 +45,8 @@ import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.hive.HiveTypes;
import org.apache.sqoop.lib.BlobRef;
import org.apache.sqoop.lib.ClobRef;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
import org.apache.sqoop.util.ExportException;
import org.apache.sqoop.util.ImportException;
@@ -866,5 +868,8 @@ public abstract class ConnManager {
return false;
}
+ public ParquetJobConfiguratorFactory getParquetJobConfigurator() {
+ return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(options.getConf());
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/CubridManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/CubridManager.java b/src/java/org/apache/sqoop/manager/CubridManager.java
index e27f616..a75268f 100644
--- a/src/java/org/apache/sqoop/manager/CubridManager.java
+++ b/src/java/org/apache/sqoop/manager/CubridManager.java
@@ -65,7 +65,7 @@ public class CubridManager extends
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
- ExportBatchOutputFormat.class);
+ ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@@ -80,7 +80,7 @@ public class CubridManager extends
context.setConnManager(this);
JdbcUpsertExportJob exportJob = new JdbcUpsertExportJob(context,
- CubridUpsertOutputFormat.class);
+ CubridUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/Db2Manager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/Db2Manager.java b/src/java/org/apache/sqoop/manager/Db2Manager.java
index 7ff68ce..c78946e 100644
--- a/src/java/org/apache/sqoop/manager/Db2Manager.java
+++ b/src/java/org/apache/sqoop/manager/Db2Manager.java
@@ -111,7 +111,7 @@ public class Db2Manager
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
- ExportBatchOutputFormat.class);
+ ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
index c05e1c1..70b9b43 100644
--- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
@@ -585,7 +585,8 @@ public class DirectPostgresqlManager
new PostgreSQLCopyExportJob(context,
null,
ExportInputFormat.class,
- NullOutputFormat.class);
+ NullOutputFormat.class,
+ getParquetJobConfigurator().createParquetExportJobConfigurator());
job.runExport();
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/MainframeManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/MainframeManager.java b/src/java/org/apache/sqoop/manager/MainframeManager.java
index a6002ef..4e8be15 100644
--- a/src/java/org/apache/sqoop/manager/MainframeManager.java
+++ b/src/java/org/apache/sqoop/manager/MainframeManager.java
@@ -90,7 +90,7 @@ public class MainframeManager extends org.apache.sqoop.manager.ConnManager {
importer = new AccumuloImportJob(opts, context);
} else {
// Import to HDFS.
- importer = new MainframeImportJob(opts, context);
+ importer = new MainframeImportJob(opts, context, getParquetJobConfigurator().createParquetImportJobConfigurator());
}
importer.setInputFormatClass(MainframeDatasetInputFormat.class);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/MySQLManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java
index 2d17707..992c461 100644
--- a/src/java/org/apache/sqoop/manager/MySQLManager.java
+++ b/src/java/org/apache/sqoop/manager/MySQLManager.java
@@ -138,7 +138,7 @@ public class MySQLManager
LOG.warn("documentation for additional limitations.");
JdbcUpsertExportJob exportJob =
- new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class);
+ new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/OracleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java
index b7005d4..cdc6c54 100644
--- a/src/java/org/apache/sqoop/manager/OracleManager.java
+++ b/src/java/org/apache/sqoop/manager/OracleManager.java
@@ -462,7 +462,7 @@ public class OracleManager
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context,
- null, null, ExportBatchOutputFormat.class);
+ null, null, ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@@ -474,7 +474,7 @@ public class OracleManager
throws IOException, ExportException {
context.setConnManager(this);
JdbcUpsertExportJob exportJob =
- new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
+ new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/SQLServerManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java
index d57a493..b136087 100644
--- a/src/java/org/apache/sqoop/manager/SQLServerManager.java
+++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java
@@ -181,10 +181,10 @@ public class SQLServerManager
JdbcExportJob exportJob;
if (isNonResilientOperation()) {
exportJob = new JdbcExportJob(context, null, null,
- SqlServerExportBatchOutputFormat.class);
+ SqlServerExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
} else {
exportJob = new JdbcExportJob(context, null, null,
- SQLServerResilientExportOutputFormat.class);
+ SQLServerResilientExportOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
configureConnectionRecoveryForExport(context);
}
exportJob.runExport();
@@ -202,7 +202,7 @@ public class SQLServerManager
} else {
context.setConnManager(this);
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null,
- null, SQLServerResilientUpdateOutputFormat.class);
+ null, SQLServerResilientUpdateOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
configureConnectionRecoveryForUpdate(context);
exportJob.runExport();
}
@@ -223,7 +223,7 @@ public class SQLServerManager
}
JdbcUpsertExportJob exportJob =
- new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class);
+ new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/SqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java
index 4572098..d82332a 100644
--- a/src/java/org/apache/sqoop/manager/SqlManager.java
+++ b/src/java/org/apache/sqoop/manager/SqlManager.java
@@ -682,7 +682,7 @@ public abstract class SqlManager
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
- context);
+ context, getParquetJobConfigurator().createParquetImportJobConfigurator());
}
checkTableImportOptions(context);
@@ -725,7 +725,7 @@ public abstract class SqlManager
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
- context);
+ context, getParquetJobConfigurator().createParquetImportJobConfigurator());
}
String splitCol = getSplitColumn(opts, null);
@@ -926,7 +926,7 @@ public abstract class SqlManager
public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
- JdbcExportJob exportJob = new JdbcExportJob(context);
+ JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@@ -935,7 +935,7 @@ public abstract class SqlManager
throws IOException,
ExportException {
context.setConnManager(this);
- JdbcCallExportJob exportJob = new JdbcCallExportJob(context);
+ JdbcCallExportJob exportJob = new JdbcCallExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@@ -960,7 +960,7 @@ public abstract class SqlManager
org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
- JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
+ JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
index 10524e3..95eaacf 100644
--- a/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
@@ -321,7 +321,7 @@ public class OraOopConnManager extends GenericJdbcManager {
throw ex;
}
JdbcExportJob exportJob =
- new JdbcExportJob(context, null, null, oraOopOutputFormatClass);
+ new JdbcExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@@ -343,7 +343,7 @@ public class OraOopConnManager extends GenericJdbcManager {
}
JdbcUpdateExportJob exportJob =
- new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass);
+ new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index a5962ba..3b54210 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -26,8 +26,6 @@ import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
@@ -48,10 +46,8 @@ import org.apache.sqoop.manager.ImportJobContext;
import org.apache.sqoop.mapreduce.ImportJobBase;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import org.apache.sqoop.orm.AvroSchemaGenerator;
-import org.apache.sqoop.util.FileSystemUtil;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
/**
* Actually runs a jdbc import job using the ORM files generated by the
@@ -62,15 +58,24 @@ public class DataDrivenImportJob extends ImportJobBase {
public static final Log LOG = LogFactory.getLog(
DataDrivenImportJob.class.getName());
- @SuppressWarnings("unchecked")
- public DataDrivenImportJob(final SqoopOptions opts) {
- super(opts, null, DataDrivenDBInputFormat.class, null, null);
+ private final ParquetImportJobConfigurator parquetImportJobConfigurator;
+
+ public DataDrivenImportJob(final SqoopOptions opts,
+ final Class<? extends InputFormat> inputFormatClass,
+ ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) {
+ super(opts, null, inputFormatClass, null, context);
+ this.parquetImportJobConfigurator = parquetImportJobConfigurator;
}
public DataDrivenImportJob(final SqoopOptions opts,
final Class<? extends InputFormat> inputFormatClass,
ImportJobContext context) {
- super(opts, null, inputFormatClass, null, context);
+ this(opts, inputFormatClass, context, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ public DataDrivenImportJob(final SqoopOptions opts) {
+ this(opts, DataDrivenDBInputFormat.class, null);
}
@Override
@@ -101,53 +106,20 @@ public class DataDrivenImportJob extends ImportJobBase {
AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.ParquetFile) {
- JobConf conf = (JobConf)job.getConfiguration();
// Kite SDK requires an Avro schema to represent the data structure of
// target dataset. If the schema name equals to generated java class name,
// the import will fail. So we use table name as schema name and add a
// prefix "codegen_" to generated java class to avoid the conflict.
final String schemaNameOverride = tableName;
Schema schema = generateAvroSchema(tableName, schemaNameOverride);
- String uri = getKiteUri(conf, tableName);
- ParquetJob.WriteMode writeMode;
-
- if (options.doHiveImport()) {
- if (options.doOverwriteHiveTable()) {
- writeMode = ParquetJob.WriteMode.OVERWRITE;
- } else {
- writeMode = ParquetJob.WriteMode.APPEND;
- if (Datasets.exists(uri)) {
- LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
- "append data into the existing Hive table. Consider using " +
- "--hive-overwrite, if you do NOT intend to do appending.");
- }
- }
- } else {
- // Note that there is no such an import argument for overwriting HDFS
- // dataset, so overwrite mode is not supported yet.
- // Sqoop's append mode means to merge two independent datasets. We
- // choose DEFAULT as write mode.
- writeMode = ParquetJob.WriteMode.DEFAULT;
- }
- ParquetJob.configureImportJob(conf, schema, uri, writeMode);
+ Path destination = getContext().getDestination();
+
+ parquetImportJobConfigurator.configureMapper(job, schema, options, tableName, destination);
}
job.setMapperClass(getMapperClass());
}
- private String getKiteUri(Configuration conf, String tableName) throws IOException {
- if (options.doHiveImport()) {
- String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
- options.getHiveDatabaseName();
- String hiveTable = options.getHiveTableName() == null ? tableName :
- options.getHiveTableName();
- return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
- } else {
- Path destination = getContext().getDestination();
- return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
- }
- }
-
private Schema generateAvroSchema(String tableName,
String schemaNameOverride) throws IOException {
ConnManager connManager = getContext().getConnManager();
@@ -187,7 +159,7 @@ public class DataDrivenImportJob extends ImportJobBase {
return AvroImportMapper.class;
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.ParquetFile) {
- return ParquetImportMapper.class;
+ return parquetImportJobConfigurator.getMapperClass();
}
return null;
@@ -210,7 +182,7 @@ public class DataDrivenImportJob extends ImportJobBase {
return AvroOutputFormat.class;
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.ParquetFile) {
- return DatasetKeyOutputFormat.class;
+ return parquetImportJobConfigurator.getOutputFormatClass();
}
return null;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index fb5d054..17c9ed3 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -49,6 +49,8 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
+
/**
* Base class for running an import MapReduce job.
* Allows dependency injection, etc, for easy customization of import job types.
@@ -149,7 +151,7 @@ public class ImportJobBase extends JobBase {
Configuration conf = job.getConfiguration();
String shortName = CodecMap.getCodecShortNameByName(codecName, conf);
if (!shortName.equalsIgnoreCase("default")) {
- conf.set(ParquetJob.CONF_OUTPUT_CODEC, shortName);
+ conf.set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName);
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
index b7eea93..be82aed 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
@@ -32,6 +32,7 @@ import org.apache.sqoop.mapreduce.db.DBOutputFormat;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.ExportJobContext;
import com.google.common.base.Strings;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
/**
* Run an export using JDBC (JDBC-based ExportCallOutputFormat) to
@@ -43,15 +44,16 @@ public class JdbcCallExportJob extends JdbcExportJob {
public static final Log LOG = LogFactory.getLog(
JdbcCallExportJob.class.getName());
- public JdbcCallExportJob(final ExportJobContext context) {
- super(context, null, null, ExportCallOutputFormat.class);
+ public JdbcCallExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+ super(context, null, null, ExportCallOutputFormat.class, parquetExportJobConfigurator);
}
public JdbcCallExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
- final Class<? extends OutputFormat> outputFormatClass) {
- super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ final Class<? extends OutputFormat> outputFormatClass,
+ final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+ super(ctxt, mapperClass, inputFormatClass, outputFormatClass, parquetExportJobConfigurator);
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 3719836..e283548 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -32,11 +32,10 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
-import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
import java.io.IOException;
import java.util.Map;
-import org.apache.sqoop.util.FileSystemUtil;
/**
* Run an export using JDBC (JDBC-based ExportOutputFormat).
@@ -45,18 +44,23 @@ public class JdbcExportJob extends ExportJobBase {
private FileType fileType;
+ private ParquetExportJobConfigurator parquetExportJobConfigurator;
+
public static final Log LOG = LogFactory.getLog(
JdbcExportJob.class.getName());
- public JdbcExportJob(final ExportJobContext context) {
+ public JdbcExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(context);
+ this.parquetExportJobConfigurator = parquetExportJobConfigurator;
}
public JdbcExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
- final Class<? extends OutputFormat> outputFormatClass) {
+ final Class<? extends OutputFormat> outputFormatClass,
+ final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ this.parquetExportJobConfigurator = parquetExportJobConfigurator;
}
@Override
@@ -78,8 +82,7 @@ public class JdbcExportJob extends ExportJobBase {
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
- String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
- DatasetKeyInputFormat.configure(job).readFrom(uri);
+ parquetExportJobConfigurator.configureInputFormat(job, getInputPath());
}
}
@@ -120,7 +123,7 @@ public class JdbcExportJob extends ExportJobBase {
case AVRO_DATA_FILE:
return AvroInputFormat.class;
case PARQUET_FILE:
- return DatasetKeyInputFormat.class;
+ return parquetExportJobConfigurator.getInputFormatClass();
default:
return super.getInputFormatClass();
}
@@ -137,7 +140,7 @@ public class JdbcExportJob extends ExportJobBase {
case AVRO_DATA_FILE:
return AvroExportMapper.class;
case PARQUET_FILE:
- return ParquetExportMapper.class;
+ return parquetExportJobConfigurator.getMapperClass();
case UNKNOWN:
default:
return TextExportMapper.class;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index 86069c4..f901d37 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -33,15 +33,13 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
-import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.ExportJobContext;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.db.DBOutputFormat;
-import org.apache.sqoop.util.FileSystemUtil;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
/**
* Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@@ -53,6 +51,8 @@ public class JdbcUpdateExportJob extends ExportJobBase {
public static final Log LOG = LogFactory.getLog(
JdbcUpdateExportJob.class.getName());
+ private ParquetExportJobConfigurator parquetExportJobConfigurator;
+
/**
* Return an instance of the UpdateOutputFormat class object loaded
* from the shim jar.
@@ -62,16 +62,19 @@ public class JdbcUpdateExportJob extends ExportJobBase {
return UpdateOutputFormat.class;
}
- public JdbcUpdateExportJob(final ExportJobContext context)
+ public JdbcUpdateExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator)
throws IOException {
super(context, null, null, getUpdateOutputFormat());
+ this.parquetExportJobConfigurator = parquetExportJobConfigurator;
}
public JdbcUpdateExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
- final Class<? extends OutputFormat> outputFormatClass) {
+ final Class<? extends OutputFormat> outputFormatClass,
+ final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ this.parquetExportJobConfigurator = parquetExportJobConfigurator;
}
// Fix For Issue [SQOOP-2846]
@@ -86,7 +89,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
case AVRO_DATA_FILE:
return AvroExportMapper.class;
case PARQUET_FILE:
- return ParquetExportMapper.class;
+ return parquetExportJobConfigurator.getMapperClass();
case UNKNOWN:
default:
return TextExportMapper.class;
@@ -186,8 +189,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
- String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
- DatasetKeyInputFormat.configure(job).readFrom(uri);
+ parquetExportJobConfigurator.configureInputFormat(job, getInputPath());
}
}
@@ -222,7 +224,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
case AVRO_DATA_FILE:
return AvroInputFormat.class;
case PARQUET_FILE:
- return DatasetKeyInputFormat.class;
+ return parquetExportJobConfigurator.getInputFormatClass();
default:
return super.getInputFormatClass();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
index 9a8c17a..4db86da 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
@@ -30,6 +30,7 @@ import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.ExportJobContext;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.db.DBOutputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
/**
* Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
@@ -40,9 +41,10 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
JdbcUpsertExportJob.class.getName());
public JdbcUpsertExportJob(final ExportJobContext context,
- final Class<? extends OutputFormat> outputFormatClass)
+ final Class<? extends OutputFormat> outputFormatClass,
+ final ParquetExportJobConfigurator parquetExportJobConfigurator)
throws IOException {
- super(context, null, null, outputFormatClass);
+ super(context, null, null, outputFormatClass, parquetExportJobConfigurator);
}
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/MergeJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index bb21b64..c26a090 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -19,18 +19,12 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -44,17 +38,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
import org.apache.sqoop.util.Jars;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import parquet.avro.AvroParquetInputFormat;
-import parquet.avro.AvroSchemaConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.schema.MessageType;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.JobBase;
@@ -79,10 +64,11 @@ public class MergeJob extends JobBase {
*/
public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
- public static final String PARQUET_AVRO_SCHEMA = "parquetjob.avro.schema";
+ private final ParquetMergeJobConfigurator parquetMergeJobConfigurator;
- public MergeJob(final SqoopOptions opts) {
+ public MergeJob(final SqoopOptions opts, final ParquetMergeJobConfigurator parquetMergeJobConfigurator) {
super(opts, null, null, null);
+ this.parquetMergeJobConfigurator = parquetMergeJobConfigurator;
}
public boolean runMergeJob() throws IOException {
@@ -147,7 +133,7 @@ public class MergeJob extends JobBase {
case PARQUET_FILE:
Path finalPath = new Path(options.getTargetDir());
finalPath = FileSystemUtil.makeQualified(finalPath, jobConf);
- configueParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
+ parquetMergeJobConfigurator.configureParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
break;
case AVRO_DATA_FILE:
configueAvroMergeJob(conf, job, oldPath, newPath);
@@ -198,51 +184,6 @@ public class MergeJob extends JobBase {
job.setReducerClass(MergeAvroReducer.class);
AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
}
-
- private void configueParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
- Path finalPath) throws IOException {
- try {
- FileSystem fileSystem = finalPath.getFileSystem(conf);
- LOG.info("Trying to merge parquet files");
- job.setOutputKeyClass(org.apache.avro.generic.GenericRecord.class);
- job.setMapperClass(MergeParquetMapper.class);
- job.setReducerClass(MergeParquetReducer.class);
- job.setOutputValueClass(NullWritable.class);
-
- List<Footer> footers = new ArrayList<Footer>();
- FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
- FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
- footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
- footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
-
- MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
- AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
- Schema avroSchema = avroSchemaConverter.convert(schema);
-
- if (!fileSystem.exists(finalPath)) {
- Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
- DatasetKeyOutputFormat.configure(job).overwrite(dataset);
- } else {
- DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
- }
-
- job.setInputFormatClass(AvroParquetInputFormat.class);
- AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
-
- conf.set(PARQUET_AVRO_SCHEMA, avroSchema.toString());
- Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
-
- job.setOutputFormatClass(outClass);
- } catch (Exception cnfe) {
- throw new IOException(cnfe);
- }
- }
-
- public static Dataset createDataset(Schema schema, String uri) {
- DatasetDescriptor descriptor =
- new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
- return Datasets.create(uri, descriptor, GenericRecord.class);
- }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
index caa4f5f..5939b01 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
@@ -27,16 +27,16 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.lib.SqoopRecord;
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
-public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord,NullWritable> {
+
+public abstract class MergeParquetReducer<KEYOUT, VALUEOUT> extends Reducer<Text, MergeRecord, KEYOUT, VALUEOUT> {
private Schema schema = null;
private boolean bigDecimalFormatString = true;
@@ -44,7 +44,7 @@ public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord
@Override
protected void setup(Context context) throws IOException, InterruptedException {
- schema = new Schema.Parser().parse(context.getConfiguration().get("parquetjob.avro.schema"));
+ schema = new Schema.Parser().parse(context.getConfiguration().get(SQOOP_PARQUET_AVRO_SCHEMA_KEY));
bigDecimalFormatString = context.getConfiguration().getBoolean(
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
}
@@ -67,9 +67,12 @@ public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord
}
if (null != bestRecord) {
- GenericRecord outKey = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
+ GenericRecord record = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
bigDecimalFormatString);
- context.write(outKey, null);
+ write(context, record);
}
}
+
+ protected abstract void write(Context context, GenericRecord record) throws IOException, InterruptedException;
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
deleted file mode 100644
index 2bc0cba..0000000
--- a/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.IOException;
-
-/**
- * Exports Parquet records from a data source.
- */
-public class ParquetExportMapper
- extends GenericRecordExportMapper<GenericRecord, NullWritable> {
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- }
-
- @Override
- protected void map(GenericRecord key, NullWritable val,
- Context context) throws IOException, InterruptedException {
- context.write(toSqoopRecord(key), NullWritable.get());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
index 35ab495..62334f8 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
@@ -23,10 +23,7 @@ import org.apache.sqoop.lib.SqoopRecord;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.sqoop.avro.AvroUtil;
import java.io.IOException;
@@ -35,9 +32,9 @@ import java.sql.SQLException;
/**
* Imports records by writing them to a Parquet File.
*/
-public class ParquetImportMapper
+public abstract class ParquetImportMapper<KEYOUT, VALOUT>
extends AutoProgressMapper<LongWritable, SqoopRecord,
- GenericRecord, NullWritable> {
+ KEYOUT, VALOUT> {
private Schema schema = null;
private boolean bigDecimalFormatString = true;
@@ -47,11 +44,11 @@ public class ParquetImportMapper
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
- schema = ParquetJob.getAvroSchema(conf);
+ schema = getAvroSchema(conf);
bigDecimalFormatString = conf.getBoolean(
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
- lobLoader = new LargeObjectLoader(conf, new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID())));
+ lobLoader = createLobLoader(context);
}
@Override
@@ -64,9 +61,9 @@ public class ParquetImportMapper
throw new IOException(sqlE);
}
- GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
+ GenericRecord record = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
bigDecimalFormatString);
- context.write(outKey, null);
+ write(context, record);
}
@Override
@@ -76,4 +73,9 @@ public class ParquetImportMapper
}
}
+ protected abstract LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException;
+
+ protected abstract Schema getAvroSchema(Configuration configuration);
+
+ protected abstract void write(Context context, GenericRecord record) throws IOException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
deleted file mode 100644
index 4604773..0000000
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.sqoop.avro.AvroSchemaMismatchException;
-import org.apache.sqoop.hive.HiveConfig;
-import org.kitesdk.data.CompressionType;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import org.kitesdk.data.spi.SchemaValidationUtil;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-
-/**
- * Helper class for setting up a Parquet MapReduce job.
- */
-public final class ParquetJob {
-
- public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
-
- public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
-
- public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
- // Purposefully choosing the same token alias as the one Oozie chooses.
- // Make sure we don't generate a new delegation token if oozie
- // has already generated one.
- public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
-
- public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. ";
-
- public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " +
- "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" +
- " but it is possible that date/timestamp types were mapped to strings during table" +
- " creation. Consider using Sqoop option --map-column-java resolve the mismatch" +
- " (e.g. --map-column-java date_field1=String,timestamp_field1=String).";
-
- private static final String HIVE_URI_PREFIX = "dataset:hive";
-
- private ParquetJob() {
- }
-
- private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema";
- static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec";
- enum WriteMode {
- DEFAULT, APPEND, OVERWRITE
- };
-
- public static Schema getAvroSchema(Configuration conf) {
- return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
- }
-
- public static CompressionType getCompressionType(Configuration conf) {
- CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
- String codec = conf.get(CONF_OUTPUT_CODEC, defaults.getName());
- try {
- return CompressionType.forName(codec);
- } catch (IllegalArgumentException ex) {
- LOG.warn(String.format(
- "Unsupported compression type '%s'. Fallback to '%s'.",
- codec, defaults));
- }
- return defaults;
- }
-
- /**
- * Configure the import job. The import process will use a Kite dataset to
- * write data records into Parquet format internally. The input key class is
- * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
- * {@link org.apache.avro.generic.GenericRecord}.
- */
- public static void configureImportJob(JobConf conf, Schema schema,
- String uri, WriteMode writeMode) throws IOException {
- Dataset dataset;
-
- // Add hive delegation token only if we don't already have one.
- if (isHiveImport(uri)) {
- Configuration hiveConf = HiveConfig.getHiveConf(conf);
- if (isSecureMetastore(hiveConf)) {
- // Copy hive configs to job config
- HiveConfig.addHiveConfigs(hiveConf, conf);
-
- if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
- addHiveDelegationToken(conf);
- }
- }
- }
-
- if (Datasets.exists(uri)) {
- if (WriteMode.DEFAULT.equals(writeMode)) {
- throw new IOException("Destination exists! " + uri);
- }
-
- dataset = Datasets.load(uri);
- Schema writtenWith = dataset.getDescriptor().getSchema();
- if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
- String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri));
- throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema);
- }
- } else {
- dataset = createDataset(schema, getCompressionType(conf), uri);
- }
- conf.set(CONF_AVRO_SCHEMA, schema.toString());
-
- DatasetKeyOutputFormat.ConfigBuilder builder =
- DatasetKeyOutputFormat.configure(conf);
- if (WriteMode.OVERWRITE.equals(writeMode)) {
- builder.overwrite(dataset);
- } else if (WriteMode.APPEND.equals(writeMode)) {
- builder.appendTo(dataset);
- } else {
- builder.writeTo(dataset);
- }
- }
-
- private static boolean isHiveImport(String importUri) {
- return importUri.startsWith(HIVE_URI_PREFIX);
- }
-
- public static Dataset createDataset(Schema schema,
- CompressionType compressionType, String uri) {
- DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
- .schema(schema)
- .format(Formats.PARQUET)
- .compressionType(compressionType)
- .build();
- return Datasets.create(uri, descriptor, GenericRecord.class);
- }
-
- private static boolean isSecureMetastore(Configuration conf) {
- return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
- }
-
- /**
- * Add hive delegation token to credentials store.
- * @param conf
- */
- private static void addHiveDelegationToken(JobConf conf) {
- // Need to use reflection since there's no compile time dependency on the client libs.
- Class<?> HiveConfClass;
- Class<?> HiveMetaStoreClientClass;
-
- try {
- HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS);
- } catch (ClassNotFoundException ex) {
- LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS
- + " when adding hive delegation token. "
- + "Make sure HIVE_CONF_DIR is set correctly.", ex);
- throw new RuntimeException("Couldn't fetch delegation token.", ex);
- }
-
- try {
- HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS);
- } catch (ClassNotFoundException ex) {
- LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS
- + " when adding hive delegation token."
- + " Make sure HIVE_CONF_DIR is set correctly.", ex);
- throw new RuntimeException("Couldn't fetch delegation token.", ex);
- }
-
- try {
- Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance(
- HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class)
- );
- // getDelegationToken(String kerberosPrincial)
- Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class);
- Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName());
-
- // Load token
- Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>();
- metastoreToken.decodeFromUrlString(tokenStringForm.toString());
- conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken);
-
- LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken);
- } catch (Exception ex) {
- LOG.error("Couldn't fetch delegation token.", ex);
- throw new RuntimeException("Couldn't fetch delegation token.", ex);
- }
- }
-
- private static String buildAvroSchemaMismatchMessage(boolean hiveImport) {
- String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG;
-
- if (hiveImport) {
- exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG;
- }
-
- return exceptionMessage;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
index 7e975c7..8ef30d3 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
@@ -30,6 +30,7 @@ import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ImportJobContext;
import org.apache.sqoop.mapreduce.DataDrivenImportJob;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
/**
* Import data from a mainframe dataset, using MainframeDatasetInputFormat.
@@ -39,8 +40,8 @@ public class MainframeImportJob extends DataDrivenImportJob {
private static final Log LOG = LogFactory.getLog(
MainframeImportJob.class.getName());
- public MainframeImportJob(final SqoopOptions opts, ImportJobContext context) {
- super(opts, MainframeDatasetInputFormat.class, context);
+ public MainframeImportJob(final SqoopOptions opts, ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) {
+ super(opts, MainframeDatasetInputFormat.class, context, parquetImportJobConfigurator);
}
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java
new file mode 100644
index 0000000..ae53a96
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.sqoop.mapreduce.parquet;
+
+public final class ParquetConstants {
+
+ public static final String SQOOP_PARQUET_AVRO_SCHEMA_KEY = "parquetjob.avro.schema";
+
+ public static final String SQOOP_PARQUET_OUTPUT_CODEC_KEY = "parquetjob.output.codec";
+
+ private ParquetConstants() {
+ throw new AssertionError("This class is meant for static use only.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java
new file mode 100644
index 0000000..8d7b87f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+public interface ParquetExportJobConfigurator {
+
+ void configureInputFormat(Job job, Path inputPath) throws IOException;
+
+ Class<? extends Mapper> getMapperClass();
+
+ Class<? extends InputFormat> getInputFormatClass();
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
new file mode 100644
index 0000000..fa1bc7d
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.SqoopOptions;
+
+import java.io.IOException;
+
+public interface ParquetImportJobConfigurator {
+
+ void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException;
+
+ Class<? extends Mapper> getMapperClass();
+
+ Class<? extends OutputFormat> getOutputFormatClass();
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java
new file mode 100644
index 0000000..ed5103f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+public interface ParquetJobConfiguratorFactory {
+
+ ParquetImportJobConfigurator createParquetImportJobConfigurator();
+
+ ParquetExportJobConfigurator createParquetExportJobConfigurator();
+
+ ParquetMergeJobConfigurator createParquetMergeJobConfigurator();
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java
new file mode 100644
index 0000000..2286a52
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
+
+public final class ParquetJobConfiguratorFactoryProvider {
+
+ private ParquetJobConfiguratorFactoryProvider() {
+ throw new AssertionError("This class is meant for static use only.");
+ }
+
+ public static ParquetJobConfiguratorFactory createParquetJobConfiguratorFactory(Configuration configuration) {
+ return new KiteParquetJobConfiguratorFactory();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java
new file mode 100644
index 0000000..67fdf66
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+public interface ParquetMergeJobConfigurator {
+
+ void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, Path finalPath) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
new file mode 100644
index 0000000..7f21205
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.mapreduce.MergeParquetReducer;
+
+import java.io.IOException;
+
+public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
+
+ @Override
+ protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
+ context.write(record, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
new file mode 100644
index 0000000..ca02c7b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
+import org.apache.sqoop.util.FileSystemUtil;
+import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+
+import java.io.IOException;
+
+public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
+
+ @Override
+ public void configureInputFormat(Job job, Path inputPath) throws IOException {
+ String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration());
+ DatasetKeyInputFormat.configure(job).readFrom(uri);
+ }
+
+ @Override
+ public Class<? extends Mapper> getMapperClass() {
+ return KiteParquetExportMapper.class;
+ }
+
+ @Override
+ public Class<? extends InputFormat> getInputFormatClass() {
+ return DatasetKeyInputFormat.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
new file mode 100644
index 0000000..25555d8
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
+
+import java.io.IOException;
+
+/**
+ * Exports Parquet records from a data source.
+ */
+public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {
+
+ @Override
+ protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException {
+ context.write(toSqoopRecord(key), NullWritable.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
new file mode 100644
index 0000000..87828d1
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
+import org.apache.sqoop.util.FileSystemUtil;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+
+import java.io.IOException;
+
+public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
+
+ public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());
+
+ @Override
+ public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
+ JobConf conf = (JobConf) job.getConfiguration();
+ String uri = getKiteUri(conf, options, tableName, destination);
+ KiteParquetUtils.WriteMode writeMode;
+
+ if (options.doHiveImport()) {
+ if (options.doOverwriteHiveTable()) {
+ writeMode = KiteParquetUtils.WriteMode.OVERWRITE;
+ } else {
+ writeMode = KiteParquetUtils.WriteMode.APPEND;
+ if (Datasets.exists(uri)) {
+ LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
+ "append data into the existing Hive table. Consider using " +
+ "--hive-overwrite, if you do NOT intend to do appending.");
+ }
+ }
+ } else {
+ // Note that there is no such an import argument for overwriting HDFS
+ // dataset, so overwrite mode is not supported yet.
+ // Sqoop's append mode means to merge two independent datasets. We
+ // choose DEFAULT as write mode.
+ writeMode = KiteParquetUtils.WriteMode.DEFAULT;
+ }
+ KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode);
+ }
+
+ @Override
+ public Class<? extends Mapper> getMapperClass() {
+ return KiteParquetImportMapper.class;
+ }
+
+ @Override
+ public Class<? extends OutputFormat> getOutputFormatClass() {
+ return DatasetKeyOutputFormat.class;
+ }
+
+ private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
+ if (options.doHiveImport()) {
+ String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
+ options.getHiveDatabaseName();
+ String hiveTable = options.getHiveTableName() == null ? tableName :
+ options.getHiveTableName();
+ return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
+ } else {
+ return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
new file mode 100644
index 0000000..20adf6e
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.avro.AvroUtil;
+import org.apache.sqoop.lib.LargeObjectLoader;
+import org.apache.sqoop.mapreduce.ParquetImportMapper;
+
+import java.io.IOException;
+
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
+public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
+
+ @Override
+ protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID()));
+ return new LargeObjectLoader(conf, workPath);
+ }
+
+ @Override
+ protected Schema getAvroSchema(Configuration configuration) {
+ String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
+ return AvroUtil.parseAvroSchema(schemaString);
+ }
+
+ @Override
+ protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
+ context.write(record, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
new file mode 100644
index 0000000..055e116
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
+
+public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
+
+ @Override
+ public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
+ return new KiteParquetImportJobConfigurator();
+ }
+
+ @Override
+ public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
+ return new KiteParquetExportJobConfigurator();
+ }
+
+ @Override
+ public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
+ return new KiteParquetMergeJobConfigurator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
new file mode 100644
index 0000000..9fecf28
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.mapreduce.MergeParquetMapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+import parquet.avro.AvroParquetInputFormat;
+import parquet.avro.AvroSchemaConverter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
+public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
+
+ public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());
+
+ @Override
+ public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
+ Path finalPath) throws IOException {
+ try {
+ FileSystem fileSystem = finalPath.getFileSystem(conf);
+ LOG.info("Trying to merge parquet files");
+ job.setOutputKeyClass(GenericRecord.class);
+ job.setMapperClass(MergeParquetMapper.class);
+ job.setReducerClass(KiteMergeParquetReducer.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ List<Footer> footers = new ArrayList<Footer>();
+ FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
+ FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
+ footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
+ footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
+
+ MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+ AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+ Schema avroSchema = avroSchemaConverter.convert(schema);
+
+ if (!fileSystem.exists(finalPath)) {
+ Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
+ DatasetKeyOutputFormat.configure(job).overwrite(dataset);
+ } else {
+ DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
+ }
+
+ job.setInputFormatClass(AvroParquetInputFormat.class);
+ AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
+
+ conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
+ Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
+
+ job.setOutputFormatClass(outClass);
+ } catch (Exception cnfe) {
+ throw new IOException(cnfe);
+ }
+ }
+
+ public static Dataset createDataset(Schema schema, String uri) {
+ DatasetDescriptor descriptor =
+ new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
+ return Datasets.create(uri, descriptor, GenericRecord.class);
+ }
+}
|