rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [32/51] [abbrv] incubator-rocketmq git commit: Add javadoc to message store.
Date Tue, 06 Jun 2017 03:38:52 GMT
Add javadoc to message store.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/482def9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/482def9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/482def9e

Branch: refs/heads/master
Commit: 482def9e6a9032111d259f2ac82d37e5111d9b8b
Parents: 06416b0
Author: Zhanhui Li <lizhanhui@apache.org>
Authored: Sun May 7 23:07:03 2017 +0800
Committer: dongeforever <zhendongliu92@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../broker/client/net/Broker2Client.java        |   2 +-
 .../longpolling/PullRequestHoldService.java     |   4 +-
 .../broker/offset/ConsumerOffsetManager.java    |   4 +-
 .../plugin/AbstractPluginMessageStore.java      |  20 +-
 .../broker/processor/AdminBrokerProcessor.java  |  16 +-
 .../processor/ConsumerManageProcessor.java      |   2 +-
 .../rocketmq/store/DefaultMessageStore.java     |  16 +-
 .../org/apache/rocketmq/store/MessageStore.java | 239 ++++++++++++++++++-
 .../store/schedule/ScheduleMessageService.java  |   2 +-
 9 files changed, 259 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index c00898c..863da62 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -149,7 +149,7 @@ public class Broker2Client {
             long timeStampOffset;
             if (timeStamp == -1) {
 
-                timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic,
i);
+                timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
i);
             } else {
                 timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic,
i, timeStamp);
             }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index 1a53db1..71f56a4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -98,7 +98,7 @@ public class PullRequestHoldService extends ServiceThread {
             if (2 == kArray.length) {
                 String topic = kArray[0];
                 int queueId = Integer.parseInt(kArray[1]);
-                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic,
queueId);
+                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
queueId);
                 try {
                     this.notifyMessageArriving(topic, queueId, offset);
                 } catch (Throwable e) {
@@ -124,7 +124,7 @@ public class PullRequestHoldService extends ServiceThread {
                 for (PullRequest request : requestList) {
                     long newestOffset = maxOffset;
                     if (newestOffset <= request.getPullFromThisOffset()) {
-                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic,
queueId);
+                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
queueId);
                     }
 
                     if (newestOffset > request.getPullFromThisOffset()) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index bdcf30c..769c4ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -73,7 +73,7 @@ public class ConsumerOffsetManager extends ConfigManager {
 
         while (it.hasNext() && result) {
             Entry<Integer, Long> next = it.next();
-            long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic,
next.getKey());
+            long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQueue(topic,
next.getKey());
             long offsetInPersist = next.getValue();
             result = offsetInPersist <= minOffsetInStore;
         }
@@ -201,7 +201,7 @@ public class ConsumerOffsetManager extends ConfigManager {
             String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
             if (topic.equals(topicGroupArr[0])) {
                 for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet())
{
-                    long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic,
entry.getKey());
+                    long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic,
entry.getKey());
                     if (entry.getValue() >= minOffset) {
                         Long offset = queueMinOffset.get(entry.getKey());
                         if (offset == null) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 8ded973..690f70b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -92,18 +92,18 @@ public abstract class AbstractPluginMessageStore implements MessageStore
{
     }
 
     @Override
-    public long getMaxOffsetInQuque(String topic, int queueId) {
-        return next.getMaxOffsetInQuque(topic, queueId);
+    public long getMaxOffsetInQueue(String topic, int queueId) {
+        return next.getMaxOffsetInQueue(topic, queueId);
     }
 
     @Override
-    public long getMinOffsetInQuque(String topic, int queueId) {
-        return next.getMinOffsetInQuque(topic, queueId);
+    public long getMinOffsetInQueue(String topic, int queueId) {
+        return next.getMinOffsetInQueue(topic, queueId);
     }
 
     @Override
-    public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
-        return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset);
+    public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset)
{
+        return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset);
     }
 
     @Override
@@ -152,8 +152,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore
{
     }
 
     @Override
-    public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
-        return next.getMessageStoreTimeStamp(topic, queueId, offset);
+    public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset)
{
+        return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
     }
 
     @Override
@@ -172,8 +172,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore
{
     }
 
     @Override
