rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [7/7] incubator-rocketmq git commit: [ROCKETMQ-121]Support message filtering based on SQL92 closes apache/incubator-rocketmq#82
Date Fri, 21 Apr 2017 10:19:17 GMT
[ROCKETMQ-121]Support message filtering based on SQL92 closes apache/incubator-rocketmq#82


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/58f1574b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/58f1574b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/58f1574b

Branch: refs/heads/develop
Commit: 58f1574b28bf8bf18a795036545c7a700437ed0b
Parents: 42f78c2
Author: vsair <liuxuedee@gmail.com>
Authored: Fri Apr 21 18:17:58 2017 +0800
Committer: dongeforever <zhendongliu92@yeah.net>
Committed: Fri Apr 21 18:17:58 2017 +0800

----------------------------------------------------------------------
 broker/pom.xml                                  |    4 +
 .../rocketmq/broker/BrokerController.java       |   27 +
 .../rocketmq/broker/BrokerPathConfigHelper.java |    3 +
 .../broker/client/ConsumerGroupEvent.java       |   33 +
 .../client/ConsumerIdsChangeListener.java       |    6 +-
 .../rocketmq/broker/client/ConsumerManager.java |   11 +-
 .../DefaultConsumerIdsChangeListener.java       |   37 +-
 .../filter/CommitLogDispatcherCalcBitMap.java   |  110 ++
 .../broker/filter/ConsumerFilterData.java       |  151 ++
 .../broker/filter/ConsumerFilterManager.java    |  471 ++++++
 .../filter/ExpressionForRetryMessageFilter.java |   97 ++
 .../broker/filter/ExpressionMessageFilter.java  |  162 +++
 .../broker/filter/MessageEvaluationContext.java |   58 +
 .../NotifyMessageArrivingListener.java          |    8 +-
 .../broker/longpolling/PullRequest.java         |   10 +-
 .../longpolling/PullRequestHoldService.java     |   19 +-
 .../rocketmq/broker/out/BrokerOuterAPI.java     |    2 +-
 .../plugin/AbstractPluginMessageStore.java      |   18 +-
 .../broker/processor/AdminBrokerProcessor.java  |   91 ++
 .../broker/processor/ClientManageProcessor.java |   44 +
 .../broker/processor/PullMessageProcessor.java  |   59 +-
 .../CommitLogDispatcherCalcBitMapTest.java      |  192 +++
 .../filter/ConsumerFilterManagerTest.java       |  291 ++++
 .../filter/MessageStoreWithFilterTest.java      |  392 +++++
 .../processor/PullMessageProcessorTest.java     |    9 +-
 .../client/consumer/DefaultMQPushConsumer.java  |   15 +
 .../client/consumer/MQPushConsumer.java         |   21 +
 .../client/consumer/MessageSelector.java        |   77 +
 .../rocketmq/client/impl/FindBrokerResult.java  |   12 +
 .../rocketmq/client/impl/MQClientAPIImpl.java   |   57 +-
 .../consumer/DefaultMQPushConsumerImpl.java     |   40 +-
 .../client/impl/consumer/PullAPIWrapper.java    |   40 +
 .../client/impl/factory/MQClientInstance.java   |   60 +-
 .../apache/rocketmq/common/BrokerConfig.java    |   67 +
 .../java/org/apache/rocketmq/common/MixAll.java |   14 +-
 .../rocketmq/common/constant/LoggerName.java    |    1 +
 .../rocketmq/common/filter/ExpressionType.java  |   67 +
 .../rocketmq/common/filter/FilterAPI.java       |   18 +
 .../apache/rocketmq/common/message/Message.java |    6 +
 .../rocketmq/common/message/MessageDecoder.java |   39 +
 .../rocketmq/common/namesrv/TopAddressing.java  |    2 +-
 .../rocketmq/common/protocol/RequestCode.java   |    4 +
 .../rocketmq/common/protocol/ResponseCode.java  |    4 +
 .../protocol/body/CheckClientRequestBody.java   |   52 +
 .../common/protocol/body/ConsumeQueueData.java  |   98 ++
 .../body/QueryConsumeQueueResponseBody.java     |   72 +
 .../header/PullMessageRequestHeader.java        |    9 +
 .../header/QueryConsumeQueueRequestHeader.java  |   75 +
 .../protocol/heartbeat/SubscriptionData.java    |   17 +-
 .../rocketmq/common/filter/FilterAPITest.java   |   49 +
 .../common/message/MessageDecoderTest.java      |   80 ++
 distribution/conf/logback_broker.xml            |   28 +
 distribution/release.xml                        |    1 +
 .../rocketmq/example/benchmark/Consumer.java    |   31 +-
 .../rocketmq/example/benchmark/Producer.java    |   34 +-
 .../rocketmq/example/filter/SqlConsumer.java    |   62 +
 .../rocketmq/example/filter/SqlProducer.java    |   67 +
 filter/pom.xml                                  |   43 +
 .../apache/rocketmq/filter/FilterFactory.java   |   72 +
 .../org/apache/rocketmq/filter/FilterSpi.java   |   43 +
 .../org/apache/rocketmq/filter/SqlFilter.java   |   43 +
 .../rocketmq/filter/constant/UnaryType.java     |   26 +
 .../filter/expression/BinaryExpression.java     |   91 ++
 .../filter/expression/BooleanExpression.java    |   39 +
 .../filter/expression/ComparisonExpression.java |  413 ++++++
 .../filter/expression/ConstantExpression.java   |  156 ++
 .../expression/EmptyEvaluationContext.java      |   35 +
 .../filter/expression/EvaluationContext.java    |   43 +
 .../rocketmq/filter/expression/Expression.java  |   38 +
 .../filter/expression/LogicExpression.java      |   94 ++
 .../filter/expression/MQFilterException.java    |   46 +
 .../filter/expression/NowExpression.java        |   36 +
 .../filter/expression/PropertyExpression.java   |   70 +
 .../filter/expression/UnaryExpression.java      |  267 ++++
 .../filter/expression/UnaryInExpression.java    |   61 +
 .../rocketmq/filter/parser/ParseException.java  |  204 +++
 .../rocketmq/filter/parser/SelectorParser.java  | 1354 ++++++++++++++++++
 .../rocketmq/filter/parser/SelectorParser.jj    |  524 +++++++
 .../filter/parser/SelectorParserConstants.java  |  140 ++
 .../parser/SelectorParserTokenManager.java      |  919 ++++++++++++
 .../filter/parser/SimpleCharStream.java         |  502 +++++++
 .../apache/rocketmq/filter/parser/Token.java    |  152 ++
 .../rocketmq/filter/parser/TokenMgrError.java   |  174 +++
 .../apache/rocketmq/filter/util/BitsArray.java  |  260 ++++
 .../rocketmq/filter/util/BloomFilter.java       |  338 +++++
 .../rocketmq/filter/util/BloomFilterData.java   |   83 ++
 .../apache/rocketmq/filter/BitsArrayTest.java   |  123 ++
 .../apache/rocketmq/filter/BloomFilterTest.java |  172 +++
 .../apache/rocketmq/filter/ExpressionTest.java  |  594 ++++++++
 .../apache/rocketmq/filter/FilterSpiTest.java   |   84 ++
 .../org/apache/rocketmq/filter/ParserTest.java  |  129 ++
 pom.xml                                         |   11 +
 srvutil/pom.xml                                 |    4 +
 .../org/apache/rocketmq/store/CommitLog.java    |    8 +-
 .../rocketmq/store/CommitLogDispatcher.java     |   26 +
 .../org/apache/rocketmq/store/ConsumeQueue.java |  122 +-
 .../apache/rocketmq/store/ConsumeQueueExt.java  |  638 +++++++++
 .../rocketmq/store/DefaultMessageFilter.java    |   29 +-
 .../rocketmq/store/DefaultMessageStore.java     |  132 +-
 .../apache/rocketmq/store/DispatchRequest.java  |   21 +-
 .../org/apache/rocketmq/store/MappedFile.java   |   25 +
 .../apache/rocketmq/store/MappedFileQueue.java  |    2 +-
 .../rocketmq/store/MessageArrivingListener.java |    5 +-
 .../apache/rocketmq/store/MessageFilter.java    |   26 +-
 .../org/apache/rocketmq/store/MessageStore.java |    8 +-
 .../store/config/MessageStoreConfig.java        |   31 +
 .../store/config/StorePathConfigHelper.java     |    4 +
 .../store/schedule/ScheduleMessageService.java  |   14 +
 .../rocketmq/store/ConsumeQueueExtTest.java     |  251 ++++
 .../apache/rocketmq/store/ConsumeQueueTest.java |  226 +++
 .../rocketmq/store/DefaultMessageStoreTest.java |    4 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java |    9 +
 .../tools/admin/DefaultMQAdminExtImpl.java      |    8 +
 .../apache/rocketmq/tools/admin/MQAdminExt.java |   22 +
 .../rocketmq/tools/command/MQAdminStartup.java  |    3 +
 .../command/queue/QueryConsumeQueueCommand.java |  159 ++
 116 files changed, 12552 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/pom.xml
