rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huzongt...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #2883] [Part E] Improve produce performance in M/S mode. (#2889)
Date Wed, 28 Jul 2021 06:50:05 GMT
This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0325772  [ISSUE #2883] [Part E] Improve produce performance in M/S mode.  (#2889)
0325772 is described below

commit 032577255ecd543908446aa272668c9f882bcce5
Author: huangli <areyouok@gmail.com>
AuthorDate: Wed Jul 28 14:49:45 2021 +0800

    [ISSUE #2883] [Part E] Improve produce performance in M/S mode.  (#2889)
    
    * Remove putMessage/putMessages method in CommitLog which has too many duplicated code.
    
    * Optimise performance of asyncPutMessage (extract some code out of putMessage lock)
    
    * extract generation of msgId out of lock in CommitLog (now only for single message processor)
    
    * extract generation of topicQueueTable key out of sync code
    
    * extract generation of msgId out of lock in CommitLog (for batch)
    
    * fix ipv6 problem introduced in commit "Optimise performance of asyncPutMessage (extract some code out of putMessage lock)"
---
 .../rocketmq/store/AppendMessageCallback.java      |   5 +-
 .../apache/rocketmq/store/AppendMessageResult.java |  17 +
 .../java/org/apache/rocketmq/store/CommitLog.java  | 760 ++++++++-------------
 .../apache/rocketmq/store/DefaultMessageStore.java |  55 +-
 .../java/org/apache/rocketmq/store/MappedFile.java |  20 +-
 .../rocketmq/store/MessageExtBrokerInner.java      |  12 +
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 232 -------
 .../apache/rocketmq/store/AppendCallbackTest.java  |  28 +-
 8 files changed, 361 insertions(+), 768 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
index d337638..5499c90 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store;
 
 import java.nio.ByteBuffer;
 import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
 
 /**
  * Write messages callback interface
@@ -30,7 +31,7 @@ public interface AppendMessageCallback {
      * @return How many bytes to write
      */
     AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
-        final int maxBlank, final MessageExtBrokerInner msg);
+        final int maxBlank, final MessageExtBrokerInner msg, PutMessageContext putMessageContext);
 
     /**
      * After batched message serialization, write MapedByteBuffer
@@ -39,5 +40,5 @@ public interface AppendMessageCallback {
      * @return How many bytes to write
      */
     AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
-        final int maxBlank, final MessageExtBatch messageExtBatch);
+        final int maxBlank, final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext);
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
index d6d1aa6..de3c03b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.store;
 
+import java.util.function.Supplier;
+
 /**
  * When write a message to the commit log, returns results
  */
@@ -28,6 +30,7 @@ public class AppendMessageResult {
     private int wroteBytes;
     // Message ID
     private String msgId;
+    private Supplier<String> msgIdSupplier;
     // Message storage timestamp
     private long storeTimestamp;
     // Consume queue's offset(step by one)
@@ -51,6 +54,17 @@ public class AppendMessageResult {
         this.pagecacheRT = pagecacheRT;
     }
 
+    public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, Supplier<String> msgIdSupplier,
+            long storeTimestamp, long logicsOffset, long pagecacheRT) {
+        this.status = status;
+        this.wroteOffset = wroteOffset;
+        this.wroteBytes = wroteBytes;
+        this.msgIdSupplier = msgIdSupplier;
+        this.storeTimestamp = storeTimestamp;
+        this.logicsOffset = logicsOffset;
+        this.pagecacheRT = pagecacheRT;
+    }
+
     public long getPagecacheRT() {
         return pagecacheRT;
     }
