rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #1846] Dledger model change into pipeline manner to improve performance (#1847)
Date Fri, 23 Oct 2020 03:55:54 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 845c5fd  [ISSUE #1846] Dledger model change into pipeline manner to improve performance
(#1847)
845c5fd is described below

commit 845c5fd61886aff99185a709ebf69d833a38b9c6
Author: rongtong <jinrongtong5@163.com>
AuthorDate: Fri Oct 23 11:55:31 2020 +0800

    [ISSUE #1846] Dledger model change into pipeline manner to improve performance (#1847)
    
    * enhancement(dledger):implement asyncPutMessage in dledger commitlog
    
    * enhancement(dledger):move serialization out of lock
    
    * fix(dledger):fix the issue that queueOffset is overwritten
    
    * fix(dledger):fix the issue that get wrong queueOffset
    
    * test(dledger):add dledgerCommitlog put messages async unit test
    
    * chore(dledger): fix the issue that cannot find symbol of variable SCHEDULE_TOPIC
---
 .../rocketmq/common/message/MessageDecoder.java    |   1 +
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 274 ++++++++++++++-------
 .../store/dledger/DLedgerCommitlogTest.java        |  44 ++++
 3 files changed, 231 insertions(+), 88 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index d048dde..7e86d84 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -42,6 +42,7 @@ public class MessageDecoder {
     public static final char NAME_VALUE_SEPARATOR = 1;
     public static final char PROPERTY_SEPARATOR = 2;
     public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
+    public static final int QUEUE_OFFSET_POSITION = 4 + 4 + 4 + 4 + 4;
     public static final int SYSFLAG_POSITION = 4 + 4 + 4 + 4 + 4 + 8 + 8;
 //    public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
 //        + 4 // 2 MAGICCODE
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 400ad78..24e0f69 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -28,6 +28,8 @@ import io.openmessaging.storage.dledger.store.file.MmapFile;
 import io.openmessaging.storage.dledger.store.file.MmapFileList;
 import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
 import io.openmessaging.storage.dledger.utils.DLedgerUtils;
+import java.net.Inet6Address;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
@@ -364,21 +366,14 @@ public class DLedgerCommitLog extends CommitLog {
         return beginTimeInDledgerLock;
     }
 
-    @Override
-    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+    private void setMessageInfo(MessageExtBrokerInner msg, int tranType) {
         // Set the storage time
         msg.setStoreTimestamp(System.currentTimeMillis());
         // Set the message body BODY CRC (consider the most appropriate setting
         // on the client)
         msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
 
-        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
-        String topic = msg.getTopic();
-        int queueId = msg.getQueueId();
-
         //should be consistent with the old version
-        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
             || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
             // Delay Delivery
@@ -387,8 +382,9 @@ public class DLedgerCommitLog extends CommitLog {
                     msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                 }
 
-                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
-                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
+
+                String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
+                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 
                 // Backup real topic, queueId
                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
@@ -400,6 +396,25 @@ public class DLedgerCommitLog extends CommitLog {
             }
         }
 
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            msg.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            msg.setStoreHostAddressV6Flag();
+        }
+    }
+
+    @Override
+    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
+        String topic = msg.getTopic();
+        setMessageInfo(msg,tranType);
+
         // Back to Results
         AppendMessageResult appendResult;
         AppendFuture<AppendEntryResponse> dledgerFuture;
@@ -411,14 +426,15 @@ public class DLedgerCommitLog extends CommitLog {
         try {
             beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
             encodeResult = this.messageSerializer.serialize(msg);
-            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
+            encodeResult.setQueueOffsetKey(queueOffset);
             if (encodeResult.status != AppendMessageStatus.PUT_OK) {
                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
             }
             AppendEntryRequest request = new AppendEntryRequest();
             request.setGroup(dLedgerConfig.getGroup());
             request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
-            request.setBody(encodeResult.data);
+            request.setBody(encodeResult.getData());
             dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
             if (dledgerFuture.getPos() == -1) {
                 return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
@@ -430,7 +446,7 @@ public class DLedgerCommitLog extends CommitLog {
 
             String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(),
wroteOffset);
             elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
-            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset,
encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset,
encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
             switch (tranType) {
                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
@@ -496,12 +512,104 @@ public class DLedgerCommitLog extends CommitLog {
 
     @Override
     public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
msg) {
-        return CompletableFuture.completedFuture(this.putMessage(msg));
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
+
+        setMessageInfo(msg, tranType);
+
+        final String finalTopic = msg.getTopic();
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        AppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        encodeResult = this.messageSerializer.serialize(msg);
+        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
new AppendMessageResult(encodeResult.status)));
+        }
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        long elapsedTimeInLock;
+        long queueOffset;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
+            encodeResult.setQueueOffsetKey(queueOffset);
+            AppendEntryRequest request = new AppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBody(encodeResult.getData());
+            dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY,
new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+            }
+            long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
+
+            int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG)
== 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(),
wroteOffset);
+            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset,
encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+            switch (tranType) {
+                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                    break;
+                case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                    // The next update ConsumeQueue information
+                    DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey,
queueOffset + 1);
+                    break;
+                default:
+                    break;
+            }
+        } catch (Exception e) {
+            log.error("Put message error", e);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR,
new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        } finally {
+            beginTimeInDledgerLock = 0;
+            putMessageLock.unlock();
+        }
+
+        if (elapsedTimeInLock > 500) {
+            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
elapsedTimeInLock, msg.getBody().length, appendResult);
+        }
+
+        return dledgerFuture.thenApply(appendEntryResponse -> {
+            PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
+            switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
+                case SUCCESS:
+                    putMessageStatus = PutMessageStatus.PUT_OK;
+                    break;
+                case INCONSISTENT_LEADER:
+                case NOT_LEADER:
+                case LEADER_NOT_READY:
+                case DISK_FULL:
+                    putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
+                    break;
+                case WAIT_QUORUM_ACK_TIMEOUT:
+                    //Do not return flush_slave_timeout to the client, for the ons client
will ignore it.
+                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+                    break;
+                case LEADER_PENDING_FULL:
+                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+                    break;
+            }
+            PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
+            if (putMessageStatus == PutMessageStatus.PUT_OK) {
+                // Statistics
+                storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).incrementAndGet();
+                storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).addAndGet(appendResult.getWroteBytes());
+            }
+            return putMessageResult;
+        });
     }
 
     @Override
     public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch)
{
-        return CompletableFuture.completedFuture(putMessages(messageExtBatch));
+        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null));
     }
 
     @Override
