rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-spring] branch master updated: [ISSUE-59] Sending batch messages with RocketMQTemplate (#60)
Date Wed, 24 Apr 2019 11:55:55 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b4d51b  [ISSUE-59] Sending batch messages with RocketMQTemplate (#60)
6b4d51b is described below

commit 6b4d51b38e6b8d2ee51cc85b977970678f9fbd4a
Author: Kevin Wang <wiseking.wq@gmail.com>
AuthorDate: Wed Apr 24 19:55:51 2019 +0800

    [ISSUE-59] Sending batch messages with RocketMQTemplate (#60)
    
     [ISSUE-59] Sending batch messages with RocketMQTemplate
---
 README.md                                          |  1 +
 README_zh_CN.md                                    |  1 +
 .../samples/springboot/ProducerApplication.java    | 18 ++++++++++
 .../rocketmq/spring/core/RocketMQTemplate.java     | 40 ++++++++++++++++++++++
 4 files changed, 60 insertions(+)

diff --git a/README.md b/README.md
index 780c5ce..5ee3363 100644
--- a/README.md
+++ b/README.md
@@ -29,6 +29,7 @@ We are always very happy to have contributions, whether for trivial cleanups
or
 
 - [x] synchronous transmission
 - [x] synchronous ordered transmission
+- [x] synchronous batch transmission
 - [x] asynchronous transmission
 - [x] asynchronous ordered transmission
 - [x] orderly consume
diff --git a/README_zh_CN.md b/README_zh_CN.md
index b75a617..e5f075f 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -20,6 +20,7 @@
 
 - [x] 同步发送
 - [x] 同步顺序发送
+- [x] 同步批量发送
 - [x] 异步发送
 - [x] 异步顺序发送
 - [x] 顺序消费
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index 465658a..9e49170 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -35,6 +35,8 @@ import org.springframework.messaging.support.MessageBuilder;
 
 import javax.annotation.Resource;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -93,10 +95,26 @@ public class ProducerApplication implements CommandLineRunner {
         rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
         System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag1");
 
+
+        // Send a batch of strings
+        testBatchMessages();
+
         // Send transactional messages
         testTransaction();
     }
 
+    private void testBatchMessages() {
+        List<Message> msgs = new ArrayList<Message>();
+        for (int i = 0; i < 10; i++) {
+            msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
+                    setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
+        }
+
+        SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
+
+        System.out.printf("--- Batch messages send result :" + sr);
+    }
+
 
     private void testTransaction() throws MessagingException {
         String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 0803a7e..6b5d06c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -40,6 +40,9 @@ import org.springframework.messaging.core.MessagePostProcessor;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.util.Assert;
 import org.springframework.util.MimeTypeUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -124,6 +127,43 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String>
imp
     }
 
     /**
+     * syncSend batch messages in a given timeout.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages    Collection of {@link org.springframework.messaging.Message}
+     * @param timeout     send timeout with millis
+     * @return {@link SendResult}
+     */
+    public SendResult syncSend(String destination, Collection<Message<?>> messages,
long timeout) {
+        if (Objects.isNull(messages) || messages.size() == 0) {
+            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
+            throw new IllegalArgumentException("`messages` can not be empty");
+        }
+
+        try {
+            long now = System.currentTimeMillis();
+            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+            org.apache.rocketmq.common.message.Message rocketMsg;
+            for (Message<?> msg:messages) {
+                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
+                    log.warn("Found a message empty in the batch, skip it");
+                    continue;
+                }
+                rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination,
msg);
+                rmqMsgs.add(rocketMsg);
+            }
+
+            SendResult sendResult = producer.send(rmqMsgs, timeout);
+            long costTime = System.currentTimeMillis() - now;
+            log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            return sendResult;
+        } catch (Exception e) {
+            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination,
messages.size());
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    /**
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`


Mime
View raw message