rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject incubator-rocketmq-site git commit: [ROCKETMQ-121]Docs of filtering messages based on SQL92 closes apache/incubator-rocketmq-site#11
Date Wed, 26 Apr 2017 09:10:13 GMT
Repository: incubator-rocketmq-site
Updated Branches:
  refs/heads/master c87ce9e8b -> 42a8b8c9e


[ROCKETMQ-121]Docs of filtering messages based on SQL92 closes apache/incubator-rocketmq-site#11


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/42a8b8c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/42a8b8c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/42a8b8c9

Branch: refs/heads/master
Commit: 42a8b8c9e4294081d9b37737019cf152cd64a025
Parents: c87ce9e
Author: vsair <liuxuedee@gmail.com>
Authored: Wed Apr 26 17:07:36 2017 +0800
Committer: dongeforever <dongeforever@apache.org>
Committed: Wed Apr 26 17:07:36 2017 +0800

----------------------------------------------------------------------
 _data/navigation.yml                            |   2 +
 _docs/19-filter-by-sql92-example.md             | 109 +++++++++++++++++++
 ...4-26-filter-messages-by-sql92-in-rocketmq.md | 105 ++++++++++++++++++
 assets/images/blog/filter_build_cq_apache.png   | Bin 0 -> 16104 bytes
 assets/images/blog/filter_structure_apach.png   | Bin 0 -> 21027 bytes
 5 files changed, 216 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/_data/navigation.yml
----------------------------------------------------------------------
diff --git a/_data/navigation.yml b/_data/navigation.yml
index b239dd5..7efeea7 100644
--- a/_data/navigation.yml
+++ b/_data/navigation.yml
@@ -33,6 +33,8 @@ docs:
         url: /docs/schedule-example/
       - title: "Batch Example"
         url: /docs/batch-example/
