rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinrongt...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #690] Support batch msgs in dledger mode (#2406)
Date Tue, 08 Dec 2020 08:59:21 GMT
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4730987  [ISSUE #690] Support batch msgs in dledger mode (#2406)
4730987 is described below

commit 4730987c9cc046c6db980a48aaed9eacc13c630d
Author: TerrellChen <39452477+TerrellChen@users.noreply.github.com>
AuthorDate: Tue Dec 8 16:58:59 2020 +0800

    [ISSUE #690] Support batch msgs in dledger mode (#2406)
    
    * implement issue-690
    
    * add unit test
    
    * fix version
    
    * fix wroteOffset;update version;polish
    
    * polish
    
    * fix wrong wroteOffset of AppendMessageResult
    
    * move serialization out of lock in async method
---
 store/pom.xml                                      |   2 +-
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 376 ++++++++++++++++++++-
 .../org/apache/rocketmq/store/StoreTestBase.java   |  68 +++-
 .../store/dledger/DLedgerCommitlogTest.java        | 122 ++++++-
 4 files changed, 544 insertions(+), 24 deletions(-)

diff --git a/store/pom.xml b/store/pom.xml
index 8f4b44a..bcb3e6a 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -31,7 +31,7 @@
         <dependency>
             <groupId>io.openmessaging.storage</groupId>
             <artifactId>dledger</artifactId>
-            <version>0.2.0</version>
+            <version>0.2.2</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.rocketmq</groupId>
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 24e0f69..9a6e7a7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -17,11 +17,13 @@
 package org.apache.rocketmq.store.dledger;
 
 import io.openmessaging.storage.dledger.AppendFuture;
+import io.openmessaging.storage.dledger.BatchAppendFuture;
 import io.openmessaging.storage.dledger.DLedgerConfig;
 import io.openmessaging.storage.dledger.DLedgerServer;
 import io.openmessaging.storage.dledger.entry.DLedgerEntry;
 import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
 import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
+import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
 import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
 import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
 import io.openmessaging.storage.dledger.store.file.MmapFile;
@@ -32,6 +34,8 @@ import java.net.Inet6Address;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.UtilAll;
@@ -74,6 +78,8 @@ public class DLedgerCommitLog extends CommitLog {
 
     private boolean isInrecoveringOldCommitlog = false;
 
+    private final StringBuilder msgIdBuilder = new StringBuilder();
+
     public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
         super(defaultMessageStore);
         dLedgerConfig = new DLedgerConfig();
@@ -507,7 +513,129 @@ public class DLedgerCommitLog extends CommitLog {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG)
== 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            boolean isFirstOffset = true;
+            long firstWroteOffset = 0;
+            for (long pos : dledgerFuture.getPositions()) {
+                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                if (isFirstOffset) {
+                    firstWroteOffset = wroteOffset;
+                    isFirstOffset = false;
+                }
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(),
wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }
+
+            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset,
encodeResult.totalMsgLen,
+                    msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+            DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset
+ msgNum);
+        } catch (Exception e) {
+            log.error("Put message error", e);
+            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus
+                    .UNKNOWN_ERROR));
+        } finally {
+            beginTimeInDledgerLock = 0;
+            putMessageLock.unlock();
+        }
+
+        if (elapsedTimeInLock > 500) {
+            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
+                    elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
+        }
+
+        PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
+        try {
+            AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
+            switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
+                case SUCCESS:
+                    putMessageStatus = PutMessageStatus.PUT_OK;
+                    break;
+                case INCONSISTENT_LEADER:
+                case NOT_LEADER:
+                case LEADER_NOT_READY:
+                case DISK_FULL:
+                    putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
+                    break;
+                case WAIT_QUORUM_ACK_TIMEOUT:
+                    //Do not return flush_slave_timeout to the client, for the ons client
will ignore it.
+                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+                    break;
+                case LEADER_PENDING_FULL:
+                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+                    break;
+            }
+        } catch (Throwable t) {
+            log.error("Failed to get dledger append result", t);
+        }
+
+        PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
+        if (putMessageStatus == PutMessageStatus.PUT_OK) {
+            // Statistics
+            storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum);
+            storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen);
+        }
+        return putMessageResult;
     }
 
     @Override