----------------------------------------------------------------------
diff --git a/broker/pom.xml b/broker/pom.xml
index 8cdafea..0f8ad0a 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -49,6 +49,10 @@
             <artifactId>rocketmq-srvutil</artifactId>
         </dependency>
         <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-filter</artifactId>
+        </dependency>
+        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6acd40c..bacd25c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -37,6 +37,8 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
+import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
+import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
 import org.apache.rocketmq.broker.latency.BrokerFastFailure;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
@@ -96,6 +98,7 @@ public class BrokerController {
     private final MessageStoreConfig messageStoreConfig;
     private final ConsumerOffsetManager consumerOffsetManager;
     private final ConsumerManager consumerManager;
+    private final ConsumerFilterManager consumerFilterManager;
     private final ProducerManager producerManager;
     private final ClientHousekeepingService clientHousekeepingService;
     private final PullMessageProcessor pullMessageProcessor;
@@ -149,6 +152,7 @@ public class BrokerController {
         this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
         this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
         this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
+        this.consumerFilterManager = new ConsumerFilterManager(this);
         this.producerManager = new ProducerManager();
         this.clientHousekeepingService = new ClientHousekeepingService(this);
         this.broker2Client = new Broker2Client(this);
@@ -192,6 +196,7 @@ public class BrokerController {
 
         result = result && this.consumerOffsetManager.load();
         result = result && this.subscriptionGroupManager.load();
+        result = result && this.consumerFilterManager.load();
 
         if (result) {
             try {
@@ -202,6 +207,7 @@ public class BrokerController {
                 //load plugin
                 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                 this.messageStore = MessageStoreFactory.build(context, this.messageStore);
+                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
             } catch (IOException e) {
                 result = false;
                 e.printStackTrace();
@@ -278,6 +284,17 @@ public class BrokerController {
                 @Override
                 public void run() {
                     try {
+                        BrokerController.this.consumerFilterManager.persist();
+                    } catch (Throwable e) {
+                        log.error("schedule persist consumer filter error.", e);
+                    }
+                }
+            }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
                         BrokerController.this.protectBroker();
                     } catch (Exception e) {
                         log.error("protectBroker error.", e);
@@ -400,9 +417,11 @@ public class BrokerController {
         ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
         this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
         this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
 
         this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
 
         /**
          * ConsumerManageProcessor
@@ -504,6 +523,10 @@ public class BrokerController {
         return consumerManager;
     }
 
+    public ConsumerFilterManager getConsumerFilterManager() {
+        return consumerFilterManager;
+    }
+
     public ConsumerOffsetManager getConsumerOffsetManager() {
         return consumerOffsetManager;
     }
@@ -590,6 +613,10 @@ public class BrokerController {
         if (this.brokerFastFailure != null) {
             this.brokerFastFailure.shutdown();
         }
+
+        if (this.consumerFilterManager != null) {
+            this.consumerFilterManager.persist();
+        }
     }
 
     private void unregisterBrokerAll() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index 24876df..0a323ee 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -44,4 +44,7 @@ public class BrokerPathConfigHelper {
         return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
     }
 
+    public static String getConsumerFilterPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java
new file mode 100644
index 0000000..717fb70
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+public enum ConsumerGroupEvent {
+
+    /**
+     * Some consumers in the group are changed.
+     */
+    CHANGE,
+    /**
+     * The group of consumer is unregistered.
+     */
+    UNREGISTER,
+    /**
+     * The group of consumer is registered.
+     */
+    REGISTER
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
index 07d28dc..831e293 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
@@ -16,9 +16,7 @@
  */
 package org.apache.rocketmq.broker.client;
 
-import io.netty.channel.Channel;
-import java.util.List;
-
 public interface ConsumerIdsChangeListener {
-    void consumerIdsChanged(final String group, final List<Channel> channels);
+
+    void handle(ConsumerGroupEvent event, String group, Object... args);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index a2d88d5..a5ddec8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -85,10 +85,11 @@ public class ConsumerManager {
                     if (remove != null) {
                         log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
                             next.getKey());
+                        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
                     }
                 }
 
-                this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel());
+                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
             }
         }
     }
@@ -111,10 +112,12 @@ public class ConsumerManager {
 
         if (r1 || r2) {
             if (isNotifyConsumerIdsChangedEnable) {
-                this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
+                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
             }
         }
 
+        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
+
         return r1 || r2;
     }
 
@@ -126,10 +129,12 @@ public class ConsumerManager {
                 ConsumerGroupInfo remove = this.consumerTable.remove(group);
                 if (remove != null) {
                     log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
+
+                    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
                 }
             }
             if (isNotifyConsumerIdsChangedEnable) {
-                this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
+                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index a1b2d8a..d716a33 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -17,8 +17,12 @@
 package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
+
+import java.util.Collection;
 import java.util.List;
+
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
 public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
     private final BrokerController brokerController;
@@ -28,11 +32,34 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
     }
 
     @Override
-    public void consumerIdsChanged(String group, List<Channel> channels) {
-        if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
-            for (Channel chl : channels) {
-                this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
-            }
+    public void handle(ConsumerGroupEvent event, String group, Object... args) {
+        if (event == null) {
+            return;
+        }
+        switch (event) {
+            case CHANGE:
+                if (args == null || args.length < 1) {
+                    return;
+                }
+                List<Channel> channels = (List<Channel>) args[0];
+                if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
+                    for (Channel chl : channels) {
+                        this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
+                    }
+                }
+                break;
+            case UNREGISTER:
+                this.brokerController.getConsumerFilterManager().unRegister(group);
+                break;
+            case REGISTER:
+                if (args == null || args.length < 1) {
+                    return;
+                }
+                Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
+                this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
+                break;
+            default:
+                throw new RuntimeException("Unknown event " + event);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
new file mode 100644
index 0000000..85415d6
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Calculate bit map of filter.
+ */
+public class CommitLogDispatcherCalcBitMap implements CommitLogDispatcher {
+
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
+
+    protected final BrokerConfig brokerConfig;
+    protected final ConsumerFilterManager consumerFilterManager;
+
+    public CommitLogDispatcherCalcBitMap(BrokerConfig brokerConfig, ConsumerFilterManager consumerFilterManager) {
+        this.brokerConfig = brokerConfig;
+        this.consumerFilterManager = consumerFilterManager;
+    }
+
+    @Override
+    public void dispatch(DispatchRequest request) {
+        if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
+            return;
+        }
+
+        try {
+
+            Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());
+
+            if (filterDatas == null || filterDatas.isEmpty()) {
+                return;
+            }
+
+            Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
+            BitsArray filterBitMap = BitsArray.create(
+                this.consumerFilterManager.getBloomFilter().getM()
+            );
+
+            long startTime = System.currentTimeMillis();
+            while (iterator.hasNext()) {
+                ConsumerFilterData filterData = iterator.next();
+
+                if (filterData.getCompiledExpression() == null) {
+                    log.error("[BUG] Consumer in filter manager has no compiled expression! {}", filterData);
+                    continue;
+                }
+
+                if (filterData.getBloomFilterData() == null) {
+                    log.error("[BUG] Consumer in filter manager has no bloom data! {}", filterData);
+                    continue;
+                }
+
+                Object ret = null;
+                try {
+                    MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());
+
+                    ret = filterData.getCompiledExpression().evaluate(context);
+                } catch (Throwable e) {
+                    log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e);
+                }
+
+                log.debug("Result of Calc bit map:ret={}, data={}, props={}, offset={}", ret, filterData, request.getPropertiesMap(), request.getCommitLogOffset());
+
+                // eval true
+                if (ret != null && ret instanceof Boolean && (Boolean) ret) {
+                    consumerFilterManager.getBloomFilter().hashTo(
+                        filterData.getBloomFilterData(),
+                        filterBitMap
+                    );
+                }
+            }
+
+            request.setBitMap(filterBitMap.bytes());
+
+            long eclipseTime = System.currentTimeMillis() - startTime;
+            // 1ms
+            if (eclipseTime >= 1) {
+                log.warn("Spend {} ms to calc bit map, consumerNum={}, topic={}", eclipseTime, filterDatas.size(), request.getTopic());
+            }
+        } catch (Throwable e) {
+            log.error("Calc bit map error! topic={}, offset={}, queueId={}, {}", request.getTopic(), request.getCommitLogOffset(), request.getQueueId(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java
new file mode 100644
index 0000000..4db02e2
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.util.BloomFilterData;
+
+import java.util.Collections;
+
+/**
+ * Filter data of consumer.
+ */
+public class ConsumerFilterData {
+
+    private String consumerGroup;
+    private String topic;
+    private String expression;
+    private String expressionType;
+    private transient Expression compiledExpression;
+    private long bornTime;
+    private long deadTime = 0;
+    private BloomFilterData bloomFilterData;
+    private long clientVersion;
+
+    public boolean isDead() {
+        return this.deadTime >= this.bornTime;
+    }
+
+    public long howLongAfterDeath() {
+        if (isDead()) {
+            return System.currentTimeMillis() - getDeadTime();
+        }
+        return -1;
+    }
+
+    /**
+     * Check this filter data has been used to calculate bit map when msg was stored in server.
+     *
+     * @param msgStoreTime
+     * @return
+     */
+    public boolean isMsgInLive(long msgStoreTime) {
+        return msgStoreTime > getBornTime();
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(final String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(final String topic) {
+        this.topic = topic;
+    }
+
+    public String getExpression() {
+        return expression;
+    }
+
+    public void setExpression(final String expression) {
+        this.expression = expression;
+    }
+
+    public String getExpressionType() {
+        return expressionType;
+    }
+
+    public void setExpressionType(final String expressionType) {
+        this.expressionType = expressionType;
+    }
+
+    public Expression getCompiledExpression() {
+        return compiledExpression;
+    }
+
+    public void setCompiledExpression(final Expression compiledExpression) {
+        this.compiledExpression = compiledExpression;
+    }
+
+    public long getBornTime() {
+        return bornTime;
+    }
+
+    public void setBornTime(final long bornTime) {
+        this.bornTime = bornTime;
+    }
+
+    public long getDeadTime() {
+        return deadTime;
+    }
+
+    public void setDeadTime(final long deadTime) {
+        this.deadTime = deadTime;
+    }
+
+    public BloomFilterData getBloomFilterData() {
+        return bloomFilterData;
+    }
+
+    public void setBloomFilterData(final BloomFilterData bloomFilterData) {
+        this.bloomFilterData = bloomFilterData;
+    }
+
+    public long getClientVersion() {
+        return clientVersion;
+    }
+
+    public void setClientVersion(long clientVersion) {
+        this.clientVersion = clientVersion;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return EqualsBuilder.reflectionEquals(this, o, Collections.<String>emptyList());
+    }
+
+    @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(this, Collections.<String>emptyList());
+    }
+
+    @Override
+    public String toString() {
+        return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
new file mode 100644
index 0000000..7f790af
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.filter.FilterFactory;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.filter.util.BloomFilter;
+import org.apache.rocketmq.filter.util.BloomFilterData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Consumer filter data manager.Just manage the consumers use expression filter.
+ */
+public class ConsumerFilterManager extends ConfigManager {
+
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
+
+    private static final long MS_24_HOUR = 24 * 3600 * 1000;
+
+    private ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>
+        filterDataByTopic = new ConcurrentHashMap<String/*consumer group*/, FilterDataMapByTopic>(256);
+
+    private transient BrokerController brokerController;
+    private transient BloomFilter bloomFilter;
+
+    public ConsumerFilterManager() {
+        // just for test
+        this.bloomFilter = BloomFilter.createByFn(20, 64);
+    }
+
+    public ConsumerFilterManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.bloomFilter = BloomFilter.createByFn(
+            brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),
+            brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()
+        );
+        // then set bit map length of store config.
+        brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(
+            this.bloomFilter.getM()
+        );
+    }
+
+    /**
+     * Build consumer filter data.Be care, bloom filter data is not included.
+     *
+     * @param topic
+     * @param consumerGroup
+     * @param expression
+     * @param type
+     * @param clientVersion
+     * @return maybe null
+     */
+    public static ConsumerFilterData build(final String topic, final String consumerGroup,
+                                           final String expression, final String type,
+                                           final long clientVersion) {
+        if (ExpressionType.isTagType(type)) {
+            return null;
+        }
+
+        ConsumerFilterData consumerFilterData = new ConsumerFilterData();
+        consumerFilterData.setTopic(topic);
+        consumerFilterData.setConsumerGroup(consumerGroup);
+        consumerFilterData.setBornTime(System.currentTimeMillis());
+        consumerFilterData.setDeadTime(0);
+        consumerFilterData.setExpression(expression);
+        consumerFilterData.setExpressionType(type);
+        consumerFilterData.setClientVersion(clientVersion);
+        try {
+            consumerFilterData.setCompiledExpression(
+                FilterFactory.INSTANCE.get(type).compile(expression)
+            );
+        } catch (Throwable e) {
+            log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());
+            return null;
+        }
+
+        return consumerFilterData;
+    }
+
+    public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {
+        for (SubscriptionData subscriptionData : subList) {
+            register(
+                subscriptionData.getTopic(),
+                consumerGroup,
+                subscriptionData.getSubString(),
+                subscriptionData.getExpressionType(),
+                subscriptionData.getSubVersion()
+            );
+        }
+
+        // make illegal topic dead.
+        Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);
+
+        Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();
+        while (iterator.hasNext()) {
+            ConsumerFilterData filterData = iterator.next();
+
+            boolean exist = false;
+            for (SubscriptionData subscriptionData : subList) {
+                if (subscriptionData.getTopic().equals(filterData.getTopic())) {
+                    exist = true;
+                    break;
+                }
+            }
+
+            if (!exist && !filterData.isDead()) {
+                filterData.setDeadTime(System.currentTimeMillis());
+                log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);
+            }
+        }
+    }
+
+    public boolean register(final String topic, final String consumerGroup, final String expression,
+                            final String type, final long clientVersion) {
+        if (ExpressionType.isTagType(type)) {
+            return false;
+        }
+
+        if (expression == null || expression.length() == 0) {
+            return false;
+        }
+
+        FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
+
+        if (filterDataMapByTopic == null) {
+            FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
+            FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
+            filterDataMapByTopic = prev != null ? prev : temp;
+        }
+
+        BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
+
+        return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
+    }
+
+    public void unRegister(final String consumerGroup) {
+        for (String topic : filterDataByTopic.keySet()) {
+            this.filterDataByTopic.get(topic).unRegister(consumerGroup);
+        }
+    }
+
+    public ConsumerFilterData get(final String topic, final String consumerGroup) {
+        if (!this.filterDataByTopic.containsKey(topic)) {
+            return null;
+        }
+        if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {
+            return null;
+        }
+
+        return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup);
+    }
+
+    public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) {
+        Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>();
+
+        Iterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator();
+        while (topicIterator.hasNext()) {
+            FilterDataMapByTopic filterDataMapByTopic = topicIterator.next();
+
+            Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator();
+
+            while (filterDataIterator.hasNext()) {
+                ConsumerFilterData filterData = filterDataIterator.next();
+
+                if (filterData.getConsumerGroup().equals(consumerGroup)) {
+                    ret.add(filterData);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    public final Collection<ConsumerFilterData> get(final String topic) {
+        if (!this.filterDataByTopic.containsKey(topic)) {
+            return null;
+        }
+        if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {
+            return null;
+        }
+
+        return this.filterDataByTopic.get(topic).getGroupFilterData().values();
+    }
+
+    public BloomFilter getBloomFilter() {
+        return bloomFilter;
+    }
+
+    @Override
+    public String encode() {
+        return encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        if (this.brokerController != null) {
+            return BrokerPathConfigHelper.getConsumerFilterPath(
+                this.brokerController.getMessageStoreConfig().getStorePathRootDir()
+            );
+        }
+        return BrokerPathConfigHelper.getConsumerFilterPath("./unit_test");
+    }
+
+    @Override
+    public void decode(final String jsonString) {
+        ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);
+        if (load != null && load.filterDataByTopic != null) {
+            boolean bloomChanged = false;
+            for (String topic : load.filterDataByTopic.keySet()) {
+                FilterDataMapByTopic dataMapByTopic = load.filterDataByTopic.get(topic);
+                if (dataMapByTopic == null) {
+                    continue;
+                }
+
+                for (String group : dataMapByTopic.getGroupFilterData().keySet()) {
+
+                    ConsumerFilterData filterData = dataMapByTopic.getGroupFilterData().get(group);
+
+                    if (filterData == null) {
+                        continue;
+                    }
+
+                    try {
+                        filterData.setCompiledExpression(
+                            FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression())
+                        );
+                    } catch (Exception e) {
+                        log.error("load filter data error, " + filterData, e);
+                    }
+
+                    // check whether bloom filter is changed
+                    // if changed, ignore the bit map calculated before.
+                    if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {
+                        bloomChanged = true;
+                        log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData());
+                        break;
+                    }
+
+                    log.info("load exist consumer filter data: {}", filterData);
+
+                    if (filterData.getDeadTime() == 0) {
+                        // we think all consumers are dead when load
+                        long deadTime = System.currentTimeMillis() - 30 * 1000;
+                        filterData.setDeadTime(
+                            deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime
+                        );
+                    }
+                }
+            }
+
+            if (!bloomChanged) {
+                this.filterDataByTopic = load.filterDataByTopic;
+            }
+        }
+    }
+
+    @Override
+    public String encode(final boolean prettyFormat) {
+        // clean
+        {
+            clean();
+        }
+        return RemotingSerializable.toJson(this, prettyFormat);
+    }
+
+    public void clean() {
+        Iterator<Map.Entry<String, FilterDataMapByTopic>> topicIterator = this.filterDataByTopic.entrySet().iterator();
+        while (topicIterator.hasNext()) {
+            Map.Entry<String, FilterDataMapByTopic> filterDataMapByTopic = topicIterator.next();
+
+            Iterator<Map.Entry<String, ConsumerFilterData>> filterDataIterator
+                = filterDataMapByTopic.getValue().getGroupFilterData().entrySet().iterator();
+
+            while (filterDataIterator.hasNext()) {
+                Map.Entry<String, ConsumerFilterData> filterDataByGroup = filterDataIterator.next();
+
+                ConsumerFilterData filterData = filterDataByGroup.getValue();
+                if (filterData.howLongAfterDeath() >= (this.brokerController == null ? MS_24_HOUR : this.brokerController.getBrokerConfig().getFilterDataCleanTimeSpan())) {
+                    log.info("Remove filter consumer {}, died too long!", filterDataByGroup.getValue());
+                    filterDataIterator.remove();
+                }
+            }
+
+            if (filterDataMapByTopic.getValue().getGroupFilterData().isEmpty()) {
+                log.info("Topic has no consumer, remove it! {}", filterDataMapByTopic.getKey());
+                topicIterator.remove();
+            }
+        }
+    }
+
+    public ConcurrentHashMap<String, FilterDataMapByTopic> getFilterDataByTopic() {
+        return filterDataByTopic;
+    }
+
+    public void setFilterDataByTopic(final ConcurrentHashMap<String, FilterDataMapByTopic> filterDataByTopic) {
+        this.filterDataByTopic = filterDataByTopic;
+    }
+
+    public static class FilterDataMapByTopic {
+
+        private ConcurrentHashMap<String/*consumer group*/, ConsumerFilterData>
+            groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();
+
+        private String topic;
+
+        public FilterDataMapByTopic() {
+        }
+
+        public FilterDataMapByTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public void unRegister(String consumerGroup) {
+            if (!this.groupFilterData.containsKey(consumerGroup)) {
+                return;
+            }
+
+            ConsumerFilterData data = this.groupFilterData.get(consumerGroup);
+
+            if (data == null || data.isDead()) {
+                return;
+            }
+
+            long now = System.currentTimeMillis();
+
+            log.info("Unregister consumer filter: {}, deadTime: {}", data, now);
+
+            data.setDeadTime(now);
+        }
+
+        public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData, long clientVersion) {
+            ConsumerFilterData old = this.groupFilterData.get(consumerGroup);
+
+            if (old == null) {
+                ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
+                if (consumerFilterData == null) {
+                    return false;
+                }
+                consumerFilterData.setBloomFilterData(bloomFilterData);
+
+                old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);
+                if (old == null) {
+                    log.info("New consumer filter registered: {}", consumerFilterData);
+                    return true;
+                } else {
+                    if (clientVersion <= old.getClientVersion()) {
+                        if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
+                            log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
+                                consumerGroup, topic,
+                                clientVersion, old.getClientVersion(),
+                                old.getExpressionType(), old.getExpression(),
+                                type, expression);
+                        }
+                        if (clientVersion == old.getClientVersion() && old.isDead()) {
+                            reAlive(old);
+                            return true;
+                        }
+
+                        return false;
+                    } else {
+                        this.groupFilterData.put(consumerGroup, consumerFilterData);
+                        log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);
+                        return true;
+                    }
+                }
+            } else {
+                if (clientVersion <= old.getClientVersion()) {
+                    if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
+                        log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
+                            consumerGroup, topic,
+                            clientVersion, old.getClientVersion(),
+                            old.getExpressionType(), old.getExpression(),
+                            type, expression);
+                    }
+                    if (clientVersion == old.getClientVersion() && old.isDead()) {
+                        reAlive(old);
+                        return true;
+                    }
+
+                    return false;
+                }
+
+                boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);
+                if (old.getBloomFilterData() == null && bloomFilterData != null) {
+                    change = true;
+                }
+                if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {
+                    change = true;
+                }
+
+                // if subscribe data is changed, or consumer is died too long.
+                if (change) {
+                    ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
+                    if (consumerFilterData == null) {
+                        // new expression compile error, remove old, let client report error.
+                        this.groupFilterData.remove(consumerGroup);
+                        return false;
+                    }
+                    consumerFilterData.setBloomFilterData(bloomFilterData);
+
+                    this.groupFilterData.put(consumerGroup, consumerFilterData);
+
+                    log.info("Consumer filter info change, old: {}, new: {}, change: {}",
+                        old, consumerFilterData, change);
+
+                    return true;
+                } else {
+                    old.setClientVersion(clientVersion);
+                    if (old.isDead()) {
+                        reAlive(old);
+                    }
+                    return true;
+                }
+            }
+        }
+
+        protected void reAlive(ConsumerFilterData filterData) {
+            long oldDeadTime = filterData.getDeadTime();
+            filterData.setDeadTime(0);
+            log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
+        }
+
+        public final ConsumerFilterData get(String consumerGroup) {
+            return this.groupFilterData.get(consumerGroup);
+        }
+
+        public final ConcurrentHashMap<String, ConsumerFilterData> getGroupFilterData() {
+            return this.groupFilterData;
+        }
+
+        public void setGroupFilterData(final ConcurrentHashMap<String, ConsumerFilterData> groupFilterData) {
+            this.groupFilterData = groupFilterData;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(final String topic) {
+            this.topic = topic;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
new file mode 100644
index 0000000..9518178
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+/**
+ * Support filter to retry topic.
+ * <br>It will decode properties first in order to get real topic.
+ */
+public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
+    public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, ConsumerFilterManager consumerFilterManager) {
+        super(subscriptionData, consumerFilterData, consumerFilterManager);
+    }
+
+    @Override
+    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+        if (subscriptionData == null) {
+            return true;
+        }
+
+        if (subscriptionData.isClassFilterMode()) {
+            return true;
+        }
+
+        boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+
+        if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+            return true;
+        }
+
+        ConsumerFilterData realFilterData = this.consumerFilterData;
+        Map<String, String> tempProperties = properties;
+        boolean decoded = false;
+        if (isRetryTopic) {
+            // retry topic, use original filter data.
+            // poor performance to support retry filter.
+            if (tempProperties == null && msgBuffer != null) {
+                decoded = true;
+                tempProperties = MessageDecoder.decodeProperties(msgBuffer);
+            }
+            String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);
+            String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+            realFilterData = this.consumerFilterManager.get(realTopic, group);
+        }
+
+        // no expression
+        if (realFilterData == null || realFilterData.getExpression() == null
+            || realFilterData.getCompiledExpression() == null) {
+            return true;
+        }
+
+        if (!decoded && tempProperties == null && msgBuffer != null) {
+            tempProperties = MessageDecoder.decodeProperties(msgBuffer);
+        }
+
+        Object ret = null;
+        try {
+            MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
+
+            ret = realFilterData.getCompiledExpression().evaluate(context);
+        } catch (Throwable e) {
+            log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
+        }
+
+        log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
+
+        if (ret == null || !(ret instanceof Boolean)) {
+            return false;
+        }
+
+        return (Boolean) ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
new file mode 100644
index 0000000..893df0d
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.filter.util.BloomFilter;
+import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.MessageFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ExpressionMessageFilter implements MessageFilter {
+
+    protected static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
+
+    protected final SubscriptionData subscriptionData;
+    protected final ConsumerFilterData consumerFilterData;
+    protected final ConsumerFilterManager consumerFilterManager;
+    protected final boolean bloomDataValid;
+
+    public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
+                                   ConsumerFilterManager consumerFilterManager) {
+        this.subscriptionData = subscriptionData;
+        this.consumerFilterData = consumerFilterData;
+        this.consumerFilterManager = consumerFilterManager;
+        if (consumerFilterData == null) {
+            bloomDataValid = false;
+            return;
+        }
+        BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
+        if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {
+            bloomDataValid = true;
+        } else {
+            bloomDataValid = false;
+        }
+    }
+
+    @Override
+    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+        if (null == subscriptionData) {
+            return true;
+        }
+
+        if (subscriptionData.isClassFilterMode()) {
+            return true;
+        }
+
+        // by tags code.
+        if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+
+            if (tagsCode == null || tagsCode < 0L) {
+                return true;
+            }
+
+            if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
+                return true;
+            }
+
+            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
+        } else {
+            // no expression or no bloom
+            if (consumerFilterData == null || consumerFilterData.getExpression() == null
+                || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
+                return true;
+            }
+
+            // message is before consumer
+            if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
+                log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
+                return true;
+            }
+
+            byte[] filterBitMap = cqExtUnit.getFilterBitMap();
+            BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
+            if (filterBitMap == null || !this.bloomDataValid
+                || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
+                return true;
+            }
+
+            BitsArray bitsArray = null;
+            try {
+                bitsArray = BitsArray.create(filterBitMap);
+                boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
+                log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
+                return ret;
+            } catch (Throwable e) {
+                log.error("bloom filter error, sub=" + subscriptionData
+                    + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+        if (subscriptionData == null) {
+            return true;
+        }
+
+        if (subscriptionData.isClassFilterMode()) {
+            return true;
+        }
+
+        if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+            return true;
+        }
+
+        ConsumerFilterData realFilterData = this.consumerFilterData;
+        Map<String, String> tempProperties = properties;
+
+        // no expression
+        if (realFilterData == null || realFilterData.getExpression() == null
+            || realFilterData.getCompiledExpression() == null) {
+            return true;
+        }
+
+        if (tempProperties == null && msgBuffer != null) {
+            tempProperties = MessageDecoder.decodeProperties(msgBuffer);
+        }
+
+        Object ret = null;
+        try {
+            MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
+
+            ret = realFilterData.getCompiledExpression().evaluate(context);
+        } catch (Throwable e) {
+            log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
+        }
+
+        log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
+
+        if (ret == null || !(ret instanceof Boolean)) {
+            return false;
+        }
+
+        return (Boolean) ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java
new file mode 100644
index 0000000..879d179
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.filter.expression.EvaluationContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Evaluation context from message.
+ */
+public class MessageEvaluationContext implements EvaluationContext {
+
+    private Map<String, String> properties;
+
+    public MessageEvaluationContext(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public Object get(final String name) {
+        if (this.properties == null) {
+            return null;
+        }
+        return this.properties.get(name);
+    }
+
+    @Override
+    public Map<String, Object> keyValues() {
+        if (properties == null) {
+            return null;
+        }
+
+        Map<String, Object> copy = new HashMap<String, Object>(properties.size(), 1);
+
+        for (String key : properties.keySet()) {
+            copy.put(key, properties.get(key));
+        }
+
+        return copy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index 2dec9f7..fd38c4f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.longpolling;
 
 import org.apache.rocketmq.store.MessageArrivingListener;
 
+import java.util.Map;
+
 public class NotifyMessageArrivingListener implements MessageArrivingListener {
     private final PullRequestHoldService pullRequestHoldService;
 
@@ -27,7 +29,9 @@ public class NotifyMessageArrivingListener implements MessageArrivingListener {
     }
 
     @Override
-    public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
-        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode);
+    public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
+                         long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
+        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
+            msgStoreTime, filterBitMap, properties);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
index b66344f..045ab9b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.longpolling;
 import io.netty.channel.Channel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageFilter;
 
 public class PullRequest {
     private final RemotingCommand requestCommand;
@@ -27,15 +28,18 @@ public class PullRequest {
     private final long suspendTimestamp;
     private final long pullFromThisOffset;
     private final SubscriptionData subscriptionData;
+    private final MessageFilter messageFilter;
 
     public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
-        long pullFromThisOffset, SubscriptionData subscriptionData) {
+        long pullFromThisOffset, SubscriptionData subscriptionData,
+        MessageFilter messageFilter) {
         this.requestCommand = requestCommand;
         this.clientChannel = clientChannel;
         this.timeoutMillis = timeoutMillis;
         this.suspendTimestamp = suspendTimestamp;
         this.pullFromThisOffset = pullFromThisOffset;
         this.subscriptionData = subscriptionData;
+        this.messageFilter = messageFilter;
     }
 
     public RemotingCommand getRequestCommand() {
@@ -61,4 +65,8 @@ public class PullRequest {
     public SubscriptionData getSubscriptionData() {
         return subscriptionData;
     }
+
+    public MessageFilter getMessageFilter() {
+        return messageFilter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index fdba50d..1a53db1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -18,13 +18,13 @@ package org.apache.rocketmq.broker.longpolling;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.SystemClock;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.store.DefaultMessageFilter;
-import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +33,6 @@ public class PullRequestHoldService extends ServiceThread {
     private static final String TOPIC_QUEUEID_SEPARATOR = "@";
     private final BrokerController brokerController;
     private final SystemClock systemClock = new SystemClock();
-    private final MessageFilter messageFilter = new DefaultMessageFilter();
     private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
         new ConcurrentHashMap<String, ManyPullRequest>(1024);
 
@@ -110,10 +109,11 @@ public class PullRequestHoldService extends ServiceThread {
     }
 
     public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
-        notifyMessageArriving(topic, queueId, maxOffset, null);
+        notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
     }
 
-    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
+    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
+                                      long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
         String key = this.buildKey(topic, queueId);
         ManyPullRequest mpr = this.pullRequestTable.get(key);
         if (mpr != null) {
@@ -128,7 +128,14 @@ public class PullRequestHoldService extends ServiceThread {
                     }
 
                     if (newestOffset > request.getPullFromThisOffset()) {
-                        if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
+                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
+                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
+                        // match by bit map, need eval again when properties is not null.
+                        if (match && properties != null) {
+                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
+                        }
+
+                        if (match) {
                             try {
                                 this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                     request.getRequestCommand());

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 039c942..6c2a987 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
 public class BrokerOuterAPI {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final RemotingClient remotingClient;
-    private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR);
+    private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 00257fd..8ded973 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -18,11 +18,14 @@
 package org.apache.rocketmq.broker.plugin;
 
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.QueryMessageResult;
@@ -84,8 +87,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
 
     @Override
     public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
-        int maxMsgNums, SubscriptionData subscriptionData) {
-        return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData);
+                                       int maxMsgNums, final MessageFilter messageFilter) {
+        return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
     }
 
     @Override
@@ -234,4 +237,13 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
         next.setConfirmOffset(phyOffset);
     }
 
+    @Override
+    public LinkedList<CommitLogDispatcher> getDispatcherList() {
+        return next.getDispatcherList();
+    }
+
+    @Override
+    public ConsumeQueue getConsumeQueue(String topic, int queueId) {
+        return next.getConsumeQueue(topic, queueId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e35316d..daea53c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.io.UnsupportedEncodingException;
@@ -32,6 +33,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -49,6 +52,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
 import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumeQueueData;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.GroupList;
@@ -56,6 +60,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
 import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
@@ -81,6 +86,7 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
 import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
@@ -94,6 +100,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.stats.StatsItem;
 import org.apache.rocketmq.common.stats.StatsSnapshot;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.filter.util.BitsArray;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -101,7 +108,10 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -187,6 +197,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 return ViewBrokerStatsData(ctx, request);
             case RequestCode.GET_BROKER_CONSUME_STATS:
                 return fetchAllConsumeStatsInBroker(ctx, request);
+            case RequestCode.QUERY_CONSUME_QUEUE:
+                return queryConsumeQueue(ctx, request);
             default:
                 break;
         }
@@ -1244,4 +1256,83 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         }
     }
 
+    private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        QueryConsumeQueueRequestHeader requestHeader =
+            (QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class);
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        ConsumeQueue consumeQueue = this.brokerController.getMessageStore().getConsumeQueue(requestHeader.getTopic(),
+            requestHeader.getQueueId());
+        if (consumeQueue == null) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("%d@%s is not exist!", requestHeader.getQueueId(), requestHeader.getTopic()));
+            return response;
+        }
+
+        QueryConsumeQueueResponseBody body = new QueryConsumeQueueResponseBody();
+        response.setCode(ResponseCode.SUCCESS);
+        response.setBody(body.encode());
+
+        body.setMaxQueueIndex(consumeQueue.getMaxOffsetInQueue());
+        body.setMinQueueIndex(consumeQueue.getMinOffsetInQueue());
+
+        MessageFilter messageFilter = null;
+        if (requestHeader.getConsumerGroup() != null) {
+            SubscriptionData subscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(
+                requestHeader.getConsumerGroup(), requestHeader.getTopic()
+            );
+            body.setSubscriptionData(subscriptionData);
+            if (subscriptionData == null) {
+                body.setFilterData(String.format("%s@%s is not online!", requestHeader.getConsumerGroup(), requestHeader.getTopic()));
+            } else {
+                ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager()
+                    .get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+                body.setFilterData(JSON.toJSONString(filterData, true));
+
+                messageFilter = new ExpressionMessageFilter(subscriptionData, filterData,
+                    this.brokerController.getConsumerFilterManager());
+            }
+        }
+
+        SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex());
+        if (result == null) {
+            response.setRemark(String.format("Index %d of %d@%s is not exist!", requestHeader.getIndex(), requestHeader.getQueueId(), requestHeader.getTopic()));
+            return response;
+        }
+        try {
+            List<ConsumeQueueData> queues = new ArrayList<>();
+            for (int i = 0; i < result.getSize() && i < requestHeader.getCount() * ConsumeQueue.CQ_STORE_UNIT_SIZE; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                ConsumeQueueData one = new ConsumeQueueData();
+                one.setPhysicOffset(result.getByteBuffer().getLong());
+                one.setPhysicSize(result.getByteBuffer().getInt());
+                one.setTagsCode(result.getByteBuffer().getLong());
+
+                if (!consumeQueue.isExtAddr(one.getTagsCode())) {
+                    queues.add(one);
+                    continue;
+                }
+
+                ConsumeQueueExt.CqExtUnit cqExtUnit = consumeQueue.getExt(one.getTagsCode());
+                if (cqExtUnit != null) {
+                    one.setExtendDataJson(JSON.toJSONString(cqExtUnit));
+                    if (cqExtUnit.getFilterBitMap() != null) {
+                        one.setBitMap(BitsArray.create(cqExtUnit.getFilterBitMap()).toString());
+                    }
+                    if (messageFilter != null) {
+                        one.setEval(messageFilter.isMatchedByConsumeQueue(cqExtUnit.getTagsCode(), cqExtUnit));
+                    }
+                } else {
+                    one.setMsg("Cq extend not exist!addr: " + one.getTagsCode());
+                }
+
+                queues.add(one);
+            }
+            body.setQueueData(queues);
+        } finally {
+            result.release();
+        }
+
+        return response;
+    }
 }


Mime
View raw message