title: “Filter Messages By SQL92 In RocketMQ” categories:
So far, RocketMQ only support message filtering feature by TAG
, but one message only can own one tag, this is too limited to meet complex business requirements.
So, we want to define and implement a reasonable filter language based on a subset of the SQL 92 expression syntax to support customized message filtering.
Let RocketMQ has the ability of message filtering is the purpose of this issue, and as we know, SQL92 is used widely and most persons are familiar with it.It‘s resonable to select it as RocketMQ’s grammar.
As I know, ActiveMQ already impllement this functionality based on JavaCC, it's simple and exntensible.So I just extract it and integrate into RocketMQ, only some grammars:
>
, >=
, <
, <=
, BETWEEN
, =
;=
, <>
, IN
;IS NULL
or IS NOT NULL
;AND
, logical OR
, logical NOT
;Constant type are:
NULL
, special constant;TRUE
or FALSE
;ConsumerFilterManager
.MessageFilter
(an interface) with compiled expression and subscription data to select matched messages in CommitLog
.The main logic is simple.
The implementation of SQL92 language is placed in this module which have dependency on common module.
Broker compile or evaluate expression through the interface of FilterSpi
contained in FilterFactory
that manage all FilterSpi
and also support new one to register.
Different from tag filtering, expression of SQL92 should be compiled first to check whether is leagal and then use the complied expression to compute. This procedure is designed to take place at broker.
ConsumerManager
manage the suscriptions of push consumer, and ConsumerFilterManager
manage the expression info of push consumer who wish to filter message by special language, the info includes data version, expression, compiled expression, alive time and etc.
I redesign the interface getMessage
of MessageStore
by replace the last parameter SubscriptionData
to MessageFilter
that is also refactored. The purpose is to make module rocketmq-store
has no relation with protocol.
When get message, the implementation ExpressionMessageFilter
would check whether the message is matched by BitsArray
which will be refered later or evaluation, just as the mechanism of tag filtering.
It's poor performance to do filter when pull message:
BloomFilter
and pre-calculation are adopted to optimize the situation:
BloomFilter
when register to broker.CommitLog
, the consumer's filtering result would be calculated, and all resuls are assembled as a BitsArray
saved in ConsumeQueueExt
.ConsumeQueueExt
is a store file linked to ConsumeQueue
, ConsumeQueue
could find the data by the tagsCode
whitch is already replaced by the address(for compitable, the range is Long.MIN_VALUE to Integer.MIN_VALUE) generated by ConsumeQueueExt
.ExpressionMessageFilter
could use the BitsArray
to check whether the message is matched. Because of BloomFilter's collision, it also need to decode properties to do calculation for matched message(may could be reduced by check the collision, not include in this edition).This optimization is suitable for:
This optimization is off default, it need set some configs when broker starting to switch on:
Only push consumer could filter message by SQL92 expression in this edition, the interface is:
public void subscribe(final String topic, final MessageSelector messageSelector)
Configuration of broker machine: 32 core, 128G memory, 1000Mb/s full duplex dual network
Producer send message with 1k body and 1k properties.
Five consumers consume message through push model, every consumer would get 1/5 messages of total.
Cpu and gc frequency is about 30% lower when do pre-calculate filtering result.