rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [1/7] incubator-rocketmq git commit: [ROCKETMQ-121]Support message filtering based on SQL92 closes apache/incubator-rocketmq#82
Date Fri, 21 Apr 2017 10:19:11 GMT
Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 42f78c281 -> 58f1574b2


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
index 33529da..3d33eaf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.store;
 
+import java.util.Map;
+
 public class DispatchRequest {
     private final String topic;
     private final int queueId;
@@ -30,6 +32,8 @@ public class DispatchRequest {
 
     private final int sysFlag;
     private final long preparedTransactionOffset;
+    private final Map<String, String> propertiesMap;
+    private byte[] bitMap;
 
     public DispatchRequest(
         final String topic,
@@ -42,7 +46,8 @@ public class DispatchRequest {
         final String keys,
         final String uniqKey,
         final int sysFlag,
-        final long preparedTransactionOffset
+        final long preparedTransactionOffset,
+        final Map<String, String> propertiesMap
     ) {
         this.topic = topic;
         this.queueId = queueId;
@@ -57,6 +62,7 @@ public class DispatchRequest {
         this.sysFlag = sysFlag;
         this.preparedTransactionOffset = preparedTransactionOffset;
         this.success = true;
+        this.propertiesMap = propertiesMap;
     }
 
     public DispatchRequest(int size) {
@@ -81,6 +87,7 @@ public class DispatchRequest {
         this.sysFlag = 0;
         this.preparedTransactionOffset = 0;
         this.success = false;
+        this.propertiesMap = null;
     }
 
     public DispatchRequest(int size, boolean success) {
@@ -105,6 +112,7 @@ public class DispatchRequest {
         this.sysFlag = 0;
         this.preparedTransactionOffset = 0;
         this.success = success;
+        this.propertiesMap = null;
     }
 
     public String getTopic() {
@@ -155,4 +163,15 @@ public class DispatchRequest {
         return uniqKey;
     }
 
+    public Map<String, String> getPropertiesMap() {
+        return propertiesMap;
+    }
+
+    public byte[] getBitMap() {
+        return bitMap;
+    }
+
+    public void setBitMap(byte[] bitMap) {
+        this.bitMap = bitMap;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index 550e578..a9a00a8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -245,6 +245,31 @@ public class MappedFile extends ReferenceResource {
     }
 
     /**
+     * Content of data from offset to offset + length will be wrote to file.
+     *
+     * @param data
+     * @param offset The offset of the subarray to be used.
+     * @param length The length of the subarray to be used.
+     * @return
+     */
+    public boolean appendMessage(final byte[] data, final int offset, final int length) {
+        int currentPos = this.wrotePosition.get();
+
+        if ((currentPos + length) <= this.fileSize) {
+            try {
+                this.fileChannel.position(currentPos);
+                this.fileChannel.write(ByteBuffer.wrap(data, offset, length));
+            } catch (Throwable e) {
+                log.error("Error occurred when append message to mappedFile.", e);
+            }
+            this.wrotePosition.addAndGet(length);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param flushLeastPages
      * @return The current flushed position
      */

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 5c6c62c..a8fa364 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -121,7 +121,7 @@ public class MappedFileQueue {
         this.deleteExpiredFile(willRemoveFiles);
     }
 
-    private void deleteExpiredFile(List<MappedFile> files) {
+    void deleteExpiredFile(List<MappedFile> files) {
 
         if (!files.isEmpty()) {
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
index 2523c1a..dee1bc7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.store;
 
+import java.util.Map;
+
 public interface MessageArrivingListener {
-    void arriving(String topic, int queueId, long logicOffset, long tagsCode);
+    void arriving(String topic, int queueId, long logicOffset, long tagsCode,
+                  long msgStoreTime, byte[] filterBitMap, Map<String, String> properties);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
index 859ce99..6b34758 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
@@ -16,8 +16,30 @@
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import java.nio.ByteBuffer;
+import java.util.Map;
 
 public interface MessageFilter {
-    boolean isMessageMatched(final SubscriptionData subscriptionData, final Long tagsCode);
+    /**
+     * match by tags code or filter bit map which is calculated when message received
+     * and stored in consume queue ext.
+     *
+     * @param tagsCode         tagsCode
+     * @param cqExtUnit        extend unit of consume queue
+     * @return
+     */
+    boolean isMatchedByConsumeQueue(final Long tagsCode,
+                                    final ConsumeQueueExt.CqExtUnit cqExtUnit);
+
+    /**
+     * match by message content which are stored in commit log.
+     * <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in
store,
+     * {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer}
is null.
+     *
+     * @param msgBuffer        message buffer in commit log, may be null if not invoked in
store.
+     * @param properties       message properties, should decode from buffer if null by yourself.
+     * @return
+     */
+    boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
+                                 final Map<String, String> properties);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 65c546b..e841c08 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -17,10 +17,10 @@
 package org.apache.rocketmq.store;
 
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
 public interface MessageStore {
 
@@ -37,7 +37,7 @@ public interface MessageStore {
     PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
 
     GetMessageResult getMessage(final String group, final String topic, final int queueId,
-        final long offset, final int maxMsgNums, final SubscriptionData subscriptionData);
+        final long offset, final int maxMsgNums, final MessageFilter messageFilter);
 
     long getMaxOffsetInQuque(final String topic, final int queueId);
 
@@ -105,4 +105,8 @@ public interface MessageStore {
     long lockTimeMills();
 
     boolean isTransientStorePoolDeficient();
+
+    LinkedList<CommitLogDispatcher> getDispatcherList();
+
+    ConsumeQueue getConsumeQueue(String topic, int queueId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 7ae2ab5..29f800c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -34,6 +34,13 @@ public class MessageStoreConfig {
     private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
     // ConsumeQueue file size,default is 30W
     private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
+    // enable consume queue ext
+    private boolean enableConsumeQueueExt = false;
+    // ConsumeQueue extend file size, 48M
+    private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
+    // Bit count of filter bit map.
+    // this will be set by pipe of calculate filter bit map.
+    private int bitMapLengthConsumeQueueExt = 64;
 
     // CommitLog flush interval
     // flush data to disk
@@ -191,6 +198,30 @@ public class MessageStoreConfig {
         this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue;
     }
 
+    public boolean isEnableConsumeQueueExt() {
+        return enableConsumeQueueExt;
+    }
+
+    public void setEnableConsumeQueueExt(boolean enableConsumeQueueExt) {
+        this.enableConsumeQueueExt = enableConsumeQueueExt;
+    }
+
+    public int getMappedFileSizeConsumeQueueExt() {
+        return mappedFileSizeConsumeQueueExt;
+    }
+
+    public void setMappedFileSizeConsumeQueueExt(int mappedFileSizeConsumeQueueExt) {
+        this.mappedFileSizeConsumeQueueExt = mappedFileSizeConsumeQueueExt;
+    }
+
+    public int getBitMapLengthConsumeQueueExt() {
+        return bitMapLengthConsumeQueueExt;
+    }
+
+    public void setBitMapLengthConsumeQueueExt(int bitMapLengthConsumeQueueExt) {
+        this.bitMapLengthConsumeQueueExt = bitMapLengthConsumeQueueExt;
+    }
+
     public int getFlushIntervalCommitLog() {
         return flushIntervalCommitLog;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
index aebebaf..ef1d670 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
@@ -24,6 +24,10 @@ public class StorePathConfigHelper {
         return rootDir + File.separator + "consumequeue";
     }
 
+    public static String getStorePathConsumeQueueExt(final String rootDir) {
+        return rootDir + File.separator + "consumequeue_ext";
+    }
+
     public static String getStorePathIndex(final String rootDir) {
         return rootDir + File.separator + "index";
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index e08a6f5..d45b994 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.running.RunningStats;
 import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.PutMessageResult;
@@ -248,11 +249,24 @@ public class ScheduleMessageService extends ConfigManager {
                     try {
                         long nextOffset = offset;
                         int i = 0;
+                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                         for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE)
{
                             long offsetPy = bufferCQ.getByteBuffer().getLong();
                             int sizePy = bufferCQ.getByteBuffer().getInt();
                             long tagsCode = bufferCQ.getByteBuffer().getLong();
 
+                            if (cq.isExtAddr(tagsCode)) {
+                                if (cq.getExt(tagsCode, cqExtUnit)) {
+                                    tagsCode = cqExtUnit.getTagsCode();
+                                } else {
+                                    //can't find ext content.So re compute tags code.
+                                    log.error("[BUG] can't find consume queue extend file
content!addr={}, offsetPy={}, sizePy={}",
+                                        tagsCode, offsetPy, sizePy);
+                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy,
sizePy);
+                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                                }
+                            }
+
                             long now = System.currentTimeMillis();
                             long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
new file mode 100644
index 0000000..5dbc584
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConsumeQueueExtTest {
+
+    private static final String topic = "abc";
+    private static final int queueId = 0;
+    private static final String storePath = "." + File.separator + "unit_test_store";
+    private static final int bitMapLength = 64;
+    private static final int unitSizeWithBitMap = ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE
+ bitMapLength / Byte.SIZE;
+    private static final int cqExtFileSize = 10 * unitSizeWithBitMap;
+    private static final int unitCount = 20;
+
+
+    protected ConsumeQueueExt genExt() {
+        return new ConsumeQueueExt(
+            topic, queueId, storePath, cqExtFileSize, bitMapLength
+        );
+    }
+
+    protected byte[] genBitMap(int bitMapLength) {
+        byte[] bytes = new byte[bitMapLength / Byte.SIZE];
+
+        Random random = new Random(System.currentTimeMillis());
+        random.nextBytes(bytes);
+
+        return bytes;
+    }
+
+    protected ConsumeQueueExt.CqExtUnit genUnit(boolean hasBitMap) {
+        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+
+        cqExtUnit.setTagsCode(Math.abs((new Random(System.currentTimeMillis())).nextInt()));
+        cqExtUnit.setMsgStoreTime(System.currentTimeMillis());
+        if (hasBitMap) {
+            cqExtUnit.setFilterBitMap(genBitMap(bitMapLength));
+        }
+
+        return cqExtUnit;
+    }
+
+    protected void deleteDirectory(String rootPath) {
+        File file = new File(rootPath);
+        deleteFile(file);
+    }
+
+    protected void deleteFile(File file) {
+        File[] subFiles = file.listFiles();
+        if (subFiles != null) {
+            for (File sub : subFiles) {
+                deleteFile(sub);
+            }
+        }
+
+        file.delete();
+    }
+
+    protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut,
+                          boolean unitSameSize, int unitCount) {
+        for (int i = 0; i < unitCount; i++) {
+            ConsumeQueueExt.CqExtUnit putUnit =
+                unitSameSize ? genUnit(true) : genUnit(i % 2 == 0);
+
+            long addr = consumeQueueExt.put(putUnit);
+            assertThat(addr).isLessThan(0);
+
+            if (getAfterPut) {
+                ConsumeQueueExt.CqExtUnit getUnit = consumeQueueExt.get(addr);
+
+                assertThat(getUnit).isNotNull();
+                assertThat(putUnit).isEqualTo(getUnit);
+            }
+
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                assertThat(false).isTrue();
+            }
+        }
+    }
+
+    @Test
+    public void testPut() {
+        ConsumeQueueExt consumeQueueExt = genExt();
+
+        try {
+            putSth(consumeQueueExt, true, false, unitCount);
+        } finally {
+            consumeQueueExt.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+
+    @Test
+    public void testGet() {
+        ConsumeQueueExt consumeQueueExt = genExt();
+
+        putSth(consumeQueueExt, false, false, unitCount);
+
+        try {
+            // from start.
+            long addr = consumeQueueExt.decorate(0);
+
+            ConsumeQueueExt.CqExtUnit unit = new ConsumeQueueExt.CqExtUnit();
+            while (true) {
+                boolean ret = consumeQueueExt.get(addr, unit);
+
+                if (!ret) {
+                    break;
+                }
+
+                assertThat(unit.getSize()).isGreaterThanOrEqualTo(ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE);
+
+                addr += unit.getSize();
+            }
+        } finally {
+            consumeQueueExt.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+
+    @Test
+    public void testGet_invalidAddress() {
+        ConsumeQueueExt consumeQueueExt = genExt();
+
+        putSth(consumeQueueExt, false, true, unitCount);
+
+        try {
+            ConsumeQueueExt.CqExtUnit unit = consumeQueueExt.get(0);
+
+            assertThat(unit).isNull();
+
+            long addr = (cqExtFileSize / unitSizeWithBitMap) * unitSizeWithBitMap;
+            addr += unitSizeWithBitMap;
+
+            unit = consumeQueueExt.get(addr);
+            assertThat(unit).isNull();
+        } finally {
+            consumeQueueExt.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+
+    @Test
+    public void testRecovery() {
+        ConsumeQueueExt putCqExt = genExt();
+
+        putSth(putCqExt, false, true, unitCount);
+
+        ConsumeQueueExt loadCqExt = genExt();
+
+        loadCqExt.load();
+
+        loadCqExt.recover();
+
+        try {
+            assertThat(loadCqExt.getMinAddress()).isEqualTo(Long.MIN_VALUE);
+
+            // same unit size.
+            int countPerFile = (cqExtFileSize - ConsumeQueueExt.END_BLANK_DATA_LENGTH) /
unitSizeWithBitMap;
+
+            int lastFileUnitCount = unitCount % countPerFile;
+
+            int fileCount = unitCount / countPerFile + 1;
+            if (lastFileUnitCount == 0) {
+                fileCount -= 1;
+            }
+
+            if (lastFileUnitCount == 0) {
+                assertThat(loadCqExt.unDecorate(loadCqExt.getMaxAddress()) % cqExtFileSize).isEqualTo(0);
+            } else {
+                assertThat(loadCqExt.unDecorate(loadCqExt.getMaxAddress()))
+                    .isEqualTo(lastFileUnitCount * unitSizeWithBitMap + (fileCount - 1) *
cqExtFileSize);
+            }
+        } finally {
+            putCqExt.destroy();
+            loadCqExt.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+
+    @Test
+    public void testTruncateByMinOffset() {
+        ConsumeQueueExt consumeQueueExt = genExt();
+
+        putSth(consumeQueueExt, false, true, unitCount * 2);
+
+        try {
+            // truncate first one file.
+            long address = consumeQueueExt.decorate((long) (cqExtFileSize * 1.5));
+
+            long expectMinAddress = consumeQueueExt.decorate(cqExtFileSize);
+
+            consumeQueueExt.truncateByMinAddress(address);
+
+            long minAddress = consumeQueueExt.getMinAddress();
+
+            assertThat(expectMinAddress).isEqualTo(minAddress);
+        } finally {
+            consumeQueueExt.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+
+    @Test
+    public void testTruncateByMaxOffset() {
+        ConsumeQueueExt consumeQueueExt = genExt();
+
+        putSth(consumeQueueExt, false, true, unitCount * 2);
+
+        try {
+            // truncate, only first 3 files exist.
+            long address = consumeQueueExt.decorate(cqExtFileSize * 2 + unitSizeWithBitMap);
+
+            long expectMaxAddress = address + unitSizeWithBitMap;
+
+            consumeQueueExt.truncateByMaxAddress(address);
+
+            long maxAddress = consumeQueueExt.getMaxAddress();
+
+            assertThat(expectMaxAddress).isEqualTo(maxAddress);
+        } finally {
+            consumeQueueExt.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
new file mode 100644
index 0000000..9c42fb9
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConsumeQueueTest {
+
+    private static final String msg = "Once, there was a chance for me!";
+    private static final byte[] msgBody = msg.getBytes();
+
+    private static final String topic = "abc";
+    private static final int queueId = 0;
+    private static final String storePath = "." + File.separator + "unit_test_store";
+    private static final int commitLogFileSize = 1024 * 8;
+    private static final int cqFileSize = 10 * 20;
+    private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE
+ 64);
+
+    private static SocketAddress BornHost;
+
+    private static SocketAddress StoreHost;
+
+    static {
+        try {
+            StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+        try {
+            BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public MessageExtBrokerInner buildMessage() {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic(topic);
+        msg.setTags("TAG1");
+        msg.setKeys("Hello");
+        msg.setBody(msgBody);
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(queueId);
+        msg.setSysFlag(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(StoreHost);
+        msg.setBornHost(BornHost);
+        for (int i = 0; i < 1; i++) {
+            msg.putUserProperty(String.valueOf(i), "imagoodperson" + i);
+        }
+        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+        return msg;
+    }
+
+
+    public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
+                                               boolean enableCqExt, int cqExtFileSize) {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
+        messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize);
+        messageStoreConfig.setMessageIndexEnable(false);
+        messageStoreConfig.setEnableConsumeQueueExt(enableCqExt);
+
+        messageStoreConfig.setStorePathRootDir(storePath);
+        messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog");
+
+        return messageStoreConfig;
+    }
+
+    protected DefaultMessageStore gen() throws Exception {
+        MessageStoreConfig messageStoreConfig = buildStoreConfig(
+            commitLogFileSize, cqFileSize, true, cqExtFileSize
+        );
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+
+        DefaultMessageStore master = new DefaultMessageStore(
+            messageStoreConfig,
+            new BrokerStatsManager(brokerConfig.getBrokerClusterName()),
+            new MessageArrivingListener() {
+                @Override
+                public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
+                                     long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
+                }
+            }
+            , brokerConfig);
+
+        assertThat(master.load()).isTrue();
+
+        master.start();
+
+        return master;
+    }
+
+    protected void putMsg(DefaultMessageStore master) throws Exception {
+        long totalMsgs = 200;
+
+        for (long i = 0; i < totalMsgs; i++) {
+            master.putMessage(buildMessage());
+        }
+    }
+
+    protected void deleteDirectory(String rootPath) {
+        File file = new File(rootPath);
+        deleteFile(file);
+    }
+
+    protected void deleteFile(File file) {
+        File[] subFiles = file.listFiles();
+        if (subFiles != null) {
+            for (File sub : subFiles) {
+                deleteFile(sub);
+            }
+        }
+
+        file.delete();
+    }
+
+    @Test
+    public void testConsumeQueueWithExtendData() {
+        DefaultMessageStore master = null;
+        try {
+            master = gen();
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+
+        master.getDispatcherList().addFirst(new CommitLogDispatcher() {
+
+            @Override
+            public void dispatch(DispatchRequest request) {
+                runCount++;
+            }
+
+            private int runCount = 0;
+        });
+
+        try {
+            try {
+                putMsg(master);
+                // wait build consume queue
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                e.printStackTrace();
+                assertThat(Boolean.FALSE).isTrue();
+            }
+
+            ConsumeQueue cq = master.getConsumeQueueTable().get(topic).get(queueId);
+
+            assertThat(cq).isNotNull();
+
+            long index = 0;
+
+            while (index < cq.getMaxOffsetInQueue()) {
+                SelectMappedBufferResult bufferResult = cq.getIndexBuffer(index);
+
+                assertThat(bufferResult).isNotNull();
+
+                ByteBuffer buffer = bufferResult.getByteBuffer();
+
+                assertThat(buffer).isNotNull();
+                try {
+                    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                    for (int i = 0; i < bufferResult.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE)
{
+                        long phyOffset = buffer.getLong();
+                        int size = buffer.getInt();
+                        long tagsCode = buffer.getLong();
+
+                        assertThat(phyOffset).isGreaterThanOrEqualTo(0);
+                        assertThat(size).isGreaterThan(0);
+                        assertThat(tagsCode).isLessThan(0);
+
+                        boolean ret = cq.getExt(tagsCode, cqExtUnit);
+
+                        assertThat(ret).isTrue();
+                        assertThat(cqExtUnit).isNotNull();
+                        assertThat(cqExtUnit.getSize()).isGreaterThan((short) 0);
+                        assertThat(cqExtUnit.getMsgStoreTime()).isGreaterThan(0);
+                        assertThat(cqExtUnit.getTagsCode()).isGreaterThan(0);
+                    }
+
+                } finally {
+                    bufferResult.release();
+                }
+
+                index += cqFileSize / ConsumeQueue.CQ_STORE_UNIT_SIZE;
+            }
+        } finally {
+            master.shutdown();
+            master.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 5c9c46f..75f1de9 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.store;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.store.config.FlushDiskType;
@@ -124,7 +125,8 @@ public class DefaultMessageStoreTest {
 
     private class MyMessageArrivingListener implements MessageArrivingListener {
         @Override
-        public void arriving(String topic, int queueId, long logicOffset, long tagsCode)
{
+        public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime,
+                             byte[] filterBitMap, Map<String, String> properties) {
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 19bff89..409ea33 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
@@ -469,4 +470,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt
{
         UnsupportedEncodingException {
         return this.defaultMQAdminExtImpl.getNameServerConfig(nameServers);
     }
+
+    @Override
+    public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
int queueId, long index, int count, String consumerGroup)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException {
+        return this.defaultMQAdminExtImpl.queryConsumeQueue(
+            brokerAddr, topic, queueId, index, count, consumerGroup
+        );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index a31b69d..157ae21 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
@@ -955,4 +956,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner
{
         return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers,
timeoutMillis);
     }
 
+    @Override
+    public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
int queueId, long index, int count, String consumerGroup)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException {
+        return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(
+            brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis
+        );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 493cf54..82add92 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
@@ -241,4 +242,25 @@ public interface MQAdminExt extends MQAdmin {
     Map<String, Properties> getNameServerConfig(final List<String> nameServers)
throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
         MQClientException, UnsupportedEncodingException;
+
+    /**
+     * query consume queue data
+     *
+     * @param brokerAddr broker ip address
+     * @param topic topic
+     * @param queueId id of queue
+     * @param index start offset
+     * @param count how many
+     * @param consumerGroup group
+     * @return
+     * @throws InterruptedException
+     * @throws RemotingTimeoutException
+     * @throws RemotingSendRequestException
+     * @throws RemotingConnectException
+     * @throws MQClientException
+     */
+    QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr,
+                                            final String topic, final int queueId,
+                                            final long index, final int count, final String
consumerGroup)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 9bd37e8..6398291 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.tools.command.namesrv.UpdateNamesrvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand;
 import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
 import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
+import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
 import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
 import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
 import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
@@ -189,6 +190,8 @@ public class MQAdminStartup {
         initCommand(new GetNamesrvConfigCommand());
         initCommand(new UpdateNamesrvConfigCommand());
         initCommand(new GetBrokerConfigCommand());
+
+        initCommand(new QueryConsumeQueueCommand());
     }
 
     private static void initLogback() throws JoranException {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java
new file mode 100644
index 0000000..611addd
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.queue;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.protocol.body.ConsumeQueueData;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+
+public class QueryConsumeQueueCommand implements SubCommand {
+
+    public static void main(String[] args) {
+        QueryConsumeQueueCommand cmd = new QueryConsumeQueueCommand();
+
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-t TopicTest", "-q 0", "-i 6447", "-b 100.81.165.119:10911"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
+                new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+
+    @Override
+    public String commandName() {
+        return "queryCq";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Query cq command.";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("q", "queue", true, "queue num, ie. 1");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("i", "index", true, "start queue index.");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("c", "count", true, "how many.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("b", "broker", true, "broker addr.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "consumer", true, "consumer group.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+
+            String topic = commandLine.getOptionValue("t").trim();
+            int queueId = Integer.valueOf(commandLine.getOptionValue("q").trim());
+            long index = Long.valueOf(commandLine.getOptionValue("i").trim());
+            int count = Integer.valueOf(commandLine.getOptionValue("c", "10").trim());
+            String broker = null;
+            if (commandLine.hasOption("b")) {
+                broker = commandLine.getOptionValue("b").trim();
+            }
+            String consumerGroup = null;
+            if (commandLine.hasOption("g")) {
+                consumerGroup = commandLine.getOptionValue("g").trim();
+            }
+
+            if (broker == null || broker == "") {
+                TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+
+                if (topicRouteData == null || topicRouteData.getBrokerDatas() == null
+                    || topicRouteData.getBrokerDatas().isEmpty()) {
+                    throw new Exception("No topic route data!");
+                }
+
+                broker = topicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(0L);
+            }
+
+            QueryConsumeQueueResponseBody queryConsumeQueueResponseBody = defaultMQAdminExt.queryConsumeQueue(
+                broker, topic, queueId, index, count, consumerGroup
+            );
+
+            if (queryConsumeQueueResponseBody.getSubscriptionData() != null) {
+                System.out.printf("Subscription data: \n%s\n", JSON.toJSONString(queryConsumeQueueResponseBody.getSubscriptionData(),
true));
+                System.out.print("======================================\n");
+            }
+
+            if (queryConsumeQueueResponseBody.getFilterData() != null) {
+                System.out.printf("Filter data: \n%s\n", queryConsumeQueueResponseBody.getFilterData());
+                System.out.print("======================================\n");
+            }
+
+            System.out.printf("Queue data: \nmax: %d, min: %d\n", queryConsumeQueueResponseBody.getMaxQueueIndex(),
+                queryConsumeQueueResponseBody.getMinQueueIndex());
+            System.out.print("======================================\n");
+
+            if (queryConsumeQueueResponseBody.getQueueData() != null) {
+
+                long i = index;
+                for (ConsumeQueueData queueData : queryConsumeQueueResponseBody.getQueueData())
{
+                    StringBuilder stringBuilder = new StringBuilder();
+                    stringBuilder.append("idx: " + i + "\n");
+
+                    stringBuilder.append(queueData.toString() + "\n");
+
+                    stringBuilder.append("======================================\n");
+
+                    System.out.print(stringBuilder.toString());
+                    i++;
+                }
+
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}



Mime
View raw message