聚合查询的主要逻辑在 AggregateExecutor
对于不带值过滤条件的聚合查询,通过 executeWithoutValueFilter()
方法获得结果并构建 dataSet。首先使用 mergeSameSeries()
方法将对于相同时间序列的聚合查询合并,例如:如果需要计算count(s1), sum(s2), count(s3), sum(s1),即需要计算s1的两个聚合值,那么将会得到pathToAggrIndexesMap结果为:s1 -> 0, 3; s2 -> 1; s3 -> 2。
那么将会得到 pathToAggrIndexesMap
,其中每一个 entry 都是一个 series 的聚合查询,因此可以通过调用 groupAggregationsBySeries()
方法计算出其聚合值 aggregateResults
。在最后创建结果集之前,需要将其顺序还原为用户查询的顺序。最后使用 constructDataSet()
方法创建结果集并返回。
下面详细讲解 groupAggregationsBySeries()
方法。首先创建一个 IAggregateReader
:
IAggregateReader seriesReader = new SeriesAggregateReader( pathToAggrIndexes.getKey(), tsDataType, context, QueryResourceManager.getInstance() .getQueryDataSource(seriesPath, context, timeFilter), timeFilter, null);
对于每一个 entry(即series),首先为其每一种聚合查询创建一个聚合结果 AggregateResult
,同时维护一个布尔值列表 isCalculatedList
,对应每一个 AggregateResult
是否已经计算完成,并记录需要剩余计算的聚合函数数目 remainingToCalculate
。布尔值列表和这个计数值将会使得某些聚合函数(如 FIRST_VALUE
)在获得结果后,不需要再继续进行整个循环过程。
接下来,按照5.2节所介绍的 aggregateReader
使用方法,更新 AggregateResult
:
while (aggregateReader.hasNextChunk()) { if (aggregateReader.canUseCurrentChunkStatistics()) { Statistics chunkStatistics = aggregateReader.currentChunkStatistics(); // do some aggregate calculation using chunk statistics ... aggregateReader.skipCurrentChunk(); continue; } while (aggregateReader.hasNextPage()) { if (aggregateReader.canUseCurrentPageStatistics()) { Statistics pageStatistic = aggregateReader.currentPageStatistics(); // do some aggregate calculation using page statistics ... aggregateReader.skipCurrentPage(); continue; } else { BatchData batchData = aggregateReader.nextPage(); // do some aggregate calculation using batch data ... } } }
需要注意的是,在对于每一个result进行更新之前,需要首先判断其是否已经被计算完(利用 isCalculatedList
列表);每一次更新后,调用 isCalculatedAggregationResult()
方法同时更新列表中的布尔值。如果列表中所有值均为true,即 remainingToCalculate
值为0,证明所有聚合函数结果均已计算完,可以返回。
if (Boolean.FALSE.equals(isCalculatedList.get(i))) { AggregateResult aggregateResult = aggregateResultList.get(i); ... // 更新 if (aggregateResult.isCalculatedAggregationResult()) { isCalculatedList.set(i, true); remainingToCalculate--; if (remainingToCalculate == 0) { return aggregateResultList; } } }
在使用 overlapedPageData
进行更新时,由于获得每一个聚合函数结果都会遍历这个 batchData,因此需要调用 resetBatchData()
方法将指针指向其开始位置,使得下一个函数可以遍历。
对于带值过滤条件的聚合查询,通过 executeWithoutValueFilter()
方法获得结果并构建 dataSet。首先根据表达式创建 timestampGenerator
,然后为每一个时间序列创建一个 SeriesReaderByTimestamp
,放到 readersOfSelectedSeries
列表中;为每一个查询创建一个聚合结果 AggregateResult
,放到 aggregateResults
列表中。
初始化完成后,调用 aggregateWithValueFilter()
方法更新结果:
while (timestampGenerator.hasNext()) { // 生成timestamps long[] timeArray = new long[aggregateFetchSize]; int timeArrayLength = 0; for (int cnt = 0; cnt < aggregateFetchSize; cnt++) { if (!timestampGenerator.hasNext()) { break; } timeArray[timeArrayLength++] = timestampGenerator.next(); } // 利用timestamps计算聚合结果 for (int i = 0; i < readersOfSelectedSeries.size(); i++) { aggregateResults.get(i).updateResultUsingTimestamps(timeArray, timeArrayLength, readersOfSelectedSeries.get(i)); } }
对于count聚合查询,我们也可以使用level关键字来进一步汇总点数。
这个逻辑在 AggregationExecutor
类里。
首先,把所有涉及到的时序按level来进行汇集,最后的路径。
例如把root.sg1.d1.s0,root.sg1.d2.s1按level=1汇集成root.sg1。
然后调用上述的聚合逻辑求出所有时序的总点数信息,这个会返回RowRecord数据结构。
最后,把聚合查询返回的RowRecord按上述的final paths,进行累加,组合成新的RowRecord。
例如,把《root.sg1.d1.s0,3》,《root.sg1.d2.s1,4》聚合成《root.sg1,7》
注意:
- 这里只支持count操作
- root的层级level=0