+      - title: "Filter By SQL92 Example"
+        url: /docs/filter-by-sql92-example/
 
   - title: Deployment & Operations
     children:

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/_docs/19-filter-by-sql92-example.md
----------------------------------------------------------------------
diff --git a/_docs/19-filter-by-sql92-example.md b/_docs/19-filter-by-sql92-example.md
new file mode 100644
index 0000000..111d616
--- /dev/null
+++ b/_docs/19-filter-by-sql92-example.md
@@ -0,0 +1,109 @@
+---
+title: "Filter By SQL92 Example "
+permalink: /docs/filter-by-sql92-example/
+excerpt: "How to filter messages by SQL92 in Apache RocketMQ."
+modified: 2017-04-26T16:35:00-04:00
+---
+
+
+{% include toc %}
+
+In most cases, tag is simple and useful to select message as you want.For example:
+
+```java
+DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
+consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
+```
+
+Consumer will recieve messages that contains TAGA or TAGB or TAGC. But the limitation is
that one message only could has one tag, this may be not suitable for more sophisticated scenarios.At
this time, you could use sql expression to select messages.
+
+### Principle
+
+SQL feature could do some calculation through the properties you put in messages when sending.
Under the grammars defined by RocketMQ, you can implement some interesting logic as you want.
Here is an example:
+
+<pre>
+------------
+| message  |
+|----------|  a > 5 AND b = 'abc'
+| a = 10   |  --------------------> Gotten
+| b = 'abc'|
+| c = true |
+------------
+------------
+| message  |
+|----------|   a > 5 AND b = 'abc'
+| a = 1    |  --------------------> Missed
+| b = 'abc'|
+| c = true |
+------------
+</pre>
+
+### Grammars
+
+RocketMQ only defines some basic grammars to support this feature. Not enough ? You could
also extend it easily.
+
+1. Numeric comparison, like `>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
+2. Character comparison, like `=`, `<>`, `IN`;
+3. `IS NULL` or `IS NOT NULL`;
+4. Logical `AND`, logical `OR`, logical `NOT`;
+
+Constant type are:
+
+1. Numeric, like 123, 3.1415;
+2. Character, like 'abc', must be maked with single quotes;
+3. `NULL`, special constant;
+4. Boolean, `TRUE` or `FALSE`;
+
+### Interface
+
+Only push consumer could select messages by SQL92.The interface is:
+
+`public void subscribe(final String topic, final MessageSelector messageSelector)`
+
+### Examples
+
+You can put properties in message through method `putUserProperty` when sending.
+
+```java
+DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+try {
+    producer.start();
+} catch (MQClientException e) {
+    e.printStackTrace();
+    return;
+}
+
+Message msg = new Message("TopicTest",
+    tag,
+    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+);
+// Set some properties.
+msg.putUserProperty("a", String.valueOf(i));
+
+SendResult sendResult = producer.send(msg);
+   
+producer.shutdown();
+```
+
+Use `MessageSelector.bySql` to select messages through SQL92 when consuming.
+
+```java
+DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
+
+try {
+	// only subsribe messages have property a, also a >=0 and a <= 3
+    consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
+} catch (MQClientException e) {
+    e.printStackTrace();
+    return;
+}
+
+consumer.registerMessageListener(new MessageListenerConcurrently() {
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
context) {
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+});
+consumer.start();
+```
+ 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/_posts/2017-04-26-filter-messages-by-sql92-in-rocketmq.md
----------------------------------------------------------------------
diff --git a/_posts/2017-04-26-filter-messages-by-sql92-in-rocketmq.md b/_posts/2017-04-26-filter-messages-by-sql92-in-rocketmq.md
new file mode 100644
index 0000000..1a49d22
--- /dev/null
+++ b/_posts/2017-04-26-filter-messages-by-sql92-in-rocketmq.md
@@ -0,0 +1,105 @@
+---
+title: "Filter Messages By SQL92 In RocketMQ"
+categories:
+  - RocketMQ
+tags:
+  - RocketMQ
+  - Filter
+---
+
+
+So far, RocketMQ only support message filtering feature by `TAG`, but one message only can
own one tag, this is too limited to meet complex business requirements.
+
+So, we want to define and implement a reasonable filter language based on a subset of the
SQL 92 expression syntax to support customized message filtering.
+
+### Why Subset Of SQL92
+
+Let RocketMQ has the ability of message filtering is the purpose of this issue, and as we
know, SQL92 is used widely and most persons are familiar with it.It's resonable to select
it as RocketMQ's grammar.
+
+As I know, ActiveMQ already impllement this functionality based on JavaCC, it's simple and
exntensible.So I just extract it and integrate into RocketMQ, only some grammars:
+
+1. Numeric comparison, like `>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
+2. Character comparison, like `=`, `<>`, `IN`;
+3. `IS NULL` or `IS NOT NULL`;
+4. Logical `AND`, logical `OR`, logical `NOT`;
+
+Constant type are:
+
+1. Numeric, like 123, 3.1415;
+2. Character, like 'abc', must be maked with single quotes;
+3. `NULL`, special constant;
+4. Boolean, `TRUE` or `FALSE`;
+
+### Design
+ - Structure
+
+![screenshot](/assets/images/blog/filter_structure_apach.png)
+
+
+1. Broker collects the expression of consumer through heartbeat request, and saved in `ConsumerFilterManager`.
+2. When consumer pulls messages, broker will construct a `MessageFilter`(an interface) with
compiled expression and subscription data to select matched messages in `CommitLog`.
+
+The main logic is simple.
+
+ - New Module, rocketmq-filter
+
+The implementation of SQL92 language is placed in this module which have dependency on common
module.
+
+Broker compile or evaluate expression through the interface of `FilterSpi` contained in `FilterFactory`
that manage all `FilterSpi` and also support new one to register.
+
+ - How to manage consumer's expression data
+
+Different from tag filtering, expression of SQL92 should be compiled first to check whether
is leagal and then use the complied expression to compute. This procedure is designed to take
place at broker.
+
+`ConsumerManager` manage the suscriptions of push consumer, and `ConsumerFilterManager` manage
the expression info of push consumer who wish to filter message by special language, the info
includes data version, expression, compiled expression, alive time and etc.
+
+ - How to filter message by expression
+
+I redesign the interface `getMessage` of `MessageStore` by replace the last parameter `SubscriptionData`
to `MessageFilter` that is also refactored. The purpose is to make module `rocketmq-store`
has no relation with protocol.
+
+When get message, the implementation `ExpressionMessageFilter` would check whether the message
is matched by `BitsArray` which will be refered later or evaluation, just as the mechanism
of tag filtering.
+
+ - Optimization, pre-calculate the filtering result when build consume queue
+
+It's poor performance to do filter when pull message:
+
+1. off-heap to heap, once every consumer subscribed same topic pull message.
+2. decode message properties, once every consumer subscribed same topic pull message.
+
+`BloomFilter` and pre-calculation are adopted to optimize the situation:
+
+
+![screenshot](/assets/images/blog/filter_build_cq_apache.png)
+
+1. Every consumer has been asigned some bit position of `BloomFilter` when register to broker.
+2. When broker build queue after message into `CommitLog`, the consumer's filtering result
would be calculated, and all resuls are assembled as a `BitsArray` saved in `ConsumeQueueExt`.
+3. `ConsumeQueueExt` is a store file linked to `ConsumeQueue`, `ConsumeQueue` could find
the data by the `tagsCode` whitch is already replaced by the address(for compitable, the range
is Long.MIN\_VALUE to Integer.MIN\_VALUE) generated by `ConsumeQueueExt`.
+4. `ExpressionMessageFilter` could use the `BitsArray` to check whether the message is matched.
Because of BloomFilter's collision, it also need to decode properties to do calculation for
matched message(may could be reduced by check the collision, not include in this edition).
+
+This optimization is suitable for:
+
+1. High subscription ratio.
+2. Large properties.
+
+This optimization is off default, it need set some configs when broker starting to switch
on:
+
+1. enableCalcFilterBitMap = true, means to caculate bitmap when build consume queue.
+2. expectConsumerNumUseFilter = XX(Integer, default is 32), means estimated consumer num
subscribe same topic.
+3. maxErrorRateOfBloomFilter = XX(1~100, default is 20), means error rate of bloom filter.
+4. enableConsumeQueueExt = true, means construct consume queue extend file.
+
+### Interface
+
+Only push consumer could filter message by SQL92 expression in this edition, the interface
is:
+
+`public void subscribe(final String topic, final MessageSelector messageSelector)`
+
+### Performance Comparison
+
+Configuration of broker machine: 32 core, 128G memory, 1000Mb/s full duplex dual network
+
+Producer send message with 1k body and 1k properties.
+
+Five consumers consume message through push model, every consumer would get 1/5 messages
of total.
+
+Cpu and gc frequency is about 30% lower when do pre-calculate filtering result.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/assets/images/blog/filter_build_cq_apache.png
----------------------------------------------------------------------
diff --git a/assets/images/blog/filter_build_cq_apache.png b/assets/images/blog/filter_build_cq_apache.png
new file mode 100644
index 0000000..1a0a29a
Binary files /dev/null and b/assets/images/blog/filter_build_cq_apache.png differ

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/assets/images/blog/filter_structure_apach.png
----------------------------------------------------------------------
diff --git a/assets/images/blog/filter_structure_apach.png b/assets/images/blog/filter_structure_apach.png
new file mode 100644
index 0000000..aae5172
Binary files /dev/null and b/assets/images/blog/filter_structure_apach.png differ


Mime
View raw message