@@ -88,6 +102,9 @@ public class AppendMessageResult {
     }
 
     public String getMsgId() {
+        if (msgId == null && msgIdSupplier != null) {
+            msgId = msgIdSupplier.get();
+        }
         return msgId;
     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 57fa363..5e92654 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -16,17 +16,18 @@
  */
 package org.apache.rocketmq.store;
 
+import java.net.Inet4Address;
 import java.net.Inet6Address;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
@@ -62,7 +63,7 @@ public class CommitLog {
     private final FlushCommitLogService commitLogService;
 
     private final AppendMessageCallback appendMessageCallback;
-    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
+    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
     protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
     protected volatile long confirmOffset = -1L;
 
@@ -84,10 +85,10 @@ public class CommitLog {
         this.commitLogService = new CommitRealTimeService();
 
         this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
-        batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
+        putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
             @Override
-            protected MessageExtBatchEncoder initialValue() {
-                return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+            protected PutMessageThreadLocal initialValue() {
+                return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
             }
         };
         this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -555,6 +556,14 @@ public class CommitLog {
         return beginTimeInLock;
     }
 
+    private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {
+        keyBuilder.setLength(0);
+        keyBuilder.append(messageExt.getTopic());
+        keyBuilder.append('-');
+        keyBuilder.append(messageExt.getQueueId());
+        return keyBuilder.toString();
+    }
+
     public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
         // Set the storage time
         msg.setStoreTimestamp(System.currentTimeMillis());
@@ -591,12 +600,30 @@ public class CommitLog {
             }
         }
 
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            msg.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            msg.setStoreHostAddressV6Flag();
+        }
+
+        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
+        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
+        if (encodeResult != null) {
+            return CompletableFuture.completedFuture(encodeResult);
+        }
+        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
+        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
+
         long elapsedTimeInLock = 0;
         MappedFile unlockMappedFile = null;
-        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 
         putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
         try {
+            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
             long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
             this.beginTimeInLock = beginLockTimestamp;
 
@@ -613,7 +640,7 @@ public class CommitLog {
                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
             }
 
-            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
             switch (result.getStatus()) {
                 case PUT_OK:
                     break;
@@ -627,7 +654,7 @@ public class CommitLog {
                         beginTimeInLock = 0;
                         return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                     }
-                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                     break;
                 case MESSAGE_SIZE_EXCEEDED:
                 case PROPERTIES_SIZE_EXCEEDED:
@@ -693,14 +720,26 @@ public class CommitLog {
             return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
         }
 
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
         long elapsedTimeInLock = 0;
         MappedFile unlockMappedFile = null;
         MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 
         //fine-grained lock instead of the coarse-grained
-        MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
+        PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
+        MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
 
         putMessageLock.lock();
         try {
@@ -720,7 +759,7 @@ public class CommitLog {
                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
             }
 
-            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
+            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
             switch (result.getStatus()) {
                 case PUT_OK:
                     break;
@@ -734,7 +773,7 @@ public class CommitLog {
                         beginTimeInLock = 0;
                         return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                     }
-                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
+                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
                     break;
                 case MESSAGE_SIZE_EXCEEDED:
                 case PROPERTIES_SIZE_EXCEEDED:
@@ -784,129 +823,6 @@ public class CommitLog {
 
     }
 
-    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
-        // Set the storage time
-        msg.setStoreTimestamp(System.currentTimeMillis());
-        // Set the message body BODY CRC (consider the most appropriate setting
-        // on the client)
-        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
-        // Back to Results
-        AppendMessageResult result = null;
-
-        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
-        String topic = msg.getTopic();
-        int queueId = msg.getQueueId();
-
-        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
-        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
-            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
-            // Delay Delivery
-            if (msg.getDelayTimeLevel() > 0) {
-                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
-                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
-                }
-
-                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
-                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
-
-                // Backup real topic, queueId
-                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
-                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
-                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
-
-                msg.setTopic(topic);
-                msg.setQueueId(queueId);
-            }
-        }
-
-        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
-        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
-            msg.setBornHostV6Flag();
-        }
-
-        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
-        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
-            msg.setStoreHostAddressV6Flag();
-        }
-
-        long elapsedTimeInLock = 0;
-
-        MappedFile unlockMappedFile = null;
-        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
-
-        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
-        try {
-            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
-            this.beginTimeInLock = beginLockTimestamp;
-
-            // Here settings are stored timestamp, in order to ensure an orderly
-            // global
-            msg.setStoreTimestamp(beginLockTimestamp);
-
-            if (null == mappedFile || mappedFile.isFull()) {
-                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
-            }
-            if (null == mappedFile) {
-                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
-                beginTimeInLock = 0;
-                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
-            }
-
-            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
-            switch (result.getStatus()) {
-                case PUT_OK:
-                    break;
-                case END_OF_FILE:
-                    unlockMappedFile = mappedFile;
-                    // Create a new file, re-write the message
-                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
-                    if (null == mappedFile) {
-                        // XXX: warn and notify me
-                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
-                        beginTimeInLock = 0;
-                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
-                    }
-                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
-                    break;
-                case MESSAGE_SIZE_EXCEEDED:
-                case PROPERTIES_SIZE_EXCEEDED:
-                    beginTimeInLock = 0;
-                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
-                case UNKNOWN_ERROR:
-                    beginTimeInLock = 0;
-                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
-                default:
-                    beginTimeInLock = 0;
-                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
-            }
-
-            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
-            beginTimeInLock = 0;
-        } finally {
-            putMessageLock.unlock();
-        }
-
-        if (elapsedTimeInLock > 500) {
-            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
-        }
-
-        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
-            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
-        }
-
-        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
-
-        // Statistics
-        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
-        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
-
-        handleDiskFlush(result, putMessageResult, msg);
-        handleHA(result, putMessageResult, msg);
-
-        return putMessageResult;
-    }
-
     public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
         // Synchronization flush
         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
@@ -951,179 +867,6 @@ public class CommitLog {
         return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
     }
 
