avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject [avro] branch master updated: AVRO-2262 Java compression codec fixes. (#352)
Date Mon, 12 Nov 2018 11:18:13 GMT
This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 07bd95a  AVRO-2262 Java compression codec fixes. (#352)
07bd95a is described below

commit 07bd95a82f93458196d44f8e285435a865d3d70d
Author: jacobtolar <accounts@sheckel.net>
AuthorDate: Mon Nov 12 05:18:09 2018 -0600

    AVRO-2262 Java compression codec fixes. (#352)
    
    AVRO-2262 Java compression codec improvements
---
 .../main/java/org/apache/avro/file/BZip2Codec.java | 34 +++++++---------------
 .../src/main/java/org/apache/avro/file/Codec.java  | 12 ++++++++
 .../java/org/apache/avro/file/CodecFactory.java    | 12 ++++----
 .../java/org/apache/avro/file/DeflateCodec.java    | 27 +++++------------
 .../java/org/apache/avro/file/SnappyCodec.java     | 13 +++++----
 .../main/java/org/apache/avro/file/XZCodec.java    | 24 ++++-----------
 .../java/org/apache/avro/file/ZstandardCodec.java  | 29 +++++-------------
 7 files changed, 57 insertions(+), 94 deletions(-)

diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
index e2dbc09..1d1f4ed 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
@@ -29,6 +29,8 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream
 public class BZip2Codec extends Codec {
 
   public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+  private final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+
   private ByteArrayOutputStream outputBuffer;
 
   static class Option extends CodecFactory {
@@ -43,41 +45,27 @@ public class BZip2Codec extends Codec {
 
   @Override
   public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException {
-
     ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
-    BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos);
-
-    try {
-      outputStream.write(uncompressedData.array(),
-                         uncompressedData.position(),
-                         uncompressedData.remaining());
-    } finally {
-      outputStream.close();
+
+    try (BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos))
{
+        outputStream.write(uncompressedData.array(), computeOffset(uncompressedData), uncompressedData.remaining());
     }
 
-    ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
-    return result;
+    return ByteBuffer.wrap(baos.toByteArray());
   }
 
   @Override
   public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
-    ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array());
-    BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais);
-    try {
+    ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array(), computeOffset(compressedData),
compressedData.remaining());
+    try(BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais)) {
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
-      byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
-
       int readCount = -1;
-
-      while ( (readCount = inputStream.read(buffer, compressedData.position(), buffer.length))>
0) {
+      while ((readCount = inputStream.read(buffer, compressedData.position(), buffer.length))
> 0) {
         baos.write(buffer, 0, readCount);
       }
 
-      ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
-      return result;
-    } finally {
-      inputStream.close();
+      return ByteBuffer.wrap(baos.toByteArray());
     }
   }
 
@@ -100,6 +88,4 @@ public class BZip2Codec extends Codec {
     outputBuffer.reset();
     return outputBuffer;
   }
-
-
 }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java b/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
index bd335c9..d462139 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
@@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
 
 /**
  * Interface for Avro-supported compression codecs for data files.
+ *
+ * Note that Codec objects may maintain internal state (e.g. buffers)
+ * and are not thread safe.
  */
 public abstract class Codec {
   /** Name of the codec; written to the file's metadata. */
@@ -37,12 +40,21 @@ public abstract class Codec {
    **/
   @Override
   public abstract boolean equals(Object other);
+
   /**
    * Codecs must implement a hashCode() method that is consistent with equals().*/
   @Override
   public abstract int hashCode();
+
   @Override
   public String toString() {
     return getName();
   }
+
+  // Codecs often reference the array inside a ByteBuffer. Compute the offset
+  // to the start of data correctly in the case that our ByteBuffer
+  // is a slice() of another.
+  protected static int computeOffset(ByteBuffer data) {
+      return data.arrayOffset() + data.position();
+  }
 }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
index db51fc6..238e8a4 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
@@ -83,12 +83,12 @@ public abstract class CodecFactory {
   public static final int DEFAULT_XZ_LEVEL = LZMA2Options.PRESET_DEFAULT;
 
   static {
-    addCodec("null", nullCodec());
-    addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
-    addCodec("snappy", snappyCodec());
-    addCodec("bzip2", bzip2Codec());
-    addCodec("xz", xzCodec(DEFAULT_XZ_LEVEL));
-    addCodec("zstandard", zstandardCodec());
+    addCodec(DataFileConstants.NULL_CODEC, nullCodec());
+    addCodec(DataFileConstants.DEFLATE_CODEC, deflateCodec(DEFAULT_DEFLATE_LEVEL));
+    addCodec(DataFileConstants.SNAPPY_CODEC, snappyCodec());
+    addCodec(DataFileConstants.BZIP2_CODEC, bzip2Codec());
+    addCodec(DataFileConstants.XZ_CODEC, xzCodec(DEFAULT_XZ_LEVEL));
+    addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec());
   }
 
   /** Maps a codec name into a CodecFactory.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
index 7080d65..5a3f927 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java
@@ -36,7 +36,7 @@ import java.util.zip.InflaterOutputStream;
  * {@link Inflater} and {@link Deflater}, is using
  * RFC1951.
  */
-class DeflateCodec extends Codec {
+public class DeflateCodec extends Codec {
 
   static class Option extends CodecFactory {
     private int compressionLevel;
@@ -70,30 +70,19 @@ class DeflateCodec extends Codec {
   @Override
   public ByteBuffer compress(ByteBuffer data) throws IOException {
     ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
-    DeflaterOutputStream ios = new DeflaterOutputStream(baos, getDeflater());
-    writeAndClose(data, ios);
-    ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
-    return result;
+    try(OutputStream outputStream = new DeflaterOutputStream(baos, getDeflater())) {
+      outputStream.write(data.array(), computeOffset(data), data.remaining());
+    }
+    return ByteBuffer.wrap(baos.toByteArray());
   }
 
   @Override
   public ByteBuffer decompress(ByteBuffer data) throws IOException {
     ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
-    InflaterOutputStream ios = new InflaterOutputStream(baos, getInflater());
-    writeAndClose(data, ios);
-    ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
-    return result;
-  }
-
-  private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException {
-    byte[] input = data.array();
-    int offset = data.arrayOffset() + data.position();
-    int length = data.remaining();
-    try {
-      to.write(input, offset, length);
-    } finally {
-      to.close();
+    try(OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) {
+      outputStream.write(data.array(), computeOffset(data), data.remaining());
     }
+    return ByteBuffer.wrap(baos.toByteArray());
   }
 
   // get and initialize the inflater for use.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
index 3c75bb7..04f7218 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
@@ -24,7 +24,7 @@ import java.util.zip.CRC32;
 import org.xerial.snappy.Snappy;
 
 /** * Implements Snappy compression and decompression. */
-class SnappyCodec extends Codec {
+public class SnappyCodec extends Codec {
   private CRC32 crc32 = new CRC32();
 
   static class Option extends CodecFactory {
@@ -40,12 +40,13 @@ class SnappyCodec extends Codec {
 
   @Override
   public ByteBuffer compress(ByteBuffer in) throws IOException {
+    int offset = computeOffset(in);
     ByteBuffer out =
       ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())+4);
-    int size = Snappy.compress(in.array(), in.position(), in.remaining(),
+    int size = Snappy.compress(in.array(), offset, in.remaining(),
                                out.array(), 0);
     crc32.reset();
-    crc32.update(in.array(), in.position(), in.remaining());
+    crc32.update(in.array(), offset, in.remaining());
     out.putInt(size, (int)crc32.getValue());
 
     out.limit(size+4);
@@ -55,9 +56,10 @@ class SnappyCodec extends Codec {
 
   @Override
   public ByteBuffer decompress(ByteBuffer in) throws IOException {
+    int offset = computeOffset(in);
     ByteBuffer out = ByteBuffer.allocate
-      (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
-    int size = Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
+      (Snappy.uncompressedLength(in.array(), offset, in.remaining()-4));
+    int size = Snappy.uncompress(in.array(), offset, in.remaining()-4,
                                  out.array(), 0);
     out.limit(size);
 
@@ -79,5 +81,4 @@ class SnappyCodec extends Codec {
       return false;
     return true;
   }
-
 }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
index 6586818..92a742a 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
@@ -59,8 +59,9 @@ public class XZCodec extends Codec {
   @Override
   public ByteBuffer compress(ByteBuffer data) throws IOException {
     ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
-    OutputStream ios = new XZCompressorOutputStream(baos, compressionLevel);
-    writeAndClose(data, ios);
+    try (OutputStream outputStream = new XZCompressorOutputStream(baos, compressionLevel))
{
+      outputStream.write(data.array(), computeOffset(data), data.remaining());
+    }
     return ByteBuffer.wrap(baos.toByteArray());
   }
 
@@ -69,28 +70,15 @@ public class XZCodec extends Codec {
     ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
     InputStream bytesIn = new ByteArrayInputStream(
       data.array(),
-      data.arrayOffset() + data.position(),
+      computeOffset(data),
       data.remaining());
-    InputStream ios = new XZCompressorInputStream(bytesIn);
-    try {
+
+    try (InputStream ios = new XZCompressorInputStream(bytesIn)) {
       IOUtils.copy(ios, baos);
-    } finally {
-      ios.close();
     }
     return ByteBuffer.wrap(baos.toByteArray());
   }
 
-  private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException {
-    byte[] input = data.array();
-    int offset = data.arrayOffset() + data.position();
-    int length = data.remaining();
-    try {
-      to.write(input, offset, length);
-    } finally {
-      to.close();
-    }
-  }
-
   // get and initialize the output buffer for use.
   private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
     if (null == outputBuffer) {
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
index 4ec8433..deea4b8 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
@@ -46,10 +46,11 @@ public class ZstandardCodec extends Codec {
     }
 
     @Override
-    public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException {
-        ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
-        OutputStream outputStream = new ZstdCompressorOutputStream(baos);
-        writeAndClose(uncompressedData, outputStream);
+    public ByteBuffer compress(ByteBuffer data) throws IOException {
+        ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
+        try (OutputStream outputStream = new ZstdCompressorOutputStream(baos)) {
+           outputStream.write(data.array(), computeOffset(data), data.remaining());
+        }
         return ByteBuffer.wrap(baos.toByteArray());
     }
 
@@ -58,28 +59,14 @@ public class ZstandardCodec extends Codec {
         ByteArrayOutputStream baos = getOutputBuffer(compressedData.remaining());
         InputStream bytesIn = new ByteArrayInputStream(
           compressedData.array(),
-          compressedData.arrayOffset() + compressedData.position(),
+          computeOffset(compressedData),
           compressedData.remaining());
-        InputStream ios = new ZstdCompressorInputStream(bytesIn);
-        try {
-          IOUtils.copy(ios, baos);
-        } finally {
-          ios.close();
+        try (InputStream ios = new ZstdCompressorInputStream(bytesIn)) {
+            IOUtils.copy(ios, baos);
         }
         return ByteBuffer.wrap(baos.toByteArray());
     }
 
-    private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException {
-        byte[] input = data.array();
-        int offset = data.arrayOffset() + data.position();
-        int length = data.remaining();
-        try {
-          to.write(input, offset, length);
-        } finally {
-          to.close();
-        }
-      }
-
     // get and initialize the output buffer for use.
     private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
       if (outputBuffer == null) {


Mime
View raw message