@@ -609,7 +737,125 @@ public class DLedgerCommitLog extends CommitLog {
 
     @Override
     public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch)
{
-        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null));
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null));
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null));
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        encodeResult = this.messageSerializer.serialize(messageExtBatch);
+        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
new AppendMessageResult(encodeResult
+                    .status)));
+        }
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY,
new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG)
== 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            boolean isFirstOffset = true;
+            long firstWroteOffset = 0;
+            for (long pos : dledgerFuture.getPositions()) {
+                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                if (isFirstOffset) {
+                    firstWroteOffset = wroteOffset;
+                    isFirstOffset = false;
+                }
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(),
wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }
+
+            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset,
encodeResult.totalMsgLen,
+                    msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+            DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset
+ msgNum);
+        } catch (Exception e) {
+            log.error("Put message error", e);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR,
new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        } finally {
+            beginTimeInDledgerLock = 0;
+            putMessageLock.unlock();
+        }
+
+        if (elapsedTimeInLock > 500) {
+            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
+                    elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
+        }
+
+        return dledgerFuture.thenApply(appendEntryResponse -> {
+            PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
+            switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
+                case SUCCESS:
+                    putMessageStatus = PutMessageStatus.PUT_OK;
+                    break;
+                case INCONSISTENT_LEADER:
+                case NOT_LEADER:
+                case LEADER_NOT_READY:
+                case DISK_FULL:
+                    putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
+                    break;
+                case WAIT_QUORUM_ACK_TIMEOUT:
+                    //Do not return flush_slave_timeout to the client, for the ons client
will ignore it.
+                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+                    break;
+                case LEADER_PENDING_FULL:
+                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+                    break;
+            }
+            PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
+            if (putMessageStatus == PutMessageStatus.PUT_OK) {
+                // Statistics
+                storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).incrementAndGet();
+                storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(appendResult.getWroteBytes());
+            }
+            return putMessageResult;
+        });
     }
 
     @Override
