This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 08e5758 Support delay message in async send mode
new 733b866 [ISSUE #36]Support delay level configuration in async mode
08e5758 is described below
commit 08e575875bfc0844fa3d9997f3a9cba178466361
Author: ShannonDing <libya_003@163.com>
AuthorDate: Wed Apr 10 16:41:46 2019 +0800
Support delay message in async send mode
---
.../rocketmq/spring/core/RocketMQTemplate.java | 20 +++++++++++++++++---
1 file changed, 17 insertions(+), 3 deletions(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 2f29d7a..348962e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -245,16 +245,16 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String>
imp
Message<?> message = this.doConvert(payload, null, null);
return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
}
-
/**
- * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified
in addition.
+ * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay
level specified in addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
* @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
*/
- public void asyncSend(String destination, Message<?> message, SendCallback sendCallback,
long timeout) {
+ public void asyncSend(String destination, Message<?> message, SendCallback sendCallback,
long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be
null");
@@ -263,12 +263,26 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String>
imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
charset, destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
producer.send(rocketMsg, sendCallback, timeout);
} catch (Exception e) {
log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
+ /**
+ * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified
in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param sendCallback {@link SendCallback}
+ * @param timeout send timeout with millis
+ */
+ public void asyncSend(String destination, Message<?> message, SendCallback sendCallback,
long timeout) {
+ asyncSend(destination,message,sendCallback,timeout,0);
+ }
/**
* <p> Send message to broker asynchronously. asynchronous transmission is generally
used in response time sensitive
|