-    public void excuteDeleteFilesManualy() {
-        next.excuteDeleteFilesManualy();
+    public void executeDeleteFilesManually() {
+        next.executeDeleteFilesManually();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index daea53c..f59d295 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -376,7 +376,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         final GetMaxOffsetRequestHeader requestHeader =
             (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
 
-        long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(),
requestHeader.getQueueId());
+        long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
 
         responseHeader.setOffset(offset);
 
@@ -391,7 +391,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         final GetMinOffsetRequestHeader requestHeader =
             (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
 
-        long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
requestHeader.getQueueId());
+        long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
 
         responseHeader.setOffset(offset);
         response.setCode(ResponseCode.SUCCESS);
@@ -537,11 +537,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
             mq.setQueueId(i);
 
             TopicOffset topicOffset = new TopicOffset();
-            long min = this.brokerController.getMessageStore().getMinOffsetInQuque(topic,
i);
+            long min = this.brokerController.getMessageStore().getMinOffsetInQueue(topic,
i);
             if (min < 0)
                 min = 0;
 
-            long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic,
i);
+            long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
i);
             if (max < 0)
                 max = 0;
 
@@ -679,7 +679,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
                 OffsetWrapper offsetWrapper = new OffsetWrapper();
 
-                long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic,
i);
+                long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
i);
                 if (brokerOffset < 0)
                     brokerOffset = 0;
 
@@ -862,7 +862,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
             long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic,
i);
             timeSpan.setMinTimeStamp(minTime);
 
-            long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic,
i);
+            long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
i);
             long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic,
i, max - 1);
             timeSpan.setMaxTimeStamp(maxTime);
 
@@ -876,7 +876,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
             }
             timeSpan.setConsumeTimeStamp(consumeTime);
 
-            long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(),
i);
+            long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
i);
             if (consumerOffset < maxBrokerOffset) {
                 long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic,
i, consumerOffset);
                 timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
@@ -1126,7 +1126,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                     mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                     mq.setQueueId(i);
                     OffsetWrapper offsetWrapper = new OffsetWrapper();
