rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-spring] branch master updated: Support delay message in async send mode
Date Wed, 17 Apr 2019 01:55:42 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 08e5758  Support delay message in async send mode
     new 733b866  [ISSUE #36]Support delay level configuration in async mode
08e5758 is described below

commit 08e575875bfc0844fa3d9997f3a9cba178466361
Author: ShannonDing <libya_003@163.com>
AuthorDate: Wed Apr 10 16:41:46 2019 +0800

    Support delay message in async send mode
---
 .../rocketmq/spring/core/RocketMQTemplate.java       | 20 +++++++++++++++++---
 1 file changed, 17 insertions(+), 3 deletions(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 2f29d7a..348962e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -245,16 +245,16 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String>
imp
         Message<?> message = this.doConvert(payload, null, null);
         return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
     }
-
     /**
-     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified
in addition.
+     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay
level specified in addition.
      *
      * @param destination  formats: `topicName:tags`
      * @param message      {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
      * @param timeout      send timeout with millis
+     * @param delayLevel   level for the delay message
      */
-    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback,
long timeout) {
+    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback,
long timeout, int delayLevel) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
             log.error("asyncSend failed. destination:{}, message is null ", destination);
             throw new IllegalArgumentException("`message` and `message.payload` cannot be
null");
@@ -263,12 +263,26 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String>
imp
         try {
             org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                 charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
             producer.send(rocketMsg, sendCallback, timeout);
         } catch (Exception e) {
             log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
     }
+    /**
+     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified
in addition.
+     *
+     * @param destination  formats: `topicName:tags`
+     * @param message      {@link org.springframework.messaging.Message}
+     * @param sendCallback {@link SendCallback}
+     * @param timeout      send timeout with millis
+     */
+    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback,
long timeout) {
+        asyncSend(destination,message,sendCallback,timeout,0);
+    }
 
     /**
      * <p> Send message to broker asynchronously. asynchronous transmission is generally
used in response time sensitive


Mime
View raw message