| == Abstract Syntax Tree == |
| LogicalProject(category=[$0], shopId=[$1], cnt_num=[$2], rank_num=[$3]) |
| +- LogicalFilter(condition=[<=($3, 2)]) |
| +- LogicalProject(category=[$0], shopId=[$1], cnt_num=[$2], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) |
| +- LogicalTableScan(table=[[builtin, default, MyView]]) |
| |
| LogicalProject(category=[$0], shopId=[$1], max_num=[$2], rank_num=[$3]) |
| +- LogicalFilter(condition=[<=($3, 2)]) |
| +- LogicalProject(category=[$0], shopId=[$1], max_num=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) |
| +- LogicalTableScan(table=[[builtin, default, MyView]]) |
| |
| == Optimized Logical Plan == |
| StreamExecGroupAggregate(groupBy=[category, shopId], select=[category, shopId, COUNT(num) AS cnt_num, MAX(num) AS max_num], retract=[false], accMode=[Acc], reuse_id=[1]) |
| +- StreamExecExchange(distribution=[hash[category, shopId]], retract=[true], accMode=[Acc]) |
| +- StreamExecDataStreamScan(table=[[builtin, default, _DataStreamTable_0]], retract=[true], accMode=[Acc]) |
| |
| StreamExecSink(fields=[category, shopId, cnt_num, rank_num], retract=[false], accMode=[AccRetract]) |
| +- StreamExecCalc(select=[category, shopId, cnt_num, w0$o0], retract=[true], accMode=[AccRetract]) |
| +- StreamExecRank(rankFunction=[ROW_NUMBER], partitionBy=[category], orderBy=[cnt_num DESC], rankRange=[rankStart=1, rankEnd=2], strategy=[UpdateFastRank[0,1]], select=[*, rowNum], retract=[true], accMode=[AccRetract]) |
| +- StreamExecExchange(distribution=[hash[category]], retract=[false], accMode=[Acc]) |
| +- Reused(reference_id=[1]) |
| |
| StreamExecSink(fields=[category, shopId, max_num, rank_num], retract=[false], accMode=[AccRetract]) |
| +- StreamExecCalc(select=[category, shopId, max_num, w0$o0], retract=[true], accMode=[AccRetract]) |
| +- StreamExecRank(rankFunction=[ROW_NUMBER], partitionBy=[category], orderBy=[max_num DESC], rankRange=[rankStart=1, rankEnd=2], strategy=[UpdateFastRank[0,1]], select=[*, rowNum], retract=[true], accMode=[AccRetract]) |
| +- StreamExecExchange(distribution=[hash[category]], retract=[false], accMode=[Acc]) |
| +- Reused(reference_id=[1]) |
| |
| == Physical Execution Plan == |
| : Data Source |
| content : collect elements with CollectionInputFormat |
| |
| : Operator |
| content : SourceConversion(table:[builtin, default, _DataStreamTable_0], fields:(category, shopId, num)) |
| ship_strategy : FORWARD |
| |
| : Operator |
| content : GroupAggregate(groupBy: (category, shopId), select: (category, shopId, COUNT(num) AS cnt_num, MAX(num) AS max_num)) |
| ship_strategy : HASH |
| |
| : Operator |
| content : UpdateFastRank[0,1](orderBy: (cnt_num DESC), partitionBy: (category), *, rowNum, rankStart=1, rankEnd=2) |
| ship_strategy : HASH |
| |
| : Operator |
| content : Calc(select: (category, shopId, cnt_num, w0$o0)) |
| ship_strategy : FORWARD |
| |
| : Operator |
| content : SinkConversion to Tuple2 |
| ship_strategy : FORWARD |
| |
| : Operator |
| content : Map |
| ship_strategy : FORWARD |
| |
| : Operator |
| content : UpdateFastRank[0,1](orderBy: (max_num DESC), partitionBy: (category), *, rowNum, rankStart=1, rankEnd=2) |
| ship_strategy : HASH |
| |
| : Operator |
| content : Calc(select: (category, shopId, max_num, w0$o0)) |
| ship_strategy : FORWARD |
| |
| : Operator |
| content : SinkConversion to Tuple2 |
| ship_strategy : FORWARD |
| |
| : Operator |
| content : Map |
| ship_strategy : FORWARD |
| |
| : Data Sink |
| content : Sink: TestingRetractTableSink |
| ship_strategy : FORWARD |
| |
| : Data Sink |
| content : Sink: TestingRetractTableSink |
| ship_strategy : FORWARD |
| |