@@ -701,7 +947,9 @@ public class DLedgerCommitLog extends CommitLog {
     class EncodeResult {
         private String queueOffsetKey;
         private ByteBuffer data;
+        private List<byte[]> batchData;
         private AppendMessageStatus status;
+        private int totalMsgLen;
 
         public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey)
{
             this.data = data;
@@ -716,6 +964,13 @@ public class DLedgerCommitLog extends CommitLog {
         public byte[] getData() {
             return data.array();
         }
+
+        public EncodeResult(AppendMessageStatus status, String queueOffsetKey, List<byte[]>
batchData, int totalMsgLen) {
+            this.batchData = batchData;
+            this.status = status;
+            this.queueOffsetKey = queueOffsetKey;
+            this.totalMsgLen = totalMsgLen;
+        }
     }
 
     class MessageSerializer {
@@ -823,6 +1078,123 @@ public class DLedgerCommitLog extends CommitLog {
             return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key);
         }
 
+        public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
+            keyBuilder.setLength(0);
+            keyBuilder.append(messageExtBatch.getTopic());
+            keyBuilder.append('-');
+            keyBuilder.append(messageExtBatch.getQueueId());
+            String key = keyBuilder.toString();
+
+            Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
+            if (null == queueOffset) {
+                queueOffset = 0L;
+                DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
+            }
+
+            int totalMsgLen = 0;
+            ByteBuffer messagesByteBuff = messageExtBatch.wrap();
+            List<byte[]> batchBody = new LinkedList<>();
+
+            int sysFlag = messageExtBatch.getSysFlag();
+            int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4
+ 4 : 16 + 4;
+            int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG)
== 0 ? 4 + 4 : 16 + 4;
+            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
+            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
+
+            while (messagesByteBuff.hasRemaining()) {
+                // 1 TOTALSIZE
+                messagesByteBuff.getInt();
+                // 2 MAGICCODE
+                messagesByteBuff.getInt();
+                // 3 BODYCRC
+                messagesByteBuff.getInt();
+                // 4 FLAG
+                int flag = messagesByteBuff.getInt();
+                // 5 BODY
+                int bodyLen = messagesByteBuff.getInt();
+                int bodyPos = messagesByteBuff.position();
+                int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
+                messagesByteBuff.position(bodyPos + bodyLen);
+                // 6 properties
+                short propertiesLen = messagesByteBuff.getShort();
+                int propertiesPos = messagesByteBuff.position();
+                messagesByteBuff.position(propertiesPos + propertiesLen);
+
+                final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+
+                final int topicLength = topicData.length;
+
+                final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength,
propertiesLen);
+                ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
+
+                // Exceeds the maximum message
+                if (msgLen > this.maxMessageSize) {
+                    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen
+ ", msg body size: " +
+                            bodyLen
+                            + ", maxMessageSize: " + this.maxMessageSize);
+                    throw new RuntimeException("message size exceeded");
+                }
+
+                totalMsgLen += msgLen;
+                // Determines whether there is sufficient free space
+                if (totalMsgLen > maxMessageSize) {
+                    throw new RuntimeException("message size exceeded");
+                }
+
+                // Initialization of storage space
+                this.resetByteBuffer(msgStoreItemMemory, msgLen);
+                // 1 TOTALSIZE
+                msgStoreItemMemory.putInt(msgLen);
+                // 2 MAGICCODE
+                msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
+                // 3 BODYCRC
+                msgStoreItemMemory.putInt(bodyCrc);
+                // 4 QUEUEID
+                msgStoreItemMemory.putInt(messageExtBatch.getQueueId());
+                // 5 FLAG
+                msgStoreItemMemory.putInt(flag);
+                // 6 QUEUEOFFSET
+                msgStoreItemMemory.putLong(queueOffset++);
+                // 7 PHYSICALOFFSET
+                msgStoreItemMemory.putLong(0);
+                // 8 SYSFLAG
+                msgStoreItemMemory.putInt(messageExtBatch.getSysFlag());
+                // 9 BORNTIMESTAMP
+                msgStoreItemMemory.putLong(messageExtBatch.getBornTimestamp());
+                // 10 BORNHOST
+                resetByteBuffer(bornHostHolder, bornHostLength);
+                msgStoreItemMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
+                // 11 STORETIMESTAMP
+                msgStoreItemMemory.putLong(messageExtBatch.getStoreTimestamp());
+                // 12 STOREHOSTADDRESS
+                resetByteBuffer(storeHostHolder, storeHostLength);
+                msgStoreItemMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
+                // 13 RECONSUMETIMES
+                msgStoreItemMemory.putInt(messageExtBatch.getReconsumeTimes());
+                // 14 Prepared Transaction Offset
+                msgStoreItemMemory.putLong(0);
+                // 15 BODY
+                msgStoreItemMemory.putInt(bodyLen);
+                if (bodyLen > 0) {
+                    msgStoreItemMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
+                }
+                // 16 TOPIC
+                msgStoreItemMemory.put((byte) topicLength);
+                msgStoreItemMemory.put(topicData);
+                // 17 PROPERTIES
+                msgStoreItemMemory.putShort(propertiesLen);
+                if (propertiesLen > 0) {
+                    msgStoreItemMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
+                }
+                byte[] data = new byte[msgLen];
+                msgStoreItemMemory.clear();
+                msgStoreItemMemory.get(data);
+                batchBody.add(data);
+            }
+
+            return new EncodeResult(AppendMessageStatus.PUT_OK, key, batchBody, totalMsgLen);
+        }
+
         private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
             byteBuffer.flip();
             byteBuffer.limit(limit);
diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java
index a736754..5660de1 100644
--- a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java
@@ -16,17 +16,19 @@
  */
 package org.apache.rocketmq.store;
 
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.junit.After;
+
 import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.rocketmq.common.UtilAll;
-import org.junit.After;
 
 public class StoreTestBase {
 
@@ -44,6 +46,28 @@ public class StoreTestBase {
         return port.addAndGet(5);
     }
 
+    protected MessageExtBatch buildBatchMessage(int size) {
+        MessageExtBatch messageExtBatch = new MessageExtBatch();
+        messageExtBatch.setTopic("StoreTest");
+        messageExtBatch.setTags("TAG1");
+        messageExtBatch.setKeys("Hello");
+        messageExtBatch.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+        messageExtBatch.setSysFlag(0);
+
+        messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+        messageExtBatch.setBornHost(BornHost);
+        messageExtBatch.setStoreHost(StoreHost);
+
+        List<Message> messageList = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            messageList.add(buildMessage());
+        }
+
+        messageExtBatch.setBody(MessageDecoder.encodeMessages(messageList));
+
+        return messageExtBatch;
+    }
+
     protected MessageExtBrokerInner buildMessage() {
         MessageExtBrokerInner msg = new MessageExtBrokerInner();
         msg.setTopic("StoreTest");
@@ -59,6 +83,40 @@ public class StoreTestBase {
         return msg;
     }
 
+    protected MessageExtBatch buildIPv6HostBatchMessage(int size) {
+        MessageExtBatch messageExtBatch = new MessageExtBatch();
+        messageExtBatch.setTopic("StoreTest");
+        messageExtBatch.setTags("TAG1");
+        messageExtBatch.setKeys("Hello");
+        messageExtBatch.setBody(MessageBody);
+        messageExtBatch.setMsgId("24084004018081003FAA1DDE2B3F898A00002A9F0000000000000CA0");
+        messageExtBatch.setKeys(String.valueOf(System.currentTimeMillis()));
+        messageExtBatch.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+        messageExtBatch.setSysFlag(0);
+        messageExtBatch.setBornHostV6Flag();
+        messageExtBatch.setStoreHostAddressV6Flag();
+        messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+        try {
+            messageExtBatch.setBornHost(new InetSocketAddress(InetAddress.getByName("1050:0000:0000:0000:0005:0600:300c:326b"),
8123));
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+
+        try {
+            messageExtBatch.setStoreHost(new InetSocketAddress(InetAddress.getByName("::1"),
8123));
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+
+        List<Message> messageList = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            messageList.add(buildIPv6HostMessage());
+        }
+
+        messageExtBatch.setBody(MessageDecoder.encodeMessages(messageList));
+        return messageExtBatch;
+    }
+
     protected MessageExtBrokerInner buildIPv6HostMessage() {
         MessageExtBrokerInner msg = new MessageExtBrokerInner();
         msg.setTopic("StoreTest");
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index e31d834..8ab8a23 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -19,14 +19,17 @@ package org.apache.rocketmq.store.dledger;
 import io.openmessaging.storage.dledger.DLedgerServer;
 import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
 import io.openmessaging.storage.dledger.store.file.MmapFileList;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
@@ -41,7 +44,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
 
     @Test
     public void testTruncateCQ() throws Exception {
-        String base =  createBaseDir();
+        String base = createBaseDir();
         String peers = String.format("n0-localhost:%d", nextPort());
         String group = UUID.randomUUID().toString();
         String topic = UUID.randomUUID().toString();
@@ -94,10 +97,9 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
     }
 
 
-
     @Test
     public void testRecover() throws Exception {
-        String base =  createBaseDir();
+        String base = createBaseDir();
         String peers = String.format("n0-localhost:%d", nextPort());
         String group = UUID.randomUUID().toString();
         String topic = UUID.randomUUID().toString();
@@ -135,10 +137,9 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
     }
 
 
-
     @Test
     public void testPutAndGetMessage() throws Exception {
-        String base =  createBaseDir();
+        String base = createBaseDir();
         String peers = String.format("n0-localhost:%d", nextPort());
         String group = UUID.randomUUID().toString();
         DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers,
null, false, 0);
@@ -148,7 +149,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         List<PutMessageResult> results = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
             MessageExtBrokerInner msgInner =
-                i < 5 ? buildMessage() : buildIPv6HostMessage();
+                    i < 5 ? buildMessage() : buildIPv6HostMessage();
             msgInner.setTopic(topic);
             msgInner.setQueueId(0);
             PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
@@ -160,7 +161,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
         Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
         Assert.assertEquals(0, messageStore.dispatchBehindBytes());
-        GetMessageResult getMessageResult =  messageStore.getMessage("group", topic, 0, 0,
32, null);
+        GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0,
32, null);
         Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
 
         Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
@@ -178,8 +179,52 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
     }
 
     @Test
+    public void testBatchPutAndGetMessage() throws Exception {
+        String base = createBaseDir();
+        String peers = String.format("n0-localhost:%d", nextPort());
+        String group = UUID.randomUUID().toString();
+        DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers,
null, false, 0);
+        Thread.sleep(1000);
+        String topic = UUID.randomUUID().toString();
+        // should be less than 4
+        int batchMessageSize = 2;
+        int repeat = 10;
+        List<PutMessageResult> results = new ArrayList<>();
+        for (int i = 0; i < repeat; i++) {
+            MessageExtBatch messageExtBatch =
+                    i < repeat / 10 ? buildBatchMessage(batchMessageSize) : buildIPv6HostBatchMessage(batchMessageSize);
+            messageExtBatch.setTopic(topic);
+            messageExtBatch.setQueueId(0);
+            PutMessageResult putMessageResult = messageStore.putMessages(messageExtBatch);
+            results.add(putMessageResult);
+            Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+            Assert.assertEquals(i * batchMessageSize, putMessageResult.getAppendMessageResult().getLogicsOffset());
+        }
+        Thread.sleep(100);
+        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+        Assert.assertEquals(repeat * batchMessageSize, messageStore.getMaxOffsetInQueue(topic,
0));
+        Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+        GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0,
100, null);
+        Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
+
+        Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize,
getMessageResult.getMessageBufferList().size());
+        Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize,
getMessageResult.getMessageMapedList().size());
+        Assert.assertEquals(repeat * batchMessageSize, getMessageResult.getMaxOffset());
+
+        for (int i = 0; i < results.size(); i++) {
+            ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i * batchMessageSize);
+            MessageExt messageExt = MessageDecoder.decode(buffer);
+            Assert.assertEquals(i * batchMessageSize, messageExt.getQueueOffset());
+            Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId().split(",").length,
batchMessageSize);
+            Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(),
messageExt.getCommitLogOffset());
+        }
+        messageStore.destroy();
+        messageStore.shutdown();
+    }
+
+    @Test
     public void testAsyncPutAndGetMessage() throws Exception {
-        String base =  createBaseDir();
+        String base = createBaseDir();
         String peers = String.format("n0-localhost:%d", nextPort());
         String group = UUID.randomUUID().toString();
         DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers,
null, false, 0);
@@ -189,7 +234,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         List<PutMessageResult> results = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
             MessageExtBrokerInner msgInner =
