sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject sqoop git commit: SQOOP-1391: Compression codec handling
Date Mon, 10 Nov 2014 07:26:31 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk 27cdcef4e -> ad13ad081


SQOOP-1391: Compression codec handling

(Qian Xu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ad13ad08
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ad13ad08
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ad13ad08

Branch: refs/heads/trunk
Commit: ad13ad08106fe907c76fa3df4c7f5123874952fa
Parents: 27cdcef
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sun Nov 9 23:25:46 2014 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Sun Nov 9 23:25:46 2014 -0800

----------------------------------------------------------------------
 .../apache/sqoop/mapreduce/ImportJobBase.java   |  6 ++--
 .../org/apache/sqoop/mapreduce/ParquetJob.java  | 31 ++++++++++++++++----
 .../com/cloudera/sqoop/TestParquetImport.java   | 20 +++++++++++--
 3 files changed, 46 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index dab5606..04d60fd 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -145,10 +145,8 @@ public class ImportJobBase extends JobBase {
         if (codecName != null) {
           Configuration conf = job.getConfiguration();
           String shortName = CodecMap.getCodecShortNameByName(codecName, conf);
-          if (!shortName.equalsIgnoreCase("default") &&
-              !shortName.equalsIgnoreCase("snappy")) {
-            // TODO: SQOOP-1391 More compression codec support
-            LOG.warn("Will use snappy as compression codec instead");
+          if (!shortName.equalsIgnoreCase("default")) {
+            conf.set(ParquetJob.CONF_OUTPUT_CODEC, shortName);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index 6ef29a1..bea74c3 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -20,7 +20,10 @@ package org.apache.sqoop.mapreduce;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.kitesdk.data.CompressionType;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
 import org.kitesdk.data.DatasetNotFoundException;
@@ -36,19 +39,35 @@ import java.io.IOException;
  */
 public final class ParquetJob {
 
+  public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
+
   private ParquetJob() {
   }
 
-  private static final String CONF_AVRO_SCHEMA = "avro.schema";
+  private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema";
+  static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec";
 
   public static Schema getAvroSchema(Configuration conf) {
     return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
   }
 
+  public static CompressionType getCompressionType(Configuration conf) {
+    CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
+    String codec = conf.get(CONF_OUTPUT_CODEC, defaults.getName());
+    try {
+      return CompressionType.forName(codec);
+    } catch (IllegalArgumentException ex) {
+      LOG.warn(String.format(
+          "Unsupported compression type '%s'. Fallback to '%s'.",
+          codec, defaults));
+    }
+    return defaults;
+  }
+
   /**
    * Configure the import job. The import process will use a Kite dataset to
    * write data records into Parquet format internally. The input key class is
-   * {@link org.apache.sqoop.lib.SqoopAvroRecord}. The output key is
+   * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
    * {@link org.apache.avro.generic.GenericRecord}.
    */
   public static void configureImportJob(Configuration conf, Schema schema,
@@ -58,7 +77,7 @@ public final class ParquetJob {
       try {
         dataset = Datasets.load(uri);
       } catch (DatasetNotFoundException ex) {
-        dataset = createDataset(schema, uri);
+        dataset = createDataset(schema, getCompressionType(conf), uri);
       }
       Schema writtenWith = dataset.getDescriptor().getSchema();
       if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
@@ -67,16 +86,18 @@ public final class ParquetJob {
                 writtenWith, schema));
       }
     } else {
-      dataset = createDataset(schema, uri);
+      dataset = createDataset(schema, getCompressionType(conf), uri);
     }
     conf.set(CONF_AVRO_SCHEMA, schema.toString());
     DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
   }
 
-  private static Dataset createDataset(Schema schema, String uri) {
+  private static Dataset createDataset(Schema schema,
+      CompressionType compressionType, String uri) {
     DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
         .schema(schema)
         .format(Formats.PARQUET)
+        .compressionType(compressionType)
         .build();
     return Datasets.create(uri, descriptor, GenericRecord.class);
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/test/com/cloudera/sqoop/TestParquetImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java
index 2224719..192da17 100644
--- a/src/test/com/cloudera/sqoop/TestParquetImport.java
+++ b/src/test/com/cloudera/sqoop/TestParquetImport.java
@@ -27,6 +27,7 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.kitesdk.data.CompressionType;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetReader;
 import org.kitesdk.data.Datasets;
@@ -75,13 +76,24 @@ public class TestParquetImport extends ImportJobTestCase {
     return args.toArray(new String[args.size()]);
   }
 
-  public void testParquetImport() throws IOException {
+  public void testSnappyCompression() throws IOException {
+    runParquetImportTest("snappy");
+  }
+
+  public void testDeflateCompression() throws IOException {
+    runParquetImportTest("deflate");
+  }
+
+  private void runParquetImportTest(String codec) throws IOException {
     String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
         "VARBINARY(2)",};
     String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
     createTableWithColTypes(types, vals);
 
-    runImport(getOutputArgv(true, null));
+    String [] extraArgs = { "--compression-codec", codec};
+    runImport(getOutputArgv(true, extraArgs));
+
+    assertEquals(CompressionType.forName(codec), getCompressionType());
 
     Schema schema = getSchema();
     assertEquals(Type.RECORD, schema.getType());
@@ -177,6 +189,10 @@ public class TestParquetImport extends ImportJobTestCase {
     }
   }
 
+  private CompressionType getCompressionType() {
+    return getDataset().getDescriptor().getCompressionType();
+  }
+
   private Schema getSchema() {
     return getDataset().getDescriptor().getSchema();
   }


Mime
View raw message