title: “Filter Messages By SQL92 In RocketMQ” categories:

  • RocketMQ tags:
  • RocketMQ
  • Filter

So far, RocketMQ only support message filtering feature by TAG, but one message only can own one tag, this is too limited to meet complex business requirements.

So, we want to define and implement a reasonable filter language based on a subset of the SQL 92 expression syntax to support customized message filtering.

Why Subset Of SQL92

Let RocketMQ has the ability of message filtering is the purpose of this issue, and as we know, SQL92 is used widely and most persons are familiar with it.It‘s resonable to select it as RocketMQ’s grammar.

As I know, ActiveMQ already impllement this functionality based on JavaCC, it's simple and exntensible.So I just extract it and integrate into RocketMQ, only some grammars:

  1. Numeric comparison, like >, >=, <, <=, BETWEEN, =;
  2. Character comparison, like =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. Logical AND, logical OR, logical NOT;

Constant type are:

  1. Numeric, like 123, 3.1415;
  2. Character, like ‘abc’, must be maked with single quotes;
  3. NULL, special constant;
  4. Boolean, TRUE or FALSE;

Design

  • Structure

screenshot

  1. Broker collects the expression of consumer through heartbeat request, and saved in ConsumerFilterManager.
  2. When consumer pulls messages, broker will construct a MessageFilter(an interface) with compiled expression and subscription data to select matched messages in CommitLog.

The main logic is simple.

  • New Module, rocketmq-filter

The implementation of SQL92 language is placed in this module which have dependency on common module.

Broker compile or evaluate expression through the interface of FilterSpi contained in FilterFactory that manage all FilterSpi and also support new one to register.

  • How to manage consumer's expression data

Different from tag filtering, expression of SQL92 should be compiled first to check whether is leagal and then use the complied expression to compute. This procedure is designed to take place at broker.

ConsumerManager manage the suscriptions of push consumer, and ConsumerFilterManager manage the expression info of push consumer who wish to filter message by special language, the info includes data version, expression, compiled expression, alive time and etc.

  • How to filter message by expression

I redesign the interface getMessage of MessageStore by replace the last parameter SubscriptionData to MessageFilter that is also refactored. The purpose is to make module rocketmq-store has no relation with protocol.

When get message, the implementation ExpressionMessageFilter would check whether the message is matched by BitsArray which will be refered later or evaluation, just as the mechanism of tag filtering.

  • Optimization, pre-calculate the filtering result when build consume queue

It's poor performance to do filter when pull message:

  1. off-heap to heap, once every consumer subscribed same topic pull message.
  2. decode message properties, once every consumer subscribed same topic pull message.

BloomFilter and pre-calculation are adopted to optimize the situation:

screenshot

  1. Every consumer has been asigned some bit position of BloomFilter when register to broker.
  2. When broker build queue after message into CommitLog, the consumer's filtering result would be calculated, and all resuls are assembled as a BitsArray saved in ConsumeQueueExt.
  3. ConsumeQueueExt is a store file linked to ConsumeQueue, ConsumeQueue could find the data by the tagsCode whitch is already replaced by the address(for compitable, the range is Long.MIN_VALUE to Integer.MIN_VALUE) generated by ConsumeQueueExt.
  4. ExpressionMessageFilter could use the BitsArray to check whether the message is matched. Because of BloomFilter's collision, it also need to decode properties to do calculation for matched message(may could be reduced by check the collision, not include in this edition).

This optimization is suitable for:

  1. High subscription ratio.
  2. Large properties.

This optimization is off default, it need set some configs when broker starting to switch on:

  1. enableCalcFilterBitMap = true, means to caculate bitmap when build consume queue.
  2. expectConsumerNumUseFilter = XX(Integer, default is 32), means estimated consumer num subscribe same topic.
  3. maxErrorRateOfBloomFilter = XX(1~100, default is 20), means error rate of bloom filter.
  4. enableConsumeQueueExt = true, means construct consume queue extend file.

Interface

Only push consumer could filter message by SQL92 expression in this edition, the interface is:

public void subscribe(final String topic, final MessageSelector messageSelector)

Performance Comparison

Configuration of broker machine: 32 core, 128G memory, 1000Mb/s full duplex dual network

Producer send message with 1k body and 1k properties.

Five consumers consume message through push model, every consumer would get 1/5 messages of total.

Cpu and gc frequency is about 30% lower when do pre-calculate filtering result.