activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1928 Fixing body conversion of LargeMessages to AMQP
Date Fri, 22 Jun 2018 19:48:45 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 532317cef -> 2c8b6b4ae


ARTEMIS-1928 Fixing body conversion of LargeMessages to AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/efd966d8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/efd966d8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/efd966d8

Branch: refs/heads/master
Commit: efd966d88d1a3ff4612c29729c8c717f075de943
Parents: 532317c
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Jun 22 15:37:42 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Jun 22 15:48:15 2018 -0400

----------------------------------------------------------------------
 .../amqp/converter/CoreAmqpConverter.java       |  9 ++---
 .../ProtocolsMessageLoadBalancingTest.java      | 37 ++++++++++++++++++--
 2 files changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/efd966d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 66c75a8..49372db 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -60,6 +60,7 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -381,17 +382,17 @@ public class CoreAmqpConverter {
          // will be unknown so we check for special cases of messages with special data
          // encoded into the server message body.
          ICoreMessage internalMessage = message.getInnerMessage();
-         int readerIndex = internalMessage.getBodyBuffer().readerIndex();
+
+         // this will represent a readOnly buffer for the message
+         ActiveMQBuffer buffer = internalMessage.getDataBuffer();
          try {
-            Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
+            Object s = buffer.readNullableSimpleString();
             if (s != null) {
                body = new AmqpValue(s.toString());
             }
          } catch (Throwable ignored) {
             logger.debug("Exception ignored during conversion", ignored.getMessage(), ignored);
             body = new AmqpValue("Conversion to AMQP error!");
-         } finally {
-            internalMessage.getBodyBuffer().readerIndex(readerIndex);
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/efd966d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
index ae41392..ae4f013 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
@@ -29,6 +29,12 @@ import java.util.Collection;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -392,9 +398,34 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase
{
       waitForBindings(1, "queues.0", 1, 1, false);
 
 
-      // sending Messages.. they should be load balanced
-      {
-         ConnectionFactory cf =  getJmsConnectionFactory(0);
+
+      if (protocol.equals("AMQP")) {
+
+
+         ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
+         locator.setMinLargeMessageSize(1024);
+         ClientSessionFactory coreFactory = locator.createSessionFactory();
+         ClientSession clientSession = coreFactory.createSession();
+         ClientProducer producer = clientSession.createProducer(queueName);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            ClientMessage message = clientSession.createMessage((byte)0, true);
+            StringBuffer stringbuffer = new StringBuffer();
+            stringbuffer.append("hello");
+            if (i % 3 == 0) {
+               // making 1/3 of the messages to be large message
+               for (int j = 0; j < 10 * 1024; j++) {
+                  stringbuffer.append(" ");
+               }
+            }
+            message.getBodyBuffer().writeUTF(stringbuffer.toString());
+            producer.send(message);
+         }
+         coreFactory.close();
+
+      } else {
+
+         // sending Messages.. they should be load balanced
+         ConnectionFactory cf = getJmsConnectionFactory(0);
          Connection cn = cf.createConnection();
          Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));


Mime
View raw message