http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-store/pom.xml b/rocketmq-store/pom.xml
new file mode 100644
index 0000000..440e521
--- /dev/null
+++ b/rocketmq-store/pom.xml
@@ -0,0 +1,46 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-store</artifactId>
+ <name>rocketmq-store ${project.version}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.jna</groupId>
+ <artifactId>jna</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
new file mode 100644
index 0000000..40eee7a
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Create MappedFile in advance
+ *
+ * @author shijia.wxr
+ */
+public class AllocateMappedFileService extends ServiceThread {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private static int waitTimeOut = 1000 * 5;
+ private ConcurrentHashMap<String, AllocateRequest> requestTable =
+ new ConcurrentHashMap<String, AllocateRequest>();
+ private PriorityBlockingQueue<AllocateRequest> requestQueue =
+ new PriorityBlockingQueue<AllocateRequest>();
+ private volatile boolean hasException = false;
+ private DefaultMessageStore messageStore;
+
+
+ public AllocateMappedFileService(DefaultMessageStore messageStore) {
+ this.messageStore = messageStore;
+ }
+
+
+ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
+ int canSubmitRequests = 2;
+ if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
+ && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
+ canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
+ }
+ }
+
+ AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
+ boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
+
+ if (nextPutOK) {
+ if (canSubmitRequests <= 0) {
+ log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
+ this.requestTable.remove(nextFilePath);
+ return null;
+ }
+ boolean offerOK = this.requestQueue.offer(nextReq);
+ if (!offerOK) {
+ log.warn("never expected here, add a request to preallocate queue failed");
+ }
+ canSubmitRequests--;
+ }
+
+ AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
+ boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
+ if (nextNextPutOK) {
+ if (canSubmitRequests <= 0) {
+ log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
+ this.requestTable.remove(nextNextFilePath);
+ } else {
+ boolean offerOK = this.requestQueue.offer(nextNextReq);
+ if (!offerOK) {
+ log.warn("never expected here, add a request to preallocate queue failed");
+ }
+ }
+ }
+
+ if (hasException) {
+ log.warn(this.getServiceName() + " service has exception. so return null");
+ return null;
+ }
+
+ AllocateRequest result = this.requestTable.get(nextFilePath);
+ try {
+ if (result != null) {
+ boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
+ if (!waitOK) {
+ log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
+ return null;
+ } else {
+ this.requestTable.remove(nextFilePath);
+ return result.getMappedFile();
+ }
+ } else {
+ log.error("find preallocate mmap failed, this never happen");
+ }
+ } catch (InterruptedException e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+
+ return null;
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return AllocateMappedFileService.class.getSimpleName();
+ }
+
+
+ public void shutdown() {
+ this.stopped = true;
+ this.thread.interrupt();
+
+ try {
+ this.thread.join(this.getJointime());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ for (AllocateRequest req : this.requestTable.values()) {
+ if (req.mappedFile != null) {
+ log.info("delete pre allocated maped file, {}", req.mappedFile.getFileName());
+ req.mappedFile.destroy(1000);
+ }
+ }
+ }
+
+
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped() && this.mmapOperation()) {
+
+ }
+ log.info(this.getServiceName() + " service end");
+ }
+
+
+ /**
+ * Only interrupted by the external thread, will return false
+ */
+ private boolean mmapOperation() {
+ boolean isSuccess = false;
+ AllocateRequest req = null;
+ try {
+ req = this.requestQueue.take();
+ AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
+ if (null == expectedRequest) {
+ log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ + req.getFileSize());
+ return true;
+ }
+ if (expectedRequest != req) {
+ log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
+ return true;
+ }
+
+ if (req.getMappedFile() == null) {
+ long beginTime = System.currentTimeMillis();
+
+ MappedFile mappedFile;
+ if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ try {
+ mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
+ mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
+ } catch (RuntimeException e) {
+ log.warn("Use default implementation.");
+ mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
+ }
+ } else {
+ mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
+ }
+
+ long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
+ if (eclipseTime > 10) {
+ int queueSize = this.requestQueue.size();
+ log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ + " " + req.getFilePath() + " " + req.getFileSize());
+ }
+
+ // pre write mappedFile
+ if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
+ .getMapedFileSizeCommitLog()
+ &&
+ this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+ mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
+ this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
+ }
+
+ req.setMappedFile(mappedFile);
+ this.hasException = false;
+ isSuccess = true;
+ }
+ } catch (InterruptedException e) {
+ log.warn(this.getServiceName() + " service has exception, maybe by shutdown");
+ this.hasException = true;
+ return false;
+ } catch (IOException e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ this.hasException = true;
+ if (null != req) {
+ requestQueue.offer(req);
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e1) {
+ }
+ }
+ } finally {
+ if (req != null && isSuccess)
+ req.getCountDownLatch().countDown();
+ }
+ return true;
+ }
+
+ static class AllocateRequest implements Comparable<AllocateRequest> {
+ // Full file path
+ private String filePath;
+ private int fileSize;
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+ private volatile MappedFile mappedFile = null;
+
+
+ public AllocateRequest(String filePath, int fileSize) {
+ this.filePath = filePath;
+ this.fileSize = fileSize;
+ }
+
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+
+ public int getFileSize() {
+ return fileSize;
+ }
+
+
+ public void setFileSize(int fileSize) {
+ this.fileSize = fileSize;
+ }
+
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+
+ public MappedFile getMappedFile() {
+ return mappedFile;
+ }
+
+
+ public void setMappedFile(MappedFile mappedFile) {
+ this.mappedFile = mappedFile;
+ }
+
+
+ public int compareTo(AllocateRequest other) {
+ if (this.fileSize < other.fileSize)
+ return 1;
+ else if (this.fileSize > other.fileSize) {
+ return -1;
+ } else {
+ int mIndex = this.filePath.lastIndexOf(File.separator);
+ long mName = Long.parseLong(this.filePath.substring(mIndex + 1));
+ int oIndex = other.filePath.lastIndexOf(File.separator);
+ long oName = Long.parseLong(other.filePath.substring(oIndex + 1));
+ if (mName < oName) {
+ return -1;
+ } else if (mName > oName) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ // return this.fileSize < other.fileSize ? 1 : this.fileSize >
+ // other.fileSize ? -1 : 0;
+ }
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((filePath == null) ? 0 : filePath.hashCode());
+ result = prime * result + fileSize;
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AllocateRequest other = (AllocateRequest) obj;
+ if (filePath == null) {
+ if (other.filePath != null)
+ return false;
+ } else if (!filePath.equals(other.filePath))
+ return false;
+ if (fileSize != other.fileSize)
+ return false;
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
new file mode 100644
index 0000000..778bd88
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Write messages callback interface
+ *
+ * @author shijia.wxr
+ *
+ */
+public interface AppendMessageCallback {
+
+ /**
+ * After message serialization, write MapedByteBuffer
+ *
+ * @param byteBuffer
+ * @param maxBlank
+ * @param msg
+ *
+ * @return How many bytes to write
+ */
+ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
+ final int maxBlank, final MessageExtBrokerInner msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
new file mode 100644
index 0000000..a87a917
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * When write a message to the commit log, returns results
+ *
+ * @author shijia.wxr
+ */
+public class AppendMessageResult {
+ // Return code
+ private AppendMessageStatus status;
+ // Where to start writing
+ private long wroteOffset;
+ // Write Bytes
+ private int wroteBytes;
+ // Message ID
+ private String msgId;
+ // Message storage timestamp
+ private long storeTimestamp;
+ // Consume queue's offset(step by one)
+ private long logicsOffset;
+ private long pagecacheRT = 0;
+
+ public AppendMessageResult(AppendMessageStatus status) {
+ this(status, 0, 0, "", 0, 0, 0);
+ }
+
+ public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId,
+ long storeTimestamp, long logicsOffset, long pagecacheRT) {
+ this.status = status;
+ this.wroteOffset = wroteOffset;
+ this.wroteBytes = wroteBytes;
+ this.msgId = msgId;
+ this.storeTimestamp = storeTimestamp;
+ this.logicsOffset = logicsOffset;
+ this.pagecacheRT = pagecacheRT;
+ }
+
+ public long getPagecacheRT() {
+ return pagecacheRT;
+ }
+
+ public void setPagecacheRT(final long pagecacheRT) {
+ this.pagecacheRT = pagecacheRT;
+ }
+
+ public boolean isOk() {
+ return this.status == AppendMessageStatus.PUT_OK;
+ }
+
+
+ public AppendMessageStatus getStatus() {
+ return status;
+ }
+
+
+ public void setStatus(AppendMessageStatus status) {
+ this.status = status;
+ }
+
+
+ public long getWroteOffset() {
+ return wroteOffset;
+ }
+
+
+ public void setWroteOffset(long wroteOffset) {
+ this.wroteOffset = wroteOffset;
+ }
+
+
+ public int getWroteBytes() {
+ return wroteBytes;
+ }
+
+
+ public void setWroteBytes(int wroteBytes) {
+ this.wroteBytes = wroteBytes;
+ }
+
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+
+ public long getStoreTimestamp() {
+ return storeTimestamp;
+ }
+
+
+ public void setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ }
+
+
+ public long getLogicsOffset() {
+ return logicsOffset;
+ }
+
+
+ public void setLogicsOffset(long logicsOffset) {
+ this.logicsOffset = logicsOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "AppendMessageResult{" +
+ "status=" + status +
+ ", wroteOffset=" + wroteOffset +
+ ", wroteBytes=" + wroteBytes +
+ ", msgId='" + msgId + '\'' +
+ ", storeTimestamp=" + storeTimestamp +
+ ", logicsOffset=" + logicsOffset +
+ ", pagecacheRT=" + pagecacheRT +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
new file mode 100644
index 0000000..97234c0
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * When write a message to the commit log, returns code
+ *
+ * @author shijia.wxr
+ *
+ */
+public enum AppendMessageStatus {
+ PUT_OK,
+ END_OF_FILE,
+ MESSAGE_SIZE_EXCEEDED,
+ PROPERTIES_SIZE_EXCEEDED,
+ UNKNOWN_ERROR,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
new file mode 100644
index 0000000..23f305d
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
@@ -0,0 +1,1296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageAccessor;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import com.alibaba.rocketmq.store.config.FlushDiskType;
+import com.alibaba.rocketmq.store.ha.HAService;
+import com.alibaba.rocketmq.store.schedule.ScheduleMessageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * Store all metadata downtime for recovery, data protection reliability
+ *
+ * @author shijia.wxr
+ */
+public class CommitLog {
+ // Message's MAGIC CODE daa320a7
+ public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ // End of file empty MAGIC CODE cbd43194
+ private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
+ private final MappedFileQueue mappedFileQueue;
+ private final DefaultMessageStore defaultMessageStore;
+ private final FlushCommitLogService flushCommitLogService;
+
+ //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
+ private final FlushCommitLogService commitLogService;
+
+ private final AppendMessageCallback appendMessageCallback;
+ private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
+ private volatile long confirmOffset = -1L;
+
+ private volatile long beginTimeInLock = 0;
+
+ //true: Can lock, false : in lock.
+ private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+ private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
+
+ public CommitLog(final DefaultMessageStore defaultMessageStore) {
+ this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
+ defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
+ this.defaultMessageStore = defaultMessageStore;
+
+ if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+ this.flushCommitLogService = new GroupCommitService();
+ } else {
+ this.flushCommitLogService = new FlushRealTimeService();
+ }
+
+ this.commitLogService = new CommitRealTimeService();
+
+ this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+ }
+
+ public boolean load() {
+ boolean result = this.mappedFileQueue.load();
+ log.info("load commit log " + (result ? "OK" : "Failed"));
+ return result;
+ }
+
+ public void start() {
+ this.flushCommitLogService.start();
+
+ if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ this.commitLogService.start();
+ }
+ }
+
+ public void shutdown() {
+ if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ this.commitLogService.shutdown();
+ }
+
+ this.flushCommitLogService.shutdown();
+ }
+
+ public long flush() {
+ this.mappedFileQueue.commit(0);
+ this.mappedFileQueue.flush(0);
+ return this.mappedFileQueue.getFlushedWhere();
+ }
+
+ public long getMaxOffset() {
+ return this.mappedFileQueue.getMaxOffset();
+ }
+
+ public long remainHowManyDataToCommit() {
+ return this.mappedFileQueue.remainHowManyDataToCommit();
+ }
+
+ public long remainHowManyDataToFlush() {
+ return this.mappedFileQueue.remainHowManyDataToFlush();
+ }
+
+
+ public int deleteExpiredFile(//
+ final long expiredTime, //
+ final int deleteFilesInterval, //
+ final long intervalForcibly, //
+ final boolean cleanImmediately//
+ ) {
+ return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
+ }
+
+
+ /**
+ * Read CommitLog data, use data replication
+ */
+ public SelectMappedBufferResult getData(final long offset) {
+ return this.getData(offset, offset == 0);
+ }
+
+
+ public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
+ int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+ MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
+ if (mappedFile != null) {
+ int pos = (int) (offset % mappedFileSize);
+ SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
+ return result;
+ }
+
+ return null;
+ }
+
+
+ /**
+ * When the normal exit, data recovery, all memory data have been flush
+ */
+ public void recoverNormally() {
+ boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+ final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+ if (!mappedFiles.isEmpty()) {
+ // Began to recover from the last third file
+ int index = mappedFiles.size() - 3;
+ if (index < 0)
+ index = 0;
+
+ MappedFile mappedFile = mappedFiles.get(index);
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+ long processOffset = mappedFile.getFileFromOffset();
+ long mappedFileOffset = 0;
+ while (true) {
+ DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
+ int size = dispatchRequest.getMsgSize();
+ // Normal data
+ if (dispatchRequest.isSuccess() && size > 0) {
+ mappedFileOffset += size;
+ }
+ // Come the end of the file, switch to the next file Since the
+ // return 0 representatives met last hole,
+ // this can not be included in truncate offset
+ else if (dispatchRequest.isSuccess() && size == 0) {
+ index++;
+ if (index >= mappedFiles.size()) {
+ // Current branch can not happen
+ log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName());
+ break;
+ } else {
+ mappedFile = mappedFiles.get(index);
+ byteBuffer = mappedFile.sliceByteBuffer();
+ processOffset = mappedFile.getFileFromOffset();
+ mappedFileOffset = 0;
+ log.info("recover next physics file, " + mappedFile.getFileName());
+ }
+ }
+ // Intermediate file read error
+ else if (!dispatchRequest.isSuccess()) {
+ log.info("recover physics file end, " + mappedFile.getFileName());
+ break;
+ }
+ }
+
+ processOffset += mappedFileOffset;
+ this.mappedFileQueue.setFlushedWhere(processOffset);
+ this.mappedFileQueue.setCommittedWhere(processOffset);
+ this.mappedFileQueue.truncateDirtyFiles(processOffset);
+ }
+ }
+
+ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC) {
+ return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
+ }
+
+ private void doNothingForDeadCode(final Object obj) {
+ if (obj != null) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.valueOf(obj.hashCode()));
+ }
+ }
+ }
+
+ /**
+ * check the message and returns the message size
+ *
+ * @return 0 Come the end of the file // >0 Normal messages // -1 Message
+ * checksum failure
+ */
+ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
+ try {
+ // 1 TOTAL SIZE
+ int totalSize = byteBuffer.getInt();
+
+ // 2 MAGIC CODE
+ int magicCode = byteBuffer.getInt();
+ switch (magicCode) {
+ case MESSAGE_MAGIC_CODE:
+ break;
+ case BLANK_MAGIC_CODE:
+ return new DispatchRequest(0, true /* success */);
+ default:
+ log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
+ return new DispatchRequest(-1, false /* success */);
+ }
+
+ byte[] bytesContent = new byte[totalSize];
+
+ // 3 BODYCRC
+ int bodyCRC = byteBuffer.getInt();
+
+ // 4 QUEUEID
+ int queueId = byteBuffer.getInt();
+
+ // 5 FLAG
+ int flag = byteBuffer.getInt();
+
+ // 6 QUEUEOFFSET
+ long queueOffset = byteBuffer.getLong();
+
+ // 7 PHYSICALOFFSET
+ long physicOffset = byteBuffer.getLong();
+
+ // 8 SYSFLAG
+ int sysFlag = byteBuffer.getInt();
+
+ // 9 BORNTIMESTAMP
+ long bornTimeStamp = byteBuffer.getLong();
+
+ // 10
+ ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8);
+
+ // 11 STORETIMESTAMP
+ long storeTimestamp = byteBuffer.getLong();
+
+ // 12
+ ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8);
+
+ // 13 RECONSUMETIMES
+ int reconsumeTimes = byteBuffer.getInt();
+
+ // 14 Prepared Transaction Offset
+ long preparedTransactionOffset = byteBuffer.getLong();
+
+ // 15 BODY
+ int bodyLen = byteBuffer.getInt();
+ if (bodyLen > 0) {
+ if (readBody) {
+ byteBuffer.get(bytesContent, 0, bodyLen);
+
+ if (checkCRC) {
+ int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
+ if (crc != bodyCRC) {
+ log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
+ return new DispatchRequest(-1, false/* success */);
+ }
+ }
+ } else {
+ byteBuffer.position(byteBuffer.position() + bodyLen);
+ }
+ }
+
+ // 16 TOPIC
+ byte topicLen = byteBuffer.get();
+ byteBuffer.get(bytesContent, 0, topicLen);
+ String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
+
+ long tagsCode = 0;
+ String keys = "";
+ String uniqKey = null;
+
+ // 17 properties
+ short propertiesLength = byteBuffer.getShort();
+ if (propertiesLength > 0) {
+ byteBuffer.get(bytesContent, 0, propertiesLength);
+ String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
+ Map<String, String> propertiesMap = MessageDecoder.string2messageProperties(properties);
+
+ keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
+
+ uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+
+ String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
+ if (tags != null && tags.length() > 0) {
+ tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
+ }
+
+ // Timing message processing
+ {
+ String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+ if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
+ int delayLevel = Integer.parseInt(t);
+
+ if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
+ delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
+ }
+
+ if (delayLevel > 0) {
+ tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
+ storeTimestamp);
+ }
+ }
+ }
+ }
+
+ int readLength = calMsgLength(bodyLen, topicLen, propertiesLength);
+ if (totalSize != readLength) {
+ doNothingForDeadCode(reconsumeTimes);
+ doNothingForDeadCode(flag);
+ doNothingForDeadCode(bornTimeStamp);
+ doNothingForDeadCode(byteBuffer1);
+ doNothingForDeadCode(byteBuffer2);
+ log.error(
+ "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
+ totalSize, readLength, bodyLen, topicLen, propertiesLength);
+ return new DispatchRequest(totalSize, false/* success */);
+ }
+
+ return new DispatchRequest(//
+ topic, // 1
+ queueId, // 2
+ physicOffset, // 3
+ totalSize, // 4
+ tagsCode, // 5
+ storeTimestamp, // 6
+ queueOffset, // 7
+ keys, // 8
+ uniqKey, //9
+ sysFlag, // 9
+ preparedTransactionOffset// 10
+ );
+ } catch (Exception e) {
+ }
+
+ return new DispatchRequest(-1, false /* success */);
+ }
+
+ private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
+ final int msgLen = 4 // 1 TOTALSIZE
+ + 4 // 2 MAGICCODE
+ + 4 // 3 BODYCRC
+ + 4 // 4 QUEUEID
+ + 4 // 5 FLAG
+ + 8 // 6 QUEUEOFFSET
+ + 8 // 7 PHYSICALOFFSET
+ + 4 // 8 SYSFLAG
+ + 8 // 9 BORNTIMESTAMP
+ + 8 // 10 BORNHOST
+ + 8 // 11 STORETIMESTAMP
+ + 8 // 12 STOREHOSTADDRESS
+ + 4 // 13 RECONSUMETIMES
+ + 8 // 14 Prepared Transaction Offset
+ + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY
+ + 1 + topicLength // 15 TOPIC
+ + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16
+ // propertiesLength
+ + 0;
+ return msgLen;
+ }
+
+ public long getConfirmOffset() {
+ return this.confirmOffset;
+ }
+
+ public void setConfirmOffset(long phyOffset) {
+ this.confirmOffset = phyOffset;
+ }
+
+ public void recoverAbnormally() {
+ // recover by the minimum time stamp
+ boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+ final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+ if (!mappedFiles.isEmpty()) {
+ // Looking beginning to recover from which file
+ int index = mappedFiles.size() - 1;
+ MappedFile mappedFile = null;
+ for (; index >= 0; index--) {
+ mappedFile = mappedFiles.get(index);
+ if (this.isMappedFileMatchedRecover(mappedFile)) {
+ log.info("recover from this maped file " + mappedFile.getFileName());
+ break;
+ }
+ }
+
+ if (index < 0) {
+ index = 0;
+ mappedFile = mappedFiles.get(index);
+ }
+
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+ long processOffset = mappedFile.getFileFromOffset();
+ long mappedFileOffset = 0;
+ while (true) {
+ DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
+ int size = dispatchRequest.getMsgSize();
+
+ // Normal data
+ if (size > 0) {
+ mappedFileOffset += size;
+
+
+ if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+ if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
+ this.defaultMessageStore.doDispatch(dispatchRequest);
+ }
+ } else {
+ this.defaultMessageStore.doDispatch(dispatchRequest);
+ }
+ }
+ // Intermediate file read error
+ else if (size == -1) {
+ log.info("recover physics file end, " + mappedFile.getFileName());
+ break;
+ }
+ // Come the end of the file, switch to the next file
+ // Since the return 0 representatives met last hole, this can
+ // not be included in truncate offset
+ else if (size == 0) {
+ index++;
+ if (index >= mappedFiles.size()) {
+ // The current branch under normal circumstances should
+ // not happen
+ log.info("recover physics file over, last maped file " + mappedFile.getFileName());
+ break;
+ } else {
+ mappedFile = mappedFiles.get(index);
+ byteBuffer = mappedFile.sliceByteBuffer();
+ processOffset = mappedFile.getFileFromOffset();
+ mappedFileOffset = 0;
+ log.info("recover next physics file, " + mappedFile.getFileName());
+ }
+ }
+ }
+
+ processOffset += mappedFileOffset;
+ this.mappedFileQueue.setFlushedWhere(processOffset);
+ this.mappedFileQueue.setCommittedWhere(processOffset);
+ this.mappedFileQueue.truncateDirtyFiles(processOffset);
+
+ // Clear ConsumeQueue redundant data
+ this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+ }
+ // Commitlog case files are deleted
+ else {
+ this.mappedFileQueue.setFlushedWhere(0);
+ this.mappedFileQueue.setCommittedWhere(0);
+ this.defaultMessageStore.destroyLogics();
+ }
+ }
+
+ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+
+ int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
+ if (magicCode != MESSAGE_MAGIC_CODE) {
+ return false;
+ }
+
+ long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+ if (0 == storeTimestamp) {
+ return false;
+ }
+
+ if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
+ && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
+ if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
+ log.info("find check timestamp, {} {}", //
+ storeTimestamp, //
+ UtilAll.timeMillisToHumanString(storeTimestamp));
+ return true;
+ }
+ } else {
+ if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
+ log.info("find check timestamp, {} {}", //
+ storeTimestamp, //
+ UtilAll.timeMillisToHumanString(storeTimestamp));
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void notifyMessageArriving() {
+
+ }
+
+ public boolean resetOffset(long offset) {
+ return this.mappedFileQueue.resetOffset(offset);
+ }
+
+ public long getBeginTimeInLock() {
+ return beginTimeInLock;
+ }
+
+ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+ // Set the storage time
+ msg.setStoreTimestamp(System.currentTimeMillis());
+ // Set the message body BODY CRC (consider the most appropriate setting
+ // on the client)
+ msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+ // Back to Results
+ AppendMessageResult result = null;
+
+ StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+ String topic = msg.getTopic();
+ int queueId = msg.getQueueId();
+
+ final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
+ if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
+ || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+ // Delay Delivery
+ if (msg.getDelayTimeLevel() > 0) {
+ if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
+ msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
+ }
+
+ topic = ScheduleMessageService.SCHEDULE_TOPIC;
+ queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
+
+ // Backup real topic, queueId
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
+ msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ msg.setTopic(topic);
+ msg.setQueueId(queueId);
+ }
+ }
+
+ long eclipseTimeInLock = 0;
+ MappedFile unlockMappedFile = null;
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+
+ lockForPutMessage(); //spin...
+ try {
+ long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
+ this.beginTimeInLock = beginLockTimestamp;
+
+ // Here settings are stored timestamp, in order to ensure an orderly
+ // global
+ msg.setStoreTimestamp(beginLockTimestamp);
+
+ if (null == mappedFile || mappedFile.isFull()) {
+ mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
+ }
+ if (null == mappedFile) {
+ log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
+ beginTimeInLock = 0;
+ return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
+ }
+
+ result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+ switch (result.getStatus()) {
+ case PUT_OK:
+ break;
+ case END_OF_FILE:
+ unlockMappedFile = mappedFile;
+ // Create a new file, re-write the message
+ mappedFile = this.mappedFileQueue.getLastMappedFile(0);
+ if (null == mappedFile) {
+ // XXX: warn and notify me
+ log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
+ beginTimeInLock = 0;
+ return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
+ }
+ result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+ break;
+ case MESSAGE_SIZE_EXCEEDED:
+ case PROPERTIES_SIZE_EXCEEDED:
+ beginTimeInLock = 0;
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
+ case UNKNOWN_ERROR:
+ beginTimeInLock = 0;
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+ default:
+ beginTimeInLock = 0;
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+ }
+
+ eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
+ beginTimeInLock = 0;
+ } finally {
+ releasePutMessageLock();
+ }
+
+ if (eclipseTimeInLock > 500) {
+ log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
+ }
+
+ if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+ this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
+ }
+
+
+ PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
+
+ // Statistics
+ storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
+ storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
+
+ GroupCommitRequest request = null;
+
+ // Synchronization flush
+ if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+ final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
+ if (msg.isWaitStoreMsgOK()) {
+ request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+ service.putRequest(request);
+ boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+ if (!flushOK) {
+ log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
+ + " client address: " + msg.getBornHostString());
+ putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ }
+ } else {
+ service.wakeup();
+ }
+ }
+ // Asynchronous flush
+ else {
+ if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ flushCommitLogService.wakeup();
+ } else {
+ commitLogService.wakeup();
+ }
+ }
+
+ // Synchronous write double
+ if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
+ HAService service = this.defaultMessageStore.getHaService();
+ if (msg.isWaitStoreMsgOK()) {
+ // Determine whether to wait
+ if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
+ if (null == request) {
+ request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+ }
+ service.putRequest(request);
+
+ service.getWaitNotifyObject().wakeupAll();
+
+ boolean flushOK =
+ // TODO
+ request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+ if (!flushOK) {
+ log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ + msg.getTags() + " client address: " + msg.getBornHostString());
+ putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+ }
+ }
+ // Slave problem
+ else {
+ // Tell the producer, slave not available
+ putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
+ }
+ }
+ }
+
+ return putMessageResult;
+ }
+
+ /**
+ * According to receive certain message or offset storage time if an error
+ * occurs, it returns -1
+ */
+ public long pickupStoreTimestamp(final long offset, final int size) {
+ if (offset >= this.getMinOffset()) {
+ SelectMappedBufferResult result = this.getMessage(offset, size);
+ if (null != result) {
+ try {
+ return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+ } finally {
+ result.release();
+ }
+ }
+ }
+
+ return -1;
+ }
+
+ public long getMinOffset() {
+ MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
+ if (mappedFile != null) {
+ if (mappedFile.isAvailable()) {
+ return mappedFile.getFileFromOffset();
+ } else {
+ return this.rollNextFile(mappedFile.getFileFromOffset());
+ }
+ }
+
+ return -1;
+ }
+
+ public SelectMappedBufferResult getMessage(final long offset, final int size) {
+ int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+ MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+ if (mappedFile != null) {
+ int pos = (int) (offset % mappedFileSize);
+ return mappedFile.selectMappedBuffer(pos, size);
+ }
+ return null;
+ }
+
+ public long rollNextFile(final long offset) {
+ int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+ return offset + mappedFileSize - offset % mappedFileSize;
+ }
+
+ public HashMap<String, Long> getTopicQueueTable() {
+ return topicQueueTable;
+ }
+
+
+ public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+ this.topicQueueTable = topicQueueTable;
+ }
+
+
+ public void destroy() {
+ this.mappedFileQueue.destroy();
+ }
+
+
+ public boolean appendData(long startOffset, byte[] data) {
+ lockForPutMessage(); //spin...
+ try {
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
+ if (null == mappedFile) {
+ log.error("appendData getLastMappedFile error " + startOffset);
+ return false;
+ }
+
+ return mappedFile.appendMessage(data);
+ } finally {
+ releasePutMessageLock();
+ }
+ }
+
+
+ public boolean retryDeleteFirstFile(final long intervalForcibly) {
+ return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly);
+ }
+
+ public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
+ String key = topic + "-" + queueId;
+ synchronized (this) {
+ this.topicQueueTable.remove(key);
+ }
+
+ log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
+ }
+
+ public void checkSelf() {
+ mappedFileQueue.checkSelf();
+ }
+
+ abstract class FlushCommitLogService extends ServiceThread {
+ protected static final int RETRY_TIMES_OVER = 10;
+ }
+
+ class CommitRealTimeService extends FlushCommitLogService {
+
+ private long lastCommitTimestamp = 0;
+
+ @Override
+ public String getServiceName() {
+ return CommitRealTimeService.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ CommitLog.log.info(this.getServiceName() + " service started");
+ while (!this.isStopped()) {
+ int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
+
+ int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
+
+ int commitDataThoroughInterval =
+ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
+
+ long begin = System.currentTimeMillis();
+ if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
+ this.lastCommitTimestamp = begin;
+ commitDataLeastPages = 0;
+ }
+
+ try {
+ boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
+ long end = System.currentTimeMillis();
+ if (!result) {
+ this.lastCommitTimestamp = end; // result = false means some data committed.
+ //now wake up flush thread.
+ flushCommitLogService.wakeup();
+ }
+
+ if (end - begin > 500) {
+ log.info("Commit data to file costs {} ms", end - begin);
+ }
+ this.waitForRunning(interval);
+ } catch (Throwable e) {
+ CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ boolean result = false;
+ for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
+ result = CommitLog.this.mappedFileQueue.commit(0);
+ CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
+ }
+ CommitLog.log.info(this.getServiceName() + " service end");
+ }
+ }
+
+ class FlushRealTimeService extends FlushCommitLogService {
+ private long lastFlushTimestamp = 0;
+ private long printTimes = 0;
+
+
+ public void run() {
+ CommitLog.log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
+
+ int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
+ int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
+
+ int flushPhysicQueueThoroughInterval =
+ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
+
+ boolean printFlushProgress = false;
+
+ // Print flush progress
+ long currentTimeMillis = System.currentTimeMillis();
+ if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
+ this.lastFlushTimestamp = currentTimeMillis;
+ flushPhysicQueueLeastPages = 0;
+ printFlushProgress = (printTimes++ % 10) == 0;
+ }
+
+ try {
+ if (flushCommitLogTimed) {
+ Thread.sleep(interval);
+ } else {
+ this.waitForRunning(interval);
+ }
+
+ if (printFlushProgress) {
+ this.printFlushProgress();
+ }
+
+ long begin = System.currentTimeMillis();
+ CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
+ long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
+ if (storeTimestamp > 0) {
+ CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+ }
+ long past = System.currentTimeMillis() - begin;
+ if (past > 500) {
+ log.info("Flush data to disk costs {} ms", past);
+ }
+ } catch (Throwable e) {
+ CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
+ this.printFlushProgress();
+ }
+ }
+
+ // Normal shutdown, to ensure that all the flush before exit
+ boolean result = false;
+ for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
+ result = CommitLog.this.mappedFileQueue.flush(0);
+ CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
+ }
+
+ this.printFlushProgress();
+
+ CommitLog.log.info(this.getServiceName() + " service end");
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return FlushRealTimeService.class.getSimpleName();
+ }
+
+
+ private void printFlushProgress() {
+ // CommitLog.log.info("how much disk fall behind memory, "
+ // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
+ }
+
+
+ @Override
+ public long getJointime() {
+ return 1000 * 60 * 5;
+ }
+ }
+
+ public static class GroupCommitRequest {
+ private final long nextOffset;
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+ private volatile boolean flushOK = false;
+
+
+ public GroupCommitRequest(long nextOffset) {
+ this.nextOffset = nextOffset;
+ }
+
+
+ public long getNextOffset() {
+ return nextOffset;
+ }
+
+
+ public void wakeupCustomer(final boolean flushOK) {
+ this.flushOK = flushOK;
+ this.countDownLatch.countDown();
+ }
+
+
+ public boolean waitForFlush(long timeout) {
+ try {
+ this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ return this.flushOK;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+
+ /**
+ * GroupCommit Service
+ */
+ class GroupCommitService extends FlushCommitLogService {
+ private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
+ private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
+
+
+ public void putRequest(final GroupCommitRequest request) {
+ synchronized (this) {
+ this.requestsWrite.add(request);
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+ }
+ }
+
+
+ private void swapRequests() {
+ List<GroupCommitRequest> tmp = this.requestsWrite;
+ this.requestsWrite = this.requestsRead;
+ this.requestsRead = tmp;
+ }
+
+
+ private void doCommit() {
+ if (!this.requestsRead.isEmpty()) {
+ for (GroupCommitRequest req : this.requestsRead) {
+ // There may be a message in the next file, so a maximum of
+ // two times the flush
+ boolean flushOK = false;
+ for (int i = 0; i < 2 && !flushOK; i++) {
+ flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+
+ if (!flushOK) {
+ CommitLog.this.mappedFileQueue.flush(0);
+ }
+ }
+
+ req.wakeupCustomer(flushOK);
+ }
+
+ long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
+ if (storeTimestamp > 0) {
+ CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+ }
+
+ this.requestsRead.clear();
+ } else {
+ // Because of individual messages is set to not sync flush, it
+ // will come to this process
+ CommitLog.this.mappedFileQueue.flush(0);
+ }
+ }
+
+
+ public void run() {
+ CommitLog.log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.waitForRunning(0);
+ this.doCommit();
+ } catch (Exception e) {
+ CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ // Under normal circumstances shutdown, wait for the arrival of the
+ // request, and then flush
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ CommitLog.log.warn("GroupCommitService Exception, ", e);
+ }
+
+ synchronized (this) {
+ this.swapRequests();
+ }
+
+ this.doCommit();
+
+ CommitLog.log.info(this.getServiceName() + " service end");
+ }
+
+
+ @Override
+ protected void onWaitEnd() {
+ this.swapRequests();
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return GroupCommitService.class.getSimpleName();
+ }
+
+
+ @Override
+ public long getJointime() {
+ return 1000 * 60 * 5;
+ }
+ }
+
+ class DefaultAppendMessageCallback implements AppendMessageCallback {
+ // File at the end of the minimum fixed length empty
+ private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
+ private final ByteBuffer msgIdMemory;
+ // Store the message content
+ private final ByteBuffer msgStoreItemMemory;
+ // The maximum length of the message
+ private final int maxMessageSize;
+ // Build Message Key
+ private final StringBuilder keyBuilder = new StringBuilder();
+
+ private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
+
+
+ DefaultAppendMessageCallback(final int size) {
+ this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
+ this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
+ this.maxMessageSize = size;
+ }
+
+
+ public ByteBuffer getMsgStoreItemMemory() {
+ return msgStoreItemMemory;
+ }
+
+
+ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
+ // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
+
+ // PHY OFFSET
+ long wroteOffset = fileFromOffset + byteBuffer.position();
+
+ this.resetByteBuffer(hostHolder, 8);
+ String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
+
+ // Record ConsumeQueue information
+ keyBuilder.setLength(0);
+ keyBuilder.append(msgInner.getTopic());
+ keyBuilder.append('-');
+ keyBuilder.append(msgInner.getQueueId());
+ String key = keyBuilder.toString();
+ Long queueOffset = CommitLog.this.topicQueueTable.get(key);
+ if (null == queueOffset) {
+ queueOffset = 0L;
+ CommitLog.this.topicQueueTable.put(key, queueOffset);
+ }
+
+ // Transaction messages that require special handling
+ final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
+ switch (tranType) {
+ // Prepared and Rollback message is not consumed, will not enter the
+ // consumer queuec
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ queueOffset = 0L;
+ break;
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ default:
+ break;
+ }
+
+ /**
+ * Serialize message
+ */
+ final byte[] propertiesData =
+ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+ final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;
+
+ if (propertiesLength > Short.MAX_VALUE) {
+ log.warn("putMessage message properties length too long. length={}", propertiesData.length);
+ return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+ }
+
+ final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+ final int topicLength = topicData == null ? 0 : topicData.length;
+
+ final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
+
+ final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
+
+ // Exceeds the maximum message
+ if (msgLen > this.maxMessageSize) {
+ CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ + ", maxMessageSize: " + this.maxMessageSize);
+ return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
+ }
+
+ // Determines whether there is sufficient free space
+ if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
+ this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
+ // 1 TOTALSIZE
+ this.msgStoreItemMemory.putInt(maxBlank);
+ // 2 MAGICCODE
+ this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
+ // 3 The remaining space may be any value
+ //
+
+ // Here the length of the specially set maxBlank
+ final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
+ byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
+ return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
+ queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ }
+
+ // Initialization of storage space
+ this.resetByteBuffer(msgStoreItemMemory, msgLen);
+ // 1 TOTALSIZE
+ this.msgStoreItemMemory.putInt(msgLen);
+ // 2 MAGICCODE
+ this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+ // 3 BODYCRC
+ this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
+ // 4 QUEUEID
+ this.msgStoreItemMemory.putInt(msgInner.getQueueId());
+ // 5 FLAG
+ this.msgStoreItemMemory.putInt(msgInner.getFlag());
+ // 6 QUEUEOFFSET
+ this.msgStoreItemMemory.putLong(queueOffset);
+ // 7 PHYSICALOFFSET
+ this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
+ // 8 SYSFLAG
+ this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
+ // 9 BORNTIMESTAMP
+ this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+ // 10 BORNHOST
+ this.resetByteBuffer(hostHolder, 8);
+ this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
+ // 11 STORETIMESTAMP
+ this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+ // 12 STOREHOSTADDRESS
+ this.resetByteBuffer(hostHolder, 8);
+ this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
+ //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
+ // 13 RECONSUMETIMES
+ this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+ // 14 Prepared Transaction Offset
+ this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+ // 15 BODY
+ this.msgStoreItemMemory.putInt(bodyLength);
+ if (bodyLength > 0)
+ this.msgStoreItemMemory.put(msgInner.getBody());
+ // 16 TOPIC
+ this.msgStoreItemMemory.put((byte) topicLength);
+ this.msgStoreItemMemory.put(topicData);
+ // 17 PROPERTIES
+ this.msgStoreItemMemory.putShort(propertiesLength);
+ if (propertiesLength > 0)
+ this.msgStoreItemMemory.put(propertiesData);
+
+ final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
+ // Write messages to the queue buffer
+ byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
+
+ AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
+ msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+
+ switch (tranType) {
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ break;
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ // The next update ConsumeQueue information
+ CommitLog.this.topicQueueTable.put(key, ++queueOffset);
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+
+
+ private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
+ byteBuffer.flip();
+ byteBuffer.limit(limit);
+ }
+ }
+
+ public long lockTimeMills() {
+ long diff = 0;
+ long begin = this.beginTimeInLock;
+ if (begin > 0) {
+ diff = this.defaultMessageStore.now() - begin;
+ }
+
+ if (diff < 0) {
+ diff = 0;
+ }
+
+ return diff;
+ }
+
+ /**
+ * Spin util acquired the lock.
+ */
+ private void lockForPutMessage() {
+ if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
+ putMessageNormalLock.lock();
+ } else {
+ boolean flag;
+ do {
+ flag = this.putMessageSpinLock.compareAndSet(true, false);
+ } while (!flag);
+ }
+ }
+
+ private void releasePutMessageLock() {
+ if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
+ putMessageNormalLock.unlock();
+ } else {
+ this.putMessageSpinLock.compareAndSet(false, true);
+ }
+ }
+}
|