rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [12/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-store/pom.xml b/rocketmq-store/pom.xml
new file mode 100644
index 0000000..440e521
--- /dev/null
+++ b/rocketmq-store/pom.xml
@@ -0,0 +1,46 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-store</artifactId>
+    <name>rocketmq-store ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>net.java.dev.jna</groupId>
+            <artifactId>jna</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
new file mode 100644
index 0000000..40eee7a
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Create MappedFile in advance
+ *
+ * @author shijia.wxr
+ */
+public class AllocateMappedFileService extends ServiceThread {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static int waitTimeOut = 1000 * 5;
+    private ConcurrentHashMap<String, AllocateRequest> requestTable =
+            new ConcurrentHashMap<String, AllocateRequest>();
+    private PriorityBlockingQueue<AllocateRequest> requestQueue =
+            new PriorityBlockingQueue<AllocateRequest>();
+    private volatile boolean hasException = false;
+    private DefaultMessageStore messageStore;
+
+
+    public AllocateMappedFileService(DefaultMessageStore messageStore) {
+        this.messageStore = messageStore;
+    }
+
+
+    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
+        int canSubmitRequests = 2;
+        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
+                    && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
+                canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
+            }
+        }
+
+        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
+        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
+
+        if (nextPutOK) {
+            if (canSubmitRequests <= 0) {
+                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
+                        "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
+                this.requestTable.remove(nextFilePath);
+                return null;
+            }
+            boolean offerOK = this.requestQueue.offer(nextReq);
+            if (!offerOK) {
+                log.warn("never expected here, add a request to preallocate queue failed");
+            }
+            canSubmitRequests--;
+        }
+
+        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
+        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
+        if (nextNextPutOK) {
+            if (canSubmitRequests <= 0) {
+                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
+                        "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
+                this.requestTable.remove(nextNextFilePath);
+            } else {
+                boolean offerOK = this.requestQueue.offer(nextNextReq);
+                if (!offerOK) {
+                    log.warn("never expected here, add a request to preallocate queue failed");
+                }
+            }
+        }
+
+        if (hasException) {
+            log.warn(this.getServiceName() + " service has exception. so return null");
+            return null;
+        }
+
+        AllocateRequest result = this.requestTable.get(nextFilePath);
+        try {
+            if (result != null) {
+                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
+                if (!waitOK) {
+                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
+                    return null;
+                } else {
+                    this.requestTable.remove(nextFilePath);
+                    return result.getMappedFile();
+                }
+            } else {
+                log.error("find preallocate mmap failed, this never happen");
+            }
+        } catch (InterruptedException e) {
+            log.warn(this.getServiceName() + " service has exception. ", e);
+        }
+
+        return null;
+    }
+
+
+    @Override
+    public String getServiceName() {
+        return AllocateMappedFileService.class.getSimpleName();
+    }
+
+
+    public void shutdown() {
+        this.stopped = true;
+        this.thread.interrupt();
+
+        try {
+            this.thread.join(this.getJointime());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        for (AllocateRequest req : this.requestTable.values()) {
+            if (req.mappedFile != null) {
+                log.info("delete pre allocated maped file, {}", req.mappedFile.getFileName());
+                req.mappedFile.destroy(1000);
+            }
+        }
+    }
+
+
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+
+        while (!this.isStopped() && this.mmapOperation()) {
+
+        }
+        log.info(this.getServiceName() + " service end");
+    }
+
+
+    /**
+     * Only interrupted by the external thread, will return false
+     */
+    private boolean mmapOperation() {
+        boolean isSuccess = false;
+        AllocateRequest req = null;
+        try {
+            req = this.requestQueue.take();
+            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
+            if (null == expectedRequest) {
+                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+                        + req.getFileSize());
+                return true;
+            }
+            if (expectedRequest != req) {
+                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
+                        + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
+                return true;
+            }
+
+            if (req.getMappedFile() == null) {
+                long beginTime = System.currentTimeMillis();
+
+                MappedFile mappedFile;
+                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+                    try {
+                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
+                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
+                    } catch (RuntimeException e) {
+                        log.warn("Use default implementation.");
+                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
+                    }
+                } else {
+                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
+                }
+
+                long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
+                if (eclipseTime > 10) {
+                    int queueSize = this.requestQueue.size();
+                    log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+                            + " " + req.getFilePath() + " " + req.getFileSize());
+                }
+
+                // pre write mappedFile
+                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
+                        .getMapedFileSizeCommitLog()
+                        &&
+                        this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
+                            this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
+                }
+
+                req.setMappedFile(mappedFile);
+                this.hasException = false;
+                isSuccess = true;
+            }
+        } catch (InterruptedException e) {
+            log.warn(this.getServiceName() + " service has exception, maybe by shutdown");
+            this.hasException = true;
+            return false;
+        } catch (IOException e) {
+            log.warn(this.getServiceName() + " service has exception. ", e);
+            this.hasException = true;
+            if (null != req) {
+                requestQueue.offer(req);
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException e1) {
+                }
+            }
+        } finally {
+            if (req != null && isSuccess)
+                req.getCountDownLatch().countDown();
+        }
+        return true;
+    }
+
+    static class AllocateRequest implements Comparable<AllocateRequest> {
+        // Full file path
+        private String filePath;
+        private int fileSize;
+        private CountDownLatch countDownLatch = new CountDownLatch(1);
+        private volatile MappedFile mappedFile = null;
+
+
+        public AllocateRequest(String filePath, int fileSize) {
+            this.filePath = filePath;
+            this.fileSize = fileSize;
+        }
+
+
+        public String getFilePath() {
+            return filePath;
+        }
+
+
+        public void setFilePath(String filePath) {
+            this.filePath = filePath;
+        }
+
+
+        public int getFileSize() {
+            return fileSize;
+        }
+
+
+        public void setFileSize(int fileSize) {
+            this.fileSize = fileSize;
+        }
+
+
+        public CountDownLatch getCountDownLatch() {
+            return countDownLatch;
+        }
+
+
+        public void setCountDownLatch(CountDownLatch countDownLatch) {
+            this.countDownLatch = countDownLatch;
+        }
+
+
+        public MappedFile getMappedFile() {
+            return mappedFile;
+        }
+
+
+        public void setMappedFile(MappedFile mappedFile) {
+            this.mappedFile = mappedFile;
+        }
+
+
+        public int compareTo(AllocateRequest other) {
+            if (this.fileSize < other.fileSize)
+                return 1;
+            else if (this.fileSize > other.fileSize) {
+                return -1;
+            } else {
+                int mIndex = this.filePath.lastIndexOf(File.separator);
+                long mName = Long.parseLong(this.filePath.substring(mIndex + 1));
+                int oIndex = other.filePath.lastIndexOf(File.separator);
+                long oName = Long.parseLong(other.filePath.substring(oIndex + 1));
+                if (mName < oName) {
+                    return -1;
+                } else if (mName > oName) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+            // return this.fileSize < other.fileSize ? 1 : this.fileSize >
+            // other.fileSize ? -1 : 0;
+        }
+
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((filePath == null) ? 0 : filePath.hashCode());
+            result = prime * result + fileSize;
+            return result;
+        }
+
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            AllocateRequest other = (AllocateRequest) obj;
+            if (filePath == null) {
+                if (other.filePath != null)
+                    return false;
+            } else if (!filePath.equals(other.filePath))
+                return false;
+            if (fileSize != other.fileSize)
+                return false;
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
new file mode 100644
index 0000000..778bd88
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Write messages callback interface
+ *
+ * @author shijia.wxr
+ *
+ */
+public interface AppendMessageCallback {
+
+    /**
+     * After message serialization, write MapedByteBuffer
+     *
+     * @param byteBuffer
+     * @param maxBlank
+     * @param msg
+     *
+     * @return How many bytes to write
+     */
+    AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
+                                 final int maxBlank, final MessageExtBrokerInner msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
new file mode 100644
index 0000000..a87a917
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * When write a message to the commit log, returns results
+ *
+ * @author shijia.wxr
+ */
+public class AppendMessageResult {
+    // Return code
+    private AppendMessageStatus status;
+    // Where to start writing
+    private long wroteOffset;
+    // Write Bytes
+    private int wroteBytes;
+    // Message ID
+    private String msgId;
+    // Message storage timestamp
+    private long storeTimestamp;
+    // Consume queue's offset(step by one)
+    private long logicsOffset;
+    private long pagecacheRT = 0;
+
+    public AppendMessageResult(AppendMessageStatus status) {
+        this(status, 0, 0, "", 0, 0, 0);
+    }
+
+    public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId,
+                               long storeTimestamp, long logicsOffset, long pagecacheRT) {
+        this.status = status;
+        this.wroteOffset = wroteOffset;
+        this.wroteBytes = wroteBytes;
+        this.msgId = msgId;
+        this.storeTimestamp = storeTimestamp;
+        this.logicsOffset = logicsOffset;
+        this.pagecacheRT = pagecacheRT;
+    }
+
+    public long getPagecacheRT() {
+        return pagecacheRT;
+    }
+
+    public void setPagecacheRT(final long pagecacheRT) {
+        this.pagecacheRT = pagecacheRT;
+    }
+
+    public boolean isOk() {
+        return this.status == AppendMessageStatus.PUT_OK;
+    }
+
+
+    public AppendMessageStatus getStatus() {
+        return status;
+    }
+
+
+    public void setStatus(AppendMessageStatus status) {
+        this.status = status;
+    }
+
+
+    public long getWroteOffset() {
+        return wroteOffset;
+    }
+
+
+    public void setWroteOffset(long wroteOffset) {
+        this.wroteOffset = wroteOffset;
+    }
+
+
+    public int getWroteBytes() {
+        return wroteBytes;
+    }
+
+
+    public void setWroteBytes(int wroteBytes) {
+        this.wroteBytes = wroteBytes;
+    }
+
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+
+    public void setStoreTimestamp(long storeTimestamp) {
+        this.storeTimestamp = storeTimestamp;
+    }
+
+
+    public long getLogicsOffset() {
+        return logicsOffset;
+    }
+
+
+    public void setLogicsOffset(long logicsOffset) {
+        this.logicsOffset = logicsOffset;
+    }
+
+    @Override
+    public String toString() {
+        return "AppendMessageResult{" +
+                "status=" + status +
+                ", wroteOffset=" + wroteOffset +
+                ", wroteBytes=" + wroteBytes +
+                ", msgId='" + msgId + '\'' +
+                ", storeTimestamp=" + storeTimestamp +
+                ", logicsOffset=" + logicsOffset +
+                ", pagecacheRT=" + pagecacheRT +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
new file mode 100644
index 0000000..97234c0
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * When write a message to the commit log, returns code
+ *
+ * @author shijia.wxr
+ *
+ */
+public enum AppendMessageStatus {
+    PUT_OK,
+    END_OF_FILE,
+    MESSAGE_SIZE_EXCEEDED,
+    PROPERTIES_SIZE_EXCEEDED,
+    UNKNOWN_ERROR,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
new file mode 100644
index 0000000..23f305d
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
@@ -0,0 +1,1296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageAccessor;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import com.alibaba.rocketmq.store.config.FlushDiskType;
+import com.alibaba.rocketmq.store.ha.HAService;
+import com.alibaba.rocketmq.store.schedule.ScheduleMessageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * Store all metadata downtime for recovery, data protection reliability
+ *
+ * @author shijia.wxr
+ */
+public class CommitLog {
+    // Message's MAGIC CODE daa320a7
+    public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    // End of file empty MAGIC CODE cbd43194
+    private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
+    private final MappedFileQueue mappedFileQueue;
+    private final DefaultMessageStore defaultMessageStore;
+    private final FlushCommitLogService flushCommitLogService;
+
+    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
+    private final FlushCommitLogService commitLogService;
+
+    private final AppendMessageCallback appendMessageCallback;
+    private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
+    private volatile long confirmOffset = -1L;
+
+    private volatile long beginTimeInLock = 0;
+
+    //true: Can lock, false : in lock.
+    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
+
+    public CommitLog(final DefaultMessageStore defaultMessageStore) {
+        this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
+                defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
+        this.defaultMessageStore = defaultMessageStore;
+
+        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+            this.flushCommitLogService = new GroupCommitService();
+        } else {
+            this.flushCommitLogService = new FlushRealTimeService();
+        }
+
+        this.commitLogService = new CommitRealTimeService();
+
+        this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+    }
+
+    public boolean load() {
+        boolean result = this.mappedFileQueue.load();
+        log.info("load commit log " + (result ? "OK" : "Failed"));
+        return result;
+    }
+
+    public void start() {
+        this.flushCommitLogService.start();
+
+        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+            this.commitLogService.start();
+        }
+    }
+
+    public void shutdown() {
+        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+            this.commitLogService.shutdown();
+        }
+
+        this.flushCommitLogService.shutdown();
+    }
+
+    public long flush() {
+        this.mappedFileQueue.commit(0);
+        this.mappedFileQueue.flush(0);
+        return this.mappedFileQueue.getFlushedWhere();
+    }
+
+    public long getMaxOffset() {
+        return this.mappedFileQueue.getMaxOffset();
+    }
+
+    public long remainHowManyDataToCommit() {
+        return this.mappedFileQueue.remainHowManyDataToCommit();
+    }
+
+    public long remainHowManyDataToFlush() {
+        return this.mappedFileQueue.remainHowManyDataToFlush();
+    }
+
+
+    public int deleteExpiredFile(//
+                                 final long expiredTime, //
+                                 final int deleteFilesInterval, //
+                                 final long intervalForcibly, //
+                                 final boolean cleanImmediately//
+    ) {
+        return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
+    }
+
+
+    /**
+     * Read CommitLog data, use data replication
+     */
+    public SelectMappedBufferResult getData(final long offset) {
+        return this.getData(offset, offset == 0);
+    }
+
+
+    public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
+        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
+        if (mappedFile != null) {
+            int pos = (int) (offset % mappedFileSize);
+            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
+            return result;
+        }
+
+        return null;
+    }
+
+
+    /**
+     * When the normal exit, data recovery, all memory data have been flush
+     */
+    public void recoverNormally() {
+        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+        if (!mappedFiles.isEmpty()) {
+            // Began to recover from the last third file
+            int index = mappedFiles.size() - 3;
+            if (index < 0)
+                index = 0;
+
+            MappedFile mappedFile = mappedFiles.get(index);
+            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+            long processOffset = mappedFile.getFileFromOffset();
+            long mappedFileOffset = 0;
+            while (true) {
+                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
+                int size = dispatchRequest.getMsgSize();
+                // Normal data
+                if (dispatchRequest.isSuccess() && size > 0) {
+                    mappedFileOffset += size;
+                }
+                // Come the end of the file, switch to the next file Since the
+                // return 0 representatives met last hole,
+                // this can not be included in truncate offset
+                else if (dispatchRequest.isSuccess() && size == 0) {
+                    index++;
+                    if (index >= mappedFiles.size()) {
+                        // Current branch can not happen
+                        log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName());
+                        break;
+                    } else {
+                        mappedFile = mappedFiles.get(index);
+                        byteBuffer = mappedFile.sliceByteBuffer();
+                        processOffset = mappedFile.getFileFromOffset();
+                        mappedFileOffset = 0;
+                        log.info("recover next physics file, " + mappedFile.getFileName());
+                    }
+                }
+                // Intermediate file read error
+                else if (!dispatchRequest.isSuccess()) {
+                    log.info("recover physics file end, " + mappedFile.getFileName());
+                    break;
+                }
+            }
+
+            processOffset += mappedFileOffset;
+            this.mappedFileQueue.setFlushedWhere(processOffset);
+            this.mappedFileQueue.setCommittedWhere(processOffset);
+            this.mappedFileQueue.truncateDirtyFiles(processOffset);
+        }
+    }
+
+    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC) {
+        return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
+    }
+
+    private void doNothingForDeadCode(final Object obj) {
+        if (obj != null) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.valueOf(obj.hashCode()));
+            }
+        }
+    }
+
+    /**
+     * check the message and returns the message size
+     *
+     * @return 0 Come the end of the file // >0 Normal messages // -1 Message
+     * checksum failure
+     */
+    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
+        try {
+            // 1 TOTAL SIZE
+            int totalSize = byteBuffer.getInt();
+
+            // 2 MAGIC CODE
+            int magicCode = byteBuffer.getInt();
+            switch (magicCode) {
+                case MESSAGE_MAGIC_CODE:
+                    break;
+                case BLANK_MAGIC_CODE:
+                    return new DispatchRequest(0, true /* success */);
+                default:
+                    log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
+                    return new DispatchRequest(-1, false /* success */);
+            }
+
+            byte[] bytesContent = new byte[totalSize];
+
+            // 3 BODYCRC
+            int bodyCRC = byteBuffer.getInt();
+
+            // 4 QUEUEID
+            int queueId = byteBuffer.getInt();
+
+            // 5 FLAG
+            int flag = byteBuffer.getInt();
+
+            // 6 QUEUEOFFSET
+            long queueOffset = byteBuffer.getLong();
+
+            // 7 PHYSICALOFFSET
+            long physicOffset = byteBuffer.getLong();
+
+            // 8 SYSFLAG
+            int sysFlag = byteBuffer.getInt();
+
+            // 9 BORNTIMESTAMP
+            long bornTimeStamp = byteBuffer.getLong();
+
+            // 10
+            ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8);
+
+            // 11 STORETIMESTAMP
+            long storeTimestamp = byteBuffer.getLong();
+
+            // 12
+            ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8);
+
+            // 13 RECONSUMETIMES
+            int reconsumeTimes = byteBuffer.getInt();
+
+            // 14 Prepared Transaction Offset
+            long preparedTransactionOffset = byteBuffer.getLong();
+
+            // 15 BODY
+            int bodyLen = byteBuffer.getInt();
+            if (bodyLen > 0) {
+                if (readBody) {
+                    byteBuffer.get(bytesContent, 0, bodyLen);
+
+                    if (checkCRC) {
+                        int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
+                        if (crc != bodyCRC) {
+                            log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
+                            return new DispatchRequest(-1, false/* success */);
+                        }
+                    }
+                } else {
+                    byteBuffer.position(byteBuffer.position() + bodyLen);
+                }
+            }
+
+            // 16 TOPIC
+            byte topicLen = byteBuffer.get();
+            byteBuffer.get(bytesContent, 0, topicLen);
+            String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
+
+            long tagsCode = 0;
+            String keys = "";
+            String uniqKey = null;
+
+            // 17 properties
+            short propertiesLength = byteBuffer.getShort();
+            if (propertiesLength > 0) {
+                byteBuffer.get(bytesContent, 0, propertiesLength);
+                String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
+                Map<String, String> propertiesMap = MessageDecoder.string2messageProperties(properties);
+
+                keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
+
+                uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+
+                String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
+                if (tags != null && tags.length() > 0) {
+                    tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
+                }
+
+                // Timing message processing
+                {
+                    String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+                    if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
+                        int delayLevel = Integer.parseInt(t);
+
+                        if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
+                            delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
+                        }
+
+                        if (delayLevel > 0) {
+                            tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
+                                    storeTimestamp);
+                        }
+                    }
+                }
+            }
+
+            int readLength = calMsgLength(bodyLen, topicLen, propertiesLength);
+            if (totalSize != readLength) {
+                doNothingForDeadCode(reconsumeTimes);
+                doNothingForDeadCode(flag);
+                doNothingForDeadCode(bornTimeStamp);
+                doNothingForDeadCode(byteBuffer1);
+                doNothingForDeadCode(byteBuffer2);
+                log.error(
+                        "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
+                        totalSize, readLength, bodyLen, topicLen, propertiesLength);
+                return new DispatchRequest(totalSize, false/* success */);
+            }
+
+            return new DispatchRequest(//
+                    topic, // 1
+                    queueId, // 2
+                    physicOffset, // 3
+                    totalSize, // 4
+                    tagsCode, // 5
+                    storeTimestamp, // 6
+                    queueOffset, // 7
+                    keys, // 8
+                    uniqKey, //9
+                    sysFlag, // 9
+                    preparedTransactionOffset// 10
+            );
+        } catch (Exception e) {
+        }
+
+        return new DispatchRequest(-1, false /* success */);
+    }
+
+    private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
+        final int msgLen = 4 // 1 TOTALSIZE
+                + 4 // 2 MAGICCODE
+                + 4 // 3 BODYCRC
+                + 4 // 4 QUEUEID
+                + 4 // 5 FLAG
+                + 8 // 6 QUEUEOFFSET
+                + 8 // 7 PHYSICALOFFSET
+                + 4 // 8 SYSFLAG
+                + 8 // 9 BORNTIMESTAMP
+                + 8 // 10 BORNHOST
+                + 8 // 11 STORETIMESTAMP
+                + 8 // 12 STOREHOSTADDRESS
+                + 4 // 13 RECONSUMETIMES
+                + 8 // 14 Prepared Transaction Offset
+                + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY
+                + 1 + topicLength // 15 TOPIC
+                + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16
+                // propertiesLength
+                + 0;
+        return msgLen;
+    }
+
+    public long getConfirmOffset() {
+        return this.confirmOffset;
+    }
+
+    public void setConfirmOffset(long phyOffset) {
+        this.confirmOffset = phyOffset;
+    }
+
+    public void recoverAbnormally() {
+        // recover by the minimum time stamp
+        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+        if (!mappedFiles.isEmpty()) {
+            // Looking beginning to recover from which file
+            int index = mappedFiles.size() - 1;
+            MappedFile mappedFile = null;
+            for (; index >= 0; index--) {
+                mappedFile = mappedFiles.get(index);
+                if (this.isMappedFileMatchedRecover(mappedFile)) {
+                    log.info("recover from this maped file " + mappedFile.getFileName());
+                    break;
+                }
+            }
+
+            if (index < 0) {
+                index = 0;
+                mappedFile = mappedFiles.get(index);
+            }
+
+            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+            long processOffset = mappedFile.getFileFromOffset();
+            long mappedFileOffset = 0;
+            while (true) {
+                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
+                int size = dispatchRequest.getMsgSize();
+
+                // Normal data
+                if (size > 0) {
+                    mappedFileOffset += size;
+
+
+                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
+                            this.defaultMessageStore.doDispatch(dispatchRequest);
+                        }
+                    } else {
+                        this.defaultMessageStore.doDispatch(dispatchRequest);
+                    }
+                }
+                // Intermediate file read error
+                else if (size == -1) {
+                    log.info("recover physics file end, " + mappedFile.getFileName());
+                    break;
+                }
+                // Come the end of the file, switch to the next file
+                // Since the return 0 representatives met last hole, this can
+                // not be included in truncate offset
+                else if (size == 0) {
+                    index++;
+                    if (index >= mappedFiles.size()) {
+                        // The current branch under normal circumstances should
+                        // not happen
+                        log.info("recover physics file over, last maped file " + mappedFile.getFileName());
+                        break;
+                    } else {
+                        mappedFile = mappedFiles.get(index);
+                        byteBuffer = mappedFile.sliceByteBuffer();
+                        processOffset = mappedFile.getFileFromOffset();
+                        mappedFileOffset = 0;
+                        log.info("recover next physics file, " + mappedFile.getFileName());
+                    }
+                }
+            }
+
+            processOffset += mappedFileOffset;
+            this.mappedFileQueue.setFlushedWhere(processOffset);
+            this.mappedFileQueue.setCommittedWhere(processOffset);
+            this.mappedFileQueue.truncateDirtyFiles(processOffset);
+
+            // Clear ConsumeQueue redundant data
+            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+        }
+        // Commitlog case files are deleted
+        else {
+            this.mappedFileQueue.setFlushedWhere(0);
+            this.mappedFileQueue.setCommittedWhere(0);
+            this.defaultMessageStore.destroyLogics();
+        }
+    }
+
+    private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
+        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+
+        int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
+        if (magicCode != MESSAGE_MAGIC_CODE) {
+            return false;
+        }
+
+        long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+        if (0 == storeTimestamp) {
+            return false;
+        }
+
+        if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
+                && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
+            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
+                log.info("find check timestamp, {} {}", //
+                        storeTimestamp, //
+                        UtilAll.timeMillisToHumanString(storeTimestamp));
+                return true;
+            }
+        } else {
+            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
+                log.info("find check timestamp, {} {}", //
+                        storeTimestamp, //
+                        UtilAll.timeMillisToHumanString(storeTimestamp));
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private void notifyMessageArriving() {
+
+    }
+
+    public boolean resetOffset(long offset) {
+        return this.mappedFileQueue.resetOffset(offset);
+    }
+
+    public long getBeginTimeInLock() {
+        return beginTimeInLock;
+    }
+
+    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+        // Set the storage time
+        msg.setStoreTimestamp(System.currentTimeMillis());
+        // Set the message body BODY CRC (consider the most appropriate setting
+        // on the client)
+        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+        // Back to Results
+        AppendMessageResult result = null;
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        String topic = msg.getTopic();
+        int queueId = msg.getQueueId();
+
+        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
+        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
+                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+            // Delay Delivery
+            if (msg.getDelayTimeLevel() > 0) {
+                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
+                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
+                }
+
+                topic = ScheduleMessageService.SCHEDULE_TOPIC;
+                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
+
+                // Backup real topic, queueId
+                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
+                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
+                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+                msg.setTopic(topic);
+                msg.setQueueId(queueId);
+            }
+        }
+
+        long eclipseTimeInLock = 0;
+        MappedFile unlockMappedFile = null;
+        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+
+        lockForPutMessage(); //spin...
+        try {
+            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
+            this.beginTimeInLock = beginLockTimestamp;
+
+            // Here settings are stored timestamp, in order to ensure an orderly
+            // global
+            msg.setStoreTimestamp(beginLockTimestamp);
+
+            if (null == mappedFile || mappedFile.isFull()) {
+                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
+            }
+            if (null == mappedFile) {
+                log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
+                beginTimeInLock = 0;
+                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
+            }
+
+            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+            switch (result.getStatus()) {
+                case PUT_OK:
+                    break;
+                case END_OF_FILE:
+                    unlockMappedFile = mappedFile;
+                    // Create a new file, re-write the message
+                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
+                    if (null == mappedFile) {
+                        // XXX: warn and notify me
+                        log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
+                        beginTimeInLock = 0;
+                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
+                    }
+                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+                    break;
+                case MESSAGE_SIZE_EXCEEDED:
+                case PROPERTIES_SIZE_EXCEEDED:
+                    beginTimeInLock = 0;
+                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
+                case UNKNOWN_ERROR:
+                    beginTimeInLock = 0;
+                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+                default:
+                    beginTimeInLock = 0;
+                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+            }
+
+            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
+            beginTimeInLock = 0;
+        } finally {
+            releasePutMessageLock();
+        }
+
+        if (eclipseTimeInLock > 500) {
+            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
+        }
+
+        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
+        }
+
+
+        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
+
+        // Statistics
+        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
+        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
+
+        GroupCommitRequest request = null;
+
+        // Synchronization flush
+        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
+            if (msg.isWaitStoreMsgOK()) {
+                request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+                service.putRequest(request);
+                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+                if (!flushOK) {
+                    log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
+                            + " client address: " + msg.getBornHostString());
+                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+                }
+            } else {
+                service.wakeup();
+            }
+        }
+        // Asynchronous flush
+        else {
+            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+                flushCommitLogService.wakeup();
+            } else {
+                commitLogService.wakeup();
+            }
+        }
+
+        // Synchronous write double
+        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
+            HAService service = this.defaultMessageStore.getHaService();
+            if (msg.isWaitStoreMsgOK()) {
+                // Determine whether to wait
+                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
+                    if (null == request) {
+                        request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+                    }
+                    service.putRequest(request);
+
+                    service.getWaitNotifyObject().wakeupAll();
+
+                    boolean flushOK =
+                            // TODO
+                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+                    if (!flushOK) {
+                        log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+                                + msg.getTags() + " client address: " + msg.getBornHostString());
+                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+                    }
+                }
+                // Slave problem
+                else {
+                    // Tell the producer, slave not available
+                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
+                }
+            }
+        }
+
+        return putMessageResult;
+    }
+
+    /**
+     * According to receive certain message or offset storage time if an error
+     * occurs, it returns -1
+     */
+    public long pickupStoreTimestamp(final long offset, final int size) {
+        if (offset >= this.getMinOffset()) {
+            SelectMappedBufferResult result = this.getMessage(offset, size);
+            if (null != result) {
+                try {
+                    return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+                } finally {
+                    result.release();
+                }
+            }
+        }
+
+        return -1;
+    }
+
+    public long getMinOffset() {
+        MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
+        if (mappedFile != null) {
+            if (mappedFile.isAvailable()) {
+                return mappedFile.getFileFromOffset();
+            } else {
+                return this.rollNextFile(mappedFile.getFileFromOffset());
+            }
+        }
+
+        return -1;
+    }
+
+    public SelectMappedBufferResult getMessage(final long offset, final int size) {
+        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+        if (mappedFile != null) {
+            int pos = (int) (offset % mappedFileSize);
+            return mappedFile.selectMappedBuffer(pos, size);
+        }
+        return null;
+    }
+
+    public long rollNextFile(final long offset) {
+        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+        return offset + mappedFileSize - offset % mappedFileSize;
+    }
+
+    public HashMap<String, Long> getTopicQueueTable() {
+        return topicQueueTable;
+    }
+
+
+    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+        this.topicQueueTable = topicQueueTable;
+    }
+
+
+    public void destroy() {
+        this.mappedFileQueue.destroy();
+    }
+
+
+    public boolean appendData(long startOffset, byte[] data) {
+        lockForPutMessage(); //spin...
+        try {
+            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
+            if (null == mappedFile) {
+                log.error("appendData getLastMappedFile error  " + startOffset);
+                return false;
+            }
+
+            return mappedFile.appendMessage(data);
+        } finally {
+            releasePutMessageLock();
+        }
+    }
+
+
+    public boolean retryDeleteFirstFile(final long intervalForcibly) {
+        return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly);
+    }
+
+    public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
+        String key = topic + "-" + queueId;
+        synchronized (this) {
+            this.topicQueueTable.remove(key);
+        }
+
+        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
+    }
+
+    public void checkSelf() {
+        mappedFileQueue.checkSelf();
+    }
+
+    abstract class FlushCommitLogService extends ServiceThread {
+        protected static final int RETRY_TIMES_OVER = 10;
+    }
+
+    class CommitRealTimeService extends FlushCommitLogService {
+
+        private long lastCommitTimestamp = 0;
+
+        @Override
+        public String getServiceName() {
+            return CommitRealTimeService.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            CommitLog.log.info(this.getServiceName() + " service started");
+            while (!this.isStopped()) {
+                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
+
+                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
+
+                int commitDataThoroughInterval =
+                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
+
+                long begin = System.currentTimeMillis();
+                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
+                    this.lastCommitTimestamp = begin;
+                    commitDataLeastPages = 0;
+                }
+
+                try {
+                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
+                    long end = System.currentTimeMillis();
+                    if (!result) {
+                        this.lastCommitTimestamp = end; // result = false means some data committed.
+                        //now wake up flush thread.
+                        flushCommitLogService.wakeup();
+                    }
+
+                    if (end - begin > 500) {
+                        log.info("Commit data to file costs {} ms", end - begin);
+                    }
+                    this.waitForRunning(interval);
+                } catch (Throwable e) {
+                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            boolean result = false;
+            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
+                result = CommitLog.this.mappedFileQueue.commit(0);
+                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
+            }
+            CommitLog.log.info(this.getServiceName() + " service end");
+        }
+    }
+
+    class FlushRealTimeService extends FlushCommitLogService {
+        private long lastFlushTimestamp = 0;
+        private long printTimes = 0;
+
+
+        public void run() {
+            CommitLog.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
+
+                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
+                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
+
+                int flushPhysicQueueThoroughInterval =
+                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
+
+                boolean printFlushProgress = false;
+
+                // Print flush progress
+                long currentTimeMillis = System.currentTimeMillis();
+                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
+                    this.lastFlushTimestamp = currentTimeMillis;
+                    flushPhysicQueueLeastPages = 0;
+                    printFlushProgress = (printTimes++ % 10) == 0;
+                }
+
+                try {
+                    if (flushCommitLogTimed) {
+                        Thread.sleep(interval);
+                    } else {
+                        this.waitForRunning(interval);
+                    }
+
+                    if (printFlushProgress) {
+                        this.printFlushProgress();
+                    }
+
+                    long begin = System.currentTimeMillis();
+                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
+                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
+                    if (storeTimestamp > 0) {
+                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+                    }
+                    long past = System.currentTimeMillis() - begin;
+                    if (past > 500) {
+                        log.info("Flush data to disk costs {} ms", past);
+                    }
+                } catch (Throwable e) {
+                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
+                    this.printFlushProgress();
+                }
+            }
+
+            // Normal shutdown, to ensure that all the flush before exit
+            boolean result = false;
+            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
+                result = CommitLog.this.mappedFileQueue.flush(0);
+                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
+            }
+
+            this.printFlushProgress();
+
+            CommitLog.log.info(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return FlushRealTimeService.class.getSimpleName();
+        }
+
+
+        private void printFlushProgress() {
+            // CommitLog.log.info("how much disk fall behind memory, "
+            // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
+        }
+
+
+        @Override
+        public long getJointime() {
+            return 1000 * 60 * 5;
+        }
+    }
+
+    public static class GroupCommitRequest {
+        private final long nextOffset;
+        private final CountDownLatch countDownLatch = new CountDownLatch(1);
+        private volatile boolean flushOK = false;
+
+
+        public GroupCommitRequest(long nextOffset) {
+            this.nextOffset = nextOffset;
+        }
+
+
+        public long getNextOffset() {
+            return nextOffset;
+        }
+
+
+        public void wakeupCustomer(final boolean flushOK) {
+            this.flushOK = flushOK;
+            this.countDownLatch.countDown();
+        }
+
+
+        public boolean waitForFlush(long timeout) {
+            try {
+                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+                return this.flushOK;
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                return false;
+            }
+        }
+    }
+
+    /**
+     * GroupCommit Service
+     */
+    class GroupCommitService extends FlushCommitLogService {
+        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
+        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
+
+
+        public void putRequest(final GroupCommitRequest request) {
+            synchronized (this) {
+                this.requestsWrite.add(request);
+                if (hasNotified.compareAndSet(false, true)) {
+                    waitPoint.countDown(); // notify
+                }
+            }
+        }
+
+
+        private void swapRequests() {
+            List<GroupCommitRequest> tmp = this.requestsWrite;
+            this.requestsWrite = this.requestsRead;
+            this.requestsRead = tmp;
+        }
+
+
+        private void doCommit() {
+            if (!this.requestsRead.isEmpty()) {
+                for (GroupCommitRequest req : this.requestsRead) {
+                    // There may be a message in the next file, so a maximum of
+                    // two times the flush
+                    boolean flushOK = false;
+                    for (int i = 0; i < 2 && !flushOK; i++) {
+                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+
+                        if (!flushOK) {
+                            CommitLog.this.mappedFileQueue.flush(0);
+                        }
+                    }
+
+                    req.wakeupCustomer(flushOK);
+                }
+
+                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
+                if (storeTimestamp > 0) {
+                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+                }
+
+                this.requestsRead.clear();
+            } else {
+                // Because of individual messages is set to not sync flush, it
+                // will come to this process
+                CommitLog.this.mappedFileQueue.flush(0);
+            }
+        }
+
+
+        public void run() {
+            CommitLog.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.waitForRunning(0);
+                    this.doCommit();
+                } catch (Exception e) {
+                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            // Under normal circumstances shutdown, wait for the arrival of the
+            // request, and then flush
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                CommitLog.log.warn("GroupCommitService Exception, ", e);
+            }
+
+            synchronized (this) {
+                this.swapRequests();
+            }
+
+            this.doCommit();
+
+            CommitLog.log.info(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        protected void onWaitEnd() {
+            this.swapRequests();
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return GroupCommitService.class.getSimpleName();
+        }
+
+
+        @Override
+        public long getJointime() {
+            return 1000 * 60 * 5;
+        }
+    }
+
+    class DefaultAppendMessageCallback implements AppendMessageCallback {
+        // File at the end of the minimum fixed length empty
+        private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
+        private final ByteBuffer msgIdMemory;
+        // Store the message content
+        private final ByteBuffer msgStoreItemMemory;
+        // The maximum length of the message
+        private final int maxMessageSize;
+        // Build Message Key
+        private final StringBuilder keyBuilder = new StringBuilder();
+
+        private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
+
+
+        DefaultAppendMessageCallback(final int size) {
+            this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
+            this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
+            this.maxMessageSize = size;
+        }
+
+
+        public ByteBuffer getMsgStoreItemMemory() {
+            return msgStoreItemMemory;
+        }
+
+
+        public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
+            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
+
+            // PHY OFFSET
+            long wroteOffset = fileFromOffset + byteBuffer.position();
+
+            this.resetByteBuffer(hostHolder, 8);
+            String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
+
+            // Record ConsumeQueue information
+            keyBuilder.setLength(0);
+            keyBuilder.append(msgInner.getTopic());
+            keyBuilder.append('-');
+            keyBuilder.append(msgInner.getQueueId());
+            String key = keyBuilder.toString();
+            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
+            if (null == queueOffset) {
+                queueOffset = 0L;
+                CommitLog.this.topicQueueTable.put(key, queueOffset);
+            }
+
+            // Transaction messages that require special handling
+            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
+            switch (tranType) {
+                // Prepared and Rollback message is not consumed, will not enter the
+                // consumer queuec
+                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                    queueOffset = 0L;
+                    break;
+                case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                default:
+                    break;
+            }
+
+            /**
+             * Serialize message
+             */
+            final byte[] propertiesData =
+                    msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+            final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;
+
+            if (propertiesLength > Short.MAX_VALUE) {
+                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
+                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+            }
+
+            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+            final int topicLength = topicData == null ? 0 : topicData.length;
+
+            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
+
+            final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
+
+            // Exceeds the maximum message
+            if (msgLen > this.maxMessageSize) {
+                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+                        + ", maxMessageSize: " + this.maxMessageSize);
+                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
+            }
+
+            // Determines whether there is sufficient free space
+            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
+                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
+                // 1 TOTALSIZE
+                this.msgStoreItemMemory.putInt(maxBlank);
+                // 2 MAGICCODE
+                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
+                // 3 The remaining space may be any value
+                //
+
+                // Here the length of the specially set maxBlank
+                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
+                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
+                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
+                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+            }
+
+            // Initialization of storage space
+            this.resetByteBuffer(msgStoreItemMemory, msgLen);
+            // 1 TOTALSIZE
+            this.msgStoreItemMemory.putInt(msgLen);
+            // 2 MAGICCODE
+            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+            // 3 BODYCRC
+            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
+            // 4 QUEUEID
+            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
+            // 5 FLAG
+            this.msgStoreItemMemory.putInt(msgInner.getFlag());
+            // 6 QUEUEOFFSET
+            this.msgStoreItemMemory.putLong(queueOffset);
+            // 7 PHYSICALOFFSET
+            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
+            // 8 SYSFLAG
+            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
+            // 9 BORNTIMESTAMP
+            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+            // 10 BORNHOST
+            this.resetByteBuffer(hostHolder, 8);
+            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
+            // 11 STORETIMESTAMP
+            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+            // 12 STOREHOSTADDRESS
+            this.resetByteBuffer(hostHolder, 8);
+            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
+            //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
+            // 13 RECONSUMETIMES
+            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+            // 14 Prepared Transaction Offset
+            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+            // 15 BODY
+            this.msgStoreItemMemory.putInt(bodyLength);
+            if (bodyLength > 0)
+                this.msgStoreItemMemory.put(msgInner.getBody());
+            // 16 TOPIC
+            this.msgStoreItemMemory.put((byte) topicLength);
+            this.msgStoreItemMemory.put(topicData);
+            // 17 PROPERTIES
+            this.msgStoreItemMemory.putShort(propertiesLength);
+            if (propertiesLength > 0)
+                this.msgStoreItemMemory.put(propertiesData);
+
+            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
+            // Write messages to the queue buffer
+            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
+
+            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
+                    msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+
+            switch (tranType) {
+                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                    break;
+                case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                    // The next update ConsumeQueue information
+                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
+                    break;
+                default:
+                    break;
+            }
+            return result;
+        }
+
+
+        private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
+            byteBuffer.flip();
+            byteBuffer.limit(limit);
+        }
+    }
+
+    public long lockTimeMills() {
+        long diff = 0;
+        long begin = this.beginTimeInLock;
+        if (begin > 0) {
+            diff = this.defaultMessageStore.now() - begin;
+        }
+
+        if (diff < 0) {
+            diff = 0;
+        }
+
+        return diff;
+    }
+
+    /**
+     * Spin util acquired the lock.
+     */
+    private void lockForPutMessage() {
+        if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
+            putMessageNormalLock.lock();
+        } else {
+            boolean flag;
+            do {
+                flag = this.putMessageSpinLock.compareAndSet(true, false);
+            } while (!flag);
+        }
+    }
+
+    private void releasePutMessageLock() {
+        if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
+            putMessageNormalLock.unlock();
+        } else {
+            this.putMessageSpinLock.compareAndSet(false, true);
+        }
+    }
+}


Mime
View raw message