-
-    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
-        // Synchronization flush
-        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
-            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
-            if (messageExt.isWaitStoreMsgOK()) {
-                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
-                service.putRequest(request);
-                CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
-                PutMessageStatus flushStatus = null;
-                try {
-                    flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
-                            TimeUnit.MILLISECONDS);
-                } catch (InterruptedException | ExecutionException | TimeoutException e) {
-                    //flushOK=false;
-                }
-                if (flushStatus != PutMessageStatus.PUT_OK) {
-                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
-                        + " client address: " + messageExt.getBornHostString());
-                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
-                }
-            } else {
-                service.wakeup();
-            }
-        }
-        // Asynchronous flush
-        else {
-            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
-                flushCommitLogService.wakeup();
-            } else {
-                commitLogService.wakeup();
-            }
-        }
-    }
-
-    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
-        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
-            HAService service = this.defaultMessageStore.getHaService();
-            if (messageExt.isWaitStoreMsgOK()) {
-                // Determine whether to wait
-                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
-                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
-                    service.putRequest(request);
-                    service.getWaitNotifyObject().wakeupAll();
-                    PutMessageStatus replicaStatus = null;
-                    try {
-                        replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
-                                TimeUnit.MILLISECONDS);
-                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
-                    }
-                    if (replicaStatus != PutMessageStatus.PUT_OK) {
-                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
-                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
-                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
-                    }
-                }
-                // Slave problem
-                else {
-                    // Tell the producer, slave not available
-                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
-                }
-            }
-        }
-
-    }
-
-    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
-        AppendMessageResult result;
-
-        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
-        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
-
-        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
-        }
-        if (messageExtBatch.getDelayTimeLevel() > 0) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
-        }
-
-        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
-        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
-            messageExtBatch.setBornHostV6Flag();
-        }
-
-        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
-        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
-            messageExtBatch.setStoreHostAddressV6Flag();
-        }
-
-        long elapsedTimeInLock = 0;
-        MappedFile unlockMappedFile = null;
-        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
-
-        //fine-grained lock instead of the coarse-grained
-        MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
-
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
-
-        putMessageLock.lock();
-        try {
-            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
-            this.beginTimeInLock = beginLockTimestamp;
-
-            // Here settings are stored timestamp, in order to ensure an orderly
-            // global
-            messageExtBatch.setStoreTimestamp(beginLockTimestamp);
-
-            if (null == mappedFile || mappedFile.isFull()) {
-                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
-            }
-            if (null == mappedFile) {
-                log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
-                beginTimeInLock = 0;
-                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
-            }
-
-            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
-            switch (result.getStatus()) {
-                case PUT_OK:
-                    break;
-                case END_OF_FILE:
-                    unlockMappedFile = mappedFile;
-                    // Create a new file, re-write the message
-                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
-                    if (null == mappedFile) {
-                        // XXX: warn and notify me
-                        log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
-                        beginTimeInLock = 0;
-                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
-                    }
-                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
-                    break;
-                case MESSAGE_SIZE_EXCEEDED:
-                case PROPERTIES_SIZE_EXCEEDED:
-                    beginTimeInLock = 0;
-                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
-                case UNKNOWN_ERROR:
-                    beginTimeInLock = 0;
-                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
-                default:
-                    beginTimeInLock = 0;
-                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
-            }
-
-            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
-            beginTimeInLock = 0;
-        } finally {
-            putMessageLock.unlock();
-        }
-
-        if (elapsedTimeInLock > 500) {
-            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
-        }
-
-        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
-            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
-        }
-
-        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
-
-        // Statistics
-        storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
-        storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
-
-        handleDiskFlush(result, putMessageResult, messageExtBatch);
-
-        handleHA(result, putMessageResult, messageExtBatch);
-
-        return putMessageResult;
-    }
-
     /**
      * According to receive certain message or offset storage time if an error occurs, it returns -1
      */
@@ -1509,50 +1252,33 @@ public class CommitLog {
         private final ByteBuffer msgStoreItemMemory;
         // The maximum length of the message
         private final int maxMessageSize;
-        // Build Message Key
-        private final StringBuilder keyBuilder = new StringBuilder();
-
-        private final StringBuilder msgIdBuilder = new StringBuilder();
 
         DefaultAppendMessageCallback(final int size) {
             this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
             this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
-            this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
+            this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
             this.maxMessageSize = size;
         }
 
-        public ByteBuffer getMsgStoreItemMemory() {
-            return msgStoreItemMemory;
-        }
-
         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
-            final MessageExtBrokerInner msgInner) {
+            final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
             // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
 
             // PHY OFFSET
             long wroteOffset = fileFromOffset + byteBuffer.position();
 
-            int sysflag = msgInner.getSysFlag();
-
-            int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
-            int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
-            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
-            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
-
-            this.resetByteBuffer(storeHostHolder, storeHostLength);
-            String msgId;
-            if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
-                msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
-            } else {
-                msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
-            }
+            Supplier<String> msgIdSupplier = () -> {
+                int sysflag = msgInner.getSysFlag();
+                int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+                MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
+                msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
+                msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
+                return UtilAll.bytes2string(msgIdBuffer.array());
+            };
 
             // Record ConsumeQueue information
-            keyBuilder.setLength(0);
-            keyBuilder.append(msgInner.getTopic());
-            keyBuilder.append('-');
-            keyBuilder.append(msgInner.getQueueId());
-            String key = keyBuilder.toString();
+            String key = putMessageContext.getTopicQueueTableKey();
             Long queueOffset = CommitLog.this.topicQueueTable.get(key);
             if (null == queueOffset) {
                 queueOffset = 0L;
@@ -1574,36 +1300,12 @@ public class CommitLog {
                     break;
             }
 
-            /**
-             * Serialize message
-             */
-            final byte[] propertiesData =
-                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
-
-            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
-
-            if (propertiesLength > Short.MAX_VALUE) {
-                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
-                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
-            }
-
-            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
-            final int topicLength = topicData.length;
-
-            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
-
-            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
-
-            // Exceeds the maximum message
-            if (msgLen > this.maxMessageSize) {
-                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
-                    + ", maxMessageSize: " + this.maxMessageSize);
-                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
-            }
+            ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
+            final int msgLen = preEncodeBuffer.getInt(0);
 
             // Determines whether there is sufficient free space
             if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
