title: “Streaming Aggregation Optimization” nav-parent_id: tableapi nav-pos: 110

As it is known to all, SQL is the de-facto standard for data analytics. For streaming analytics, SQL would enable a larger pool of people to specify applications on data streams in less time, which benefits from many features of SQL, such as it is declarative and can be optimized effectively. In this page, we will introduce some useful optimizations of streaming aggregation which bring great improvement in the performance of streaming processing.

  • This will be replaced by the TOC {:toc}

GroupBy Aggregation Optimization

Generally, the group aggregate function processes input records one by one, i.e., getting accumulator from state, accumulating record to accumulator, and writing accumulator back into state. This process pattern may incur much overhead of state. Besides, it is very common to encounter data skew in production which is annoying because skew has a great impact on the performance of stream processing. Thus, some effective measures are proposed to optimize group aggregation.

MiniBatch Aggregation

The main idea of MiniBatch aggregation is caching a bundle of inputs in a buffer inside the aggregation operator. When the bundle of inputs is triggered to process, only one state operation is needed for the inputs with same key, which can reduce the state overhead significantly. The follow figure provide a visual representation of MiniBatch Aggregation.

MiniBatch optimization is disabled by default. To enable this optimization, the following configuration should be properly set.

Local-Global Aggregation

Local-Global is proposed to solve data skew problem by dividing an group aggregation into two stages, that is doing local aggregation in upstream firstly, and followed by global aggregation in downstream, which is similar to Combine + Reduce pattern in MapReduce. For example, considering the following SQL:

{% highlight sql %} SELECT color, sum(id) FROM T GROUP BY color {% endhighlight %}

It is possible that the records on stream are skewed, thus some instances of aggregation operator have to process much more records than others, which leads to hotspot. As it is shown, large number of inputs is accumulated into a few accumulators by local aggregation, which can release the burden of global aggregation in downstream. See this figure for better understanding of Local-Global pattern.

Local-Global Aggregation is enabled by default as long as MiniBatch optimization is turned on. The related configuration is shown below.

Distinct-agg split

Local-Global optimization is effective to eliminate data skew for normal aggregation, such as SUM, COUNT, MAX, MIN. But its performance is not satisfactory when dealing with distinct aggregation. Distinct-agg split is thus proposed to solve this problem by splitting a distinct group aggregation into two layers of aggregation automatically(Agg1 and Agg2, the splitting results of built-in aggregate functions are shown in the following table). The records under a hot key are breaked up by adding a bucket number of the distinct

key as the new primary key together in the inner aggregation. The bucket number is calculated as: hash_code(distinct_key) % BUCKET_NUM. See the example below for better understanding.

{% highlight sql %} --- original SQL --- select color, count(distinct id) from T group by color

--- Distinct-agg split result is Equivalent to the following SQL --- select color, sum(cnt) from ( select color, count(distinct id) as cnt from T group by color, mod(hash_code(id), 1024) ) group by color {% endhighlight %}

The execution graph of the upper SQL with Local-Global or Distinct-agg split enabled is shown as below. It can be seen that data under a hot key are evenly redistributed and accumulated among operator instances of AGG1, thus hotspot is eliminated effectively.

Distinct-Agg Split optimization is disabled by default. It is recommended to be enabled only when there is data skew problem in distinct aggregation, because it may incur extra overheads, such as network shuffle. The related configuration is shown below.

Incremental Aggregation

When both Local-Global and Distinct-agg split are enabled, a distinct aggregation will be optimized into four aggregations, i.e., Local-Agg1, Global-Agg1, Local-Agg2 and Global-Agg2 (Agg1 and Agg2 are results of splitting a distinct Aggregation). As a result, additional resources and state overhead is introduced. Incremental optimization is proposed to merge Global-Agg1 and Local-Agg2 into a equivalent Incremental-Agg to solve this problem.

Considering the following SQL: {% highlight sql %} SELECT color, count(distinct id), count(id) FROM T GROUP BY color {% endhighlight %}

The execution graph with Incremental optimization enabled or disabled is shown as below:

Incremental Aggregation is enabled by default when both Local-Global Aggregation and Distinct-Agg Split optimization are enabled. The following configuration is used to turn on/off this optimization.

{% top %}