activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-821 Add support for scheduled message for STOMP
Date Tue, 06 Jun 2017 20:02:07 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 34362e980 -> dc9cee2f7


ARTEMIS-821 Add support for scheduled message for STOMP

Adds headers AMQ_SCHEDULED_DELAY and AMQ_SCHEDULED_TIME to STOMP
protocol handling to allow for delayed and scheduled time of a
message.  The AMQ_SCHEDULED_DELAY brings forward the same option
from the 5.x broker and the AMQ_SCHEDULED_TIME option adds a fixed
time of delivery alternative to match that of AMQP and others.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a1fb897b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a1fb897b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a1fb897b

Branch: refs/heads/master
Commit: a1fb897b437d48aa98935e198c726a5b1eece959
Parents: 34362e9
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Jun 6 11:20:44 2017 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jun 6 11:20:44 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/protocol/stomp/Stomp.java      |   8 ++
 .../artemis/core/protocol/stomp/StompUtils.java |  19 +++
 .../tests/integration/stomp/StompTest.java      | 119 +++++++++++++++++--
 3 files changed, 139 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1fb897b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index 89c14e7..f1000cc 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -98,6 +98,14 @@ public interface Stomp {
          String TYPE = "type";
 
          String PERSISTENT = "persistent";
+
+         // Extensions
+
+         // ActiveMQ 5.x Scheduled Message Compatibility.
+         String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY";
+
+         // Provides a hard time of delivery option (Epoch based)
+         String AMQ_SCHEDULED_TIME = "AMQ_SCHEDULED_TIME";
       }
 
       interface Message {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1fb897b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
index b05058b..ae6bcb9 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp;
 
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -69,6 +71,23 @@ public class StompUtils {
          msg.setExpiration(Long.parseLong(expiration));
       }
 
+      // Extension headers
+      String scheduledDelay = headers.remove(Stomp.Headers.Send.AMQ_SCHEDULED_DELAY);
+      if (scheduledDelay != null) {
+         long delay = Long.parseLong(scheduledDelay);
+         if (delay > 0) {
+            msg.putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() +
delay);
+         }
+      }
+
+      String scheduledTime = headers.remove(Stomp.Headers.Send.AMQ_SCHEDULED_TIME);
+      if (scheduledTime != null) {
+         long deliveryTime = Long.parseLong(scheduledTime);
+         if (deliveryTime > 0) {
+            msg.putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, deliveryTime);
+         }
+      }
+
       // now the general headers
       for (Entry<String, String> entry : headers.entrySet()) {
          String name = entry.getKey();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1fb897b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 40dbd95..f023cfb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.stomp;
 
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
 import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
@@ -32,7 +26,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -48,7 +50,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -545,6 +546,110 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testSendMessageWithDelay() throws Exception {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      conn.connect(defUser, defPass);
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix()
+ getQueueName())
+                                   .addHeader("foo", "abc")
+                                   .addHeader("bar", "123")
+                                   .addHeader("correlation-id", "c123")
+                                   .addHeader("persistent", "true")
+                                   .addHeader("type", "t345")
+                                   .addHeader("JMSXGroupID", "abc")
+                                   .addHeader("priority", "3")
+                                   .addHeader("AMQ_SCHEDULED_DELAY", "2000")
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
+
+      assertNull("Should not receive message yet", consumer.receive(1000));
+
+      TextMessage message = (TextMessage) consumer.receive(4000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+      Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+      Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+      Assert.assertEquals(javax.jms.DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+      Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+   }
+
+   @Test
+   public void testSendMessageWithDeliveryTime() throws Exception {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      conn.connect(defUser, defPass);
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix()
+ getQueueName())
+                                   .addHeader("foo", "abc")
+                                   .addHeader("bar", "123")
+                                   .addHeader("correlation-id", "c123")
+                                   .addHeader("persistent", "true")
+                                   .addHeader("type", "t345")
+                                   .addHeader("JMSXGroupID", "abc")
+                                   .addHeader("priority", "3")
+                                   .addHeader("AMQ_SCHEDULED_TIME", Long.toString(System.currentTimeMillis()
+ 2000))
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
+
+      assertNull("Should not receive message yet", consumer.receive(1000));
+
+      TextMessage message = (TextMessage) consumer.receive(4000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+      Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+      Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+      Assert.assertEquals(javax.jms.DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+      Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+   }
+
+   @Test
+   public void testSendMessageWithDelayWithBadValue() throws Exception {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      conn.connect(defUser, defPass);
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix()
+ getQueueName())
+                                   .addHeader("AMQ_SCHEDULED_DELAY", "foo")
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
+
+      assertNull("Should not receive message yet", consumer.receive(1000));
+
+      ClientStompFrame error = conn.receiveFrame();
+
+      Assert.assertNotNull(error);
+      Assert.assertTrue(error.getCommand().equals("ERROR"));
+   }
+
+   @Test
+   public void testSendMessageWithDeliveryTimeWithBadValue() throws Exception {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      conn.connect(defUser, defPass);
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix()
+ getQueueName())
+                                   .addHeader("AMQ_SCHEDULED_TIME", "foo")
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
+
+      assertNull("Should not receive message yet", consumer.receive(1000));
+
+      ClientStompFrame error = conn.receiveFrame();
+
+      Assert.assertNotNull(error);
+      Assert.assertTrue(error.getCommand().equals("ERROR"));
+   }
+
+   @Test
    public void testSubscribeWithAutoAck() throws Exception {
       conn.connect(defUser, defPass);
 


Mime
View raw message