rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [24/51] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-121]Support message filtering based on SQL92 closes apache/incubator-rocketmq#82
Date Tue, 06 Jun 2017 03:38:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/message/Message.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index f3bff83..2c81f5c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -81,6 +81,12 @@ public class Message implements Serializable {
             throw new RuntimeException(String.format(
                 "The Property<%s> is used by system, input another please", name));
         }
+        if (value == null || value == "" || value.trim() == ""
+            || name == null || name == "" || name.trim() == "") {
+            throw new IllegalArgumentException(
+                "The name or value of property can not be null or blank string!"
+            );
+        }
         this.putProperty(name, value);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 90b837a..e41ec9d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -41,6 +41,20 @@ public class MessageDecoder {
     public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
     public static final char NAME_VALUE_SEPARATOR = 1;
     public static final char PROPERTY_SEPARATOR = 2;
+    public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+        + 4 // 2 MAGICCODE
+        + 4 // 3 BODYCRC
+        + 4 // 4 QUEUEID
+        + 4 // 5 FLAG
+        + 8 // 6 QUEUEOFFSET
+        + 8 // 7 PHYSICALOFFSET
+        + 4 // 8 SYSFLAG
+        + 8 // 9 BORNTIMESTAMP
+        + 8 // 10 BORNHOST
+        + 8 // 11 STORETIMESTAMP
+        + 8 // 12 STOREHOSTADDRESS
+        + 4 // 13 RECONSUMETIMES
+        + 8; // 14 Prepared Transaction Offset
 
     public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
         input.flip();
@@ -80,6 +94,31 @@ public class MessageDecoder {
         return new MessageId(address, offset);
     }
 
+    /**
+     * Just decode properties from msg buffer.
+     *
+     * @param byteBuffer msg commit log buffer.
+     * @return
+     */
+    public static Map<String, String> decodeProperties(java.nio.ByteBuffer byteBuffer) {
+        int topicLengthPosition = BODY_SIZE_POSITION + 4 + byteBuffer.getInt(BODY_SIZE_POSITION);
+
+        byte topicLength = byteBuffer.get(topicLengthPosition);
+
+        short propertiesLength = byteBuffer.getShort(topicLengthPosition + 1 + topicLength);
+
+        byteBuffer.position(topicLengthPosition + 1 + topicLength + 2);
+
+        if (propertiesLength > 0) {
+            byte[] properties = new byte[propertiesLength];
+            byteBuffer.get(properties);
+            String propertiesString = new String(properties, CHARSET_UTF8);
+            Map<String, String> map = string2messageProperties(propertiesString);
+            return map;
+        }
+        return null;
+    }
+
     public static MessageExt decode(java.nio.ByteBuffer byteBuffer) {
         return decode(byteBuffer, true, true, false);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
index 74fd965..990e748 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
@@ -88,7 +88,7 @@ public class TopAddressing {
 
         if (verbose) {
             String errorMsg =
-                "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts";
+                "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts";
             errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);
 
             log.warn(errorMsg);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index c6b0925..6f132f7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -68,6 +68,8 @@ public class RequestCode {
 
     public static final int GET_ALL_DELAY_OFFSET = 45;
 
+    public static final int CHECK_CLIENT_CONFIG = 46;
+
     public static final int PUT_KV_CONFIG = 100;
 
     public static final int GET_KV_CONFIG = 101;
@@ -162,4 +164,6 @@ public class RequestCode {
 
 
     public static final int SEND_BATCH_MESSAGE = 320;
+
+    public static final int QUERY_CONSUME_QUEUE = 321;
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index 90b182b..f62c4ea 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -53,6 +53,10 @@ public class ResponseCode extends RemotingSysResponseCode {
 
     public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
 
+    public static final int FILTER_DATA_NOT_EXIST = 27;
+
+    public static final int FILTER_DATA_NOT_LATEST = 28;
+
     public static final int TRANSACTION_SHOULD_COMMIT = 200;
 
     public static final int TRANSACTION_SHOULD_ROLLBACK = 201;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
new file mode 100644
index 0000000..a78ce55
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class CheckClientRequestBody extends RemotingSerializable {
+
+    private String clientId;
+    private String group;
+    private SubscriptionData subscriptionData;
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public SubscriptionData getSubscriptionData() {
+        return subscriptionData;
+    }
+
+    public void setSubscriptionData(SubscriptionData subscriptionData) {
+        this.subscriptionData = subscriptionData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java
new file mode 100644
index 0000000..7268dcd
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+public class ConsumeQueueData {
+
+    private long physicOffset;
+    private int physicSize;
+    private long tagsCode;
+    private String extendDataJson;
+    private String bitMap;
+    private boolean eval;
+    private String msg;
+
+    public long getPhysicOffset() {
+        return physicOffset;
+    }
+
+    public void setPhysicOffset(long physicOffset) {
+        this.physicOffset = physicOffset;
+    }
+
+    public int getPhysicSize() {
+        return physicSize;
+    }
+
+    public void setPhysicSize(int physicSize) {
+        this.physicSize = physicSize;
+    }
+
+    public long getTagsCode() {
+        return tagsCode;
+    }
+
+    public void setTagsCode(long tagsCode) {
+        this.tagsCode = tagsCode;
+    }
+
+    public String getExtendDataJson() {
+        return extendDataJson;
+    }
+
+    public void setExtendDataJson(String extendDataJson) {
+        this.extendDataJson = extendDataJson;
+    }
+
+    public String getBitMap() {
+        return bitMap;
+    }
+
+    public void setBitMap(String bitMap) {
+        this.bitMap = bitMap;
+    }
+
+    public boolean isEval() {
+        return eval;
+    }
+
+    public void setEval(boolean eval) {
+        this.eval = eval;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+
+    @Override
+    public String toString() {
+        return "ConsumeQueueData{" +
+            "physicOffset=" + physicOffset +
+            ", physicSize=" + physicSize +
+            ", tagsCode=" + tagsCode +
+            ", extendDataJson='" + extendDataJson + '\'' +
+            ", bitMap='" + bitMap + '\'' +
+            ", eval=" + eval +
+            ", msg='" + msg + '\'' +
+            '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
new file mode 100644
index 0000000..be93da9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+
+public class QueryConsumeQueueResponseBody extends RemotingSerializable {
+
+    private SubscriptionData subscriptionData;
+    private String filterData;
+    private List<ConsumeQueueData> queueData;
+    private long maxQueueIndex;
+    private long minQueueIndex;
+
+    public SubscriptionData getSubscriptionData() {
+        return subscriptionData;
+    }
+
+    public void setSubscriptionData(SubscriptionData subscriptionData) {
+        this.subscriptionData = subscriptionData;
+    }
+
+    public String getFilterData() {
+        return filterData;
+    }
+
+    public void setFilterData(String filterData) {
+        this.filterData = filterData;
+    }
+
+    public List<ConsumeQueueData> getQueueData() {
+        return queueData;
+    }
+
+    public void setQueueData(List<ConsumeQueueData> queueData) {
+        this.queueData = queueData;
+    }
+
+    public long getMaxQueueIndex() {
+        return maxQueueIndex;
+    }
+
+    public void setMaxQueueIndex(long maxQueueIndex) {
+        this.maxQueueIndex = maxQueueIndex;
+    }
+
+    public long getMinQueueIndex() {
+        return minQueueIndex;
+    }
+
+    public void setMinQueueIndex(long minQueueIndex) {
+        this.minQueueIndex = minQueueIndex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 8a59213..106e89e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -46,6 +46,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
     private String subscription;
     @CFNotNull
     private Long subVersion;
+    private String expressionType;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -130,4 +131,12 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
     public void setSubVersion(Long subVersion) {
         this.subVersion = subVersion;
     }
+
+    public String getExpressionType() {
+        return expressionType;
+    }
+
+    public void setExpressionType(String expressionType) {
+        this.expressionType = expressionType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java
new file mode 100644
index 0000000..642fe17
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class QueryConsumeQueueRequestHeader implements CommandCustomHeader {
+
+    private String topic;
+    private int queueId;
+    private long index;
+    private int count;
+    private String consumerGroup;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public long getIndex() {
+        return index;
+    }
+
+    public void setIndex(long index) {
+        this.index = index;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
index 81f5954..e456b7e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
@@ -32,6 +32,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
     private Set<String> tagsSet = new HashSet<String>();
     private Set<Integer> codeSet = new HashSet<Integer>();
     private long subVersion = System.currentTimeMillis();
+    private String expressionType;
 
     @JSONField(serialize = false)
     private String filterClassSource;
@@ -102,6 +103,14 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
         this.classFilterMode = classFilterMode;
     }
 
+    public String getExpressionType() {
+        return expressionType;
+    }
+
+    public void setExpressionType(String expressionType) {
+        this.expressionType = expressionType;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -111,6 +120,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
         result = prime * result + ((subString == null) ? 0 : subString.hashCode());
         result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
         result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        result = prime * result + ((expressionType == null) ? 0 : expressionType.hashCode());
         return result;
     }
 
@@ -147,6 +157,11 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
                 return false;
         } else if (!topic.equals(other.topic))
             return false;
+        if (expressionType == null) {
+            if (other.expressionType != null)
+                return false;
+        } else if (!expressionType.equals(other.expressionType))
+            return false;
         return true;
     }
 
@@ -154,7 +169,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
     public String toString() {
         return "SubscriptionData [classFilterMode=" + classFilterMode + ", topic=" + topic + ", subString="
             + subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion
-            + "]";
+            + ", expressionType=" + expressionType + "]";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
index 5137f32..c5f8460 100644
--- a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
@@ -42,4 +42,53 @@ public class FilterAPITest {
         }
         assertThat(subscriptionData.getTagsSet()).isEqualTo(tagSet);
     }
+
+    @Test
+    public void testBuildTagSome() {
+        try {
+            SubscriptionData subscriptionData = FilterAPI.build(
+                "TOPIC", "A || B", ExpressionType.TAG
+            );
+
+            assertThat(subscriptionData).isNotNull();
+            assertThat(subscriptionData.getTopic()).isEqualTo("TOPIC");
+            assertThat(subscriptionData.getSubString()).isEqualTo("A || B");
+            assertThat(ExpressionType.isTagType(subscriptionData.getExpressionType())).isTrue();
+
+            assertThat(subscriptionData.getTagsSet()).isNotNull();
+            assertThat(subscriptionData.getTagsSet()).containsExactly("A", "B");
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+
+    @Test
+    public void testBuildSQL() {
+        try {
+            SubscriptionData subscriptionData = FilterAPI.build(
+                "TOPIC", "a is not null", ExpressionType.SQL92
+            );
+
+            assertThat(subscriptionData).isNotNull();
+            assertThat(subscriptionData.getTopic()).isEqualTo("TOPIC");
+            assertThat(subscriptionData.getExpressionType()).isEqualTo(ExpressionType.SQL92);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+
+    @Test
+    public void testBuildSQLWithNullSubString() {
+        try {
+            FilterAPI.build(
+                "TOPIC", null, ExpressionType.SQL92
+            );
+
+            assertThat(Boolean.FALSE).isTrue();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
new file mode 100644
index 0000000..d14d6b0
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.message;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageDecoderTest {
+
+    @Test
+    public void testDecodeProperties() {
+        MessageExt messageExt = new MessageExt();
+
+        messageExt.setMsgId("645100FA00002A9F000000489A3AA09E");
+        messageExt.setTopic("abc");
+        messageExt.setBody("hello!q!".getBytes());
+        try {
+            messageExt.setBornHost(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0));
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+        messageExt.setBornTimestamp(System.currentTimeMillis());
+        messageExt.setCommitLogOffset(123456);
+        messageExt.setPreparedTransactionOffset(0);
+        messageExt.setQueueId(0);
+        messageExt.setQueueOffset(123);
+        messageExt.setReconsumeTimes(0);
+        try {
+            messageExt.setStoreHost(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+
+        messageExt.putUserProperty("a", "123");
+        messageExt.putUserProperty("b", "hello");
+        messageExt.putUserProperty("c", "3.14");
+
+        byte[] msgBytes = new byte[0];
+        try {
+            msgBytes = MessageDecoder.encode(messageExt, false);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+
+        ByteBuffer byteBuffer = ByteBuffer.allocate(msgBytes.length);
+        byteBuffer.put(msgBytes);
+
+        Map<String, String> properties = MessageDecoder.decodeProperties(byteBuffer);
+
+        assertThat(properties).isNotNull();
+        assertThat("123").isEqualTo(properties.get("a"));
+        assertThat("hello").isEqualTo(properties.get("b"));
+        assertThat("3.14").isEqualTo(properties.get("c"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/distribution/conf/logback_broker.xml
----------------------------------------------------------------------
diff --git a/distribution/conf/logback_broker.xml b/distribution/conf/logback_broker.xml
index 05c0ee4..dd5c63f 100644
--- a/distribution/conf/logback_broker.xml
+++ b/distribution/conf/logback_broker.xml
@@ -222,6 +222,29 @@
         <appender-ref ref="RocketmqRebalanceLockAppender_inner"/>
     </appender>
 
+    <appender name="RocketmqFilterAppender_inner"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmqlogs/filter.log</file>
+        <append>true</append>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/filter.%i.log
+            </fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+                class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+    <appender name="RocketmqFilterAppender" class="ch.qos.logback.classic.AsyncAppender">
+        <appender-ref ref="RocketmqFilterAppender_inner"/>
+    </appender>
+
     <appender name="RocketmqStatsAppender"
               class="ch.qos.logback.core.rolling.RollingFileAppender">
         <file>${user.home}/logs/rocketmqlogs/stats.log</file>
@@ -321,6 +344,11 @@
         <appender-ref ref="RocketmqCommercialAppender"/>
     </logger>
 
+    <logger name="RocketmqFilter" additivity="false">
+        <level value="INFO"/>
+        <appender-ref ref="RocketmqFilterAppender"/>
+    </logger>
+
     <root>
         <level value="INFO"/>
         <appender-ref ref="DefaultAppender"/>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/distribution/release.xml
----------------------------------------------------------------------
diff --git a/distribution/release.xml b/distribution/release.xml
index 2d3ec1e..9e4ef2a 100644
--- a/distribution/release.xml
+++ b/distribution/release.xml
@@ -67,6 +67,7 @@
                 <include>org.apache.rocketmq:rocketmq-namesrv</include>
                 <include>org.apache.rocketmq:rocketmq-filtersrv</include>
                 <include>org.apache.rocketmq:rocketmq-example</include>
+                <include>org.apache.rocketmq:rocketmq-filter</include>
             </includes>
             <binaries>
                 <outputDirectory>lib/</outputDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 473e4c7..3e1b79b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -27,10 +27,13 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
@@ -46,12 +49,14 @@ public class Consumer {
         final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
         final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
         final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
+        final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
+        final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null;
         String group = groupPrefix;
         if (Boolean.parseBoolean(isPrefixEnable)) {
             group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100);
         }
 
-        System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable);
+        System.out.printf("topic: %s, group: %s, prefix: %s, filterType: %s, expression: %s%n", topic, group, isPrefixEnable, filterType, expression);
 
         final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
 
@@ -99,7 +104,21 @@ public class Consumer {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
         consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
-        consumer.subscribe(topic, "*");
+        if (filterType == null || expression == null) {
+            consumer.subscribe(topic, "*");
+        } else {
+            if (ExpressionType.TAG.equals(filterType)) {
+                String expr = MixAll.file2String(expression);
+                System.out.printf("Expression: %s%n", expr);
+                consumer.subscribe(topic, MessageSelector.byTag(expr));
+            } else if (ExpressionType.SQL92.equals(filterType)) {
+                String expr = MixAll.file2String(expression);
+                System.out.printf("Expression: %s%n", expr);
+                consumer.subscribe(topic, MessageSelector.bySql(expr));
+            } else {
+                throw new IllegalArgumentException("Not support filter type! " + filterType);
+            }
+        }
 
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
@@ -142,6 +161,14 @@ public class Consumer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("f", "filterType", true, "TAG, SQL92");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("e", "expression", true, "filter expression content file path.ie: ./test/expr");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 50d750d..2d8d0f6 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -18,11 +18,13 @@ package org.apache.rocketmq.example.benchmark;
 
 import java.io.UnsupportedEncodingException;
 import java.util.LinkedList;
+import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -50,13 +52,12 @@ public class Producer {
         final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
         final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
         final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k'));
+        final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0;
 
         System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
 
         final Logger log = ClientLogger.getLog();
 
-        final Message msg = buildMessage(messageSize, topic);
-
         final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
 
         final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer();
@@ -117,10 +118,37 @@ public class Producer {
                 public void run() {
                     while (true) {
                         try {
+                            final Message msg;
+                            try {
+                                msg = buildMessage(messageSize, topic);
+                            } catch (UnsupportedEncodingException e) {
+                                e.printStackTrace();
+                                return;
+                            }
                             final long beginTimestamp = System.currentTimeMillis();
                             if (keyEnable) {
                                 msg.setKeys(String.valueOf(beginTimestamp / 1000));
                             }
+                            if (propertySize > 0) {
+                                if (msg.getProperties() != null) {
+                                    msg.getProperties().clear();
+                                }
+                                int i = 0;
+                                int startValue = (new Random(System.currentTimeMillis())).nextInt(100);
+                                int size = 0;
+                                while (true) {
+                                    String prop1 = "prop" + i, prop1V = "hello" + startValue;
+                                    String prop2 = "prop" + (i + 1), prop2V = String.valueOf(startValue);
+                                    msg.putUserProperty(prop1, prop1V);
+                                    msg.putUserProperty(prop2, prop2V);
+                                    size += prop1.length() + prop2.length() + prop1V.length() + prop2V.length();
+                                    if (size > propertySize) {
+                                        break;
+                                    }
+                                    i += 2;
+                                    startValue += 2;
+                                }
+                            }
                             producer.send(msg);
                             statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
                             statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
@@ -214,7 +242,7 @@ class StatsBenchmarkProducer {
     private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
 
     public Long[] createSnapshot() {
-        Long[] snap = new Long[] {
+        Long[] snap = new Long[]{
             System.currentTimeMillis(),
             this.sendRequestSuccessCount.get(),
             this.sendRequestFailedCount.get(),

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
new file mode 100644
index 0000000..9a3b813
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.filter;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class SqlConsumer {
+
+    public static void main(String[] args) {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
+
+        try {
+            consumer.subscribe("TopicTest",
+                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
+                    "and (a is not null and a between 0  3)"));
+        } catch (MQClientException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                            ConsumeConcurrentlyContext context) {
+                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+            return;
+        }
+        System.out.printf("Consumer Started.%n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
new file mode 100644
index 0000000..3f3a0e6
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.filter;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class SqlProducer {
+
+    public static void main(String[] args) {
+        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        for (int i = 0; i < 10; i++) {
+            try {
+                String tag;
+                int div = i % 3;
+                if (div == 0) {
+                    tag = "TagA";
+                } else if (div == 1) {
+                    tag = "TagB";
+                } else {
+                    tag = "TagC";
+                }
+                Message msg = new Message("TopicTest",
+                    tag,
+                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+                );
+                msg.putUserProperty("a", String.valueOf(i));
+
+                SendResult sendResult = producer.send(msg);
+                System.out.printf("%s%n", sendResult);
+            } catch (Exception e) {
+                e.printStackTrace();
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();
+                }
+            }
+        }
+        producer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/pom.xml
----------------------------------------------------------------------
diff --git a/filter/pom.xml b/filter/pom.xml
new file mode 100644
index 0000000..7978f05
--- /dev/null
+++ b/filter/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>4.1.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-filter</artifactId>
+    <name>rocketmq-filter ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-srvutil</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java b/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java
new file mode 100644
index 0000000..a318548
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Filter factory: support other filter to register.
+ */
+public class FilterFactory {
+
+    public static final FilterFactory INSTANCE = new FilterFactory();
+
+    protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER = new HashMap<String, FilterSpi>(4);
+
+    static {
+        FilterFactory.INSTANCE.register(new SqlFilter());
+    }
+
+    /**
+     * Register a filter.
+     * <br>
+     * Note:
+     * <li>1. Filter registered will be used in broker server, so take care of it's reliability and performance.</li>
+     *
+     * @param filterSpi
+     */
+    public void register(FilterSpi filterSpi) {
+        if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) {
+            throw new IllegalArgumentException(String.format("Filter spi type(%s) already exist!", filterSpi.ofType()));
+        }
+
+        FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi);
+    }
+
+    /**
+     * Un register a filter.
+     *
+     * @param type
+     * @return
+     */
+    public FilterSpi unRegister(String type) {
+        return FILTER_SPI_HOLDER.remove(type);
+    }
+
+    /**
+     * Get a filter registered, null if none exist.
+     *
+     * @param type
+     * @return
+     */
+    public FilterSpi get(String type) {
+        return FILTER_SPI_HOLDER.get(type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java b/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java
new file mode 100644
index 0000000..fcc39fa
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+
+/**
+ * Filter spi interface.
+ */
+public interface FilterSpi {
+
+    /**
+     * Compile.
+     *
+     * @param expr
+     * @return
+     * @throws org.apache.rocketmq.filter.expression.MQFilterException
+     */
+    Expression compile(final String expr) throws MQFilterException;
+
+    /**
+     * Which type.
+     *
+     * @return
+     */
+    String ofType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java b/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java
new file mode 100644
index 0000000..0c1ffb8
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+import org.apache.rocketmq.filter.parser.SelectorParser;
+
+/**
+ * SQL92 Filter, just a wrapper of {@link org.apache.rocketmq.filter.parser.SelectorParser}.
+ * <p/>
+ * <p>
+ * Do not use this filter directly.Use {@link FilterFactory#get} to select a filter.
+ * </p>
+ */
+public class SqlFilter implements FilterSpi {
+
+    @Override
+    public Expression compile(final String expr) throws MQFilterException {
+        return SelectorParser.parse(expr);
+    }
+
+    @Override
+    public String ofType() {
+        return ExpressionType.SQL92;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java b/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java
new file mode 100644
index 0000000..d2d04cd
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.constant;
+
+public enum UnaryType {
+    NEGATE,
+    IN,
+    NOT,
+    BOOLEANCAST,
+    LIKE
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java
new file mode 100644
index 0000000..0f172e3
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+/**
+ * An expression which performs an operation on two expression values.
+ * <p>
+ * This class was taken from ActiveMQ org.apache.activemq.filter.BinaryExpression,
+ * </p>
+ */
+public abstract class BinaryExpression implements Expression {
+    protected Expression left;
+    protected Expression right;
+
+    public BinaryExpression(Expression left, Expression right) {
+        this.left = left;
+        this.right = right;
+    }
+
+    public Expression getLeft() {
+        return left;
+    }
+
+    public Expression getRight() {
+        return right;
+    }
+
+    /**
+     * @see Object#toString()
+     */
+    public String toString() {
+        return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")";
+    }
+
+    /**
+     * @see Object#hashCode()
+     */
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    /**
+     * @see Object#equals(Object)
+     */
+    public boolean equals(Object o) {
+
+        if (o == null || !this.getClass().equals(o.getClass())) {
+            return false;
+        }
+        return toString().equals(o.toString());
+
+    }
+
+    /**
+     * Returns the symbol that represents this binary expression.  For example, addition is
+     * represented by "+"
+     *
+     * @return
+     */
+    public abstract String getExpressionSymbol();
+
+    /**
+     * @param expression
+     */
+    public void setRight(Expression expression) {
+        right = expression;
+    }
+
+    /**
+     * @param expression
+     */
+    public void setLeft(Expression expression) {
+        left = expression;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java
new file mode 100644
index 0000000..bb54632
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+/**
+ * A BooleanExpression is an expression that always
+ * produces a Boolean result.
+ * <p>
+ * This class was taken from ActiveMQ org.apache.activemq.filter.BooleanExpression,
+ * but the parameter is changed to an interface.
+ * </p>
+ *
+ * @see org.apache.rocketmq.filter.expression.EvaluationContext
+ */
+public interface BooleanExpression extends Expression {
+
+    /**
+     * @param context
+     * @return true if the expression evaluates to Boolean.TRUE.
+     * @throws Exception
+     */
+    boolean matches(EvaluationContext context) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java
new file mode 100644
index 0000000..8b82e57
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+import java.util.List;
+
+/**
+ * A filter performing a comparison of two objects
+ * <p>
+ * This class was taken from ActiveMQ org.apache.activemq.filter.ComparisonExpression,
+ * but:
+ * 1. Remove LIKE expression, and related methods;
+ * 2. Extract a new method __compare which has int return value;
+ * 3. When create between expression, check whether left value is less or equal than right value;
+ * 4. For string type value(can not convert to number), only equal or unequal comparison are supported.
+ * </p>
+ */
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression {
+
+    public static final ThreadLocal<Boolean> CONVERT_STRING_EXPRESSIONS = new ThreadLocal<Boolean>();
+
+    boolean convertStringExpressions = false;
+
+    /**
+     * @param left
+     * @param right
+     */
+    public ComparisonExpression(Expression left, Expression right) {
+        super(left, right);
+        convertStringExpressions = CONVERT_STRING_EXPRESSIONS.get() != null;
+    }
+
+    public static BooleanExpression createBetween(Expression value, Expression left, Expression right) {
+        // check
+        if (left instanceof ConstantExpression && right instanceof ConstantExpression) {
+            Object lv = ((ConstantExpression) left).getValue();
+            Object rv = ((ConstantExpression) right).getValue();
+            if (lv == null || rv == null) {
+                throw new RuntimeException("Illegal values of between, values can not be null!");
+            }
+            if (lv instanceof Comparable && rv instanceof Comparable) {
+                int ret = __compare((Comparable) rv, (Comparable) lv, true);
+                if (ret < 0)
+                    throw new RuntimeException(
+                        String.format("Illegal values of between, left value(%s) must less than or equal to right value(%s)", lv, rv)
+                    );
+            }
+        }
+
+        return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
+    }
+
+    public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) {
+        return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static BooleanExpression createInFilter(Expression left, List elements) {
+
+        if (!(left instanceof PropertyExpression)) {
+            throw new RuntimeException("Expected a property for In expression, got: " + left);
+        }
+        return UnaryExpression.createInExpression((PropertyExpression) left, elements, false);
+
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static BooleanExpression createNotInFilter(Expression left, List elements) {
+
+        if (!(left instanceof PropertyExpression)) {
+            throw new RuntimeException("Expected a property for In expression, got: " + left);
+        }
+        return UnaryExpression.createInExpression((PropertyExpression) left, elements, true);
+
+    }
+
+    public static BooleanExpression createIsNull(Expression left) {
+        return doCreateEqual(left, ConstantExpression.NULL);
+    }
+
+    public static BooleanExpression createIsNotNull(Expression left) {
+        return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL));
+    }
+
+    public static BooleanExpression createNotEqual(Expression left, Expression right) {
+        return UnaryExpression.createNOT(createEqual(left, right));
+    }
+
+    public static BooleanExpression createEqual(Expression left, Expression right) {
+        checkEqualOperand(left);
+        checkEqualOperand(right);
+        checkEqualOperandCompatability(left, right);
+        return doCreateEqual(left, right);
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    private static BooleanExpression doCreateEqual(Expression left, Expression right) {
+        return new ComparisonExpression(left, right) {
+
+            public Object evaluate(EvaluationContext context) throws Exception {
+                Object lv = left.evaluate(context);
+                Object rv = right.evaluate(context);
+
+                // If one of the values is null
+                if (lv == null ^ rv == null) {
+                    if (lv == null) {
+                        return null;
+                    }
+                    return Boolean.FALSE;
+                }
+                if (lv == rv || lv.equals(rv)) {
+                    return Boolean.TRUE;
+                }
+                if (lv instanceof Comparable && rv instanceof Comparable) {
+                    return compare((Comparable) lv, (Comparable) rv);
+                }
+                return Boolean.FALSE;
+            }
+
+            protected boolean asBoolean(int answer) {
+                return answer == 0;
+            }
+
+            public String getExpressionSymbol() {
+                return "==";
+            }
+        };
+    }
+
+    public static BooleanExpression createGreaterThan(final Expression left, final Expression right) {
+        checkLessThanOperand(left);
+        checkLessThanOperand(right);
+        return new ComparisonExpression(left, right) {
+            protected boolean asBoolean(int answer) {
+                return answer > 0;
+            }
+
+            public String getExpressionSymbol() {
+                return ">";
+            }
+        };
+    }
+
+    public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) {
+        checkLessThanOperand(left);
+        checkLessThanOperand(right);
+        return new ComparisonExpression(left, right) {
+            protected boolean asBoolean(int answer) {
+                return answer >= 0;
+            }
+
+            public String getExpressionSymbol() {
+                return ">=";
+            }
+        };
+    }
+
+    public static BooleanExpression createLessThan(final Expression left, final Expression right) {
+        checkLessThanOperand(left);
+        checkLessThanOperand(right);
+        return new ComparisonExpression(left, right) {
+
+            protected boolean asBoolean(int answer) {
+                return answer < 0;
+            }
+
+            public String getExpressionSymbol() {
+                return "<";
+            }
+
+        };
+    }
+
+    public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) {
+        checkLessThanOperand(left);
+        checkLessThanOperand(right);
+        return new ComparisonExpression(left, right) {
+
+            protected boolean asBoolean(int answer) {
+                return answer <= 0;
+            }
+
+            public String getExpressionSymbol() {
+                return "<=";
+            }
+        };
+    }
+
+    /**
+     * Only Numeric expressions can be used in >, >=, < or <= expressions.s
+     *
+     * @param expr
+     */
+    public static void checkLessThanOperand(Expression expr) {
+        if (expr instanceof ConstantExpression) {
+            Object value = ((ConstantExpression) expr).getValue();
+            if (value instanceof Number) {
+                return;
+            }
+
+            // Else it's boolean or a String..
+            throw new RuntimeException("Value '" + expr + "' cannot be compared.");
+        }
+        if (expr instanceof BooleanExpression) {
+            throw new RuntimeException("Value '" + expr + "' cannot be compared.");
+        }
+    }
+
+    /**
+     * Validates that the expression can be used in == or <> expression. Cannot
+     * not be NULL TRUE or FALSE litterals.
+     *
+     * @param expr
+     */
+    public static void checkEqualOperand(Expression expr) {
+        if (expr instanceof ConstantExpression) {
+            Object value = ((ConstantExpression) expr).getValue();
+            if (value == null) {
+                throw new RuntimeException("'" + expr + "' cannot be compared.");
+            }
+        }
+    }
+
+    /**
+     * @param left
+     * @param right
+     */
+    private static void checkEqualOperandCompatability(Expression left, Expression right) {
+        if (left instanceof ConstantExpression && right instanceof ConstantExpression) {
+            if (left instanceof BooleanExpression && !(right instanceof BooleanExpression)) {
+                throw new RuntimeException("'" + left + "' cannot be compared with '" + right + "'");
+            }
+        }
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public Object evaluate(EvaluationContext context) throws Exception {
+        Comparable<Comparable> lv = (Comparable) left.evaluate(context);
+        if (lv == null) {
+            return null;
+        }
+        Comparable rv = (Comparable) right.evaluate(context);
+        if (rv == null) {
+            return null;
+        }
+        if (getExpressionSymbol().equals(">=") || getExpressionSymbol().equals(">")
+            || getExpressionSymbol().equals("<") || getExpressionSymbol().equals("<=")) {
+            Class<? extends Comparable> lc = lv.getClass();
+            Class<? extends Comparable> rc = rv.getClass();
+            if (lc == rc && lc == String.class) {
+                // Compare String is illegal
+                // first try to convert to double
+                try {
+                    Comparable lvC = Double.valueOf((String) (Comparable) lv);
+                    Comparable rvC = Double.valueOf((String) rv);
+
+                    return compare(lvC, rvC);
+                } catch (Exception e) {
+                    throw new RuntimeException("It's illegal to compare string by '>=', '>', '<', '<='. lv=" + lv + ", rv=" + rv, e);
+                }
+            }
+        }
+        return compare(lv, rv);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected static int __compare(Comparable lv, Comparable rv, boolean convertStringExpressions) {
+        Class<? extends Comparable> lc = lv.getClass();
+        Class<? extends Comparable> rc = rv.getClass();
+        // If the the objects are not of the same type,
+        // try to convert up to allow the comparison.
+        if (lc != rc) {
+            try {
+                if (lc == Boolean.class) {
+                    if (convertStringExpressions && rc == String.class) {
+                        lv = Boolean.valueOf((String) lv).booleanValue();
+                    } else {
+                        return -1;
+                    }
+                } else if (lc == Byte.class) {
+                    if (rc == Short.class) {
+                        lv = Short.valueOf(((Number) lv).shortValue());
+                    } else if (rc == Integer.class) {
+                        lv = Integer.valueOf(((Number) lv).intValue());
+                    } else if (rc == Long.class) {
+                        lv = Long.valueOf(((Number) lv).longValue());
+                    } else if (rc == Float.class) {
+                        lv = new Float(((Number) lv).floatValue());
+                    } else if (rc == Double.class) {
+                        lv = new Double(((Number) lv).doubleValue());
+                    } else if (convertStringExpressions && rc == String.class) {
+                        rv = Byte.valueOf((String) rv);
+                    } else {
+                        return -1;
+                    }
+                } else if (lc == Short.class) {
+                    if (rc == Integer.class) {
+                        lv = Integer.valueOf(((Number) lv).intValue());
+                    } else if (rc == Long.class) {
+                        lv = Long.valueOf(((Number) lv).longValue());
+                    } else if (rc == Float.class) {
+                        lv = new Float(((Number) lv).floatValue());
+                    } else if (rc == Double.class) {
+                        lv = new Double(((Number) lv).doubleValue());
+                    } else if (convertStringExpressions && rc == String.class) {
+                        rv = Short.valueOf((String) rv);
+                    } else {
+                        return -1;
+                    }
+                } else if (lc == Integer.class) {
+                    if (rc == Long.class) {
+                        lv = Long.valueOf(((Number) lv).longValue());
+                    } else if (rc == Float.class) {
+                        lv = new Float(((Number) lv).floatValue());
+                    } else if (rc == Double.class) {
+                        lv = new Double(((Number) lv).doubleValue());
+                    } else if (convertStringExpressions && rc == String.class) {
+                        rv = Integer.valueOf((String) rv);
+                    } else {
+                        return -1;
+                    }
+                } else if (lc == Long.class) {
+                    if (rc == Integer.class) {
+                        rv = Long.valueOf(((Number) rv).longValue());
+                    } else if (rc == Float.class) {
+                        lv = new Float(((Number) lv).floatValue());
+                    } else if (rc == Double.class) {
+                        lv = new Double(((Number) lv).doubleValue());
+                    } else if (convertStringExpressions && rc == String.class) {
+                        rv = Long.valueOf((String) rv);
+                    } else {
+                        return -1;
+                    }
+                } else if (lc == Float.class) {
+                    if (rc == Integer.class) {
+                        rv = new Float(((Number) rv).floatValue());
+                    } else if (rc == Long.class) {
+                        rv = new Float(((Number) rv).floatValue());
+                    } else if (rc == Double.class) {
+                        lv = new Double(((Number) lv).doubleValue());
+                    } else if (convertStringExpressions && rc == String.class) {
+                        rv = Float.valueOf((String) rv);
+                    } else {
+                        return -1;
+                    }
+                } else if (lc == Double.class) {
+                    if (rc == Integer.class) {
+                        rv = new Double(((Number) rv).doubleValue());
+                    } else if (rc == Long.class) {
+                        rv = new Double(((Number) rv).doubleValue());
+                    } else if (rc == Float.class) {
+                        rv = new Float(((Number) rv).doubleValue());
+                    } else if (convertStringExpressions && rc == String.class) {
+                        rv = Double.valueOf((String) rv);
+                    } else {
+                        return -1;
+                    }
+                } else if (convertStringExpressions && lc == String.class) {
+                    if (rc == Boolean.class) {
+                        lv = Boolean.valueOf((String) lv);
+                    } else if (rc == Byte.class) {
+                        lv = Byte.valueOf((String) lv);
+                    } else if (rc == Short.class) {
+                        lv = Short.valueOf((String) lv);
+                    } else if (rc == Integer.class) {
+                        lv = Integer.valueOf((String) lv);
+                    } else if (rc == Long.class) {
+                        lv = Long.valueOf((String) lv);
+                    } else if (rc == Float.class) {
+                        lv = Float.valueOf((String) lv);
+                    } else if (rc == Double.class) {
+                        lv = Double.valueOf((String) lv);
+                    } else {
+                        return -1;
+                    }
+                } else {
+                    return -1;
+                }
+            } catch (NumberFormatException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return lv.compareTo(rv);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected Boolean compare(Comparable lv, Comparable rv) {
+        return asBoolean(__compare(lv, rv, convertStringExpressions)) ? Boolean.TRUE : Boolean.FALSE;
+    }
+
+    protected abstract boolean asBoolean(int answer);
+
+    public boolean matches(EvaluationContext context) throws Exception {
+        Object object = evaluate(context);
+        return object != null && object == Boolean.TRUE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java
new file mode 100644
index 0000000..ca70f51
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+/**
+ * Represents a constant expression
+ * <p>
+ * This class was taken from ActiveMQ org.apache.activemq.filter.ConstantExpression,
+ * but:
+ * 1. For long type constant, the range bound by java Long type;
+ * 2. For float type constant, the range bound by java Double type;
+ * 3. Remove Hex and Octal expression;
+ * 4. Add now expression to support to get current time.
+ * </p>
+ */
+public class ConstantExpression implements Expression {
+
+    static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
+        public BooleanConstantExpression(Object value) {
+            super(value);
+        }
+
+        public boolean matches(EvaluationContext context) throws Exception {
+            Object object = evaluate(context);
+            return object != null && object == Boolean.TRUE;
+        }
+    }
+
+    public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
+    public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
+    public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
+
+    private Object value;
+
+    public ConstantExpression(Object value) {
+        this.value = value;
+    }
+
+    public static ConstantExpression createFromDecimal(String text) {
+
+        // Strip off the 'l' or 'L' if needed.
+        if (text.endsWith("l") || text.endsWith("L")) {
+            text = text.substring(0, text.length() - 1);
+        }
+
+        // only support Long.MIN_VALUE ~ Long.MAX_VALUE
+        Number value = new Long(text);
+//        try {
+//            value = new Long(text);
+//        } catch (NumberFormatException e) {
+//            // The number may be too big to fit in a long.
+//            value = new BigDecimal(text);
+//        }
+
+        long l = value.longValue();
+        if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+            value = Integer.valueOf(value.intValue());
+        }
+        return new ConstantExpression(value);
+    }
+
+    public static ConstantExpression createFloat(String text) {
+        Double value = new Double(text);
+        if (value > Double.MAX_VALUE) {
+            throw new RuntimeException(text + " is greater than " + Double.MAX_VALUE);
+        }
+        if (value < Double.MIN_VALUE) {
+            throw new RuntimeException(text + " is less than " + Double.MIN_VALUE);
+        }
+        return new ConstantExpression(value);
+    }
+
+    public static ConstantExpression createNow() {
+        return new NowExpression();
+    }
+
+    public Object evaluate(EvaluationContext context) throws Exception {
+        return value;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    /**
+     * @see Object#toString()
+     */
+    public String toString() {
+        Object value = getValue();
+        if (value == null) {
+            return "NULL";
+        }
+        if (value instanceof Boolean) {
+            return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
+        }
+        if (value instanceof String) {
+            return encodeString((String) value);
+        }
+        return value.toString();
+    }
+
+    /**
+     * @see Object#hashCode()
+     */
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    /**
+     * @see Object#equals(Object)
+     */
+    public boolean equals(Object o) {
+
+        if (o == null || !this.getClass().equals(o.getClass())) {
+            return false;
+        }
+        return toString().equals(o.toString());
+
+    }
+
+    /**
+     * Encodes the value of string so that it looks like it would look like when
+     * it was provided in a selector.
+     *
+     * @return
+     */
+    public static String encodeString(String s) {
+        StringBuffer b = new StringBuffer();
+        b.append('\'');
+        for (int i = 0; i < s.length(); i++) {
+            char c = s.charAt(i);
+            if (c == '\'') {
+                b.append(c);
+            }
+            b.append(c);
+        }
+        b.append('\'');
+        return b.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java
new file mode 100644
index 0000000..52af2d0
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+import java.util.Map;
+
+/**
+ * Empty context.
+ */
+public class EmptyEvaluationContext implements EvaluationContext {
+    @Override
+    public Object get(String name) {
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> keyValues() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java
new file mode 100644
index 0000000..094ef53
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+import java.util.Map;
+
+/**
+ * Context of evaluate expression.
+ *
+ * Compare to org.apache.activemq.filter.MessageEvaluationContext, this is just an interface.
+ */
+public interface EvaluationContext {
+
+    /**
+     * Get value by name from context
+     *
+     * @param name
+     * @return
+     */
+    Object get(String name);
+
+    /**
+     * Context variables.
+     *
+     * @return
+     */
+    Map<String, Object> keyValues();
+}



Mime
View raw message