This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 6b4d51b [ISSUE-59] Sending batch messages with RocketMQTemplate (#60)
6b4d51b is described below
commit 6b4d51b38e6b8d2ee51cc85b977970678f9fbd4a
Author: Kevin Wang <wiseking.wq@gmail.com>
AuthorDate: Wed Apr 24 19:55:51 2019 +0800
[ISSUE-59] Sending batch messages with RocketMQTemplate (#60)
[ISSUE-59] Sending batch messages with RocketMQTemplate
---
README.md | 1 +
README_zh_CN.md | 1 +
.../samples/springboot/ProducerApplication.java | 18 ++++++++++
.../rocketmq/spring/core/RocketMQTemplate.java | 40 ++++++++++++++++++++++
4 files changed, 60 insertions(+)
diff --git a/README.md b/README.md
index 780c5ce..5ee3363 100644
--- a/README.md
+++ b/README.md
@@ -29,6 +29,7 @@ We are always very happy to have contributions, whether for trivial cleanups
or
- [x] synchronous transmission
- [x] synchronous ordered transmission
+- [x] synchronous batch transmission
- [x] asynchronous transmission
- [x] asynchronous ordered transmission
- [x] orderly consume
diff --git a/README_zh_CN.md b/README_zh_CN.md
index b75a617..e5f075f 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -20,6 +20,7 @@
- [x] 同步发送
- [x] 同步顺序发送
+- [x] 同步批量发送
- [x] 异步发送
- [x] 异步顺序发送
- [x] 顺序消费
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index 465658a..9e49170 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -35,6 +35,8 @@ import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -93,10 +95,26 @@ public class ProducerApplication implements CommandLineRunner {
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag1");
+
+ // Send a batch of strings
+ testBatchMessages();
+
// Send transactional messages
testTransaction();
}
+ private void testBatchMessages() {
+ List<Message> msgs = new ArrayList<Message>();
+ for (int i = 0; i < 10; i++) {
+ msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
+ setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
+ }
+
+ SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
+
+ System.out.printf("--- Batch messages send result :" + sr);
+ }
+
private void testTransaction() throws MessagingException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 0803a7e..6b5d06c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -40,6 +40,9 @@ import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -124,6 +127,43 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String>
imp
}
/**
+ * syncSend batch messages in a given timeout.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
+ * @return {@link SendResult}
+ */
+ public SendResult syncSend(String destination, Collection<Message<?>> messages,
long timeout) {
+ if (Objects.isNull(messages) || messages.size() == 0) {
+ log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
+ throw new IllegalArgumentException("`messages` can not be empty");
+ }
+
+ try {
+ long now = System.currentTimeMillis();
+ Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+ org.apache.rocketmq.common.message.Message rocketMsg;
+ for (Message<?> msg:messages) {
+ if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
+ log.warn("Found a message empty in the batch, skip it");
+ continue;
+ }
+ rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination,
msg);
+ rmqMsgs.add(rocketMsg);
+ }
+
+ SendResult sendResult = producer.send(rmqMsgs, timeout);
+ long costTime = System.currentTimeMillis() - now;
+ log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ return sendResult;
+ } catch (Exception e) {
+ log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination,
messages.size());
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
|