@@ -566,51 +674,69 @@ public class DLedgerCommitLog extends CommitLog {
         return diff;
     }
 
+    private long getQueueOffsetByKey(String key, int tranType) {
+        Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
+        if (null == queueOffset) {
+            queueOffset = 0L;
+            DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
+        }
+
+        // Transaction messages that require special handling
+        switch (tranType) {
+            // Prepared and Rollback message is not consumed, will not enter the
+            // consumer queuec
+            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                queueOffset = 0L;
+                break;
+            case MessageSysFlag.TRANSACTION_NOT_TYPE:
+            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+            default:
+                break;
+        }
+        return queueOffset;
+    }
+
+
     class EncodeResult {
         private String queueOffsetKey;
-        private byte[] data;
+        private ByteBuffer data;
         private AppendMessageStatus status;
 
-        public EncodeResult(AppendMessageStatus status, byte[] data, String queueOffsetKey)
{
+        public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey)
{
             this.data = data;
             this.status = status;
             this.queueOffsetKey = queueOffsetKey;
         }
+
+        public void setQueueOffsetKey(long offset) {
+            data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
+        }
+
+        public byte[] getData() {
+            return data.array();
+        }
     }
 
     class MessageSerializer {
-        // File at the end of the minimum fixed length empty
-        private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
-        private final ByteBuffer msgIdMemory;
-        private final ByteBuffer msgIdV6Memory;
-        // Store the message content
-        private final ByteBuffer msgStoreItemMemory;
+
         // The maximum length of the message
         private final int maxMessageSize;
         // Build Message Key
         private final StringBuilder keyBuilder = new StringBuilder();
 
-        private final StringBuilder msgIdBuilder = new StringBuilder();
-
-//        private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
-
         MessageSerializer(final int size) {
-            this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
-            this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
-            this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
             this.maxMessageSize = size;
         }
 
-        public ByteBuffer getMsgStoreItemMemory() {
-            return msgStoreItemMemory;
-        }
-
         public EncodeResult serialize(final MessageExtBrokerInner msgInner) {
             // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
 
             // PHY OFFSET
             long wroteOffset = 0;
 
+            long queueOffset = 0;
+
             int sysflag = msgInner.getSysFlag();
 
             int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4
+ 4 : 16 + 4;
@@ -618,33 +744,7 @@ public class DLedgerCommitLog extends CommitLog {
             ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
             ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
 
-            // Record ConsumeQueue information
-            keyBuilder.setLength(0);
-            keyBuilder.append(msgInner.getTopic());
-            keyBuilder.append('-');
-            keyBuilder.append(msgInner.getQueueId());
-            String key = keyBuilder.toString();
-
-            Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
-            if (null == queueOffset) {
-                queueOffset = 0L;
-                DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
-            }
-
-            // Transaction messages that require special handling
-            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
-            switch (tranType) {
-                // Prepared and Rollback message is not consumed, will not enter the
-                // consumer queuec
-                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
-                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
-                    queueOffset = 0L;
-                    break;
-                case MessageSysFlag.TRANSACTION_NOT_TYPE:
-                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
-                default:
-                    break;
-            }
+            String key = msgInner.getTopic() + "-" + msgInner.getQueueId();
 
             /**
              * Serialize message
@@ -666,6 +766,8 @@ public class DLedgerCommitLog extends CommitLog {
 
             final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength,
propertiesLength);
 
+            ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
+
             // Exceeds the maximum message
             if (msgLen > this.maxMessageSize) {
                 DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen
+ ", msg body size: " + bodyLength
@@ -675,60 +777,56 @@ public class DLedgerCommitLog extends CommitLog {
             // Initialization of storage space
             this.resetByteBuffer(msgStoreItemMemory, msgLen);
             // 1 TOTALSIZE
-            this.msgStoreItemMemory.putInt(msgLen);
+            msgStoreItemMemory.putInt(msgLen);
             // 2 MAGICCODE
-            this.msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
+            msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
             // 3 BODYCRC
-            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
+            msgStoreItemMemory.putInt(msgInner.getBodyCRC());
             // 4 QUEUEID
-            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
+            msgStoreItemMemory.putInt(msgInner.getQueueId());
             // 5 FLAG
-            this.msgStoreItemMemory.putInt(msgInner.getFlag());
+            msgStoreItemMemory.putInt(msgInner.getFlag());
             // 6 QUEUEOFFSET
-            this.msgStoreItemMemory.putLong(queueOffset);
+            msgStoreItemMemory.putLong(queueOffset);
             // 7 PHYSICALOFFSET
-            this.msgStoreItemMemory.putLong(wroteOffset);
+            msgStoreItemMemory.putLong(wroteOffset);
             // 8 SYSFLAG
-            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
+            msgStoreItemMemory.putInt(msgInner.getSysFlag());
             // 9 BORNTIMESTAMP
-            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+            msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
             // 10 BORNHOST
-            this.resetByteBuffer(bornHostHolder, bornHostLength);
-            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
+            resetByteBuffer(bornHostHolder, bornHostLength);
+            msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
             // 11 STORETIMESTAMP
-            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+            msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
             // 12 STOREHOSTADDRESS
-            this.resetByteBuffer(storeHostHolder, storeHostLength);
-            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
+            resetByteBuffer(storeHostHolder, storeHostLength);
+            msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
             //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
             // 13 RECONSUMETIMES
-            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+            msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
             // 14 Prepared Transaction Offset
-            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+            msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
             // 15 BODY
-            this.msgStoreItemMemory.putInt(bodyLength);
+            msgStoreItemMemory.putInt(bodyLength);
             if (bodyLength > 0) {
-                this.msgStoreItemMemory.put(msgInner.getBody());
+                msgStoreItemMemory.put(msgInner.getBody());
             }
             // 16 TOPIC
-            this.msgStoreItemMemory.put((byte) topicLength);
-            this.msgStoreItemMemory.put(topicData);
+            msgStoreItemMemory.put((byte) topicLength);
+            msgStoreItemMemory.put(topicData);
             // 17 PROPERTIES
-            this.msgStoreItemMemory.putShort((short) propertiesLength);
+            msgStoreItemMemory.putShort((short) propertiesLength);
             if (propertiesLength > 0) {
-                this.msgStoreItemMemory.put(propertiesData);
+                msgStoreItemMemory.put(propertiesData);
             }
-            byte[] data = new byte[msgLen];
-            this.msgStoreItemMemory.clear();
-            this.msgStoreItemMemory.get(data);
-            return new EncodeResult(AppendMessageStatus.PUT_OK, data, key);
+            return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key);
         }
 
         private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
             byteBuffer.flip();
             byteBuffer.limit(limit);
         }
-
     }
 
     public static class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult
{
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index f0b9205..e31d834 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -175,6 +177,48 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
         messageStore.shutdown();
     }
 
+    @Test
+    public void testAsyncPutAndGetMessage() throws Exception {
+        String base =  createBaseDir();
+        String peers = String.format("n0-localhost:%d", nextPort());
+        String group = UUID.randomUUID().toString();
+        DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers,
null, false, 0);
+        Thread.sleep(1000);
+        String topic = UUID.randomUUID().toString();
+
+        List<PutMessageResult> results = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            MessageExtBrokerInner msgInner =
+                i < 5 ? buildMessage() : buildIPv6HostMessage();
+            msgInner.setTopic(topic);
+            msgInner.setQueueId(0);
+            CompletableFuture<PutMessageResult> futureResult = messageStore.asyncPutMessage(msgInner);
+            PutMessageResult putMessageResult = futureResult.get(3000, TimeUnit.MILLISECONDS);
+            results.add(putMessageResult);
+            Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+            Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset());
+        }
+        Thread.sleep(100);
+        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+        Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
+        Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+        GetMessageResult getMessageResult =  messageStore.getMessage("group", topic, 0, 0,
32, null);
+        Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
+
+        Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
+        Assert.assertEquals(10, getMessageResult.getMessageMapedList().size());
+
+        for (int i = 0; i < results.size(); i++) {
+            ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i);
+            MessageExt messageExt = MessageDecoder.decode(buffer);
+            Assert.assertEquals(i, messageExt.getQueueOffset());
+            Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId());
+            Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(),
messageExt.getCommitLogOffset());
+        }
+        messageStore.destroy();
+        messageStore.shutdown();
+    }
+
 
     @Test
     public void testCommittedPos() throws Exception {


Mime
View raw message