-                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
+                this.msgStoreItemMemory.clear();
                 // 1 TOTALSIZE
                 this.msgStoreItemMemory.putInt(maxBlank);
                 // 2 MAGICCODE
@@ -1611,60 +1313,31 @@ public class CommitLog {
                 // 3 The remaining space may be any value
                 // Here the length of the specially set maxBlank
                 final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
-                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
-                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
-                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+                byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
+                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
+                        maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
+                        msgIdSupplier, msgInner.getStoreTimestamp(),
+                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
             }
 
-            // Initialization of storage space
-            this.resetByteBuffer(msgStoreItemMemory, msgLen);
-            // 1 TOTALSIZE
-            this.msgStoreItemMemory.putInt(msgLen);
-            // 2 MAGICCODE
-            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
-            // 3 BODYCRC
-            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
-            // 4 QUEUEID
-            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
-            // 5 FLAG
-            this.msgStoreItemMemory.putInt(msgInner.getFlag());
+            int pos = 4 + 4 + 4 + 4 + 4;
             // 6 QUEUEOFFSET
-            this.msgStoreItemMemory.putLong(queueOffset);
+            preEncodeBuffer.putLong(pos, queueOffset);
+            pos += 8;
             // 7 PHYSICALOFFSET
-            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
-            // 8 SYSFLAG
-            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
-            // 9 BORNTIMESTAMP
-            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
-            // 10 BORNHOST
-            this.resetByteBuffer(bornHostHolder, bornHostLength);
-            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
-            // 11 STORETIMESTAMP
-            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
-            // 12 STOREHOSTADDRESS
-            this.resetByteBuffer(storeHostHolder, storeHostLength);
-            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
-            // 13 RECONSUMETIMES
-            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
-            // 14 Prepared Transaction Offset
-            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
-            // 15 BODY
-            this.msgStoreItemMemory.putInt(bodyLength);
-            if (bodyLength > 0)
-                this.msgStoreItemMemory.put(msgInner.getBody());
-            // 16 TOPIC
-            this.msgStoreItemMemory.put((byte) topicLength);
-            this.msgStoreItemMemory.put(topicData);
-            // 17 PROPERTIES
-            this.msgStoreItemMemory.putShort((short) propertiesLength);
-            if (propertiesLength > 0)
-                this.msgStoreItemMemory.put(propertiesData);
+            preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
+            int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+            // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+            pos += 8 + 4 + 8 + ipLen;
+            // refresh store time stamp in lock
+            preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
+
 
             final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
             // Write messages to the queue buffer