-                    long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic,
i);
+                    long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
i);
                     if (brokerOffset < 0)
                         brokerOffset = 0;
                     long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 2c1029c..bb42705 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -135,7 +135,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor
{
             response.setRemark(null);
         } else {
             long minOffset =
-                this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
+                this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
                     requestHeader.getQueueId());
             if (minOffset <= 0
                 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7bed62c..931edc7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -580,7 +580,7 @@ public class DefaultMessageStore implements MessageStore {
     /**
 
      */
-    public long getMaxOffsetInQuque(String topic, int queueId) {
+    public long getMaxOffsetInQueue(String topic, int queueId) {
         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
         if (logic != null) {
             long offset = logic.getMaxOffsetInQueue();
@@ -593,7 +593,7 @@ public class DefaultMessageStore implements MessageStore {
     /**
 
      */
-    public long getMinOffsetInQuque(String topic, int queueId) {
+    public long getMinOffsetInQueue(String topic, int queueId) {
         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
         if (logic != null) {
             return logic.getMinOffsetInQueue();
@@ -603,10 +603,10 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     @Override
-    public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
+    public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset)
{
         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
         if (consumeQueue != null) {
-            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(cqOffset);
+            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeQueueOffset);
             if (bufferConsumeQueue != null) {
                 try {
                     long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
@@ -740,10 +740,10 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     @Override
-    public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
+    public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset)
{
         ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
         if (logicQueue != null) {
-            SelectMappedBufferResult result = logicQueue.getIndexBuffer(offset);
+            SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
             if (result != null) {
                 try {
                     final long phyOffset = result.getByteBuffer().getLong();
@@ -798,7 +798,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     @Override
-    public void excuteDeleteFilesManualy() {
+    public void executeDeleteFilesManually() {
         this.cleanCommitLogService.excuteDeleteFilesManualy();
     }
 
@@ -1434,7 +1434,7 @@ public class DefaultMessageStore implements MessageStore {
 
         public void excuteDeleteFilesManualy() {
             this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
-            DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked");
+            DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
         }
 
         public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index e841c08..55572ce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -22,91 +22,304 @@ import java.util.Set;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
 
+/**
+ * This class defines contracting interfaces to implement, allowing third-party vendor to
use customized message store.
+ */
 public interface MessageStore {
 
+    /**
+     * Load previously stored messages.
+     * @return true if success; false otherwise.
+     */
     boolean load();
 
+    /**
+     * Launch this message store.
+     * @throws Exception if there is any error.
+     */
     void start() throws Exception;
 
+    /**
+     * Shutdown this message store.
+     */
     void shutdown();
 
+    /**
+     * Destroy this message store. Generally, all persistent files should be removed after
invocation.
+     */
     void destroy();
 
+    /**
+     * Store a message into store.
+     * @param msg Message instance to store
+     * @return result of store operation.
+     */
     PutMessageResult putMessage(final MessageExtBrokerInner msg);
 
+    /**
+     * Store a batch of messages.
+     * @param messageExtBatch Message batch.
+     * @return result of storing batch messages.
+     */
     PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
 
+    /**
+     * Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code>
at <code>queueId</code> starting
+     * from given <code>offset</code>. Resulting messages will further be screened
using provided message filter.
+     *
+     * @param group Consumer group that launches this query.
+     * @param topic Topic to query.
+     * @param queueId Queue ID to query.
+     * @param offset Logical offset to start from.
+     * @param maxMsgNums Maximum count of messages to query.
+     * @param messageFilter Message filter used to screen desired messages.
+     * @return Matched messages.
+     */
     GetMessageResult getMessage(final String group, final String topic, final int queueId,
         final long offset, final int maxMsgNums, final MessageFilter messageFilter);
 
-    long getMaxOffsetInQuque(final String topic, final int queueId);
-
-    long getMinOffsetInQuque(final String topic, final int queueId);
-
-    long getCommitLogOffsetInQueue(final String topic, final int queueId, final long cqOffset);
-
+    /**
+     * Get maximum offset of the topic queue.
+     * @param topic Topic name.
+     * @param queueId Queue ID.
+     * @return Maximum offset at present.
+     */
+    long getMaxOffsetInQueue(final String topic, final int queueId);
+
+    /**
+     * Get the minimum offset of the topic queue.
+     * @param topic Topic name.
+     * @param queueId Queue ID.
+     * @return Minimum offset at present.
+     */
+    long getMinOffsetInQueue(final String topic, final int queueId);
+
+    /**
+     * Get the offset of the message in the commit log, which is also known as physical offset.
+     * @param topic Topic of the message to lookup.
+     * @param queueId Queue ID.
+     * @param consumeQueueOffset offset of consume queue.
+     * @return physical offset.
+     */
+    long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset);
+
+    /**
+     * Look up the physical offset of the message whose store timestamp is as specified.
+     * @param topic Topic of the message.
+     * @param queueId Queue ID.
+     * @param timestamp Timestamp to look up.
+     * @return physical offset which matches.
+     */
     long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
 
+    /**
+     * Look up the message by given commit log offset.
+     * @param commitLogOffset physical offset.
+     * @return Message whose physical offset is as specified.
+     */
     MessageExt lookMessageByOffset(final long commitLogOffset);
 
+    /**
+     * Get one message from the specified commit log offset.
+     * @param commitLogOffset commit log offset.
+     * @return wrapped result of the message.
+     */
     SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);
 
+    /**
+     * Get one message from the specified commit log offset.
+     * @param commitLogOffset commit log offset.
+     * @param msgSize message size.
+     * @return wrapped result of the message.
+     */
     SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int
msgSize);
 
+    /**
+     * Get the running information of this store.
+     * @return message store running info.
+     */
     String getRunningDataInfo();
 
+    /**
+     * Message store runtime information, which should generally contains various statistical
information.
+     * @return runtime information of the message store in format of key-value pairs.
+     */
     HashMap<String, String> getRuntimeInfo();
 
+    /**
+     * Get the maximum commit log offset.
+     * @return maximum commit log offset.
+     */
     long getMaxPhyOffset();
 
+    /**
+     * Get the minimum commit log offset.
+     * @return minimum commit log offset.
+     */
     long getMinPhyOffset();
 
+    /**
+     * Get the store time of the earliest message in the given queue.
+     * @param topic Topic of the messages to query.
+     * @param queueId Queue ID to find.
+     * @return store time of the earliest message.
+     */
     long getEarliestMessageTime(final String topic, final int queueId);
 
+    /**
+     * Get the store time of the earliest message in this store.
+     * @return timestamp of the earliest message in this store.
+     */
     long getEarliestMessageTime();
 
-    long getMessageStoreTimeStamp(final String topic, final int queueId, final long offset);
-
+    /**
+     * Get the store time of the message specified.
+     * @param topic message topic.
+     * @param queueId queue ID.
+     * @param consumeQueueOffset consume queue offset.
+     * @return store timestamp of the message.
+     */
+    long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset);
+
+    /**
+     * Get the total number of the messages in the specified queue.
+     * @param topic Topic
+     * @param queueId Queue ID.
+     * @return total number.
+     */
     long getMessageTotalInQueue(final String topic, final int queueId);
 
+    /**
+     * Get the raw commit log data starting from the given offset, which should used for
replication purpose.
+     * @param offset starting offset.
+     * @return commit log data.
+     */
     SelectMappedBufferResult getCommitLogData(final long offset);
 
+    /**
+     * Append data to commit log.
+     * @param startOffset starting offset.
+     * @param data data to append.
+     * @return true if success; false otherwise.
+     */
     boolean appendToCommitLog(final long startOffset, final byte[] data);
 
-    void excuteDeleteFilesManualy();
-
-    QueryMessageResult queryMessage(final String topic, final String key, final int maxNum,
-        final long begin, final long end);
-
+    /**
+     * Execute file deletion manually.
+     */
+    void executeDeleteFilesManually();
+
+    /**
+     * Query messages by given key.
+     * @param topic topic of the message.
+     * @param key message key.
+     * @param maxNum maximum number of the messages possible.
+     * @param begin begin timestamp.
+     * @param end end timestamp.
+     * @return
+     */
+    QueryMessageResult queryMessage(final String topic, final String key, final int maxNum,
final long begin,
+        final long end);
+
+    /**
+     * Update HA master address.
+     * @param newAddr new address.
+     */
     void updateHaMasterAddress(final String newAddr);
 
+    /**
+     * Return how much the slave falls behind.
+     * @return number of bytes that slave falls behind.
+     */
     long slaveFallBehindMuch();
 
+    /**
+     * Return the current timestamp of the store.
+     * @return current time in milliseconds since 1970-01-01.
+     */
     long now();
 
+    /**
+     * Clean unused topics.
+     * @param topics all valid topics.
+     * @return number of the topics deleted.
+     */
     int cleanUnusedTopic(final Set<String> topics);
 
+    /**
+     * Clean expired consume queues.
+     */
     void cleanExpiredConsumerQueue();
 
+    /**
+     * Check if the given message has been swapped out of the memory.
+     * @param topic topic.
+     * @param queueId queue ID.
+     * @param consumeOffset consume queue offset.
+     * @return true if the message is no longer in memory; false otherwise.
+     */
     boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
 
+    /**
+     * Get number of the bytes that have been stored in commit log and not yet dispatched
to consume queue.
+     * @return number of the bytes to dispatch.
+     */
     long dispatchBehindBytes();
 
+    /**
+     * Flush the message store to persist all data.
+     * @return maximum offset flushed to persistent storage device.
+     */
     long flush();
 
+    /**
+     * Reset written offset.
+     * @param phyOffset new offset.
+     * @return true if success; false otherwise.
+     */
     boolean resetWriteOffset(long phyOffset);
 
+    /**
+     * Get confirm offset.
+     * @return confirm offset.
+     */
     long getConfirmOffset();
 
+    /**
+     * Set confirm offset.
+     * @param phyOffset confirm offset to set.
+     */
     void setConfirmOffset(long phyOffset);
 
+    /**
+     * Check if the operation system page cache is busy or not.
+     * @return true if the OS page cache is busy; false otherwise.
+     */
     boolean isOSPageCacheBusy();
 
+    /**
+     * Get lock time in milliseconds of the store by far.
+     * @return lock time in milliseconds.
+     */
     long lockTimeMills();
 
+    /**
+     * Check if the transient store pool is deficient.
+     * @return true if the transient store pool is running out; false otherwise.
+     */
     boolean isTransientStorePoolDeficient();
 
+    /**
+     * Get the dispatcher list.
+     * @return list of the dispatcher.
+     */
     LinkedList<CommitLogDispatcher> getDispatcherList();
 
+    /**
+     * Get consume queue of the topic/queue.
+     * @param topic Topic.
+     * @param queueId Queue ID.
+     * @return Consume queue.
+     */
     ConsumeQueue getConsumeQueue(String topic, int queueId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/482def9e/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index d45b994..501876e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -79,7 +79,7 @@ public class ScheduleMessageService extends ConfigManager {
             Entry<Integer, Long> next = it.next();
             int queueId = delayLevel2QueueId(next.getKey());
             long delayOffset = next.getValue();
-            long maxOffset = this.defaultMessageStore.getMaxOffsetInQuque(SCHEDULE_TOPIC,
queueId);
+            long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC,
queueId);
             String value = String.format("%d,%d", delayOffset, maxOffset);
             String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(),
next.getKey());
             stats.put(key, value);


Mime
View raw message