-                i < 5 ? buildMessage() : buildIPv6HostMessage();
+                    i < 5 ? buildMessage() : buildIPv6HostMessage();
             msgInner.setTopic(topic);
             msgInner.setQueueId(0);
             CompletableFuture<PutMessageResult> futureResult = messageStore.asyncPutMessage(msgInner);
@@ -202,7 +247,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
         Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
         Assert.assertEquals(0, messageStore.dispatchBehindBytes());
-        GetMessageResult getMessageResult =  messageStore.getMessage("group", topic, 0, 0,
32, null);
+        GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0,
32, null);
         Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
 
         Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
@@ -219,15 +264,60 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         messageStore.shutdown();
     }
 
+    @Test
+    public void testAsyncBatchPutAndGetMessage() throws Exception {
+        String base = createBaseDir();
+        String peers = String.format("n0-localhost:%d", nextPort());
+        String group = UUID.randomUUID().toString();
+        DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers,
null, false, 0);
+        Thread.sleep(1000);
+        String topic = UUID.randomUUID().toString();
+        // should be less than 4
+        int batchMessageSize = 2;
+        int repeat = 10;
+
+        List<PutMessageResult> results = new ArrayList<>();
+        for (int i = 0; i < repeat; i++) {
+            MessageExtBatch messageExtBatch =
+                    i < 5 ? buildBatchMessage(batchMessageSize) : buildIPv6HostBatchMessage(batchMessageSize);
+            messageExtBatch.setTopic(topic);
+            messageExtBatch.setQueueId(0);
+            CompletableFuture<PutMessageResult> futureResult = messageStore.asyncPutMessages(messageExtBatch);
+            PutMessageResult putMessageResult = futureResult.get(3000, TimeUnit.MILLISECONDS);
+            results.add(putMessageResult);
+            Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+            Assert.assertEquals(i * batchMessageSize, putMessageResult.getAppendMessageResult().getLogicsOffset());
+        }
+        Thread.sleep(100);
+        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+        Assert.assertEquals(repeat * batchMessageSize, messageStore.getMaxOffsetInQueue(topic,
0));
+        Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+        GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0,
32, null);
+        Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
+
+        Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize,
getMessageResult.getMessageBufferList().size());
+        Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize,
getMessageResult.getMessageMapedList().size());
+        Assert.assertEquals(repeat * batchMessageSize, getMessageResult.getMaxOffset());
+
+        for (int i = 0; i < results.size(); i++) {
+            ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i * batchMessageSize);
+            MessageExt messageExt = MessageDecoder.decode(buffer);
+            Assert.assertEquals(i * batchMessageSize, messageExt.getQueueOffset());
+            Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId().split(",").length,
batchMessageSize);
+            Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(),
messageExt.getCommitLogOffset());
+        }
+        messageStore.destroy();
+        messageStore.shutdown();
+    }
 
     @Test
     public void testCommittedPos() throws Exception {
         String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
         String group = UUID.randomUUID().toString();
-        DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0",
peers, "n0", false, 0);
+        DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,
"n0", peers, "n0", false, 0);
 
         String topic = UUID.randomUUID().toString();
