orc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [1/5] orc git commit: ORC-77. Implement LZO and LZ4 compression codecs. (omalley)
Date Mon, 08 Aug 2016 21:19:17 GMT
Repository: orc
Updated Branches:
  refs/heads/master 0481613d3 -> 9aba074bb


ORC-77. Implement LZO and LZ4 compression codecs. (omalley)

Fixes #45.

Signed-off-by: Owen O'Malley <omalley@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/62fe9504
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/62fe9504
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/62fe9504

Branch: refs/heads/master
Commit: 62fe9504befc879540fc7abdfb9969df9300cd8c
Parents: 0481613
Author: Owen O'Malley <omalley@apache.org>
Authored: Tue Jul 5 08:16:03 2016 -0700
Committer: Owen O'Malley <omalley@apache.org>
Committed: Thu Aug 4 15:27:09 2016 -0700

----------------------------------------------------------------------
 java/core/pom.xml                               |   8 +-
 .../java/org/apache/orc/CompressionKind.java    |   2 +-
 .../org/apache/orc/impl/AircompressorCodec.java | 102 +++++++++++++++++++
 .../java/org/apache/orc/impl/ReaderImpl.java    |   4 +-
 .../java/org/apache/orc/impl/SnappyCodec.java   |  51 ++--------
 .../java/org/apache/orc/impl/WriterImpl.java    |  29 +++---
 .../test/org/apache/orc/TestVectorOrcFile.java  |  98 ++++++++++++++++++
 java/pom.xml                                    |  10 +-
 8 files changed, 230 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/62fe9504/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/java/core/pom.xml b/java/core/pom.xml
index e09ebda..35ad778 100644
--- a/java/core/pom.xml
+++ b/java/core/pom.xml
@@ -42,6 +42,10 @@
       <artifactId>commons-lang</artifactId>
     </dependency>
     <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>aircompressor</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
     </dependency>
@@ -50,10 +54,6 @@
       <artifactId>hive-storage-api</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.iq80.snappy</groupId>
