rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE# 2330] Store the properties of MessageBatch (#2343)
Date Mon, 19 Oct 2020 00:49:05 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3f48c17  [ISSUE# 2330] Store the properties of MessageBatch (#2343)
3f48c17 is described below

commit 3f48c174343223ec0dc5258595c19c8291d5e21d
Author: AVP42 <36760766+AVP42@users.noreply.github.com>
AuthorDate: Mon Oct 19 08:48:45 2020 +0800

    [ISSUE# 2330] Store the properties of MessageBatch (#2343)
    
    * [ISSUE# 2330]store the properties of MessageBatch
    
    * [ISSUE# 2330]fix style problem
    
    * [ISSUE# 2330]Undo newly added property in the MessageBatch
    
    Co-authored-by: wufc <wufc@viomi.com>
---
 .../main/java/org/apache/rocketmq/store/CommitLog.java   | 16 +++++++++++++---
 .../org/apache/rocketmq/store/BatchPutMessageTest.java   | 10 +++++++++-
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b6d17da..d489e84 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1795,6 +1795,11 @@ public class CommitLog {
             ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
             ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
 
+            // properties from MessageExtBatch
+            String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
+            final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
+            final short batchPropLen = (short) batchPropData.length;
+
             while (messagesByteBuff.hasRemaining()) {
                 // 1 TOTALSIZE
                 messagesByteBuff.getInt();
@@ -1818,7 +1823,8 @@ public class CommitLog {
 
                 final int topicLength = topicData.length;
 
-                final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength,
propertiesLen);
+                final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength,
+                        propertiesLen + batchPropLen);
 
                 // Exceeds the maximum message
                 if (msgLen > this.maxMessageSize) {
@@ -1871,9 +1877,13 @@ public class CommitLog {
                 this.msgBatchMemory.put((byte) topicLength);
                 this.msgBatchMemory.put(topicData);
                 // 17 PROPERTIES
-                this.msgBatchMemory.putShort(propertiesLen);
-                if (propertiesLen > 0)
+                this.msgBatchMemory.putShort((short) (propertiesLen + batchPropLen));
+                if (propertiesLen > 0) {
                     this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
+                }
+                if (batchPropLen > 0) {
+                    this.msgBatchMemory.put(batchPropData, 0, batchPropLen);
+                }
             }
             msgBatchMemory.flip();
             return msgBatchMemory;
diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 8618dbb..b3a7c19 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -35,6 +35,7 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -80,6 +81,12 @@ public class BatchPutMessageTest {
 
     @Test
     public void testPutMessages() throws Exception {
+        String batchPropK = "extraKey";
+        String batchPropV = "extraValue";
+        Map<String, String> batchProp = new HashMap<>(1);
+        batchProp.put(batchPropK, batchPropV);
+        short batchPropLen = (short) messageProperties2String(batchProp).getBytes(MessageDecoder.CHARSET_UTF8).length;
+
         List<Message> messages = new ArrayList<>();
         String topic = "batch-write-topic";
         int queue = 0;
@@ -98,7 +105,7 @@ public class BatchPutMessageTest {
             short propertiesLength = (short) propertiesBytes.length;
             final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
             final int topicLength = topicData.length;
-            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength)
+ msgLengthArr[j - 1];
+            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength+batchPropLen)
+ msgLengthArr[j - 1];
             j++;
         }
         byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
@@ -106,6 +113,7 @@ public class BatchPutMessageTest {
         messageExtBatch.setTopic(topic);
         messageExtBatch.setQueueId(queue);
         messageExtBatch.setBody(batchMessageBody);
+        messageExtBatch.putUserProperty(batchPropK,batchPropV);
         messageExtBatch.setBornTimestamp(System.currentTimeMillis());
         messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 125));
         messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 126));


Mime
View raw message