-            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
-
-            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
+            byteBuffer.put(preEncodeBuffer);
+            msgInner.setEncodedBuff(null);
+            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
                 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
 
             switch (tranType) {
@@ -1683,16 +1356,12 @@ public class CommitLog {
         }
 
         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
-            final MessageExtBatch messageExtBatch) {
+            final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
             byteBuffer.mark();
             //physical offset
             long wroteOffset = fileFromOffset + byteBuffer.position();
             // Record ConsumeQueue information
-            keyBuilder.setLength(0);
-            keyBuilder.append(messageExtBatch.getTopic());
-            keyBuilder.append('-');
-            keyBuilder.append(messageExtBatch.getQueueId());
-            String key = keyBuilder.toString();
+            String key = putMessageContext.getTopicQueueTableKey();
             Long queueOffset = CommitLog.this.topicQueueTable.get(key);
             if (null == queueOffset) {
                 queueOffset = 0L;
@@ -1701,17 +1370,35 @@ public class CommitLog {
             long beginQueueOffset = queueOffset;
             int totalMsgLen = 0;
             int msgNum = 0;
-            msgIdBuilder.setLength(0);
+
             final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
             ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
 
             int sysFlag = messageExtBatch.getSysFlag();
+            int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
             int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
-            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
+            Supplier<String> msgIdSupplier = () -> {
+                int msgIdLen = storeHostLength + 8;
+                int batchCount = putMessageContext.getBatchSize();
+                long[] phyPosArray = putMessageContext.getPhyPos();
+                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+                MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
+                msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
+
+                StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
+                for (int i = 0; i < phyPosArray.length; i++) {
+                    msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
+                    String msgId = UtilAll.bytes2string(msgIdBuffer.array());
+                    if (i != 0) {
+                        buffer.append(',');
+                    }
+                    buffer.append(msgId);
+                }
+                return buffer.toString();
+            };
 
-            this.resetByteBuffer(storeHostHolder, storeHostLength);
-            ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(storeHostHolder);
             messagesByteBuff.mark();
+            int index = 0;
             while (messagesByteBuff.hasRemaining()) {
                 // 1 TOTALSIZE
                 final int msgPos = messagesByteBuff.position();
@@ -1726,7 +1413,7 @@ public class CommitLog {
                 totalMsgLen += msgLen;
                 // Determines whether there is sufficient free space
                 if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
-                    this.resetByteBuffer(this.msgStoreItemMemory, 8);
+                    this.msgStoreItemMemory.clear();
                     // 1 TOTALSIZE
                     this.msgStoreItemMemory.putInt(maxBlank);
                     // 2 MAGICCODE
@@ -1737,27 +1424,20 @@ public class CommitLog {
                     // Here the length of the specially set maxBlank
                     byteBuffer.reset(); //ignore the previous appended messages
                     byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
-                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
+                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
                         beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
                 }
                 //move to add queue offset and commitlog offset
-                messagesByteBuff.position(msgPos + 20);
-                messagesByteBuff.putLong(queueOffset);
-                messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
-
-                storeHostBytes.rewind();
-                String msgId;
-                if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
-                    msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
-                } else {
-                    msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
-                }
-
-                if (msgIdBuilder.length() > 0) {
-                    msgIdBuilder.append(',').append(msgId);
-                } else {
-                    msgIdBuilder.append(msgId);
-                }
+                int pos = msgPos + 20;
+                messagesByteBuff.putLong(pos, queueOffset);
+                pos += 8;
+                messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen);
+                // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+                pos += 8 + 4 + 8 + bornHostLength;
+                // refresh store time stamp in lock
+                messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp());
+
+                putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
                 queueOffset++;
                 msgNum++;
                 messagesByteBuff.position(msgPos + msgLen);
@@ -1767,7 +1447,7 @@ public class CommitLog {
             messagesByteBuff.limit(totalMsgLen);
             byteBuffer.put(messagesByteBuff);
             messageExtBatch.setEncodedBuff(null);
-            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
+            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier,
                 messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
             result.setMsgNum(msgNum);
             CommitLog.this.topicQueueTable.put(key, queueOffset);
@@ -1782,19 +1462,104 @@ public class CommitLog {
 
     }
 
-    public static class MessageExtBatchEncoder {
+    public static class MessageExtEncoder {
         // Store the message content
-        private final ByteBuffer msgBatchMemory;
+        private final ByteBuffer encoderBuffer;
         // The maximum length of the message
         private final int maxMessageSize;
 
-        MessageExtBatchEncoder(final int size) {
-            this.msgBatchMemory = ByteBuffer.allocateDirect(size);
+        MessageExtEncoder(final int size) {
+            this.encoderBuffer = ByteBuffer.allocateDirect(size);
             this.maxMessageSize = size;
         }
 
-        public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
-            msgBatchMemory.clear(); //not thread-safe
+        private void socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
+            InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+            InetAddress address = inetSocketAddress.getAddress();
+            if (address instanceof Inet4Address) {
+                byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
+            } else {
+                byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 16);
+            }
+            byteBuffer.putInt(inetSocketAddress.getPort());
+        }
+
+        protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
+            /**
+             * Serialize message
+             */
+            final byte[] propertiesData =
+                    msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
+
+            if (propertiesLength > Short.MAX_VALUE) {
+                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
+                return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
+            }
+
+            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+            final int topicLength = topicData.length;
+
+            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
+
+            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
+
+            // Exceeds the maximum message
+            if (msgLen > this.maxMessageSize) {
+                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+                        + ", maxMessageSize: " + this.maxMessageSize);
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+            }
+
+            // Initialization of storage space
+            this.resetByteBuffer(encoderBuffer, msgLen);
+            // 1 TOTALSIZE
+            this.encoderBuffer.putInt(msgLen);
+            // 2 MAGICCODE
+            this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+            // 3 BODYCRC
+            this.encoderBuffer.putInt(msgInner.getBodyCRC());
+            // 4 QUEUEID
+            this.encoderBuffer.putInt(msgInner.getQueueId());
+            // 5 FLAG
+            this.encoderBuffer.putInt(msgInner.getFlag());
+            // 6 QUEUEOFFSET, need update later
+            this.encoderBuffer.putLong(0);
+            // 7 PHYSICALOFFSET, need update later
+            this.encoderBuffer.putLong(0);
+            // 8 SYSFLAG
+            this.encoderBuffer.putInt(msgInner.getSysFlag());
+            // 9 BORNTIMESTAMP
+            this.encoderBuffer.putLong(msgInner.getBornTimestamp());
+            // 10 BORNHOST
+            socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
+            // 11 STORETIMESTAMP
+            this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
+            // 12 STOREHOSTADDRESS
+            socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
+            // 13 RECONSUMETIMES
+            this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
+            // 14 Prepared Transaction Offset
+            this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
+            // 15 BODY
+            this.encoderBuffer.putInt(bodyLength);
+            if (bodyLength > 0)
+                this.encoderBuffer.put(msgInner.getBody());
+            // 16 TOPIC
+            this.encoderBuffer.put((byte) topicLength);
+            this.encoderBuffer.put(topicData);
+            // 17 PROPERTIES
+            this.encoderBuffer.putShort((short) propertiesLength);
+            if (propertiesLength > 0)
+                this.encoderBuffer.put(propertiesData);
+
+            encoderBuffer.flip();
+            return null;
+        }
+
+        protected ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
+            encoderBuffer.clear(); //not thread-safe
             int totalMsgLen = 0;
             ByteBuffer messagesByteBuff = messageExtBatch.wrap();
 
@@ -1809,7 +1574,9 @@ public class CommitLog {
             final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
             final short batchPropLen = (short) batchPropData.length;
 
+            int batchSize = 0;
             while (messagesByteBuff.hasRemaining()) {
+                batchSize++;
                 // 1 TOTALSIZE
                 messagesByteBuff.getInt();
                 // 2 MAGICCODE
@@ -1849,53 +1616,55 @@ public class CommitLog {
                 }
 
                 // 1 TOTALSIZE
-                this.msgBatchMemory.putInt(msgLen);
+                this.encoderBuffer.putInt(msgLen);
                 // 2 MAGICCODE
-                this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+                this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
                 // 3 BODYCRC
-                this.msgBatchMemory.putInt(bodyCrc);
+                this.encoderBuffer.putInt(bodyCrc);
                 // 4 QUEUEID
-                this.msgBatchMemory.putInt(messageExtBatch.getQueueId());
+                this.encoderBuffer.putInt(messageExtBatch.getQueueId());
                 // 5 FLAG
-                this.msgBatchMemory.putInt(flag);
+                this.encoderBuffer.putInt(flag);
                 // 6 QUEUEOFFSET
-                this.msgBatchMemory.putLong(0);
+                this.encoderBuffer.putLong(0);
                 // 7 PHYSICALOFFSET
-                this.msgBatchMemory.putLong(0);
+                this.encoderBuffer.putLong(0);
                 // 8 SYSFLAG
-                this.msgBatchMemory.putInt(messageExtBatch.getSysFlag());
+                this.encoderBuffer.putInt(messageExtBatch.getSysFlag());
                 // 9 BORNTIMESTAMP
-                this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp());
+                this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp());
                 // 10 BORNHOST
                 this.resetByteBuffer(bornHostHolder, bornHostLength);
-                this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
+                this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder));
                 // 11 STORETIMESTAMP
