rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [09/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFile.java
new file mode 100644
index 0000000..ed2afa9
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFile.java
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.config.FlushDiskType;
+import com.alibaba.rocketmq.store.util.LibC;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.nio.ch.DirectBuffer;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MappedFile extends ReferenceResource {
+    public static final int OS_PAGE_SIZE = 1024 * 4;
+    protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private static final AtomicLong TOTAL_MAPED_VITUAL_MEMORY = new AtomicLong(0);
+
+    private static final AtomicInteger TOTAL_MAPED_FILES = new AtomicInteger(0);
+
+    private String fileName;
+
+    private long fileFromOffset;
+
+    protected int fileSize;
+
+    private File file;
+
+    private MappedByteBuffer mappedByteBuffer;
+
+    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
+
+    private final AtomicInteger flushedPosition = new AtomicInteger(0);
+    //ADD BY ChenYang
+    protected final AtomicInteger committedPosition = new AtomicInteger(0);
+
+
+    protected FileChannel fileChannel;
+
+    private volatile long storeTimestamp = 0;
+    private boolean firstCreateInQueue = false;
+
+    /**
+     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
+     */
+    protected ByteBuffer writeBuffer = null;
+    protected TransientStorePool transientStorePool = null;
+
+    public MappedFile() {
+    }
+
+    public MappedFile(final String fileName, final int fileSize) throws IOException {
+        init(fileName, fileSize);
+    }
+
+    public MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
+        init(fileName, fileSize, transientStorePool);
+    }
+
+    public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
+        init(fileName, fileSize);
+        this.writeBuffer = transientStorePool.borrowBuffer();
+        this.transientStorePool = transientStorePool;
+    }
+
+    private void init(final String fileName, final int fileSize) throws IOException {
+        this.fileName = fileName;
+        this.fileSize = fileSize;
+        this.file = new File(fileName);
+        this.fileFromOffset = Long.parseLong(this.file.getName());
+        boolean ok = false;
+
+        ensureDirOK(this.file.getParent());
+
+        try {
+            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
+            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
+            TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize);
+            TOTAL_MAPED_FILES.incrementAndGet();
+            ok = true;
+        } catch (FileNotFoundException e) {
+            log.error("create file channel " + this.fileName + " Failed. ", e);
+            throw e;
+        } catch (IOException e) {
+            log.error("map file " + this.fileName + " Failed. ", e);
+            throw e;
+        } finally {
+            if (!ok && this.fileChannel != null) {
+                this.fileChannel.close();
+            }
+        }
+    }
+
+
+    public static void ensureDirOK(final String dirName) {
+        if (dirName != null) {
+            File f = new File(dirName);
+            if (!f.exists()) {
+                boolean result = f.mkdirs();
+                log.info(dirName + " mkdir " + (result ? "OK" : "Failed"));
+            }
+        }
+    }
+
+
+    public static void clean(final ByteBuffer buffer) {
+        if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
+            return;
+        invoke(invoke(viewed(buffer), "cleaner"), "clean");
+    }
+
+
+    private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
+        return AccessController.doPrivileged(new PrivilegedAction<Object>() {
+            public Object run() {
+                try {
+                    Method method = method(target, methodName, args);
+                    method.setAccessible(true);
+                    return method.invoke(target);
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+        });
+    }
+
+
+    private static Method method(Object target, String methodName, Class<?>[] args)
+            throws NoSuchMethodException {
+        try {
+            return target.getClass().getMethod(methodName, args);
+        } catch (NoSuchMethodException e) {
+            return target.getClass().getDeclaredMethod(methodName, args);
+        }
+    }
+
+
+    private static ByteBuffer viewed(ByteBuffer buffer) {
+        String methodName = "viewedBuffer";
+
+
+        Method[] methods = buffer.getClass().getMethods();
+        for (int i = 0; i < methods.length; i++) {
+            if (methods[i].getName().equals("attachment")) {
+                methodName = "attachment";
+                break;
+            }
+        }
+
+        ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
+        if (viewedBuffer == null)
+            return buffer;
+        else
+            return viewed(viewedBuffer);
+    }
+
+
+    public static int getTotalmapedfiles() {
+        return TOTAL_MAPED_FILES.get();
+    }
+
+
+    public static long getTotalMapedVitualMemory() {
+        return TOTAL_MAPED_VITUAL_MEMORY.get();
+    }
+
+
+    public long getLastModifiedTimestamp() {
+        return this.file.lastModified();
+    }
+
+    public int getFileSize() {
+        return fileSize;
+    }
+
+    public FileChannel getFileChannel() {
+        return fileChannel;
+    }
+
+    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
+        assert msg != null;
+        assert cb != null;
+
+        int currentPos = this.wrotePosition.get();
+
+
+        if (currentPos < this.fileSize) {
+            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
+            byteBuffer.position(currentPos);
+            AppendMessageResult result =
+                    cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
+            this.wrotePosition.addAndGet(result.getWroteBytes());
+            this.storeTimestamp = result.getStoreTimestamp();
+            return result;
+        }
+
+
+        log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
+                + this.fileSize);
+        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+    }
+
+    /**
+
+     */
+    public long getFileFromOffset() {
+        return this.fileFromOffset;
+    }
+
+    /**
+
+     *
+
+     */
+    public boolean appendMessage(final byte[] data) {
+        int currentPos = this.wrotePosition.get();
+
+
+        if ((currentPos + data.length) <= this.fileSize) {
+            try {
+                this.fileChannel.position(currentPos);
+                this.fileChannel.write(ByteBuffer.wrap(data));
+            } catch (Throwable e) {
+                log.error("Error occurred when append message to mappedFile.", e);
+            }
+            this.wrotePosition.addAndGet(data.length);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+
+     *
+     * @param flushLeastPages
+
+     *
+     * @return The current flushed position
+     */
+    public int flush(final int flushLeastPages) {
+        if (this.isAbleToFlush(flushLeastPages)) {
+            if (this.hold()) {
+                int value = getReadPosition();
+
+                try {
+                    //We only append data to fileChannel or mappedByteBuffer, never both.
+                    if (writeBuffer != null || this.fileChannel.position() != 0) {
+                        this.fileChannel.force(false);
+                    } else {
+                        this.mappedByteBuffer.force();
+                    }
+                } catch (Throwable e) {
+                    log.error("Error occurred when force data to disk.", e);
+                }
+
+                this.flushedPosition.set(value);
+                this.release();
+            } else {
+                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
+                this.flushedPosition.set(getReadPosition());
+            }
+        }
+        return this.getFlushedPosition();
+    }
+
+    public int commit(final int commitLeastPages) {
+        if (writeBuffer == null) {
+            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
+            return this.wrotePosition.get();
+        }
+        if (this.isAbleToCommit(commitLeastPages)) {
+            if (this.hold()) {
+                commit0(commitLeastPages);
+                this.release();
+            } else {
+                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
+            }
+        }
+
+        // All dirty data has been committed to FileChannel.
+        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
+            this.transientStorePool.returnBuffer(writeBuffer);
+            this.writeBuffer = null;
+        }
+
+        return this.committedPosition.get();
+    }
+
+    protected void commit0(final int commitLeastPages) {
+        int writePos = this.wrotePosition.get();
+        int lastCommittedPosition = this.committedPosition.get();
+
+        if (writePos - this.committedPosition.get() > 0) {
+            try {
+                ByteBuffer byteBuffer = writeBuffer.slice();
+                byteBuffer.position(lastCommittedPosition);
+                byteBuffer.limit(writePos);
+                this.fileChannel.position(lastCommittedPosition);
+                this.fileChannel.write(byteBuffer);
+                this.committedPosition.set(writePos);
+            } catch (Throwable e) {
+                log.error("Error occurred when commit data to FileChannel.", e);
+            }
+        }
+    }
+
+    private boolean isAbleToFlush(final int flushLeastPages) {
+        int flush = this.flushedPosition.get();
+        int write = getReadPosition();
+
+        if (this.isFull()) {
+            return true;
+        }
+
+        if (flushLeastPages > 0) {
+            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
+        }
+
+        return write > flush;
+    }
+
+    protected boolean isAbleToCommit(final int commitLeastPages) {
+        int flush = this.committedPosition.get();
+        int write = this.wrotePosition.get();
+
+        if (this.isFull()) {
+            return true;
+        }
+
+        if (commitLeastPages > 0) {
+            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
+        }
+
+        return write > flush;
+    }
+
+    public int getFlushedPosition() {
+        return flushedPosition.get();
+    }
+
+
+    public void setFlushedPosition(int pos) {
+        this.flushedPosition.set(pos);
+    }
+
+
+    public boolean isFull() {
+        return this.fileSize == this.wrotePosition.get();
+    }
+
+    public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
+        int readPosition = getReadPosition();
+        if ((pos + size) <= readPosition) {
+
+            if (this.hold()) {
+                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
+                byteBuffer.position(pos);
+                ByteBuffer byteBufferNew = byteBuffer.slice();
+                byteBufferNew.limit(size);
+                return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
+            } else {
+                log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
+                        + this.fileFromOffset);
+            }
+        } else {
+            log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
+                    + ", fileFromOffset: " + this.fileFromOffset);
+        }
+
+
+        return null;
+    }
+
+    /**
+
+     */
+    public SelectMappedBufferResult selectMappedBuffer(int pos) {
+        int readPosition = getReadPosition();
+        if (pos < readPosition && pos >= 0) {
+            if (this.hold()) {
+                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
+                byteBuffer.position(pos);
+                int size = readPosition - pos;
+                ByteBuffer byteBufferNew = byteBuffer.slice();
+                byteBufferNew.limit(size);
+                return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
+            }
+        }
+
+
+        return null;
+    }
+
+    @Override
+    public boolean cleanup(final long currentRef) {
+        if (this.isAvailable()) {
+            log.error("this file[REF:" + currentRef + "] " + this.fileName
+                    + " have not shutdown, stop unmaping.");
+            return false;
+        }
+
+        if (this.isCleanupOver()) {
+            log.error("this file[REF:" + currentRef + "] " + this.fileName
+                    + " have cleanup, do not do it again.");
+            return true;
+        }
+
+        clean(this.mappedByteBuffer);
+        TOTAL_MAPED_VITUAL_MEMORY.addAndGet(this.fileSize * (-1));
+        TOTAL_MAPED_FILES.decrementAndGet();
+        log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
+        return true;
+    }
+
+    public boolean destroy(final long intervalForcibly) {
+        this.shutdown(intervalForcibly);
+
+        if (this.isCleanupOver()) {
+            try {
+                this.fileChannel.close();
+                log.info("close file channel " + this.fileName + " OK");
+
+                long beginTime = System.currentTimeMillis();
+                boolean result = this.file.delete();
+                log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+                        + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+                        + this.getFlushedPosition() + ", "
+                        + UtilAll.computeEclipseTimeMilliseconds(beginTime));
+            } catch (Exception e) {
+                log.warn("close file channel " + this.fileName + " Failed. ", e);
+            }
+
+            return true;
+        } else {
+            log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName
+                    + " Failed. cleanupOver: " + this.cleanupOver);
+        }
+
+        return false;
+    }
+
+    public int getWrotePosition() {
+        return wrotePosition.get();
+    }
+
+    /**
+     *
+     * @return The max position which have valid data
+     */
+    public int getReadPosition() {
+        return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
+    }
+
+    public void setWrotePosition(int pos) {
+        this.wrotePosition.set(pos);
+    }
+
+    public void setCommittedPosition(int pos) {
+        this.committedPosition.set(pos);
+    }
+
+    public void warmMappedFile(FlushDiskType type, int pages) {
+        long beginTime = System.currentTimeMillis();
+        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
+        int flush = 0;
+        long time = System.currentTimeMillis();
+        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
+            byteBuffer.put(i, (byte) 0);
+            // force flush when flush disk type is sync
+            if (type == FlushDiskType.SYNC_FLUSH) {
+                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
+                    flush = i;
+                    mappedByteBuffer.force();
+                }
+            }
+
+            // prevent gc
+            if (j % 1000 == 0) {
+                log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
+                time = System.currentTimeMillis();
+                try {
+                    Thread.sleep(0);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        // force flush when prepare load finished
+        if (type == FlushDiskType.SYNC_FLUSH) {
+            log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}",
+                    this.getFileName(), System.currentTimeMillis() - beginTime);
+            mappedByteBuffer.force();
+        }
+        log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(),
+                System.currentTimeMillis() - beginTime);
+
+        this.mlock();
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public MappedByteBuffer getMappedByteBuffer() {
+        return mappedByteBuffer;
+    }
+
+    public ByteBuffer sliceByteBuffer() {
+        return this.mappedByteBuffer.slice();
+    }
+
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+
+    public boolean isFirstCreateInQueue() {
+        return firstCreateInQueue;
+    }
+
+
+    public void setFirstCreateInQueue(boolean firstCreateInQueue) {
+        this.firstCreateInQueue = firstCreateInQueue;
+    }
+
+
+    public void mlock() {
+        final long beginTime = System.currentTimeMillis();
+        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+        Pointer pointer = new Pointer(address);
+        {
+            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
+            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
+        }
+
+        {
+            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
+            log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
+        }
+    }
+
+    public void munlock() {
+        final long beginTime = System.currentTimeMillis();
+        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+        Pointer pointer = new Pointer(address);
+        int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
+        log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
+    }
+
+    @Override
+    public String toString() {
+        return this.fileName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
new file mode 100644
index 0000000..2b006c0
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
@@ -0,0 +1,606 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MappedFileQueue {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
+
+    private static final int DELETE_FILES_BATCH_MAX = 10;
+
+    private final String storePath;
+
+    private final int mappedFileSize;
+
+    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
+
+    private final AllocateMappedFileService allocateMappedFileService;
+
+    private long flushedWhere = 0;
+    private long committedWhere = 0;
+
+    private volatile long storeTimestamp = 0;
+
+
+    public MappedFileQueue(final String storePath, int mappedFileSize,
+                           AllocateMappedFileService allocateMappedFileService) {
+        this.storePath = storePath;
+        this.mappedFileSize = mappedFileSize;
+        this.allocateMappedFileService = allocateMappedFileService;
+    }
+
+
+    public void checkSelf() {
+
+        if (!this.mappedFiles.isEmpty()) {
+            Iterator<MappedFile> iterator = mappedFiles.iterator();
+            MappedFile pre = null;
+            while (iterator.hasNext()) {
+                MappedFile cur = iterator.next();
+
+                if (pre != null) {
+                    if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
+                        LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
+                                pre.getFileName(), cur.getFileName());
+                    }
+                }
+                pre = cur;
+            }
+        }
+    }
+
+
+    public MappedFile getMappedFileByTime(final long timestamp) {
+        Object[] mfs = this.copyMappedFiles(0);
+
+        if (null == mfs)
+            return null;
+
+        for (int i = 0; i < mfs.length; i++) {
+            MappedFile mappedFile = (MappedFile) mfs[i];
+            if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
+                return mappedFile;
+            }
+        }
+
+        return (MappedFile) mfs[mfs.length - 1];
+    }
+
+
+    private Object[] copyMappedFiles(final int reservedMappedFiles) {
+        Object[] mfs;
+
+        if (this.mappedFiles.size() <= reservedMappedFiles) {
+            return null;
+        }
+
+        mfs = this.mappedFiles.toArray();
+        return mfs;
+    }
+
+
+    public void truncateDirtyFiles(long offset) {
+        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
+
+        for (MappedFile file : this.mappedFiles) {
+            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
+            if (fileTailOffset > offset) {
+                if (offset >= file.getFileFromOffset()) {
+                    file.setWrotePosition((int) (offset % this.mappedFileSize));
+                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
+                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
+                } else {
+                    file.destroy(1000);
+                    willRemoveFiles.add(file);
+                }
+            }
+        }
+
+        this.deleteExpiredFile(willRemoveFiles);
+    }
+
+
+    private void deleteExpiredFile(List<MappedFile> files) {
+
+        if (!files.isEmpty()) {
+
+            Iterator<MappedFile> iterator = files.iterator();
+            while (iterator.hasNext()) {
+                MappedFile cur = iterator.next();
+                if (!this.mappedFiles.contains(cur)) {
+                    iterator.remove();
+                    log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());
+                }
+            }
+
+            try {
+                if (!this.mappedFiles.removeAll(files)) {
+                    log.error("deleteExpiredFile remove failed.");
+                }
+            } catch (Exception e) {
+                log.error("deleteExpiredFile has exception.", e);
+            }
+        }
+    }
+
+
+    public boolean load() {
+        File dir = new File(this.storePath);
+        File[] files = dir.listFiles();
+        if (files != null) {
+            // ascending order
+            Arrays.sort(files);
+            for (File file : files) {
+
+                if (file.length() != this.mappedFileSize) {
+                    log.warn(file + "\t" + file.length()
+                            + " length not matched message store config value, ignore it");
+                    return true;
+                }
+
+
+                try {
+                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
+
+                    mappedFile.setWrotePosition(this.mappedFileSize);
+                    mappedFile.setFlushedPosition(this.mappedFileSize);
+                    mappedFile.setCommittedPosition(this.mappedFileSize);
+                    this.mappedFiles.add(mappedFile);
+                    log.info("load " + file.getPath() + " OK");
+                } catch (IOException e) {
+                    log.error("load file " + file + " error", e);
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+
+    public long howMuchFallBehind() {
+        if (this.mappedFiles.isEmpty())
+            return 0;
+
+        long committed = this.flushedWhere;
+        if (committed != 0) {
+            MappedFile mappedFile = this.getLastMappedFile(0, false);
+            if (mappedFile != null) {
+                return (mappedFile.getFileFromOffset() + mappedFile.getWrotePosition()) - committed;
+            }
+        }
+
+        return 0;
+    }
+
+
+    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
+        long createOffset = -1;
+        MappedFile mappedFileLast = getLastMappedFile();
+
+        if (mappedFileLast == null) {
+            createOffset = startOffset - (startOffset % this.mappedFileSize);
+        }
+
+        if (mappedFileLast != null && mappedFileLast.isFull()) {
+            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
+        }
+
+        if (createOffset != -1 && needCreate) {
+            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
+            String nextNextFilePath = this.storePath + File.separator
+                    + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
+            MappedFile mappedFile = null;
+
+            if (this.allocateMappedFileService != null) {
+                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
+                        nextNextFilePath, this.mappedFileSize);
+            } else {
+                try {
+                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
+                } catch (IOException e) {
+                    log.error("create mappedFile exception", e);
+                }
+            }
+
+            if (mappedFile != null) {
+                if (this.mappedFiles.isEmpty()) {
+                    mappedFile.setFirstCreateInQueue(true);
+                }
+                this.mappedFiles.add(mappedFile);
+            }
+
+            return mappedFile;
+        }
+
+        return mappedFileLast;
+    }
+
+    public MappedFile getLastMappedFile(final long startOffset) {
+        return getLastMappedFile(startOffset, true);
+    }
+
+    public MappedFile getLastMappedFile() {
+        MappedFile mappedFileLast = null;
+
+        while (!this.mappedFiles.isEmpty()) {
+            try {
+                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
+                break;
+            } catch (IndexOutOfBoundsException e) {
+                //continue;
+            } catch (Exception e) {
+                log.error("getLastMappedFile has exception.", e);
+                break;
+            }
+        }
+
+        return mappedFileLast;
+    }
+
+    public boolean resetOffset(long offset) {
+        MappedFile mappedFileLast = getLastMappedFile();
+
+        if (mappedFileLast != null) {
+            long lastOffset = mappedFileLast.getFileFromOffset() +
+                    mappedFileLast.getWrotePosition();
+            long diff = lastOffset - offset;
+
+            final int maxDiff = this.mappedFileSize * 2;
+            if (diff > maxDiff) return false;
+        }
+
+        ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();
+
+        while (iterator.hasPrevious()) {
+            mappedFileLast = iterator.previous();
+            if (offset >= mappedFileLast.getFileFromOffset()) {
+                int where = (int) (offset % mappedFileLast.getFileSize());
+                mappedFileLast.setFlushedPosition(where);
+                mappedFileLast.setWrotePosition(where);
+                mappedFileLast.setCommittedPosition(where);
+                break;
+            } else {
+                iterator.remove();
+            }
+        }
+        return true;
+    }
+
+    public long getMinOffset() {
+
+        if (!this.mappedFiles.isEmpty()) {
+            try {
+                return this.mappedFiles.get(0).getFileFromOffset();
+            } catch (IndexOutOfBoundsException e) {
+                //continue;
+            } catch (Exception e) {
+                log.error("getMinOffset has exception.", e);
+            }
+        }
+        return -1;
+    }
+
+
+    public long getMaxOffset() {
+        MappedFile mappedFile = getLastMappedFile();
+        if (mappedFile != null) {
+            return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
+        }
+        return 0;
+    }
+
+    public long getMaxWrotePosition() {
+        MappedFile mappedFile = getLastMappedFile();
+        if (mappedFile != null) {
+            return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
+        }
+        return 0;
+    }
+
+    public long remainHowManyDataToCommit() {
+        return getMaxWrotePosition() - committedWhere;
+    }
+
+    public long remainHowManyDataToFlush() {
+        return getMaxOffset() - flushedWhere;
+    }
+
+    public void deleteLastMappedFile() {
+        MappedFile lastMappedFile = getLastMappedFile();
+        if (lastMappedFile != null) {
+            lastMappedFile.destroy(1000);
+            this.mappedFiles.remove(lastMappedFile);
+            log.info("on recover, destroy a logic mapped file " + lastMappedFile.getFileName());
+
+        }
+    }
+
+    public int deleteExpiredFileByTime(final long expiredTime,
+                                       final int deleteFilesInterval,
+                                       final long intervalForcibly,
+                                       final boolean cleanImmediately) {
+        Object[] mfs = this.copyMappedFiles(0);
+
+        if (null == mfs)
+            return 0;
+
+        int mfsLength = mfs.length - 1;
+        int deleteCount = 0;
+        List<MappedFile> files = new ArrayList<MappedFile>();
+        if (null != mfs) {
+            for (int i = 0; i < mfsLength; i++) {
+                MappedFile mappedFile = (MappedFile) mfs[i];
+                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
+                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
+                    if (mappedFile.destroy(intervalForcibly)) {
+                        files.add(mappedFile);
+                        deleteCount++;
+
+                        if (files.size() >= DELETE_FILES_BATCH_MAX) {
+                            break;
+                        }
+
+                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
+                            try {
+                                Thread.sleep(deleteFilesInterval);
+                            } catch (InterruptedException e) {
+                            }
+                        }
+                    } else {
+                        break;
+                    }
+                }
+            }
+        }
+
+        deleteExpiredFile(files);
+
+        return deleteCount;
+    }
+
+
+    public int deleteExpiredFileByOffset(long offset, int unitSize) {
+        Object[] mfs = this.copyMappedFiles(0);
+
+        List<MappedFile> files = new ArrayList<MappedFile>();
+        int deleteCount = 0;
+        if (null != mfs) {
+
+            int mfsLength = mfs.length - 1;
+
+            for (int i = 0; i < mfsLength; i++) {
+                boolean destroy;
+                MappedFile mappedFile = (MappedFile) mfs[i];
+                SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
+                if (result != null) {
+                    long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
+                    result.release();
+                    destroy = maxOffsetInLogicQueue < offset;
+                    if (destroy) {
+                        log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+                                + maxOffsetInLogicQueue + ", delete it");
+                    }
+                } else {
+                    log.warn("this being not executed forever.");
+                    break;
+                }
+
+                if (destroy && mappedFile.destroy(1000 * 60)) {
+                    files.add(mappedFile);
+                    deleteCount++;
+                } else {
+                    break;
+                }
+            }
+        }
+
+        deleteExpiredFile(files);
+
+        return deleteCount;
+    }
+
+
+    public boolean flush(final int flushLeastPages) {
+        boolean result = true;
+        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
+        if (mappedFile != null) {
+            long tmpTimeStamp = mappedFile.getStoreTimestamp();
+            int offset = mappedFile.flush(flushLeastPages);
+            long where = mappedFile.getFileFromOffset() + offset;
+            result = where == this.flushedWhere;
+            this.flushedWhere = where;
+            if (0 == flushLeastPages) {
+                this.storeTimestamp = tmpTimeStamp;
+            }
+        }
+
+        return result;
+    }
+
+    public boolean commit(final int commitLeastPages) {
+        boolean result = true;
+        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
+        if (mappedFile != null) {
+            int offset = mappedFile.commit(commitLeastPages);
+            long where = mappedFile.getFileFromOffset() + offset;
+            result = where == this.committedWhere;
+            this.committedWhere = where;
+        }
+
+        return result;
+    }
+
+
+    public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
+        try {
+            MappedFile mappedFile = this.getFirstMappedFile();
+            if (mappedFile != null) {
+                int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
+                if (index < 0 || index >= this.mappedFiles.size()) {
+                    LOG_ERROR.warn("findMappedFileByOffset offset not matched, request Offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}, StackTrace: {}",
+                            offset,
+                            index,
+                            this.mappedFileSize,
+                            this.mappedFiles.size(),
+                            UtilAll.currentStackTrace());
+                }
+
+                try {
+                    return this.mappedFiles.get(index);
+                } catch (Exception e) {
+                    if (returnFirstOnNotFound) {
+                        return mappedFile;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("findMappedFileByOffset Exception", e);
+        }
+
+        return null;
+    }
+
+
+    public MappedFile getFirstMappedFile() {
+        MappedFile mappedFileFirst = null;
+
+        if (!this.mappedFiles.isEmpty()) {
+            try {
+                mappedFileFirst = this.mappedFiles.get(0);
+            } catch (IndexOutOfBoundsException e) {
+                //ignore
+            } catch (Exception e) {
+                log.error("getFirstMappedFile has exception.", e);
+            }
+        }
+
+        return mappedFileFirst;
+    }
+
+    public MappedFile findMappedFileByOffset(final long offset) {
+        return findMappedFileByOffset(offset, false);
+    }
+
+
+    public long getMappedMemorySize() {
+        long size = 0;
+
+        Object[] mfs = this.copyMappedFiles(0);
+        if (mfs != null) {
+            for (Object mf : mfs) {
+                if (((ReferenceResource) mf).isAvailable()) {
+                    size += this.mappedFileSize;
+                }
+            }
+        }
+
+        return size;
+    }
+
+
+    public boolean retryDeleteFirstFile(final long intervalForcibly) {
+        MappedFile mappedFile = this.getFirstMappedFile();
+        if (mappedFile != null) {
+            if (!mappedFile.isAvailable()) {
+                log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
+                boolean result = mappedFile.destroy(intervalForcibly);
+                if (result) {
+                    log.info("the mappedFile re delete OK, " + mappedFile.getFileName());
+                    List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
+                    tmpFiles.add(mappedFile);
+                    this.deleteExpiredFile(tmpFiles);
+                } else {
+                    log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());
+                }
+
+                return result;
+            }
+        }
+
+        return false;
+    }
+
+
+    public void shutdown(final long intervalForcibly) {
+        for (MappedFile mf : this.mappedFiles) {
+            mf.shutdown(intervalForcibly);
+        }
+    }
+
+
+    public void destroy() {
+        for (MappedFile mf : this.mappedFiles) {
+            mf.destroy(1000 * 3);
+        }
+        this.mappedFiles.clear();
+        this.flushedWhere = 0;
+
+        // delete parent directory
+        File file = new File(storePath);
+        if (file.isDirectory()) {
+            file.delete();
+        }
+    }
+
+
+    public long getFlushedWhere() {
+        return flushedWhere;
+    }
+
+
+    public void setFlushedWhere(long flushedWhere) {
+        this.flushedWhere = flushedWhere;
+    }
+
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+
+    public List<MappedFile> getMappedFiles() {
+        return mappedFiles;
+    }
+
+
+    public int getMappedFileSize() {
+        return mappedFileSize;
+    }
+
+    public long getCommittedWhere() {
+        return committedWhere;
+    }
+
+    public void setCommittedWhere(final long committedWhere) {
+        this.committedWhere = committedWhere;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageArrivingListener.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageArrivingListener.java
new file mode 100644
index 0000000..6d227e8
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageArrivingListener.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.store;
+
+public interface MessageArrivingListener {
+    void arriving(String topic, int queueId, long logicOffset, long tagsCode);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageExtBrokerInner.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageExtBrokerInner.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageExtBrokerInner.java
new file mode 100644
index 0000000..1c54956
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageExtBrokerInner.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageExtBrokerInner extends MessageExt {
+    private static final long serialVersionUID = 7256001576878700634L;
+    private String propertiesString;
+    private long tagsCode;
+
+    public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
+        if (null == tags || tags.length() == 0)
+            return 0;
+
+        return tags.hashCode();
+    }
+
+
+    public String getPropertiesString() {
+        return propertiesString;
+    }
+
+
+    public void setPropertiesString(String propertiesString) {
+        this.propertiesString = propertiesString;
+    }
+
+
+    public long getTagsCode() {
+        return tagsCode;
+    }
+
+
+    public void setTagsCode(long tagsCode) {
+        this.tagsCode = tagsCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageFilter.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageFilter.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageFilter.java
new file mode 100644
index 0000000..a10e607
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageFilter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MessageFilter {
+    boolean isMessageMatched(final SubscriptionData subscriptionData, final Long tagsCode);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java
new file mode 100644
index 0000000..30f7bf7
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.util.HashMap;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MessageStore {
+
+    boolean load();
+
+
+    void start() throws Exception;
+
+
+    void shutdown();
+
+
+    void destroy();
+
+    PutMessageResult putMessage(final MessageExtBrokerInner msg);
+
+
+    GetMessageResult getMessage(final String group, final String topic, final int queueId,
+                                final long offset, final int maxMsgNums, final SubscriptionData subscriptionData);
+
+
+    long getMaxOffsetInQuque(final String topic, final int queueId);
+
+
+    long getMinOffsetInQuque(final String topic, final int queueId);
+
+
+    long getCommitLogOffsetInQueue(final String topic, final int queueId, final long cqOffset);
+
+
+    long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
+
+
+    MessageExt lookMessageByOffset(final long commitLogOffset);
+
+
+    SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);
+
+
+    SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
+
+    String getRunningDataInfo();
+
+
+    HashMap<String, String> getRuntimeInfo();
+
+
+    long getMaxPhyOffset();
+
+
+    long getMinPhyOffset();
+
+
+    long getEarliestMessageTime(final String topic, final int queueId);
+    long getEarliestMessageTime();
+
+
+    long getMessageStoreTimeStamp(final String topic, final int queueId, final long offset);
+
+
+    long getMessageTotalInQueue(final String topic, final int queueId);
+
+    SelectMappedBufferResult getCommitLogData(final long offset);
+
+
+    boolean appendToCommitLog(final long startOffset, final byte[] data);
+
+    void excuteDeleteFilesManualy();
+
+
+    QueryMessageResult queryMessage(final String topic, final String key, final int maxNum,
+                                    final long begin, final long end);
+
+
+    void updateHaMasterAddress(final String newAddr);
+
+
+    long slaveFallBehindMuch();
+
+
+    long now();
+
+
+    int cleanUnusedTopic(final Set<String> topics);
+
+
+    void cleanExpiredConsumerQueue();
+
+
+    boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
+
+
+    long dispatchBehindBytes();
+
+    long flush();
+
+    boolean resetWriteOffset(long phyOffset);
+
+    long getConfirmOffset();
+
+    void setConfirmOffset(long phyOffset);
+
+    boolean isOSPageCacheBusy();
+
+    long lockTimeMills();
+
+    boolean isTransientStorePoolDeficient();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageResult.java
new file mode 100644
index 0000000..b36ae58
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageResult.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * @author shijia.wxr
+ */
+public class PutMessageResult {
+    private PutMessageStatus putMessageStatus;
+    private AppendMessageResult appendMessageResult;
+
+
+    public PutMessageResult(PutMessageStatus putMessageStatus, AppendMessageResult appendMessageResult) {
+        this.putMessageStatus = putMessageStatus;
+        this.appendMessageResult = appendMessageResult;
+    }
+
+
+    public boolean isOk() {
+        return this.appendMessageResult != null && this.appendMessageResult.isOk();
+    }
+
+
+    public AppendMessageResult getAppendMessageResult() {
+        return appendMessageResult;
+    }
+
+
+    public void setAppendMessageResult(AppendMessageResult appendMessageResult) {
+        this.appendMessageResult = appendMessageResult;
+    }
+
+
+    public PutMessageStatus getPutMessageStatus() {
+        return putMessageStatus;
+    }
+
+
+    public void setPutMessageStatus(PutMessageStatus putMessageStatus) {
+        this.putMessageStatus = putMessageStatus;
+    }
+
+
+    @Override
+    public String toString() {
+        return "PutMessageResult [putMessageStatus=" + putMessageStatus + ", appendMessageResult="
+                + appendMessageResult + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageStatus.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageStatus.java
new file mode 100644
index 0000000..4758789
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageStatus.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * @author shijia.wxr
+ */
+public enum PutMessageStatus {
+    PUT_OK,
+    FLUSH_DISK_TIMEOUT,
+    FLUSH_SLAVE_TIMEOUT,
+    SLAVE_NOT_AVAILABLE,
+    SERVICE_NOT_AVAILABLE,
+    CREATE_MAPEDFILE_FAILED,
+    MESSAGE_ILLEGAL,
+    PROPERTIES_SIZE_EXCEEDED,
+    OS_PAGECACHE_BUSY,
+    UNKNOWN_ERROR,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/QueryMessageResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/QueryMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/QueryMessageResult.java
new file mode 100644
index 0000000..a8e3fed
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/QueryMessageResult.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryMessageResult {
+
+    private final List<SelectMappedBufferResult> messageMapedList =
+            new ArrayList<SelectMappedBufferResult>(100);
+
+    private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
+    private long indexLastUpdateTimestamp;
+    private long indexLastUpdatePhyoffset;
+
+    private int bufferTotalSize = 0;
+
+
+    public void addMessage(final SelectMappedBufferResult mapedBuffer) {
+        this.messageMapedList.add(mapedBuffer);
+        this.messageBufferList.add(mapedBuffer.getByteBuffer());
+        this.bufferTotalSize += mapedBuffer.getSize();
+    }
+
+
+    public void release() {
+        for (SelectMappedBufferResult select : this.messageMapedList) {
+            select.release();
+        }
+    }
+
+
+    public long getIndexLastUpdateTimestamp() {
+        return indexLastUpdateTimestamp;
+    }
+
+
+    public void setIndexLastUpdateTimestamp(long indexLastUpdateTimestamp) {
+        this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+    }
+
+
+    public long getIndexLastUpdatePhyoffset() {
+        return indexLastUpdatePhyoffset;
+    }
+
+
+    public void setIndexLastUpdatePhyoffset(long indexLastUpdatePhyoffset) {
+        this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset;
+    }
+
+
+    public List<ByteBuffer> getMessageBufferList() {
+        return messageBufferList;
+    }
+
+
+    public int getBufferTotalSize() {
+        return bufferTotalSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ReferenceResource.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ReferenceResource.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ReferenceResource.java
new file mode 100644
index 0000000..7a50f3a
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ReferenceResource.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public abstract class ReferenceResource {
+    protected final AtomicLong refCount = new AtomicLong(1);
+    protected volatile boolean available = true;
+    protected volatile boolean cleanupOver = false;
+    private volatile long firstShutdownTimestamp = 0;
+
+
+    public synchronized boolean hold() {
+        if (this.isAvailable()) {
+            if (this.refCount.getAndIncrement() > 0) {
+                return true;
+            } else {
+                this.refCount.getAndDecrement();
+            }
+        }
+
+        return false;
+    }
+
+
+    public boolean isAvailable() {
+        return this.available;
+    }
+
+
+
+    public void shutdown(final long intervalForcibly) {
+        if (this.available) {
+            this.available = false;
+            this.firstShutdownTimestamp = System.currentTimeMillis();
+            this.release();
+        }
+
+        else if (this.getRefCount() > 0) {
+            if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
+                this.refCount.set(-1000 - this.getRefCount());
+                this.release();
+            }
+        }
+    }
+
+    public void release() {
+        long value = this.refCount.decrementAndGet();
+        if (value > 0)
+            return;
+
+        synchronized (this) {
+
+            this.cleanupOver = this.cleanup(value);
+        }
+    }
+
+    public long getRefCount() {
+        return this.refCount.get();
+    }
+
+    public abstract boolean cleanup(final long currentRef);
+
+
+    public boolean isCleanupOver() {
+        return this.refCount.get() <= 0 && this.cleanupOver;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/RunningFlags.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/RunningFlags.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/RunningFlags.java
new file mode 100644
index 0000000..9343ed9
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/RunningFlags.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * @author shijia.wxr
+ */
+public class RunningFlags {
+
+    private static final int NOT_READABLE_BIT = 1;
+
+    private static final int NOT_WRITEABLE_BIT = 1 << 1;
+
+    private static final int WRITE_LOGICS_QUEUE_ERROR_BIT = 1 << 2;
+
+    private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;
+
+    private static final int DISK_FULL_BIT = 1 << 4;
+    private volatile int flagBits = 0;
+
+
+    public RunningFlags() {
+    }
+
+
+    public int getFlagBits() {
+        return flagBits;
+    }
+
+
+    public boolean getAndMakeReadable() {
+        boolean result = this.isReadable();
+        if (!result) {
+            this.flagBits &= ~NOT_READABLE_BIT;
+        }
+        return result;
+    }
+
+
+    public boolean isReadable() {
+        if ((this.flagBits & NOT_READABLE_BIT) == 0) {
+            return true;
+        }
+
+        return false;
+    }
+
+
+    public boolean getAndMakeNotReadable() {
+        boolean result = this.isReadable();
+        if (result) {
+            this.flagBits |= NOT_READABLE_BIT;
+        }
+        return result;
+    }
+
+
+    public boolean getAndMakeWriteable() {
+        boolean result = this.isWriteable();
+        if (!result) {
+            this.flagBits &= ~NOT_WRITEABLE_BIT;
+        }
+        return result;
+    }
+
+
+    public boolean isWriteable() {
+        if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
+            return true;
+        }
+
+        return false;
+    }
+
+
+    public boolean getAndMakeNotWriteable() {
+        boolean result = this.isWriteable();
+        if (result) {
+            this.flagBits |= NOT_WRITEABLE_BIT;
+        }
+        return result;
+    }
+
+
+    public void makeLogicsQueueError() {
+        this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;
+    }
+
+
+    public boolean isLogicsQueueError() {
+        if ((this.flagBits & WRITE_LOGICS_QUEUE_ERROR_BIT) == WRITE_LOGICS_QUEUE_ERROR_BIT) {
+            return true;
+        }
+
+        return false;
+    }
+
+
+    public void makeIndexFileError() {
+        this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT;
+    }
+
+
+    public boolean isIndexFileError() {
+        if ((this.flagBits & WRITE_INDEX_FILE_ERROR_BIT) == WRITE_INDEX_FILE_ERROR_BIT) {
+            return true;
+        }
+
+        return false;
+    }
+
+
+    public boolean getAndMakeDiskFull() {
+        boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
+        this.flagBits |= DISK_FULL_BIT;
+        return result;
+    }
+
+
+    public boolean getAndMakeDiskOK() {
+        boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
+        this.flagBits &= ~DISK_FULL_BIT;
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/SelectMappedBufferResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/SelectMappedBufferResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/SelectMappedBufferResult.java
new file mode 100644
index 0000000..dba8072
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/SelectMappedBufferResult.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMappedBufferResult {
+
+    private final long startOffset;
+
+    private final ByteBuffer byteBuffer;
+
+    private int size;
+
+    private MappedFile mappedFile;
+
+
+    public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, int size, MappedFile mappedFile) {
+        this.startOffset = startOffset;
+        this.byteBuffer = byteBuffer;
+        this.size = size;
+        this.mappedFile = mappedFile;
+    }
+
+
+    public ByteBuffer getByteBuffer() {
+        return byteBuffer;
+    }
+
+
+    public int getSize() {
+        return size;
+    }
+
+
+    public void setSize(final int s) {
+        this.size = s;
+        this.byteBuffer.limit(this.size);
+    }
+
+
+    public MappedFile getMappedFile() {
+        return mappedFile;
+    }
+
+
+//    @Override
+//    protected void finalize() {
+//        if (this.mappedFile != null) {
+//            this.release();
+//        }
+//    }
+
+
+    public synchronized void release() {
+        if (this.mappedFile != null) {
+            this.mappedFile.release();
+            this.mappedFile = null;
+        }
+    }
+
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreCheckpoint.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreCheckpoint.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreCheckpoint.java
new file mode 100644
index 0000000..2d75fd5
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreCheckpoint.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class StoreCheckpoint {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;
+    private final MappedByteBuffer mappedByteBuffer;
+    private volatile long physicMsgTimestamp = 0;
+    private volatile long logicsMsgTimestamp = 0;
+    private volatile long indexMsgTimestamp = 0;
+
+
+    public StoreCheckpoint(final String scpPath) throws IOException {
+        File file = new File(scpPath);
+        MappedFile.ensureDirOK(file.getParent());
+        boolean fileExists = file.exists();
+
+        this.randomAccessFile = new RandomAccessFile(file, "rw");
+        this.fileChannel = this.randomAccessFile.getChannel();
+        this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);
+
+        if (fileExists) {
+            log.info("store checkpoint file exists, " + scpPath);
+            this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
+            this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
+            this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
+
+            log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+                    + UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
+            log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
+                    + UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
+            log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
+                    + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
+        } else {
+            log.info("store checkpoint file not exists, " + scpPath);
+        }
+    }
+
+
+    public void shutdown() {
+        this.flush();
+
+        // unmap mappedByteBuffer
+        MappedFile.clean(this.mappedByteBuffer);
+
+        try {
+            this.fileChannel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    public void flush() {
+        this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
+        this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
+        this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
+        this.mappedByteBuffer.force();
+    }
+
+
+    public long getPhysicMsgTimestamp() {
+        return physicMsgTimestamp;
+    }
+
+
+    public void setPhysicMsgTimestamp(long physicMsgTimestamp) {
+        this.physicMsgTimestamp = physicMsgTimestamp;
+    }
+
+
+    public long getLogicsMsgTimestamp() {
+        return logicsMsgTimestamp;
+    }
+
+
+    public void setLogicsMsgTimestamp(long logicsMsgTimestamp) {
+        this.logicsMsgTimestamp = logicsMsgTimestamp;
+    }
+
+
+    public long getMinTimestampIndex() {
+        return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
+    }
+
+
+    public long getMinTimestamp() {
+        long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);
+
+
+        // fixed https://github.com/alibaba/RocketMQ/issues/467
+        min -= 1000 * 3;
+        if (min < 0)
+            min = 0;
+
+        return min;
+    }
+
+
+    public long getIndexMsgTimestamp() {
+        return indexMsgTimestamp;
+    }
+
+
+    public void setIndexMsgTimestamp(long indexMsgTimestamp) {
+        this.indexMsgTimestamp = indexMsgTimestamp;
+    }
+
+}


Mime
View raw message