-      <artifactId>snappy</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/orc/blob/62fe9504/java/core/src/java/org/apache/orc/CompressionKind.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/org/apache/orc/CompressionKind.java
index f684bef..3cffe57 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/org/apache/orc/CompressionKind.java
@@ -23,5 +23,5 @@ package org.apache.orc;
  * can be applied to ORC files.
  */
 public enum CompressionKind {
-  NONE, ZLIB, SNAPPY, LZO
+  NONE, ZLIB, SNAPPY, LZO, LZ4
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/62fe9504/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
new file mode 100644
index 0000000..a304730
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.Decompressor;
+import org.apache.orc.CompressionCodec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+public class AircompressorCodec implements CompressionCodec {
+  private final Compressor compressor;
+  private final Decompressor decompressor;
+
+  AircompressorCodec(Compressor compressor, Decompressor decompressor) {
+    this.compressor = compressor;
+    this.decompressor = decompressor;
+  }
+
+  // Thread local buffer
+  private static final ThreadLocal<byte[]> threadBuffer =
+      new ThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+          return null;
+        }
+      };
+
+  protected static byte[] getBuffer(int size) {
+    byte[] result = threadBuffer.get();
+    if (result == null || result.length < size || result.length > size * 2) {
+      result = new byte[size];
+      threadBuffer.set(result);
+    }
+    return result;
+  }
+
+  @Override
+  public boolean compress(ByteBuffer in, ByteBuffer out,
+                          ByteBuffer overflow) throws IOException {
+    int inBytes = in.remaining();
+    // I should work on a patch for Snappy to support an overflow buffer
+    // to prevent the extra buffer copy.
+    byte[] compressed = getBuffer(compressor.maxCompressedLength(inBytes));
+    int outBytes =
+        compressor.compress(in.array(), in.arrayOffset() + in.position(), inBytes,
+            compressed, 0, compressed.length);
+    if (outBytes < inBytes) {
+      int remaining = out.remaining();
+      if (remaining >= outBytes) {
+        System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+            out.position(), outBytes);
+        out.position(out.position() + outBytes);
+      } else {
+        System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+            out.position(), remaining);
+        out.position(out.limit());
+        System.arraycopy(compressed, remaining, overflow.array(),
+            overflow.arrayOffset(), outBytes - remaining);
+        overflow.position(outBytes - remaining);
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+    int inOffset = in.position();
+    int uncompressLen =
+        decompressor.decompress(in.array(), in.arrayOffset() + inOffset,
+        in.limit() - inOffset, out.array(), out.arrayOffset() + out.position(),
+            out.remaining());
+    out.position(uncompressLen + out.position());
+    out.flip();
+  }
+
+  @Override
+  public CompressionCodec modify(EnumSet<Modifier> modifiers) {
+    // snappy allows no modifications
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/62fe9504/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 34f743b..baf1335 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -436,12 +436,10 @@ public class ReaderImpl implements Reader {
     // Check compression codec.
     switch (ps.getCompression()) {
       case NONE:
-        break;
       case ZLIB:
-        break;
       case SNAPPY:
-        break;
       case LZO:
+      case LZ4:
         break;
       default:
         throw new IllegalArgumentException("Unknown compression");

http://git-wip-us.apache.org/repos/asf/orc/blob/62fe9504/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
index dd4f30c..f4d828a 100644
--- a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
@@ -18,46 +18,20 @@
 
 package org.apache.orc.impl;
 
-import org.apache.orc.CompressionCodec;
-import org.iq80.snappy.Snappy;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.EnumSet;
 
-public class SnappyCodec implements CompressionCodec, DirectDecompressionCodec {
+public class SnappyCodec extends AircompressorCodec
+    implements DirectDecompressionCodec{
   private static final HadoopShims SHIMS = HadoopShims.Factory.get();
 
   Boolean direct = null;
 
-  @Override
-  public boolean compress(ByteBuffer in, ByteBuffer out,
-                          ByteBuffer overflow) throws IOException {
-    int inBytes = in.remaining();
-    // I should work on a patch for Snappy to support an overflow buffer
-    // to prevent the extra buffer copy.
-    byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)];
-    int outBytes =
-        Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes,
-            compressed, 0);
-    if (outBytes < inBytes) {
-      int remaining = out.remaining();
-      if (remaining >= outBytes) {
-        System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
-            out.position(), outBytes);
-        out.position(out.position() + outBytes);
-      } else {
-        System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
-            out.position(), remaining);
-        out.position(out.limit());
-        System.arraycopy(compressed, remaining, overflow.array(),
-            overflow.arrayOffset(), outBytes - remaining);
-        overflow.position(outBytes - remaining);
-      }
-      return true;
-    } else {
-      return false;
-    }
+  SnappyCodec() {
+    super(new SnappyCompressor(), new SnappyDecompressor());
   }
 
   @Override
@@ -66,12 +40,7 @@ public class SnappyCodec implements CompressionCodec, DirectDecompressionCodec
{
       directDecompress(in, out);
       return;
     }
-    int inOffset = in.position();
-    int uncompressLen =
-        Snappy.uncompress(in.array(), in.arrayOffset() + inOffset,
-        in.limit() - inOffset, out.array(), out.arrayOffset() + out.position());
-    out.position(uncompressLen + out.position());
-    out.flip();
+    super.decompress(in, out);
   }
 
   @Override
@@ -99,10 +68,4 @@ public class SnappyCodec implements CompressionCodec, DirectDecompressionCodec
{
     decompressShim.decompress(in, out);
     out.flip(); // flip for read
   }
-
-  @Override
-  public CompressionCodec modify(EnumSet<Modifier> modifiers) {
-    // snappy allows no modifications
-    return this;
-  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/62fe9504/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index fd7fee0..3df1b76 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -30,6 +30,12 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.TreeMap;
 
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.orc.BinaryColumnStatistics;
@@ -249,23 +255,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       case SNAPPY:
         return new SnappyCodec();
       case LZO:
-        try {
-          ClassLoader loader = Thread.currentThread().getContextClassLoader();
-          if (loader == null) {
-            loader = WriterImpl.class.getClassLoader();
-          }
-          @SuppressWarnings("unchecked")
-          Class<? extends CompressionCodec> lzo =
-              (Class<? extends CompressionCodec>)
-              loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
-          return lzo.newInstance();
-        } catch (ClassNotFoundException e) {
-          throw new IllegalArgumentException("LZO is not available.", e);
-        } catch (InstantiationException e) {
-          throw new IllegalArgumentException("Problem initializing LZO", e);
-        } catch (IllegalAccessException e) {
-          throw new IllegalArgumentException("Insufficient access to LZO", e);
-        }
+        return new AircompressorCodec(new LzoCompressor(),
+            new LzoDecompressor());
+      case LZ4:
+        return new AircompressorCodec(new Lz4Compressor(),
+            new Lz4Decompressor());
       default:
         throw new IllegalArgumentException("Unknown compression codec: " +
             kind);
@@ -2648,6 +2642,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       case ZLIB: return OrcProto.CompressionKind.ZLIB;
       case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
       case LZO: return OrcProto.CompressionKind.LZO;
+      case LZ4: return OrcProto.CompressionKind.LZ4;
       default:
         throw new IllegalArgumentException("Unknown compression " + kind);
     }