-                this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp());
+                this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp());
                 // 12 STOREHOSTADDRESS
                 this.resetByteBuffer(storeHostHolder, storeHostLength);
-                this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
+                this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
                 // 13 RECONSUMETIMES
-                this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes());
+                this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes());
                 // 14 Prepared Transaction Offset, batch does not support transaction
-                this.msgBatchMemory.putLong(0);
+                this.encoderBuffer.putLong(0);
                 // 15 BODY
-                this.msgBatchMemory.putInt(bodyLen);
+                this.encoderBuffer.putInt(bodyLen);
                 if (bodyLen > 0)
-                    this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
+                    this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen);
                 // 16 TOPIC
-                this.msgBatchMemory.put((byte) topicLength);
-                this.msgBatchMemory.put(topicData);
+                this.encoderBuffer.put((byte) topicLength);
+                this.encoderBuffer.put(topicData);
                 // 17 PROPERTIES
-                this.msgBatchMemory.putShort((short) (propertiesLen + batchPropLen));
+                this.encoderBuffer.putShort((short) (propertiesLen + batchPropLen));
                 if (propertiesLen > 0) {
-                    this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
+                    this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
                 }
                 if (batchPropLen > 0) {
-                    this.msgBatchMemory.put(batchPropData, 0, batchPropLen);
+                    this.encoderBuffer.put(batchPropData, 0, batchPropLen);
                 }
             }
-            msgBatchMemory.flip();
-            return msgBatchMemory;
+            putMessageContext.setBatchSize(batchSize);
+            putMessageContext.setPhyPos(new long[batchSize]);
+            encoderBuffer.flip();
+            return encoderBuffer;
         }
 
         private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
@@ -1904,4 +1673,51 @@ public class CommitLog {
         }
 
     }
+
+    static class PutMessageThreadLocal {
+        private MessageExtEncoder encoder;
+        private StringBuilder keyBuilder;
+        PutMessageThreadLocal(int size) {
+            encoder = new MessageExtEncoder(size);
+            keyBuilder = new StringBuilder();
+        }
+
+        public MessageExtEncoder getEncoder() {
+            return encoder;
+        }
+
+        public StringBuilder getKeyBuilder() {
+            return keyBuilder;
+        }
+    }
+
+    static class PutMessageContext {
+        private String topicQueueTableKey;
+        private long[] phyPos;
+        private int batchSize;
+
+        public PutMessageContext(String topicQueueTableKey) {
+            this.topicQueueTableKey = topicQueueTableKey;
+        }
+
+        public String getTopicQueueTableKey() {
+            return topicQueueTableKey;
+        }
+
+        public long[] getPhyPos() {
+            return phyPos;
+        }
+
+        public void setPhyPos(long[] phyPos) {
+            this.phyPos = phyPos;
+        }
+
+        public int getBatchSize() {
+            return batchSize;
+        }
+
+        public void setBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+        }
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7dd5a32..69019c1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -476,58 +477,20 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public PutMessageResult putMessage(MessageExtBrokerInner msg) {
-        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
-        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
-            return new PutMessageResult(checkStoreStatus, null);
-        }
-
-        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
-        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
-            return new PutMessageResult(msgCheckStatus, null);
-        }
-
-        long beginTime = this.getSystemClock().now();
-        PutMessageResult result = this.commitLog.putMessage(msg);
-        long elapsedTime = this.getSystemClock().now() - beginTime;
-        if (elapsedTime > 500) {
-            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
-        }
-
-        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
-
-        if (null == result || !result.isOk()) {
-            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+        try {
+            return asyncPutMessage(msg).get();
+        } catch (InterruptedException | ExecutionException e) {
+            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
         }
-
-        return result;
     }
 
     @Override
     public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
