activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: AMQ-7001 - amqp jms transformer - always copy message if properties need to be modified, fix and long running test that is disabled
Date Tue, 26 Jun 2018 14:26:10 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 5ebee0ace -> ed8860000


AMQ-7001 - amqp jms transformer - always copy message if properties need to be modified, fix
and long running test that is disabled


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ed886000
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ed886000
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ed886000

Branch: refs/heads/master
Commit: ed8860000f6d9c0dcd71206309ba27a34db2c606
Parents: 5ebee0a
Author: gtully <gary.tully@gmail.com>
Authored: Tue Jun 26 15:25:52 2018 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Jun 26 15:25:52 2018 +0100

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSender.java     |  14 +-
 ...ireToAmqpConcurrentStoreAndDispatchTest.java | 182 +++++++++++++++++++
 2 files changed, 184 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ed886000/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 5ac95b2..17185a0 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -449,19 +449,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
                 ActiveMQMessage temp = null;
                 if (md.getMessage() != null) {
-
-                    // Topics can dispatch the same Message to more than one consumer
-                    // so we must copy to prevent concurrent read / write to the same
-                    // message object.
-                    if (md.getDestination().isTopic()) {
-                        synchronized (md.getMessage()) {
-                            temp = (ActiveMQMessage) md.getMessage().copy();
-                        }
-                    } else {
-                        temp = (ActiveMQMessage) md.getMessage();
-                    }
-
+                    temp = (ActiveMQMessage) md.getMessage();
                     if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) {
+                        temp = (ActiveMQMessage) md.getMessage().copy();
                         temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ed886000/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/OpenWireToAmqpConcurrentStoreAndDispatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/OpenWireToAmqpConcurrentStoreAndDispatchTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/OpenWireToAmqpConcurrentStoreAndDispatchTest.java
new file mode 100644
index 0000000..f8bf560
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/OpenWireToAmqpConcurrentStoreAndDispatchTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.activemq.store.kahadb.KahaDBStore.PROPERTY_CANCELED_TASK_MOD_METRIC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@RunWith(Parameterized.class)
+public class OpenWireToAmqpConcurrentStoreAndDispatchTest extends AmqpClientTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(OpenWireToAmqpConcurrentStoreAndDispatchTest.class);
+
+    private final String transformer;
+
+    @Parameters(name="Transformer->{0}")
+    public static Collection<Object[]> data() {
+        System.setProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "100");
+        return Arrays.asList(new Object[][] {
+                {"jms"}
+            });
+    }
+
+    public OpenWireToAmqpConcurrentStoreAndDispatchTest(String transformer) {
+        this.transformer = transformer;
+    }
+
+    @Override
+    protected String getAmqpTransformer() {
+        return transformer;
+    }
+
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseOpenWireConnector() {
+        return true;
+    }
+
+    @Test(timeout = 6000000)
+    @Ignore("takes more than 6 mins to complete but fails earlier without fix")
+    public void testNoErrorOnSend() throws Exception {
+
+        final int numIterations = 100;
+        int numConsumers = 3;
+        final int numProducers = 10;
+        final int numMessages = 2000;
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final AtomicInteger sent = new AtomicInteger();
+        final AtomicInteger received = new AtomicInteger();
+        final AtomicBoolean errorOnSend = new AtomicBoolean(false);
+
+        final AtomicInteger toSend = new AtomicInteger(numMessages);
+
+        final Random random = new Random();
+        for (int i=0; i<numIterations; i++) {
+            done.set(false);
+            sent.set(0);
+            received.set(0);
+            toSend.set(numMessages);
+
+            ExecutorService executorService = Executors.newCachedThreadPool();
+            for (int j = 0; j < numConsumers; j++) {
+                executorService.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        AmqpConnection connection = null;
+                        try {
+                            AmqpClient client = createAmqpClient();
+                            connection = trackConnection(client.connect());
+                            AmqpSession session = connection.createSession();
+
+                            AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(),
null, false, true);
+
+                            while (!done.get() && received.get() < numMessages)
{
+                                receiver.flow(1);
+                                AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+                                if (message != null) {
+                                    received.incrementAndGet();
+                                }
+                            }
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        } finally {
+                            if (connection != null) {
+                                connection.close();
+                            }
+                        }
+                    }
+                });
+            }
+
+            final byte[] payload = new byte[100];
+            for (int k = 0; k < numProducers; k++) {
+                executorService.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        Connection connection = null;
+                        try {
+                            ActiveMQConnectionFactory connectionFactory =
+                                    new ActiveMQConnectionFactory(brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
+                            connection = connectionFactory.createConnection();
+                            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                            MessageProducer producer = session.createProducer(new ActiveMQQueue(getTestName()));
+                            BytesMessage bytesMessage = session.createBytesMessage();
+                            bytesMessage.writeBytes(payload);
+                            bytesMessage.setStringProperty("PP", "VALUE");
+                            while (!done.get() && toSend.decrementAndGet() >=
0) {
+                                producer.send(bytesMessage);
+                                sent.incrementAndGet();
+                            }
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            errorOnSend.set(true);
+                        } finally {
+                            if (connection != null) {
+                                try {
+                                    connection.close();
+                                } catch (JMSException ignored) {}
+                            }
+                        }
+                    }
+                });
+            }
+
+            executorService.shutdown();
+            executorService.awaitTermination(30, TimeUnit.SECONDS);
+
+            done.set(true);
+            assertEquals("[" + i + "] sent all requested", numMessages, sent.get());
+            assertEquals("[" + i + "] got all sent", numMessages, received.get());
+            assertFalse("[" + i + "] no error on send", errorOnSend.get());
+        }
+    }
+}


Mime
View raw message