-        MessageExtBrokerInner msgInner =  buildMessage();
+        MessageExtBrokerInner msgInner = buildMessage();
         msgInner.setTopic(topic);
         msgInner.setQueueId(0);
         PutMessageResult putMessageResult = leaderStore.putMessage(msgInner);
@@ -239,7 +329,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
 
 
-        DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1",
peers, "n0", false, 0);
+        DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,
"n1", peers, "n0", false, 0);
         Thread.sleep(2000);
 
         Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
@@ -258,10 +348,10 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
     public void testIPv6HostMsgCommittedPos() throws Exception {
         String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
         String group = UUID.randomUUID().toString();
-        DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0",
peers, "n0", false, 0);
+        DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,
"n0", peers, "n0", false, 0);
 
         String topic = UUID.randomUUID().toString();
-        MessageExtBrokerInner msgInner =  buildIPv6HostMessage();
+        MessageExtBrokerInner msgInner = buildIPv6HostMessage();
         msgInner.setTopic(topic);
         msgInner.setQueueId(0);
         PutMessageResult putMessageResult = leaderStore.putMessage(msgInner);
@@ -273,7 +363,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
 
 
-        DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1",
peers, "n0", false, 0);
+        DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,
"n1", peers, "n0", false, 0);
         Thread.sleep(2000);
 
         Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));


Mime
View raw message