rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [2/7] incubator-rocketmq git commit: [ROCKETMQ-121]Support message filtering based on SQL92 closes apache/incubator-rocketmq#82
Date Fri, 21 Apr 2017 10:19:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java
new file mode 100644
index 0000000..ef81b29
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BitsArrayTest {
+
+    BitsArray gen(int bitCount) {
+        BitsArray bitsArray = BitsArray.create(bitCount);
+
+        for (int i = 0; i < bitCount / Byte.SIZE; i++) {
+            bitsArray.setByte(i, (byte) (new Random(System.currentTimeMillis())).nextInt(0xff));
+            try {
+                Thread.sleep(2);
+            } catch (InterruptedException e) {
+            }
+        }
+
+        return bitsArray;
+    }
+
+    int bitLength = Byte.SIZE;
+
+    @Test
+    public void testConstructor() {
+        BitsArray bitsArray = BitsArray.create(8);
+
+        assertThat(bitsArray.byteLength() == 1 && bitsArray.bitLength() == 8).isTrue();
+
+        bitsArray = BitsArray.create(9);
+
+        assertThat(bitsArray.byteLength() == 2 && bitsArray.bitLength() == 9).isTrue();
+
+        bitsArray = BitsArray.create(7);
+
+        assertThat(bitsArray.byteLength() == 1 && bitsArray.bitLength() == 7).isTrue();
+    }
+
+    @Test
+    public void testSet() {
+        BitsArray bitsArray = gen(bitLength);
+        BitsArray backUp = bitsArray.clone();
+
+        boolean val = bitsArray.getBit(2);
+
+        bitsArray.setBit(2, !val);
+
+        bitsArray.xor(backUp);
+
+        assertThat(bitsArray.getBit(2)).isTrue();
+    }
+
+    @Test
+    public void testAndOr() {
+        BitsArray bitsArray = gen(bitLength);
+
+        boolean val = bitsArray.getBit(2);
+
+        if (val) {
+            bitsArray.and(2, false);
+            assertThat(!bitsArray.getBit(2)).isTrue();
+        } else {
+            bitsArray.or(2, true);
+            assertThat(bitsArray.getBit(2)).isTrue();
+        }
+    }
+
+    @Test
+    public void testXor() {
+        BitsArray bitsArray = gen(bitLength);
+
+        boolean val = bitsArray.getBit(2);
+
+        bitsArray.xor(2, !val);
+
+        assertThat(bitsArray.getBit(2)).isTrue();
+    }
+
+    @Test
+    public void testNot() {
+        BitsArray bitsArray = gen(bitLength);
+        BitsArray backUp = bitsArray.clone();
+
+        bitsArray.not(2);
+
+        bitsArray.xor(backUp);
+
+        assertThat(bitsArray.getBit(2)).isTrue();
+    }
+
+    @Test
+    public void testOr() {
+        BitsArray b1 = BitsArray.create(new byte[]{(byte) 0xff, 0x00});
+        BitsArray b2 = BitsArray.create(new byte[]{0x00, (byte) 0xff});
+
+        b1.or(b2);
+
+        for (int i = 0; i < b1.bitLength(); i++) {
+            assertThat(b1.getBit(i)).isTrue();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java b/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java
new file mode 100644
index 0000000..c6097ee
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.filter.util.BloomFilter;
+import org.apache.rocketmq.filter.util.BloomFilterData;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BloomFilterTest {
+
+    @Test
+    public void testEquals() {
+        BloomFilter a = BloomFilter.createByFn(10, 20);
+
+        BloomFilter b = BloomFilter.createByFn(10, 20);
+
+        BloomFilter c = BloomFilter.createByFn(12, 20);
+
+        BloomFilter d = BloomFilter.createByFn(10, 30);
+
+        assertThat(a).isEqualTo(b);
+        assertThat(a).isNotEqualTo(c);
+        assertThat(a).isNotEqualTo(d);
+        assertThat(d).isNotEqualTo(c);
+
+        assertThat(a.hashCode()).isEqualTo(b.hashCode());
+        assertThat(a.hashCode()).isNotEqualTo(c.hashCode());
+        assertThat(a.hashCode()).isNotEqualTo(d.hashCode());
+        assertThat(c.hashCode()).isNotEqualTo(d.hashCode());
+    }
+
+    @Test
+    public void testHashTo() {
+        String cid = "CID_abc_efg";
+
+        BloomFilter bloomFilter = BloomFilter.createByFn(10, 20);
+
+        BitsArray bits = BitsArray.create(bloomFilter.getM());
+
+        int[] bitPos = bloomFilter.calcBitPositions(cid);
+
+        bloomFilter.hashTo(cid, bits);
+
+        for (int bit : bitPos) {
+            assertThat(bits.getBit(bit)).isTrue();
+        }
+    }
+
+    @Test
+    public void testCalcBitPositions() {
+        String cid = "CID_abc_efg";
+
+        BloomFilter bloomFilter = BloomFilter.createByFn(10, 20);
+
+        int[] bitPos = bloomFilter.calcBitPositions(cid);
+
+        assertThat(bitPos).isNotNull();
+        assertThat(bitPos.length).isEqualTo(bloomFilter.getK());
+
+        int[] bitPos2 = bloomFilter.calcBitPositions(cid);
+
+        assertThat(bitPos2).isNotNull();
+        assertThat(bitPos2.length).isEqualTo(bloomFilter.getK());
+
+        assertThat(bitPos).isEqualTo(bitPos2);
+    }
+
+    @Test
+    public void testIsHit() {
+        String cid = "CID_abc_efg";
+        String cid2 = "CID_abc_123";
+
+        BloomFilter bloomFilter = BloomFilter.createByFn(10, 20);
+
+        BitsArray bits = BitsArray.create(bloomFilter.getM());
+
+        bloomFilter.hashTo(cid, bits);
+
+        assertThat(bloomFilter.isHit(cid, bits)).isTrue();
+        assertThat(!bloomFilter.isHit(cid2, bits)).isTrue();
+
+        bloomFilter.hashTo(cid2, bits);
+
+        assertThat(bloomFilter.isHit(cid, bits)).isTrue();
+        assertThat(bloomFilter.isHit(cid2, bits)).isTrue();
+    }
+
+    @Test
+    public void testBloomFilterData() {
+        BloomFilterData bloomFilterData = new BloomFilterData(new int[]{1, 2, 3}, 128);
+        BloomFilterData bloomFilterData1 = new BloomFilterData(new int[]{1, 2, 3}, 128);
+        BloomFilterData bloomFilterData2 = new BloomFilterData(new int[]{1, 2, 3}, 129);
+
+        assertThat(bloomFilterData).isEqualTo(bloomFilterData1);
+        assertThat(bloomFilterData2).isNotEqualTo(bloomFilterData);
+        assertThat(bloomFilterData2).isNotEqualTo(bloomFilterData1);
+
+        assertThat(bloomFilterData.hashCode()).isEqualTo(bloomFilterData1.hashCode());
+        assertThat(bloomFilterData2.hashCode()).isNotEqualTo(bloomFilterData.hashCode());
+        assertThat(bloomFilterData2.hashCode()).isNotEqualTo(bloomFilterData1.hashCode());
+
+        assertThat(bloomFilterData.getBitPos()).isEqualTo(bloomFilterData2.getBitPos());
+        assertThat(bloomFilterData.getBitNum()).isEqualTo(bloomFilterData1.getBitNum());
+        assertThat(bloomFilterData.getBitNum()).isNotEqualTo(bloomFilterData2.getBitNum());
+
+        bloomFilterData2.setBitNum(128);
+
+        assertThat(bloomFilterData).isEqualTo(bloomFilterData2);
+
+        bloomFilterData2.setBitPos(new int[]{1, 2, 3, 4});
+
+        assertThat(bloomFilterData).isNotEqualTo(bloomFilterData2);
+
+        BloomFilterData nullData = new BloomFilterData();
+
+        assertThat(nullData.getBitNum()).isEqualTo(0);
+        assertThat(nullData.getBitPos()).isNull();
+
+        BloomFilter bloomFilter = BloomFilter.createByFn(1, 300);
+
+        assertThat(bloomFilter).isNotNull();
+        assertThat(bloomFilter.isValid(bloomFilterData)).isFalse();
+    }
+
+    @Test
+    public void testCheckFalseHit() {
+        BloomFilter bloomFilter = BloomFilter.createByFn(1, 300);
+        BitsArray bits = BitsArray.create(bloomFilter.getM());
+        int falseHit = 0;
+        for (int i = 0; i < bloomFilter.getN(); i++) {
+            String str = randomString((new Random(System.nanoTime())).nextInt(127) + 10);
+            int[] bitPos = bloomFilter.calcBitPositions(str);
+
+            if (bloomFilter.checkFalseHit(bitPos, bits)) {
+                falseHit++;
+            }
+
+            bloomFilter.hashTo(bitPos, bits);
+        }
+
+        assertThat(falseHit).isLessThanOrEqualTo(bloomFilter.getF() * bloomFilter.getN() / 100);
+    }
+
+    private String randomString(int length) {
+        StringBuilder stringBuilder = new StringBuilder(length);
+        for (int i = 0; i < length; i++) {
+            stringBuilder.append((char) ((new Random(System.nanoTime())).nextInt(123 - 97) + 97));
+        }
+
+        return stringBuilder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java b/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java
new file mode 100644
index 0000000..0ee81c9
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.expression.ComparisonExpression;
+import org.apache.rocketmq.filter.expression.ConstantExpression;
+import org.apache.rocketmq.filter.expression.EvaluationContext;
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+import org.apache.rocketmq.filter.expression.PropertyExpression;
+import org.apache.rocketmq.filter.parser.SelectorParser;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ExpressionTest {
+
+    private static String andExpression = "a=3 and b<>4 And c>5 AND d<=4";
+    private static String orExpression = "a=3 or b<>4 Or c>5 OR d<=4";
+    private static String inExpression = "a in ('3', '4', '5')";
+    private static String notInExpression = "a not in ('3', '4', '5')";
+    private static String betweenExpression = "a between 2 and 10";
+    private static String notBetweenExpression = "a not between 2 and 10";
+    private static String isNullExpression = "a is null";
+    private static String isNotNullExpression = "a is not null";
+    private static String equalExpression = "a is not null and a='hello'";
+    private static String booleanExpression = "a=TRUE OR b=FALSE";
+    private static String nullOrExpression = "a is null OR a='hello'";
+    private static String stringHasString = "TAGS is not null and TAGS='''''tag'''''";
+
+    @Test
+    public void testEvaluate_stringHasString() {
+        Expression expr = genExp(stringHasString);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("TAGS", "''tag''")
+        );
+
+        eval(expr, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_now() {
+        EvaluationContext context = genContext(
+            KeyValue.c("a", System.currentTimeMillis())
+        );
+
+        Expression nowExpression = ConstantExpression.createNow();
+        Expression propertyExpression = new PropertyExpression("a");
+
+        Expression expression = ComparisonExpression.createLessThanEqual(propertyExpression,
+            nowExpression);
+
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_stringCompare() {
+        Expression expression = genExp("a between up and low");
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "3.14")
+        );
+
+        eval(expression, context, Boolean.FALSE);
+
+        {
+            context = genContext(
+                KeyValue.c("a", "3.14"),
+                KeyValue.c("up", "up"),
+                KeyValue.c("low", "low")
+            );
+
+            eval(expression, context, Boolean.FALSE);
+        }
+
+        {
+            expression = genExp("key is not null and key between 0 and 100");
+
+            context = genContext(
+                KeyValue.c("key", "con")
+            );
+
+            eval(expression, context, Boolean.FALSE);
+        }
+
+        {
+            expression = genExp("a between 0 and 100");
+
+            context = genContext(
+                KeyValue.c("a", "abc")
+            );
+
+            eval(expression, context, Boolean.FALSE);
+        }
+
+        {
+            expression = genExp("a=b");
+
+            context = genContext(
+                KeyValue.c("a", "3.14"),
+                KeyValue.c("b", "3.14")
+            );
+
+            eval(expression, context, Boolean.TRUE);
+        }
+
+        {
+            expression = genExp("a<>b");
+
+            context = genContext(
+                KeyValue.c("a", "3.14"),
+                KeyValue.c("b", "3.14")
+            );
+
+            eval(expression, context, Boolean.FALSE);
+        }
+
+        {
+            expression = genExp("a<>b");
+
+            context = genContext(
+                KeyValue.c("a", "3.14"),
+                KeyValue.c("b", "3.141")
+            );
+
+            eval(expression, context, Boolean.TRUE);
+        }
+    }
+
+    @Test
+    public void testEvaluate_exponent() {
+        Expression expression = genExp("a > 3.1E10");
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", String.valueOf(3.1415 * Math.pow(10, 10)))
+        );
+
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_floatNumber() {
+        Expression expression = genExp("a > 3.14");
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", String.valueOf(3.1415))
+        );
+
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_twoVariable() {
+        Expression expression = genExp("a > b");
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", String.valueOf(10)),
+            KeyValue.c("b", String.valueOf(20))
+        );
+
+        eval(expression, context, Boolean.FALSE);
+
+        context = genContext(
+            KeyValue.c("b", String.valueOf(10)),
+            KeyValue.c("a", String.valueOf(20))
+        );
+
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_nullOr() {
+        Expression expression = genExp(nullOrExpression);
+
+        EvaluationContext context = genContext(
+        );
+
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+            KeyValue.c("a", "hello")
+        );
+
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+            KeyValue.c("a", "abc")
+        );
+
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_boolean() {
+        Expression expression = genExp(booleanExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "true"),
+            KeyValue.c("b", "false")
+        );
+
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+            KeyValue.c("a", "false"),
+            KeyValue.c("b", "true")
+        );
+
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_equal() {
+        Expression expression = genExp(equalExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "hello")
+        );
+
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+        );
+
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_andTrue() {
+        Expression expression = genExp(andExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", 3),
+            KeyValue.c("b", 5),
+            KeyValue.c("c", 6),
+            KeyValue.c("d", 1)
+        );
+
+        for (int i = 0; i < 500; i++) {
+            eval(expression, context, Boolean.TRUE);
+        }
+
+        long start = System.currentTimeMillis();
+        for (int j = 0; j < 100; j++) {
+            for (int i = 0; i < 1000; i++) {
+                eval(expression, context, Boolean.TRUE);
+            }
+        }
+
+        // use string
+        context = genContext(
+            KeyValue.c("a", "3"),
+            KeyValue.c("b", "5"),
+            KeyValue.c("c", "6"),
+            KeyValue.c("d", "1")
+        );
+
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_andFalse() {
+        Expression expression = genExp(andExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", 4),
+            KeyValue.c("b", 5),
+            KeyValue.c("c", 6),
+            KeyValue.c("d", 1)
+        );
+
+        eval(expression, context, Boolean.FALSE);
+
+        // use string
+        context = genContext(
+            KeyValue.c("a", "4"),
+            KeyValue.c("b", "5"),
+            KeyValue.c("c", "6"),
+            KeyValue.c("d", "1")
+        );
+
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_orTrue() {
+        Expression expression = genExp(orExpression);
+
+        // first
+        EvaluationContext context = genContext(
+            KeyValue.c("a", 3)
+        );
+        eval(expression, context, Boolean.TRUE);
+
+        // second
+        context = genContext(
+            KeyValue.c("a", 4),
+            KeyValue.c("b", 5)
+        );
+        eval(expression, context, Boolean.TRUE);
+
+        // third
+        context = genContext(
+            KeyValue.c("a", 4),
+            KeyValue.c("b", 4),
+            KeyValue.c("c", 6)
+        );
+        eval(expression, context, Boolean.TRUE);
+
+        // forth
+        context = genContext(
+            KeyValue.c("a", 4),
+            KeyValue.c("b", 4),
+            KeyValue.c("c", 3),
+            KeyValue.c("d", 2)
+        );
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_orFalse() {
+        Expression expression = genExp(orExpression);
+        // forth
+        EvaluationContext context = genContext(
+            KeyValue.c("a", 4),
+            KeyValue.c("b", 4),
+            KeyValue.c("c", 3),
+            KeyValue.c("d", 10)
+        );
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_inTrue() {
+        Expression expression = genExp(inExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "3")
+        );
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+            KeyValue.c("a", "4")
+        );
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+            KeyValue.c("a", "5")
+        );
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_inFalse() {
+        Expression expression = genExp(inExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "8")
+        );
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_notInTrue() {
+        Expression expression = genExp(notInExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "8")
+        );
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_notInFalse() {
+        Expression expression = genExp(notInExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "3")
+        );
+        eval(expression, context, Boolean.FALSE);
+
+        context = genContext(
+            KeyValue.c("a", "4")
+        );
+        eval(expression, context, Boolean.FALSE);
+
+        context = genContext(
+            KeyValue.c("a", "5")
+        );
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_betweenTrue() {
+        Expression expression = genExp(betweenExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "2")
+        );
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+            KeyValue.c("a", "10")
+        );
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+            KeyValue.c("a", "3")
+        );
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_betweenFalse() {
+        Expression expression = genExp(betweenExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "1")
+        );
+        eval(expression, context, Boolean.FALSE);
+
+        context = genContext(
+            KeyValue.c("a", "11")
+        );
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_notBetweenTrue() {
+        Expression expression = genExp(notBetweenExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "1")
+        );
+        eval(expression, context, Boolean.TRUE);
+
+        context = genContext(
+            KeyValue.c("a", "11")
+        );
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_notBetweenFalse() {
+        Expression expression = genExp(notBetweenExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "2")
+        );
+        eval(expression, context, Boolean.FALSE);
+
+        context = genContext(
+            KeyValue.c("a", "10")
+        );
+        eval(expression, context, Boolean.FALSE);
+
+        context = genContext(
+            KeyValue.c("a", "3")
+        );
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_isNullTrue() {
+        Expression expression = genExp(isNullExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("abc", "2")
+        );
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_isNullFalse() {
+        Expression expression = genExp(isNullExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "2")
+        );
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    @Test
+    public void testEvaluate_isNotNullTrue() {
+        Expression expression = genExp(isNotNullExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("a", "2")
+        );
+        eval(expression, context, Boolean.TRUE);
+    }
+
+    @Test
+    public void testEvaluate_isNotNullFalse() {
+        Expression expression = genExp(isNotNullExpression);
+
+        EvaluationContext context = genContext(
+            KeyValue.c("abc", "2")
+        );
+        eval(expression, context, Boolean.FALSE);
+    }
+
+    protected void eval(Expression expression, EvaluationContext context, Boolean result) {
+        Object ret = null;
+        try {
+            ret = expression.evaluate(context);
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+
+        if (ret == null || !(ret instanceof Boolean)) {
+            assertThat(result).isFalse();
+        } else {
+            assertThat(result).isEqualTo(ret);
+        }
+    }
+
+    protected EvaluationContext genContext(KeyValue... keyValues) {
+        if (keyValues == null || keyValues.length < 1) {
+            return new PropertyContext();
+        }
+
+        PropertyContext context = new PropertyContext();
+        for (KeyValue keyValue : keyValues) {
+            context.properties.put(keyValue.key, keyValue.value);
+        }
+
+        return context;
+    }
+
+    protected Expression genExp(String exp) {
+        Expression expression = null;
+
+        try {
+            expression = SelectorParser.parse(exp);
+
+            assertThat(expression).isNotNull();
+        } catch (MQFilterException e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+
+        return expression;
+    }
+
+    static class KeyValue {
+        public static KeyValue c(String key, Object value) {
+            return new KeyValue(key, value);
+        }
+
+        public KeyValue(String key, Object value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public String key;
+        public Object value;
+    }
+
+    class PropertyContext implements EvaluationContext {
+
+        public Map<String, Object> properties = new HashMap<String, Object>(8);
+
+        @Override
+        public Object get(final String name) {
+            return properties.get(name);
+        }
+
+        @Override
+        public Map<String, Object> keyValues() {
+            return properties;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
new file mode 100644
index 0000000..22eeb86
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.filter.expression.EmptyEvaluationContext;
+import org.apache.rocketmq.filter.expression.EvaluationContext;
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class FilterSpiTest {
+
+    static class NothingExpression implements Expression {
+
+        @Override
+        public Object evaluate(final EvaluationContext context) throws Exception {
+            return Boolean.TRUE;
+        }
+    }
+
+    static class NothingFilter implements FilterSpi {
+        @Override
+        public Expression compile(final String expr) throws MQFilterException {
+            return new NothingExpression();
+        }
+
+        @Override
+        public String ofType() {
+            return "Nothing";
+        }
+    }
+
+
+    @Test
+    public void testRegister() {
+        FilterFactory.INSTANCE.register(new NothingFilter());
+
+        Expression expr = null;
+        try {
+            expr = FilterFactory.INSTANCE.get("Nothing").compile("abc");
+        } catch (MQFilterException e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+
+        assertThat(expr).isNotNull();
+
+        try {
+            assertThat((Boolean) expr.evaluate(new EmptyEvaluationContext())).isTrue();
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+
+    @Test
+    public void testGet() {
+        try {
+            assertThat((Boolean) FilterFactory.INSTANCE.get(ExpressionType.SQL92).compile("a is not null and a > 0")
+                .evaluate(new EmptyEvaluationContext())).isFalse();
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java b/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java
new file mode 100644
index 0000000..36ef271
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+import org.apache.rocketmq.filter.parser.SelectorParser;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ParserTest {
+
+    private static String andExpression = "a=3 and b<>4 And c>5 AND d<=4";
+    private static String andExpressionHasBlank = "a=3  and    b<>4 And c>5 AND d<=4";
+    private static String orExpression = "a=3 or b<>4 Or c>5 OR d<=4";
+    private static String inExpression = "a in ('3', '4', '5')";
+    private static String notInExpression = "(a not in ('6', '4', '5')) or (b in ('3', '4', '5'))";
+    private static String betweenExpression = "(a between 2 and 10) AND (b not between 6 and 9)";
+    private static String equalNullExpression = "a is null";
+    private static String notEqualNullExpression = "a is not null";
+    private static String nowExpression = "a <= now";
+
+    private static String invalidExpression = "a and between 2 and 10";
+    private static String illegalBetween = " a between 10 and 0";
+
+    @Test
+    public void testParse_valid() {
+        for (String expr : Arrays.asList(
+            andExpression, orExpression, inExpression, notInExpression, betweenExpression,
+            equalNullExpression, notEqualNullExpression, nowExpression
+        )) {
+
+            try {
+                Expression expression = SelectorParser.parse(expr);
+                assertThat(expression).isNotNull();
+            } catch (MQFilterException e) {
+                e.printStackTrace();
+                assertThat(Boolean.FALSE).isTrue();
+            }
+
+        }
+    }
+
+    @Test
+    public void testParse_invalid() {
+        try {
+            SelectorParser.parse(invalidExpression);
+
+            assertThat(Boolean.TRUE).isFalse();
+        } catch (MQFilterException e) {
+        }
+    }
+
+    @Test
+    public void testParse_decimalOverFlow() {
+        try {
+            String str = "100000000000000000000000";
+
+            SelectorParser.parse("a > " + str);
+
+            assertThat(Boolean.TRUE).isFalse();
+        } catch (Exception e) {
+        }
+    }
+
+    @Test
+    public void testParse_floatOverFlow() {
+        try {
+            String str = "1";
+            for (int i = 0; i < 2048; i++) {
+                str += "111111111111111111111111111111111111111111111111111";
+            }
+            str += ".";
+            for (int i = 0; i < 2048; i++) {
+                str += "111111111111111111111111111111111111111111111111111";
+            }
+
+            SelectorParser.parse("a > " + str);
+
+            assertThat(Boolean.TRUE).isFalse();
+        } catch (Exception e) {
+        }
+    }
+
+    @Test
+    public void testParse_illegalBetween() {
+        try {
+            SelectorParser.parse(illegalBetween);
+
+            assertThat(Boolean.TRUE).isFalse();
+        } catch (Exception e) {
+        }
+    }
+
+    @Test
+    public void testEquals() {
+        try {
+            Expression expr1 = SelectorParser.parse(andExpression);
+
+            Expression expr2 = SelectorParser.parse(andExpressionHasBlank);
+
+            Expression expr3 = SelectorParser.parse(orExpression);
+
+            assertThat(expr1).isEqualTo(expr2);
+            assertThat(expr1).isNotEqualTo(expr3);
+        } catch (MQFilterException e) {
+            e.printStackTrace();
+            assertThat(Boolean.TRUE).isFalse();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 47df84d..feb8b14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,6 +178,7 @@
         <module>example</module>
         <module>filtersrv</module>
         <module>srvutil</module>
+        <module>filter</module>
         <module>test</module>
         <module>distribution</module>
     </modules>
@@ -554,6 +555,11 @@
                 <version>${project.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-filter</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>${project.groupId}</groupId>
                 <artifactId>rocketmq-example</artifactId>
                 <version>${project.version}</version>
@@ -603,6 +609,11 @@
                 <artifactId>commons-lang3</artifactId>
                 <version>3.4</version>
             </dependency>
+            <dependency>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>19.0</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/srvutil/pom.xml
----------------------------------------------------------------------
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 3269903..6dc0377 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -41,5 +41,9 @@
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 5be8258..7841feb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -314,10 +314,11 @@ public class CommitLog {
 
             // 17 properties
             short propertiesLength = byteBuffer.getShort();
+            Map<String, String> propertiesMap = null;
             if (propertiesLength > 0) {
                 byteBuffer.get(bytesContent, 0, propertiesLength);
                 String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
-                Map<String, String> propertiesMap = MessageDecoder.string2messageProperties(properties);
+                propertiesMap = MessageDecoder.string2messageProperties(properties);
 
                 keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
 
@@ -369,8 +370,9 @@ public class CommitLog {
                 queueOffset, // 7
                 keys, // 8
                 uniqKey, //9
-                sysFlag, // 9
-                preparedTransactionOffset// 10
+                sysFlag, // 10
+                preparedTransactionOffset, // 11
+                propertiesMap // 12
             );
         } catch (Exception e) {
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java b/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java
new file mode 100644
index 0000000..e1564a9
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+/**
+ * Dispatcher of commit log.
+ */
+public interface CommitLogDispatcher {
+
+    void dispatch(final DispatchRequest request);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 919c637..d03ff0f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,7 @@ public class ConsumeQueue {
     private final int mappedFileSize;
     private long maxPhysicOffset = -1;
     private volatile long minLogicOffset = 0;
+    private ConsumeQueueExt consumeQueueExt = null;
 
     public ConsumeQueue(
         final String topic,
@@ -61,11 +63,24 @@ public class ConsumeQueue {
         this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
 
         this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
+
+        if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
+            this.consumeQueueExt = new ConsumeQueueExt(
+                topic,
+                queueId,
+                StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
+                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
+                defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
+            );
+        }
     }
 
     public boolean load() {
         boolean result = this.mappedFileQueue.load();
         log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
+        if (isExtReadEnable()) {
+            result &= this.consumeQueueExt.load();
+        }
         return result;
     }
 
@@ -82,6 +97,7 @@ public class ConsumeQueue {
             ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
             long processOffset = mappedFile.getFileFromOffset();
             long mappedFileOffset = 0;
+            long maxExtAddr = 1;
             while (true) {
                 for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                     long offset = byteBuffer.getLong();
@@ -91,6 +107,9 @@ public class ConsumeQueue {
                     if (offset >= 0 && size > 0) {
                         mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                         this.maxPhysicOffset = offset;
+                        if (isExtAddr(tagsCode)) {
+                            maxExtAddr = tagsCode;
+                        }
                     } else {
                         log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                             + offset + " " + size + " " + tagsCode);
@@ -123,6 +142,12 @@ public class ConsumeQueue {
             this.mappedFileQueue.setFlushedWhere(processOffset);
             this.mappedFileQueue.setCommittedWhere(processOffset);
             this.mappedFileQueue.truncateDirtyFiles(processOffset);
+
+            if (isExtReadEnable()) {
+                this.consumeQueueExt.recover();
+                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
+                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
+            }
         }
     }
 
@@ -200,7 +225,7 @@ public class ConsumeQueue {
         int logicFileSize = this.mappedFileSize;
 
         this.maxPhysicOffset = phyOffet - 1;
-
+        long maxExtAddr = 1;
         while (true) {
             MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
             if (mappedFile != null) {
@@ -213,7 +238,7 @@ public class ConsumeQueue {
                 for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
                     long offset = byteBuffer.getLong();
                     int size = byteBuffer.getInt();
-                    byteBuffer.getLong();
+                    long tagsCode = byteBuffer.getLong();
 
                     if (0 == i) {
                         if (offset >= phyOffet) {
@@ -225,6 +250,10 @@ public class ConsumeQueue {
                             mappedFile.setCommittedPosition(pos);
                             mappedFile.setFlushedPosition(pos);
                             this.maxPhysicOffset = offset;
+                            // This maybe not take effect, when not every consume queue has extend file.
+                            if (isExtAddr(tagsCode)) {
+                                maxExtAddr = tagsCode;
+                            }
                         }
                     } else {
 
@@ -239,6 +268,9 @@ public class ConsumeQueue {
                             mappedFile.setCommittedPosition(pos);
                             mappedFile.setFlushedPosition(pos);
                             this.maxPhysicOffset = offset;
+                            if (isExtAddr(tagsCode)) {
+                                maxExtAddr = tagsCode;
+                            }
 
                             if (pos == logicFileSize) {
                                 return;
@@ -252,6 +284,10 @@ public class ConsumeQueue {
                 break;
             }
         }
+
+        if (isExtReadEnable()) {
+            this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
+        }
     }
 
     public long getLastOffset() {
@@ -285,7 +321,12 @@ public class ConsumeQueue {
     }
 
     public boolean flush(final int flushLeastPages) {
-        return this.mappedFileQueue.flush(flushLeastPages);
+        boolean result = this.mappedFileQueue.flush(flushLeastPages);
+        if (isExtReadEnable()) {
+            result = result & this.consumeQueueExt.flush(flushLeastPages);
+        }
+
+        return result;
     }
 
     public int deleteExpiredFile(long offset) {
@@ -296,6 +337,7 @@ public class ConsumeQueue {
 
     public void correctMinOffset(long phyMinOffset) {
         MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
+        long minExtAddr = 1;
         if (mappedFile != null) {
             SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
             if (result != null) {
@@ -303,12 +345,16 @@ public class ConsumeQueue {
                     for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                         long offsetPy = result.getByteBuffer().getLong();
                         result.getByteBuffer().getInt();
-                        result.getByteBuffer().getLong();
+                        long tagsCode = result.getByteBuffer().getLong();
 
                         if (offsetPy >= phyMinOffset) {
                             this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
                             log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
                                     this.getMinOffsetInQueue(), this.topic, this.queueId);
+                            // This maybe not take effect, when not every consume queue has extend file.
+                            if (isExtAddr(tagsCode)) {
+                                minExtAddr = tagsCode;
+                            }
                             break;
                         }
                     }
@@ -319,24 +365,43 @@ public class ConsumeQueue {
                 }
             }
         }
+
+        if (isExtReadEnable()) {
+            this.consumeQueueExt.truncateByMinAddress(minExtAddr);
+        }
     }
 
     public long getMinOffsetInQueue() {
         return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
     }
 
-    public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
-        long logicOffset) {
+    public void putMessagePositionInfoWrapper(DispatchRequest request) {
         final int maxRetries = 30;
         boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
         for (int i = 0; i < maxRetries && canWrite; i++) {
-            boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
+            long tagsCode = request.getTagsCode();
+            if (isExtWriteEnable()) {
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                cqExtUnit.setFilterBitMap(request.getBitMap());
+                cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
+                cqExtUnit.setTagsCode(request.getTagsCode());
+
+                long extAddr = this.consumeQueueExt.put(cqExtUnit);
+                if (isExtAddr(extAddr)) {
+                    tagsCode = extAddr;
+                } else {
+                    log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
+                        topic, queueId, request.getCommitLogOffset());
+                }
+            }
+            boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
+                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
             if (result) {
-                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
+                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                 return;
             } else {
                 // XXX: warn and notify me
-                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
+                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                     + " failed, retry " + i + " times");
 
                 try {
@@ -423,6 +488,20 @@ public class ConsumeQueue {
         return null;
     }
 
+    public ConsumeQueueExt.CqExtUnit getExt(final long offset) {
+        if (isExtReadEnable()) {
+            return this.consumeQueueExt.get(offset);
+        }
+        return null;
+    }
+
+    public boolean getExt(final long offset, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+        if (isExtReadEnable()) {
+            return this.consumeQueueExt.get(offset, cqExtUnit);
+        }
+        return false;
+    }
+
     public long getMinLogicOffset() {
         return minLogicOffset;
     }
@@ -457,6 +536,9 @@ public class ConsumeQueue {
         this.maxPhysicOffset = -1;
         this.minLogicOffset = 0;
         this.mappedFileQueue.destroy();
+        if (isExtReadEnable()) {
+            this.consumeQueueExt.destroy();
+        }
     }
 
     public long getMessageTotalInQueue() {
@@ -469,5 +551,27 @@ public class ConsumeQueue {
 
     public void checkSelf() {
         mappedFileQueue.checkSelf();
+        if (isExtReadEnable()) {
+            this.consumeQueueExt.checkSelf();
+        }
+    }
+
+    protected boolean isExtReadEnable() {
+        return this.consumeQueueExt != null;
+    }
+
+    protected boolean isExtWriteEnable() {
+        return this.consumeQueueExt != null
+            && this.defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt();
+    }
+
+    /**
+     * Check {@code tagsCode} is address of extend file or tags code.
+     *
+     * @param tagsCode
+     * @return
+     */
+    public boolean isExtAddr(long tagsCode) {
+        return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
new file mode 100644
index 0000000..1a177e9
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Extend of consume queue, to store something not important,
+ * such as message store time, filter bit map and etc.
+ * <p/>
+ * <li>1. This class is used only by {@link ConsumeQueue}</li>
+ * <li>2. And is week reliable.</li>
+ * <li>3. Be careful, address returned is always less than 0.</li>
+ * <li>4. Pls keep this file small.</li>
+ */
+public class ConsumeQueueExt {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private final MappedFileQueue mappedFileQueue;
+    private final String topic;
+    private final int queueId;
+
+    private final String storePath;
+    private final int mappedFileSize;
+    private ByteBuffer tempContainer;
+
+    public static final int END_BLANK_DATA_LENGTH = 4;
+
+    /**
+     * Addr can not exceed this value.For compatible.
+     */
+    public static final long MAX_ADDR = Integer.MIN_VALUE - 1L;
+    public static final long MAX_REAL_OFFSET = MAX_ADDR - Long.MIN_VALUE;
+
+    /**
+     * Constructor.
+     *
+     * @param topic          topic
+     * @param queueId        id of queue
+     * @param storePath      root dir of files to store.
+     * @param mappedFileSize file size
+     * @param bitMapLength   bit map length.
+     */
+    public ConsumeQueueExt(final String topic,
+                           final int queueId,
+                           final String storePath,
+                           final int mappedFileSize,
+                           final int bitMapLength) {
+
+        this.storePath = storePath;
+        this.mappedFileSize = mappedFileSize;
+
+        this.topic = topic;
+        this.queueId = queueId;
+
+        String queueDir = this.storePath
+            + File.separator + topic
+            + File.separator + queueId;
+
+        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
+
+        if (bitMapLength > 0) {
+            this.tempContainer = ByteBuffer.allocate(
+                bitMapLength / Byte.SIZE
+            );
+        }
+    }
+
+    /**
+     * Check whether {@code address} point to extend file.
+     * <p>
+     * Just test {@code address} is less than 0.
+     * </p>
+     *
+     * @param address
+     * @return
+     */
+    public boolean isExtAddr(final long address) {
+        return address <= MAX_ADDR;
+    }
+
+    /**
+     * Transform {@code address}(decorated by {@link #decorate}) to offset in mapped file.
+     * <p>
+     * if {@code address} is less than 0, return {@code address} - {@link java.lang.Long#MIN_VALUE};
+     * else, just return {@code address}
+     * </p>
+     *
+     * @param address
+     * @return
+     */
+    public long unDecorate(final long address) {
+        if (isExtAddr(address)) {
+            return address - Long.MIN_VALUE;
+        }
+        return address;
+    }
+
+    /**
+     * Decorate {@code offset} from mapped file, in order to distinguish with tagsCode(saved in cq originally).
+     * <p>
+     * if {@code offset} is greater than or equal to 0, then return {@code offset} + {@link java.lang.Long#MIN_VALUE};
+     * else, just return {@code offset}
+     * </p>
+     *
+     * @param offset
+     * @return ext address(value is less than 0)
+     */
+    public long decorate(final long offset) {
+        if (!isExtAddr(offset)) {
+            return offset + Long.MIN_VALUE;
+        }
+        return offset;
+    }
+
+    /**
+     * Get data from buffer.
+     *
+     * @param address less than 0
+     * @return
+     */
+    public CqExtUnit get(final long address) {
+        CqExtUnit cqExtUnit = new CqExtUnit();
+        if (get(address, cqExtUnit)) {
+            return cqExtUnit;
+        }
+
+        return null;
+    }
+
+    /**
+     * Get data from buffer, and set to {@code cqExtUnit}
+     *
+     * @param address   less than 0
+     * @param cqExtUnit
+     * @return
+     */
+    public boolean get(final long address, final CqExtUnit cqExtUnit) {
+        if (!isExtAddr(address)) {
+            return false;
+        }
+
+        final int mappedFileSize = this.mappedFileSize;
+        final long realOffset = unDecorate(address);
+
+        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0);
+        if (mappedFile == null) {
+            return false;
+        }
+
+        int pos = (int) (realOffset % mappedFileSize);
+
+        SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos);
+        if (bufferResult == null) {
+            log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset);
+            return false;
+        }
+        boolean ret = false;
+        try {
+            ret = cqExtUnit.read(bufferResult.getByteBuffer());
+        } finally {
+            bufferResult.release();
+        }
+
+        return ret;
+    }
+
+    /**
+     * Save to mapped buffer of file and return address.
+     * <p>
+     * Be careful, this method is not thread safe.
+     * </p>
+     *
+     * @param cqExtUnit
+     * @return success: < 0: fail: >=0
+     */
+    public long put(final CqExtUnit cqExtUnit) {
+        final int retryTimes = 3;
+        try {
+            int size = cqExtUnit.calcUnitSize();
+            if (size > CqExtUnit.MAX_EXT_UNIT_SIZE) {
+                log.error("Size of cq ext unit is greater than {}, {}", CqExtUnit.MAX_EXT_UNIT_SIZE, cqExtUnit);
+                return 1;
+            }
+            if (this.mappedFileQueue.getMaxOffset() + size > MAX_REAL_OFFSET) {
+                log.warn("Capacity of ext is maximum!{}, {}", this.mappedFileQueue.getMaxOffset(), size);
+                return 1;
+            }
+            // unit size maybe change.but, the same most of the time.
+            if (this.tempContainer == null || this.tempContainer.capacity() < size) {
+                this.tempContainer = ByteBuffer.allocate(size);
+            }
+
+            for (int i = 0; i < retryTimes; i++) {
+                MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+
+                if (mappedFile == null || mappedFile.isFull()) {
+                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
+                }
+
+                if (mappedFile == null) {
+                    log.error("Create mapped file when save consume queue extend, {}", cqExtUnit);
+                    continue;
+                }
+                final int wrotePosition = mappedFile.getWrotePosition();
+                final int blankSize = this.mappedFileSize - wrotePosition - END_BLANK_DATA_LENGTH;
+
+                // check whether has enough space.
+                if (size > blankSize) {
+                    fullFillToEnd(mappedFile, wrotePosition);
+                    log.info("No enough space(need:{}, has:{}) of file {}, so fill to end",
+                        size, blankSize, mappedFile.getFileName());
+                    continue;
+                }
+
+                if (mappedFile.appendMessage(cqExtUnit.write(this.tempContainer), 0, size)) {
+                    return decorate(wrotePosition + mappedFile.getFileFromOffset());
+                }
+            }
+        } catch (Throwable e) {
+            log.error("Save consume queue extend error, " + cqExtUnit, e);
+        }
+
+        return 1;
+    }
+
+    protected void fullFillToEnd(final MappedFile mappedFile, final int wrotePosition) {
+        ByteBuffer mappedFileBuffer = mappedFile.sliceByteBuffer();
+        mappedFileBuffer.position(wrotePosition);
+
+        // ending.
+        mappedFileBuffer.putShort((short) -1);
+
+        mappedFile.setWrotePosition(this.mappedFileSize);
+    }
+
+    /**
+     * Load data from file when startup.
+     *
+     * @return
+     */
+    public boolean load() {
+        boolean result = this.mappedFileQueue.load();
+        log.info("load consume queue extend" + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
+        return result;
+    }
+
+    /**
+     * Check whether the step size in mapped file queue is correct.
+     */
+    public void checkSelf() {
+        this.mappedFileQueue.checkSelf();
+    }
+
+    /**
+     * Recover.
+     */
+    public void recover() {
+        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+        if (mappedFiles == null || mappedFiles.isEmpty()) {
+            return;
+        }
+
+        // load all files, consume queue will truncate extend files.
+        int index = 0;
+
+        MappedFile mappedFile = mappedFiles.get(index);
+        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+        long processOffset = mappedFile.getFileFromOffset();
+        long mappedFileOffset = 0;
+        CqExtUnit extUnit = new CqExtUnit();
+        while (true) {
+            extUnit.readBySkip(byteBuffer);
+
+            // check whether write sth.
+            if (extUnit.getSize() > 0) {
+                mappedFileOffset += extUnit.getSize();
+                continue;
+            }
+
+            index++;
+            if (index < mappedFiles.size()) {
+                mappedFile = mappedFiles.get(index);
+                byteBuffer = mappedFile.sliceByteBuffer();
+                processOffset = mappedFile.getFileFromOffset();
+                mappedFileOffset = 0;
+                log.info("Recover next consume queue extend file, " + mappedFile.getFileName());
+                continue;
+            }
+
+            log.info("All files of consume queue extend has been recovered over, last mapped file "
+                + mappedFile.getFileName());
+            break;
+        }
+
+        processOffset += mappedFileOffset;
+        this.mappedFileQueue.setFlushedWhere(processOffset);
+        this.mappedFileQueue.setCommittedWhere(processOffset);
+        this.mappedFileQueue.truncateDirtyFiles(processOffset);
+    }
+
+    /**
+     * Delete files before {@code minAddress}.
+     *
+     * @param minAddress less than 0
+     */
+    public void truncateByMinAddress(final long minAddress) {
+        if (!isExtAddr(minAddress)) {
+            return;
+        }
+
+        log.info("Truncate consume queue ext by min {}.", minAddress);
+
+        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
+
+        List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+        final long realOffset = unDecorate(minAddress);
+
+        for (MappedFile file : mappedFiles) {
+            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
+
+            if (fileTailOffset < realOffset) {
+                log.info("Destroy consume queue ext by min: file={}, fileTailOffset={}, minOffset={}", file.getFileName(),
+                    fileTailOffset, realOffset);
+                if (file.destroy(1000)) {
+                    willRemoveFiles.add(file);
+                }
+            }
+        }
+
+        this.mappedFileQueue.deleteExpiredFile(willRemoveFiles);
+    }
+
+    /**
+     * Delete files after {@code maxAddress}, and reset wrote/commit/flush position to last file.
+     *
+     * @param maxAddress less than 0
+     */
+    public void truncateByMaxAddress(final long maxAddress) {
+        if (!isExtAddr(maxAddress)) {
+            return;
+        }
+
+        log.info("Truncate consume queue ext by max {}.", maxAddress);
+
+        CqExtUnit cqExtUnit = get(maxAddress);
+        if (cqExtUnit == null) {
+            log.error("[BUG] address {} of consume queue extend not found!", maxAddress);
+            return;
+        }
+
+        final long realOffset = unDecorate(maxAddress);
+
+        this.mappedFileQueue.truncateDirtyFiles(realOffset + cqExtUnit.getSize());
+    }
+
+    /**
+     * flush buffer to file.
+     *
+     * @param flushLeastPages
+     * @return
+     */
+    public boolean flush(final int flushLeastPages) {
+        return this.mappedFileQueue.flush(flushLeastPages);
+    }
+
+    /**
+     * delete files and directory.
+     */
+    public void destroy() {
+        this.mappedFileQueue.destroy();
+    }
+
+    /**
+     * Max address(value is less than 0).
+     * <p/>
+     * <p>
+     * Be careful: it's an address just when invoking this method.
+     * </p>
+     *
+     * @return
+     */
+    public long getMaxAddress() {
+        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+        if (mappedFile == null) {
+            return decorate(0);
+        }
+        return decorate(mappedFile.getFileFromOffset() + mappedFile.getWrotePosition());
+    }
+
+    /**
+     * Minus address saved in file.
+     *
+     * @return
+     */
+    public long getMinAddress() {
+        MappedFile firstFile = this.mappedFileQueue.getFirstMappedFile();
+        if (firstFile == null) {
+            return decorate(0);
+        }
+        return decorate(firstFile.getFileFromOffset());
+    }
+
+    /**
+     * Store unit.
+     */
+    public static class CqExtUnit {
+        public static final short MIN_EXT_UNIT_SIZE
+            = 2 * 1 // size, 32k max
+            + 8 * 2 // msg time + tagCode
+            + 2; // bitMapSize
+
+        public static final int MAX_EXT_UNIT_SIZE = Short.MAX_VALUE;
+
+        public CqExtUnit() {}
+
+        public CqExtUnit(Long tagsCode, long msgStoreTime, byte[] filterBitMap) {
+            this.tagsCode = tagsCode == null ? 0 : tagsCode;
+            this.msgStoreTime = msgStoreTime;
+            this.filterBitMap = filterBitMap;
+            this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
+            this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);
+        }
+
+        /**
+         * unit size
+         */
+        private short size;
+        /**
+         * has code of tags
+         */
+        private long tagsCode;
+        /**
+         * the time to store into commit log of message
+         */
+        private long msgStoreTime;
+        /**
+         * size of bit map
+         */
+        private short bitMapSize;
+        /**
+         * filter bit map
+         */
+        private byte[] filterBitMap;
+
+        /**
+         * build unit from buffer from current position.
+         *
+         * @param buffer
+         * @return
+         */
+        private boolean read(final ByteBuffer buffer) {
+            if (buffer.position() + 2 > buffer.limit()) {
+                return false;
+            }
+
+            this.size = buffer.getShort();
+
+            if (this.size < 1) {
+                return false;
+            }
+
+            this.tagsCode = buffer.getLong();
+            this.msgStoreTime = buffer.getLong();
+            this.bitMapSize = buffer.getShort();
+
+            if (this.bitMapSize < 1) {
+                return true;
+            }
+
+            if (this.filterBitMap == null || this.filterBitMap.length != this.bitMapSize) {
+                this.filterBitMap = new byte[bitMapSize];
+            }
+
+            buffer.get(this.filterBitMap);
+            return true;
+        }
+
+        /**
+         * Only read first 2 byte to get unit size.
+         * <p>
+         * if size > 0, then skip buffer position with size.
+         * </p>
+         * <p>
+         * if size <= 0, nothing to do.
+         * </p>
+         *
+         * @param buffer
+         */
+        private void readBySkip(final ByteBuffer buffer) {
+            ByteBuffer temp = buffer.slice();
+
+            short tempSize = temp.getShort();
+            this.size = tempSize;
+
+            if (tempSize > 0) {
+                buffer.position(buffer.position() + this.size);
+            }
+        }
+
+        /**
+         * Transform unit data to byte array.
+         * <p/>
+         * <li>1. @{code container} can be null, it will be created if null.</li>
+         * <li>2. if capacity of @{code container} is less than unit size, it will be created also.</li>
+         * <li>3. Pls be sure that size of unit is not greater than {@link #MAX_EXT_UNIT_SIZE}</li>
+         *
+         * @param container
+         * @return
+         */
+        private byte[] write(final ByteBuffer container) {
+            this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
+            this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);
+
+            ByteBuffer temp = container;
+
+            if (temp == null || temp.capacity() < this.size) {
+                temp = ByteBuffer.allocate(this.size);
+            }
+
+            temp.flip();
+            temp.limit(this.size);
+
+            temp.putShort(this.size);
+            temp.putLong(this.tagsCode);
+            temp.putLong(this.msgStoreTime);
+            temp.putShort(this.bitMapSize);
+            if (this.bitMapSize > 0) {
+                temp.put(this.filterBitMap);
+            }
+
+            return temp.array();
+        }
+
+        /**
+         * Calculate unit size by current data.
+         *
+         * @return
+         */
+        private int calcUnitSize() {
+            int sizeTemp = MIN_EXT_UNIT_SIZE + (filterBitMap == null ? 0 : filterBitMap.length);
+            return sizeTemp;
+        }
+
+        public long getTagsCode() {
+            return tagsCode;
+        }
+
+        public void setTagsCode(final long tagsCode) {
+            this.tagsCode = tagsCode;
+        }
+
+        public long getMsgStoreTime() {
+            return msgStoreTime;
+        }
+
+        public void setMsgStoreTime(final long msgStoreTime) {
+            this.msgStoreTime = msgStoreTime;
+        }
+
+        public byte[] getFilterBitMap() {
+            if (this.bitMapSize < 1) {
+                return null;
+            }
+            return filterBitMap;
+        }
+
+        public void setFilterBitMap(final byte[] filterBitMap) {
+            this.filterBitMap = filterBitMap;
+            // not safe transform, but size will be calculate by #calcUnitSize
+            this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
+        }
+
+        public short getSize() {
+            return size;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof CqExtUnit)) return false;
+
+            CqExtUnit cqExtUnit = (CqExtUnit) o;
+
+            if (bitMapSize != cqExtUnit.bitMapSize) return false;
+            if (msgStoreTime != cqExtUnit.msgStoreTime) return false;
+            if (size != cqExtUnit.size) return false;
+            if (tagsCode != cqExtUnit.tagsCode) return false;
+            if (!Arrays.equals(filterBitMap, cqExtUnit.filterBitMap)) return false;
+
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = (int) size;
+            result = 31 * result + (int) (tagsCode ^ (tagsCode >>> 32));
+            result = 31 * result + (int) (msgStoreTime ^ (msgStoreTime >>> 32));
+            result = 31 * result + (int) bitMapSize;
+            result = 31 * result + (filterBitMap != null ? Arrays.hashCode(filterBitMap) : 0);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "CqExtUnit{" +
+                "size=" + size +
+                ", tagsCode=" + tagsCode +
+                ", msgStoreTime=" + msgStoreTime +
+                ", bitMapSize=" + bitMapSize +
+                ", filterBitMap=" + Arrays.toString(filterBitMap) +
+                '}';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
index 1350026..9db87f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
@@ -18,26 +18,33 @@ package org.apache.rocketmq.store;
 
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
+import java.nio.ByteBuffer;
+import java.util.Map;
+
 public class DefaultMessageFilter implements MessageFilter {
 
-    @Override
-    public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
-        if (tagsCode == null) {
-            return true;
-        }
+    private SubscriptionData subscriptionData;
 
-        if (null == subscriptionData) {
-            return true;
-        }
+    public DefaultMessageFilter(final SubscriptionData subscriptionData) {
+        this.subscriptionData = subscriptionData;
+    }
 
-        if (subscriptionData.isClassFilterMode())
+    @Override
+    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+        if (null == tagsCode || null == subscriptionData) {
             return true;
+        }
 
-        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
+        if (subscriptionData.isClassFilterMode()) {
             return true;
         }
 
-        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
+        return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
+            || subscriptionData.getCodeSet().contains(tagsCode.intValue());
     }
 
+    @Override
+    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 0edfeec..7bed62c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -41,7 +42,6 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.running.RunningStats;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -60,8 +60,6 @@ import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
 public class DefaultMessageStore implements MessageStore {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
-    private final MessageFilter messageFilter = new DefaultMessageFilter();
-
     private final MessageStoreConfig messageStoreConfig;
     // CommitLog
     private final CommitLog commitLog;
@@ -103,6 +101,8 @@ public class DefaultMessageStore implements MessageStore {
 
     private AtomicLong printTimes = new AtomicLong(0);
 
+    private final LinkedList<CommitLogDispatcher> dispatcherList;
+
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
@@ -133,6 +133,10 @@ public class DefaultMessageStore implements MessageStore {
         this.allocateMappedFileService.start();
 
         this.indexService.start();
+
+        this.dispatcherList = new LinkedList<>();
+        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
+        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
     }
 
     public void truncateDirtyLogicFiles(long phyOffset) {
@@ -409,7 +413,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
-        final SubscriptionData subscriptionData) {
+                                       final MessageFilter messageFilter) {
         if (this.shutdown) {
             log.warn("message store has shutdown, so getMessage is forbidden");
             return null;
@@ -464,6 +468,7 @@ public class DefaultMessageStore implements MessageStore {
                         int i = 0;
                         final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                         final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
+                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                         for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                             long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                             int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
@@ -483,29 +488,51 @@ public class DefaultMessageStore implements MessageStore {
                                 break;
                             }
 
-                            if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
-                                SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
-                                if (selectResult != null) {
-                                    this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
-                                    getResult.addMessage(selectResult);
-                                    status = GetMessageStatus.FOUND;
-                                    nextPhyFileStartOffset = Long.MIN_VALUE;
+                            boolean extRet = false;
+                            if (consumeQueue.isExtAddr(tagsCode)) {
+                                extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
+                                if (extRet) {
+                                    tagsCode = cqExtUnit.getTagsCode();
                                 } else {
-                                    if (getResult.getBufferTotalSize() == 0) {
-                                        status = GetMessageStatus.MESSAGE_WAS_REMOVING;
-                                    }
-
-                                    nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
+                                    // can't find ext content.Client will filter messages by tag also.
+                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
+                                        tagsCode, offsetPy, sizePy, topic, group);
                                 }
-                            } else {
+                            }
+
+                            if (messageFilter != null
+                                && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) {
                                 if (getResult.getBufferTotalSize() == 0) {
                                     status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                 }
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
+                                continue;
+                            }
+
+                            SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
+                            if (null == selectResult) {
+                                if (getResult.getBufferTotalSize() == 0) {
+                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
+                                }
+
+                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
+                                continue;
+                            }
+
+                            if (messageFilter != null
+                                && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
+                                if (getResult.getBufferTotalSize() == 0) {
+                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                 }
+                                // release...
+                                selectResult.release();
+                                continue;
                             }
+
+                            this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
+                            getResult.addMessage(selectResult);
+                            status = GetMessageStatus.FOUND;
+                            nextPhyFileStartOffset = Long.MIN_VALUE;
                         }
 
                         if (diskFallRecorded) {
@@ -1318,27 +1345,14 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public void doDispatch(DispatchRequest req) {
-        final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
-        switch (tranType) {
-            case MessageSysFlag.TRANSACTION_NOT_TYPE:
-            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
-                DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
-                    req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
-                break;
-            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
-            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
-                break;
-        }
-
-        if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
-            DefaultMessageStore.this.indexService.buildIndex(req);
+        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
+            dispatcher.dispatch(req);
         }
     }
 
-    public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
-        long logicOffset) {
-        ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
-        cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
+    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
+        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
+        cq.putMessagePositionInfoWrapper(dispatchRequest);
     }
 
     public BrokerStatsManager getBrokerStatsManager() {
@@ -1354,6 +1368,20 @@ public class DefaultMessageStore implements MessageStore {
         return remainTransientStoreBufferNumbs() == 0;
     }
 
+    @Override
+    public LinkedList<CommitLogDispatcher> getDispatcherList() {
+        return this.dispatcherList;
+    }
+
+    @Override
+    public ConsumeQueue getConsumeQueue(String topic, int queueId) {
+        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
+        if (map == null) {
+            return null;
+        }
+        return map.get(queueId);
+    }
+
     public void unlockMappedFile(final MappedFile mappedFile) {
         this.scheduledExecutorService.schedule(new Runnable() {
             @Override
@@ -1363,6 +1391,33 @@ public class DefaultMessageStore implements MessageStore {
         }, 6, TimeUnit.SECONDS);
     }
 
+    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
+
+        @Override
+        public void dispatch(DispatchRequest request) {
+            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
+            switch (tranType) {
+                case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                    DefaultMessageStore.this.putMessagePositionInfo(request);
+                    break;
+                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                    break;
+            }
+        }
+    }
+
+    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
+
+        @Override
+        public void dispatch(DispatchRequest request) {
+            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
+                DefaultMessageStore.this.indexService.buildIndex(request);
+            }
+        }
+    }
+
     class CleanCommitLogService {
 
         private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
@@ -1695,7 +1750,8 @@ public class DefaultMessageStore implements MessageStore {
                                         && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                         DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                             dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
-                                            dispatchRequest.getTagsCode());
+                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                     }
                                     // FIXED BUG By shijia
                                     this.reputFromOffset += size;



Mime
View raw message