-        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
-        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
-            return new PutMessageResult(checkStoreStatus, null);
-        }
-
-        PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
-        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
-            return new PutMessageResult(msgCheckStatus, null);
-        }
-
-        long beginTime = this.getSystemClock().now();
-        PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
-        long elapsedTime = this.getSystemClock().now() - beginTime;
-        if (elapsedTime > 500) {
-            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
-        }
-
-        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
-
-        if (null == result || !result.isOk()) {
-            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+        try {
+            return asyncPutMessages(messageExtBatch).get();
+        } catch (InterruptedException | ExecutionException e) {
+            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
         }
-
-        return result;
     }
 
     @Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index 25f0e39..297271d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.util.LibC;
 import sun.nio.ch.DirectBuffer;
@@ -188,15 +189,18 @@ public class MappedFile extends ReferenceResource {
         return fileChannel;
     }
 
-    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
-        return appendMessagesInner(msg, cb);
+    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
+            PutMessageContext putMessageContext) {
+        return appendMessagesInner(msg, cb, putMessageContext);
     }
 
-    public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
-        return appendMessagesInner(messageExtBatch, cb);
+    public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb,
+            PutMessageContext putMessageContext) {
+        return appendMessagesInner(messageExtBatch, cb, putMessageContext);
     }
 
-    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
+    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
+            PutMessageContext putMessageContext) {
         assert messageExt != null;
         assert cb != null;
 
@@ -207,9 +211,11 @@ public class MappedFile extends ReferenceResource {
             byteBuffer.position(currentPos);
             AppendMessageResult result;
             if (messageExt instanceof MessageExtBrokerInner) {
-                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
+                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
+                        (MessageExtBrokerInner) messageExt, putMessageContext);
             } else if (messageExt instanceof MessageExtBatch) {
-                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
+                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
+                        (MessageExtBatch) messageExt, putMessageContext);
             } else {
                 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
             }
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
index e5f087b..df7e6e5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.store;
 
+import java.nio.ByteBuffer;
+
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.message.MessageExt;
 
