Repository: sqoop
Updated Branches:
refs/heads/trunk 27cdcef4e -> ad13ad081
SQOOP-1391: Compression codec handling
(Qian Xu via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ad13ad08
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ad13ad08
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ad13ad08
Branch: refs/heads/trunk
Commit: ad13ad08106fe907c76fa3df4c7f5123874952fa
Parents: 27cdcef
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sun Nov 9 23:25:46 2014 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Sun Nov 9 23:25:46 2014 -0800
----------------------------------------------------------------------
.../apache/sqoop/mapreduce/ImportJobBase.java | 6 ++--
.../org/apache/sqoop/mapreduce/ParquetJob.java | 31 ++++++++++++++++----
.../com/cloudera/sqoop/TestParquetImport.java | 20 +++++++++++--
3 files changed, 46 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index dab5606..04d60fd 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -145,10 +145,8 @@ public class ImportJobBase extends JobBase {
if (codecName != null) {
Configuration conf = job.getConfiguration();
String shortName = CodecMap.getCodecShortNameByName(codecName, conf);
- if (!shortName.equalsIgnoreCase("default") &&
- !shortName.equalsIgnoreCase("snappy")) {
- // TODO: SQOOP-1391 More compression codec support
- LOG.warn("Will use snappy as compression codec instead");
+ if (!shortName.equalsIgnoreCase("default")) {
+ conf.set(ParquetJob.CONF_OUTPUT_CODEC, shortName);
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index 6ef29a1..bea74c3 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -20,7 +20,10 @@ package org.apache.sqoop.mapreduce;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.kitesdk.data.CompressionType;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetNotFoundException;
@@ -36,19 +39,35 @@ import java.io.IOException;
*/
public final class ParquetJob {
+ public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
+
private ParquetJob() {
}
- private static final String CONF_AVRO_SCHEMA = "avro.schema";
+ private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema";
+ static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec";
public static Schema getAvroSchema(Configuration conf) {
return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
}
+ public static CompressionType getCompressionType(Configuration conf) {
+ CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
+ String codec = conf.get(CONF_OUTPUT_CODEC, defaults.getName());
+ try {
+ return CompressionType.forName(codec);
+ } catch (IllegalArgumentException ex) {
+ LOG.warn(String.format(
+ "Unsupported compression type '%s'. Fallback to '%s'.",
+ codec, defaults));
+ }
+ return defaults;
+ }
+
/**
* Configure the import job. The import process will use a Kite dataset to
* write data records into Parquet format internally. The input key class is
- * {@link org.apache.sqoop.lib.SqoopAvroRecord}. The output key is
+ * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
* {@link org.apache.avro.generic.GenericRecord}.
*/
public static void configureImportJob(Configuration conf, Schema schema,
@@ -58,7 +77,7 @@ public final class ParquetJob {
try {
dataset = Datasets.load(uri);
} catch (DatasetNotFoundException ex) {
- dataset = createDataset(schema, uri);
+ dataset = createDataset(schema, getCompressionType(conf), uri);
}
Schema writtenWith = dataset.getDescriptor().getSchema();
if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
@@ -67,16 +86,18 @@ public final class ParquetJob {
writtenWith, schema));
}
} else {
- dataset = createDataset(schema, uri);
+ dataset = createDataset(schema, getCompressionType(conf), uri);
}
conf.set(CONF_AVRO_SCHEMA, schema.toString());
DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
}
- private static Dataset createDataset(Schema schema, String uri) {
+ private static Dataset createDataset(Schema schema,
+ CompressionType compressionType, String uri) {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(schema)
.format(Formats.PARQUET)
+ .compressionType(compressionType)
.build();
return Datasets.create(uri, descriptor, GenericRecord.class);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/test/com/cloudera/sqoop/TestParquetImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java
index 2224719..192da17 100644
--- a/src/test/com/cloudera/sqoop/TestParquetImport.java
+++ b/src/test/com/cloudera/sqoop/TestParquetImport.java
@@ -27,6 +27,7 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.kitesdk.data.CompressionType;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetReader;
import org.kitesdk.data.Datasets;
@@ -75,13 +76,24 @@ public class TestParquetImport extends ImportJobTestCase {
return args.toArray(new String[args.size()]);
}
- public void testParquetImport() throws IOException {
+ public void testSnappyCompression() throws IOException {
+ runParquetImportTest("snappy");
+ }
+
+ public void testDeflateCompression() throws IOException {
+ runParquetImportTest("deflate");
+ }
+
+ private void runParquetImportTest(String codec) throws IOException {
String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
"VARBINARY(2)",};
String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
createTableWithColTypes(types, vals);
- runImport(getOutputArgv(true, null));
+ String [] extraArgs = { "--compression-codec", codec};
+ runImport(getOutputArgv(true, extraArgs));
+
+ assertEquals(CompressionType.forName(codec), getCompressionType());
Schema schema = getSchema();
assertEquals(Type.RECORD, schema.getType());
@@ -177,6 +189,10 @@ public class TestParquetImport extends ImportJobTestCase {
}
}
+ private CompressionType getCompressionType() {
+ return getDataset().getDescriptor().getCompressionType();
+ }
+
private Schema getSchema() {
return getDataset().getDescriptor().getSchema();
}
|