http://git-wip-us.apache.org/repos/asf/orc/blob/62fe9504/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 37e463b..31ac1c4 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -1692,6 +1692,104 @@ public class TestVectorOrcFile {
   }
 
   /**
+   * Read and write a randomly generated lzo file.
+   * @throws Exception
+   */
+  @Test
+  public void testLzo() throws Exception {
+    TypeDescription schema =
+        TypeDescription.fromString("struct<x:bigint,y:double,z:bigint>");
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(1000)
+            .compress(CompressionKind.LZO)
+            .bufferSize(100));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    Random rand = new Random(69);
+    batch.size = 1000;
+    for(int b=0; b < 10; ++b) {
+      for (int r=0; r < 1000; ++r) {
+        ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
+        ((DoubleColumnVector) batch.cols[1]).vector[r] = rand.nextDouble();
+        ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong();
+      }
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(CompressionKind.LZO, reader.getCompressionKind());
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch(1000);
+    rand = new Random(69);
+    for(int b=0; b < 10; ++b) {
+      rows.nextBatch(batch);
+      assertEquals(1000, batch.size);
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(rand.nextInt(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+        assertEquals(rand.nextDouble(),
+            ((DoubleColumnVector) batch.cols[1]).vector[r], 0.00001);
+        assertEquals(rand.nextLong(),
+            ((LongColumnVector) batch.cols[2]).vector[r]);
+      }
+    }
+    rows.nextBatch(batch);
+    assertEquals(0, batch.size);
+    rows.close();
+  }
+
+  /**
+   * Read and write a randomly generated lzo file.
+   * @throws Exception
+   */
+  @Test
+  public void testLz4() throws Exception {
+    TypeDescription schema =
+        TypeDescription.fromString("struct<x:bigint,y:double,z:bigint>");
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(1000)
+            .compress(CompressionKind.LZ4)
+            .bufferSize(100));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    Random rand = new Random(3);
+    batch.size = 1000;
+    for(int b=0; b < 10; ++b) {
+      for (int r=0; r < 1000; ++r) {
+        ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
+        ((DoubleColumnVector) batch.cols[1]).vector[r] = rand.nextDouble();
+        ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong();
+      }
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(CompressionKind.LZ4, reader.getCompressionKind());
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch(1000);
+    rand = new Random(3);
+    for(int b=0; b < 10; ++b) {
+      rows.nextBatch(batch);
+      assertEquals(1000, batch.size);
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(rand.nextInt(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+        assertEquals(rand.nextDouble(),
+            ((DoubleColumnVector) batch.cols[1]).vector[r], 0.00001);
+        assertEquals(rand.nextLong(),
+            ((LongColumnVector) batch.cols[2]).vector[r]);
+      }
+    }
+    rows.nextBatch(batch);
+    assertEquals(0, batch.size);
+    rows.close();
+  }
+
+  /**
    * Read and write a randomly generated snappy file.
    * @throws Exception
    */

http://git-wip-us.apache.org/repos/asf/orc/blob/62fe9504/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 8b42fe5..5f35323 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -246,6 +246,11 @@
         <version>2.6</version>
       </dependency>
       <dependency>
+        <groupId>io.airlift</groupId>
+        <artifactId>aircompressor</artifactId>
+        <version>0.3</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>${hadoop.version}</version>
@@ -349,11 +354,6 @@
         <version>1.1</version>
       </dependency>
       <dependency>
-        <groupId>org.iq80.snappy</groupId>
-        <artifactId>snappy</artifactId>
-        <version>0.2</version>
-      </dependency>
-      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.5</version>


Mime
View raw message