@@ -24,6 +26,16 @@ public class MessageExtBrokerInner extends MessageExt {
     private String propertiesString;
     private long tagsCode;
 
+    private ByteBuffer encodedBuff;
+
+    public ByteBuffer getEncodedBuff() {
+        return encodedBuff;
+    }
+
+    public void setEncodedBuff(ByteBuffer encodedBuff) {
+        this.encodedBuff = encodedBuff;
+    }
+
     public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
         if (null == tags || tags.length() == 0) { return 0; }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index ea791bd..011cbe1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -37,7 +37,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -414,237 +413,6 @@ public class DLedgerCommitLog extends CommitLog {
     }
 
     @Override
-    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
-
-        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
-        String topic = msg.getTopic();
-        setMessageInfo(msg,tranType);
-
-        // Back to Results
-        AppendMessageResult appendResult;
-        AppendFuture<AppendEntryResponse> dledgerFuture;
-        EncodeResult encodeResult;
-
-        encodeResult = this.messageSerializer.serialize(msg);
-        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
-        }
-
-        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
-        long elapsedTimeInLock;
-        long queueOffset;
-        try {
-            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
-            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
-            encodeResult.setQueueOffsetKey(queueOffset, false);
-            AppendEntryRequest request = new AppendEntryRequest();
-            request.setGroup(dLedgerConfig.getGroup());
-            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
-            request.setBody(encodeResult.getData());
-            dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
-            if (dledgerFuture.getPos() == -1) {
-                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
-            }
-            long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
-
-            int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
-            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
-
-            String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
-            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
-            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
-            switch (tranType) {
-                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
-                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
-                    break;
-                case MessageSysFlag.TRANSACTION_NOT_TYPE:
-                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
-                    // The next update ConsumeQueue information
-                    DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
-                    break;
-                default:
-                    break;
-            }
-        } catch (Exception e) {
-            log.error("Put message error", e);
-            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
-        } finally {
-            beginTimeInDledgerLock = 0;
-            putMessageLock.unlock();
-        }
-
-        if (elapsedTimeInLock > 500) {
-            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
-        }
-
-        PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
-        try {
-            AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
-            switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
-                case SUCCESS:
-                    putMessageStatus = PutMessageStatus.PUT_OK;
-                    break;
-                case INCONSISTENT_LEADER:
-                case NOT_LEADER:
-                case LEADER_NOT_READY:
-                case DISK_FULL:
-                    putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
-                    break;
-                case WAIT_QUORUM_ACK_TIMEOUT:
-                    //Do not return flush_slave_timeout to the client, for the ons client will ignore it.
-                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
-                    break;
-                case LEADER_PENDING_FULL:
-                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
-                    break;
-            }
-        } catch (Throwable t) {
-            log.error("Failed to get dledger append result", t);
-        }
-
-        PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
-        if (putMessageStatus == PutMessageStatus.PUT_OK) {
-            // Statistics
-            storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
-            storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes());
-        }
-        return putMessageResult;
-    }
-
-    @Override
-    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
-
-        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
-        }
-        if (messageExtBatch.getDelayTimeLevel() > 0) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
-        }
-
-        // Set the storage time
-        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
-
-        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
-        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
-        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
-            messageExtBatch.setBornHostV6Flag();
-        }
-
-        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
-        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
-            messageExtBatch.setStoreHostAddressV6Flag();
-        }
-
-        // Back to Results
-        AppendMessageResult appendResult;
-        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
-        EncodeResult encodeResult;
-
-        encodeResult = this.messageSerializer.serialize(messageExtBatch);
-        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
-            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
-                    .status));
-        }
-
-        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
-        msgIdBuilder.setLength(0);
-        long elapsedTimeInLock;
-        long queueOffset;
-        int msgNum = 0;
-        try {
-            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
-            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
-            encodeResult.setQueueOffsetKey(queueOffset, true);
-            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
-            request.setGroup(dLedgerConfig.getGroup());
-            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
-            request.setBatchMsgs(encodeResult.batchData);
-            AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
-            if (appendFuture.getPos() == -1) {
-                log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
-                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
-            }
-            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) appendFuture;
-
-            long wroteOffset = 0;
-
-            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
-            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
-
-            boolean isFirstOffset = true;
-            long firstWroteOffset = 0;
-            for (long pos : dledgerFuture.getPositions()) {
-                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
-                if (isFirstOffset) {
-                    firstWroteOffset = wroteOffset;
-                    isFirstOffset = false;
-                }
-                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
-                if (msgIdBuilder.length() > 0) {
-                    msgIdBuilder.append(',').append(msgId);
-                } else {
-                    msgIdBuilder.append(msgId);
-                }
-                msgNum++;
-            }
-
-            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
-            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
-                    msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
-            appendResult.setMsgNum(msgNum);
-            DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);
-        } catch (Exception e) {
-            log.error("Put message error", e);
-            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus
-                    .UNKNOWN_ERROR));
-        } finally {
-            beginTimeInDledgerLock = 0;
-            putMessageLock.unlock();
-        }
-
-        if (elapsedTimeInLock > 500) {
-            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
-                    elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
-        }
-
-        PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
-        try {
-            AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
-            switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
-                case SUCCESS:
-                    putMessageStatus = PutMessageStatus.PUT_OK;
-                    break;
-                case INCONSISTENT_LEADER:
-                case NOT_LEADER:
-                case LEADER_NOT_READY:
-                case DISK_FULL:
-                    putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
-                    break;
-                case WAIT_QUORUM_ACK_TIMEOUT:
-                    //Do not return flush_slave_timeout to the client, for the ons client will ignore it.
-                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
-                    break;
-                case LEADER_PENDING_FULL:
-                    putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
-                    break;
-            }
-        } catch (Throwable t) {
-            log.error("Failed to get dledger append result", t);
-        }
-
-        PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
-        if (putMessageStatus == PutMessageStatus.PUT_OK) {
-            // Statistics
-            storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum);
-            storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen);
-        }
-        return putMessageResult;
-    }
-
-    @Override
     public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
 
         StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index f46b3be..715c9d3 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -30,6 +30,8 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.MessageExtEncoder;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
 import org.junit.Before;
@@ -42,7 +44,7 @@ public class AppendCallbackTest {
 
     AppendMessageCallback callback;
 
-    CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024);
+    MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
 
     @Before
     public void init() throws Exception {
@@ -84,10 +86,12 @@ public class AppendCallbackTest {
         messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
         messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
         ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
         //encounter end of file when append half of the data
-        AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch);
+        AppendMessageResult result =
+                callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext);
         assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
         assertEquals(0, result.getWroteOffset());
         assertEquals(0, result.getLogicsOffset());
@@ -121,10 +125,12 @@ public class AppendCallbackTest {
         messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124));
         messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
         ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
         //encounter end of file when append half of the data
-        AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch);
+        AppendMessageResult result =
+                callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext);
         assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
         assertEquals(0, result.getWroteOffset());
         assertEquals(0, result.getLogicsOffset());
@@ -154,9 +160,11 @@ public class AppendCallbackTest {
         messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
         messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
         ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
-        AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch);
+        AppendMessageResult allresult =
+                callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext);
 
         assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
         assertEquals(0, allresult.getWroteOffset());
@@ -214,9 +222,11 @@ public class AppendCallbackTest {
         messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124));
         messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
 
-        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+        PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
         ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
-        AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch);
+        AppendMessageResult allresult =
+                callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext);
 
         assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
         assertEquals(0, allresult.getWroteOffset());

Mime
View raw message