rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huzongt...@apache.org
Subject [rocketmq] branch develop updated: Eliminate array copy (#2886)
Date Tue, 06 Jul 2021 13:47:18 GMT
This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3183122  Eliminate array copy (#2886)
3183122 is described below

commit 3183122c01cc2d0dc005634d028ccbcb62495bf6
Author: huangli <areyouok@gmail.com>
AuthorDate: Tue Jul 6 21:47:12 2021 +0800

    Eliminate array copy (#2886)
    
    [Part C] Improve produce performance in M/S mode.
---
 .../rocketmq/broker/plugin/AbstractPluginMessageStore.java     |  4 ++--
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java   |  4 ++--
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java    |  4 ++--
 .../src/main/java/org/apache/rocketmq/store/MessageStore.java  |  4 +++-
 .../org/apache/rocketmq/store/dledger/DLedgerCommitLog.java    |  2 +-
 .../src/main/java/org/apache/rocketmq/store/ha/HAService.java  | 10 ++++------
 6 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 1db019b..b95bab6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore
{
     }
 
     @Override
-    public boolean appendToCommitLog(long startOffset, byte[] data) {
-        return next.appendToCommitLog(startOffset, data);
+    public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength)
{
+        return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
     }
 
     @Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 43b01f0..57fa363 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1185,7 +1185,7 @@ public class CommitLog {
         this.mappedFileQueue.destroy();
     }
 
-    public boolean appendData(long startOffset, byte[] data) {
+    public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength)
{
         putMessageLock.lock();
         try {
             MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
@@ -1194,7 +1194,7 @@ public class CommitLog {
                 return false;
             }
 
-            return mappedFile.appendMessage(data);
+            return mappedFile.appendMessage(data, dataStart, dataLength);
         } finally {
             putMessageLock.unlock();
         }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b8ecdee..7dd5a32 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     @Override
-    public boolean appendToCommitLog(long startOffset, byte[] data) {
+    public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength)
{
         if (this.shutdown) {
             log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
             return false;
         }
 
-        boolean result = this.commitLog.appendData(startOffset, data);
+        boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
         if (result) {
             this.reputMessageService.wakeup();
         } else {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 64eb525..a8c658b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -245,9 +245,11 @@ public interface MessageStore {
      *
      * @param startOffset starting offset.
      * @param data data to append.
+     * @param dataStart the start index of data array
+     * @param dataLength the length of data array
      * @return true if success; false otherwise.
      */
-    boolean appendToCommitLog(final long startOffset, final byte[] data);
+    boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int
dataLength);
 
     /**
      * Execute file deletion manually.
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 3b98760..ea791bd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -905,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog {
     }
 
     @Override
-    public boolean appendData(long startOffset, byte[] data) {
+    public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength)
{
         //the old ha service will invoke method, here to prevent it
         return false;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index d4d4109..845935b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -440,7 +440,6 @@ public class HAService {
 
         private boolean dispatchReadRequest() {
             final int msgHeaderSize = 8 + 4; // phyoffset + size
-            int readSocketPos = this.byteBufferRead.position();
 
             while (true) {
                 int diff = this.byteBufferRead.position() - this.dispatchPosition;
@@ -459,13 +458,12 @@ public class HAService {
                     }
 
                     if (diff >= (msgHeaderSize + bodySize)) {
-                        byte[] bodyData = new byte[bodySize];
-                        this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
-                        this.byteBufferRead.get(bodyData);
+                        byte[] bodyData = byteBufferRead.array();
+                        int dataStart = this.dispatchPosition + msgHeaderSize;
 
-                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset,
bodyData);
+                        HAService.this.defaultMessageStore.appendToCommitLog(
+                                masterPhyOffset, bodyData, dataStart, bodySize);
 
-                        this.byteBufferRead.position(readSocketPos);
                         this.dispatchPosition += msgHeaderSize + bodySize;
 
                         if (!reportSlaveMaxOffsetPlus()) {

Mime
View raw message