activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4182 - explicitly comporess/decompress byte messages, so we can avoid using finalize() to close streams
Date Wed, 23 Apr 2014 10:49:28 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk fad1dd0f1 -> 44bb9fbea


https://issues.apache.org/jira/browse/AMQ-4182 - explicitly comporess/decompress byte messages,
so we can avoid using finalize() to close streams


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/44bb9fbe
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/44bb9fbe
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/44bb9fbe

Branch: refs/heads/trunk
Commit: 44bb9fbeaec91ea4fd81bbc387706c45229185a7
Parents: fad1dd0
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Wed Apr 23 12:45:40 2014 +0200
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Wed Apr 23 12:46:01 2014 +0200

----------------------------------------------------------------------
 .../activemq/command/ActiveMQBytesMessage.java  | 189 +++++++++----------
 1 file changed, 87 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/44bb9fbe/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 923e0e1..65e1036 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -23,8 +23,10 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
 
 import javax.jms.BytesMessage;
@@ -124,22 +126,29 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements
BytesMessag
 
     @Override
     public void storeContent() {
-        try {
-            if (dataOut != null) {
+        if (dataOut != null) {
+            try {
                 dataOut.close();
                 ByteSequence bs = bytesOut.toByteSequence();
+                setContent(bs);
                 if (compressed) {
-                    int pos = bs.offset;
-                    ByteSequenceData.writeIntBig(bs, length);
-                    bs.offset = pos;
+                    doCompress();
+                }
+            } catch (IOException ioe) {
+                throw new RuntimeException(ioe.getMessage(), ioe);
+            } finally {
+                try {
+                    if (bytesOut != null) {
+                        bytesOut.close();
+                        bytesOut = null;
+                    }
+                    if (dataOut != null) {
+                        dataOut.close();
+                        dataOut = null;
+                    }
+                } catch (IOException ioe) {
                 }
-                setContent(bs);
-                bytesOut = null;
-                dataOut = null;
             }
-        } catch (IOException ioe) {
-            throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
-                                                                // RuntimeException
         }
     }
 
@@ -798,17 +807,23 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements
BytesMessag
     @Override
     public void reset() throws JMSException {
         storeContent();
-        this.bytesOut = null;
-        if (dataIn != null) {
-            try {
-                // Eagerly release potential Inflater memory buffers.
+        setReadOnlyBody(true);
+        try {
+            if (bytesOut != null) {
+                bytesOut.close();
+                bytesOut = null;
+            }
+            if (dataIn != null) {
                 dataIn.close();
-            } catch (Exception e) {
+                dataIn = null;
+            }
+            if (dataOut != null) {
+                dataOut.close();
+                dataOut = null;
             }
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
         }
-        this.dataIn = null;
-        this.dataOut = null;
-        setReadOnlyBody(true);
     }
 
     private void initializeWriting() throws JMSException {
@@ -816,47 +831,14 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements
BytesMessag
         if (this.dataOut == null) {
             this.bytesOut = new ByteArrayOutputStream();
             OutputStream os = bytesOut;
-            ActiveMQConnection connection = getConnection();
-            if (connection != null && connection.isUseCompression()) {
-                // keep track of the real length of the content if
-                // we are compressed.
-                try {
-                    os.write(new byte[4]);
-                } catch (IOException e) {
-                    throw JMSExceptionSupport.create(e);
-                }
-                length = 0;
-                compressed = true;
-                final Deflater deflater = new Deflater(Deflater.BEST_SPEED);
-                os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
-                    @Override
-                    public void write(byte[] arg0) throws IOException {
-                        length += arg0.length;
-                        out.write(arg0);
-                    }
-
-                    @Override
-                    public void write(byte[] arg0, int arg1, int arg2) throws IOException
{
-                        length += arg2;
-                        out.write(arg0, arg1, arg2);
-                    }
-
-                    @Override
-                    public void write(int arg0) throws IOException {
-                        length++;
-                        out.write(arg0);
-                    }
-
-                    @Override
-                    public void close() throws IOException {
-                        super.close();
-                        deflater.end();
-                    }
-                };
-            }
             this.dataOut = new DataOutputStream(os);
         }
 
+        ActiveMQConnection connection = getConnection();
+        if (connection != null && connection.isUseCompression()) {
+            compressed = true;
+        }
+
         restoreOldContent();
     }
 
@@ -867,21 +849,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements
BytesMessag
             try {
                 ByteSequence toRestore = this.content;
                 if (compressed) {
-                    InputStream is = new ByteArrayInputStream(toRestore);
-                    int length = 0;
-                    try {
-                        DataInputStream dis = new DataInputStream(is);
-                        length = dis.readInt();
-                        dis.close();
-                    } catch (IOException e) {
-                        throw JMSExceptionSupport.create(e);
-                    }
-                    is = new InflaterInputStream(is);
-                    DataInputStream input = new DataInputStream(is);
-
-                    byte[] buffer = new byte[length];
-                    input.readFully(buffer);
-                    toRestore = new ByteSequence(buffer);
+                    toRestore = new ByteSequence(decompress(this.content));
                 }
 
                 this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
@@ -903,26 +871,44 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements
BytesMessag
     private void initializeReading() throws JMSException {
         checkWriteOnlyBody();
         if (dataIn == null) {
+            try {
             ByteSequence data = getContent();
             if (data == null) {
                 data = new ByteSequence(new byte[] {}, 0, 0);
             }
             InputStream is = new ByteArrayInputStream(data);
             if (isCompressed()) {
-                // keep track of the real length of the content if
-                // we are compressed.
-                try {
-                    DataInputStream dis = new DataInputStream(is);
-                    length = dis.readInt();
-                    dis.close();
-                } catch (IOException e) {
-                    throw JMSExceptionSupport.create(e);
+                if (data.length != 0) {
+                    is = new ByteArrayInputStream(decompress(data));
                 }
-                is = new InflaterInputStream(is);
             } else {
                 length = data.getLength();
             }
+
             dataIn = new DataInputStream(is);
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+        }
+    }
+
+    protected byte[] decompress(ByteSequence dataSequence) throws IOException {
+        Inflater inflater = new Inflater();
+        ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
+        try {
+            length = ByteSequenceData.readIntBig(dataSequence);
+            dataSequence.offset = 0;
+            byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength());
+            inflater.setInput(data);
+            byte[] buffer = new byte[length];
+            int count = inflater.inflate(buffer);
+            decompressed.write(buffer, 0, count);
+            return decompressed.toByteArray();
+        } catch (Exception e) {
+            throw new IOException(e);
+        } finally {
+            inflater.end();
+            decompressed.close();
         }
     }
 
@@ -941,28 +927,27 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements
BytesMessag
     protected void doCompress() throws IOException {
         compressed = true;
         ByteSequence bytes = getContent();
-        int length = bytes.getLength();
-        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-        bytesOut.write(new byte[4]);
-        DeflaterOutputStream os = new DeflaterOutputStream(bytesOut);
-        DataOutputStream dataOut = new DataOutputStream(os);
-        dataOut.write(bytes.data, bytes.offset, bytes.length);
-        dataOut.flush();
-        dataOut.close();
-        bytes = bytesOut.toByteSequence();
-        ByteSequenceData.writeIntBig(bytes, length);
-        bytes.offset = 0;
-        setContent(bytes);
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        // Attempt to do eager close in case of compressed data which uses a
-        // wrapped InflaterInputStream.
-        if (dataIn != null) {
+        if (bytes != null) {
+            int length = bytes.getLength();
+            ByteArrayOutputStream compressed = new ByteArrayOutputStream();
+            compressed.write(new byte[4]);
+            Deflater deflater = new Deflater();
             try {
-                dataIn.close();
-            } catch(Exception e) {
+                deflater.setInput(bytes.data);
+                deflater.finish();
+                byte[] buffer = new byte[1024];
+                while (!deflater.finished()) {
+                    int count = deflater.deflate(buffer);
+                    compressed.write(buffer, 0, count);
+                }
+
+                bytes = compressed.toByteSequence();
+                ByteSequenceData.writeIntBig(bytes, length);
+                bytes.offset = 0;
+                setContent(bytes);
+            } finally {
+                deflater.end();
+                compressed.close();
             }
         }
     }


Mime
View raw message