The result set of the downsampling query will inherit GroupByEngineDataSet
, this class contains the following fields:
The following two fields are for the entire query, and the time period is left closed and right open, which is [startTime, endTime)
:
The following fields are for the current segment, and the time period is left closed and right open, which is [startTime, endTime)
:
The core method of GroupByEngineDataSet
is very easy. First, determine if there is a next segment based on whether there is a cached time period, and return true
; if not, calculate the segmentation start time and increase usedIndex
by 1. If the segment start time has exceeded the query end time, return false
; otherwise, calculate the query end time, set hasCachedTimeInterval
to true
, and return true
:
protected boolean hasNextWithoutConstraint() { if (hasCachedTimeInterval) { return true; } curStartTime = usedIndex * slidingStep + startTime; usedIndex++; if (curStartTime < endTime) { hasCachedTimeInterval = true; curEndTime = Math.min(curStartTime + interval, endTime); return true; } else { return false; } }
The downsampling query logic without value filter is mainly in the GroupByWithoutValueFilterDataSet
class, which inherits GroupByEngineDataSet
.
This class has the following key fields:
private Map <Path, GroupByExecutor> pathExecutors classifies aggregate functions for the same Path
and encapsulates them as GroupByExecutor
, GroupByExecutor
encapsulates the data calculation logic and method of each Path, which will be described later
private TimeRange timeRange encapsulates the time interval of each calculation into an object, which is used to determine whether Statistics can directly participate in the calculation
private Filter timeFilter Generates a user-defined query interval as a Filter object, which is used to filter the available files, chunks, and pages.
First, in the initialization initGroupBy()
method, the timeFilter
is calculated based on the expression, and GroupByExecutor
is generated for each path
.
First, in the initialization initGroupBy()
method, the timeFilter
is calculated based on the expression, and GroupByExecutor
is generated for each path
. The following method is used to convert the result list into a RowRecord. Note that when there are no results in the list, add null
to the RowRecord:
for (AggregateResult res : fields) { if (res == null) { record.addField(null); continue; } record.addField(res.getResult(), res.getResultDataType()); }
Encapsulating the calculation method of all aggregate functions under the same path, this class has the following key fields:
SeriesAggregateReader
used to read the current Path
dataReader
is a batch, and it is likely to exceed the current time period. This BatchData
will be cached for next usePath
, for example: select count(a), sum(a), avg(b)
, count
and sum
can be stored together. The Integer
on the right is used to convert the result set to the order of the user query before converting it to RowRecord.//Read data from the reader and calculate the main method of this class. private List<Pair<AggregateResult, Integer>> calcResult() throws IOException, QueryProcessException; //Add aggregation operation for current path private void addAggregateResult(AggregateResult aggrResult, int index); //Determine whether the current path has completed all aggregation calculations private boolean isEndCalc(); //Calculate results from BatchData that did not run out of cache last calculation private boolean calcFromCacheData() throws IOException; //Calculation using BatchData private void calcFromBatch(BatchData batchData) throws IOException; //Calculate results directly using Page or Chunk's Statistics private void calcFromStatistics(Statistics statistics) throws QueryProcessException; //Clear all calculation results private void resetAggregateResults(); //Iterate through and calculate the data in the page private boolean readAndCalcFromPage() throws IOException, QueryProcessException;
In GroupByExecutor
, because different aggregate functions of the same path use the same data, the entry method calcResult
is responsible for reading all the data of the Path
. The retrieved data then calls the calcFromBatch
method to complete the calculation of BatchData
through all the aggregate functions.
The calcResult
method returns all AggregateResult under the current Path and the position of the current aggregated value in the user query order. Its main logic is:
//Calculate the data left over from the last time, and end the calculation if you can get the results directly if (calcFromCacheData()) { return results; } //Because a chunk contains multiple pages, the page of the current chunk must be used up before the next chunk is opened. if (readAndCalcFromPage()) { return results; } //If the remaining data is calculated, open a new chunk to continue the calculation. while (reader.hasNextChunk()) { Statistics chunkStatistics = reader.currentChunkStatistics(); // Determine if Statistics is available and perform calculations .... // Skip current chunk reader.skipCurrentChunk(); // End calculation if all results have been obtained if (isEndCalc()) { return true; } continue; } //If you cannot use chunkStatistics, you need to use page data to calculate if (readAndCalcFromPage()) { return results; } }
The readAndCalcFromPage
method is to obtain the page data from the currently opened chunk and calculate the aggregate result. Returns true when all calculations are completed, otherwise returns false. The main logic:
while (reader.hasNextPage()) { Statistics pageStatistics = reader.currentPageStatistics(); //PageStatistics can only be used if the page does not intersect with other pages if (pageStatistics != null) { // Determine if Statistics is available and perform calculations .... // Skip current page reader.skipCurrentPage(); // End calculation if all results have been obtained if (isEndCalc()) { return true; } continue; } } // When Statistics is not available, you can only fetch all data for calculation BatchData batchData = reader.nextPage(); if (batchData == null || !batchData.hasCurrent()) { continue; } // If the page just opened exceeds the time range, the data retrieved is cached and the calculation is directly ended. if (batchData.currentTime() >= curEndTime) { preCachedData = batchData; return true; } //Perform calculations calcFromBatch(batchData); ... }
The calcFromBatch
method is to traverse all the aggregate functions to calculate the retrieved BatchData. The main logic is:
for (Pair<AggregateResult, Integer> result : results) { //If a function has already been calculated, it will not be calculated, such as the minimum calculation. if (result.left.isCalculatedAggregationResult()) { continue; } // Perform calculations .... } //Determine if the data in the current batchdata can still be used next time, if it can be added to the cache if (batchData.getMaxTimestamp() >= curEndTime) { preCachedData = batchData; }
The downsampling query logic with value filtering conditions is mainly in the GroupByWithValueFilterDataSet
class, which inherits GroupByEngineDataSet
.
This class has the following key fields:
First, in the initialization initGroupBy ()
method, create a timestampGenerator
based on the expression; then create a SeriesReaderByTimestamp
for each time series and place it in the allDataReaderList
list. After initialization is complete, call the nextWithoutConstraint ()
method to update the result. If timestamp is cached for the next group by partition and the time meets the requirements, add it to timestampArray
, otherwise return the aggregateResultList
result directly; if timestamp is not cached for the next group by partition, use timestampGenerator
to traverse:
while (timestampGenerator.hasNext()) { // Call constructTimeArrayForOneCal () method to get a list of timestamp timeArrayLength = constructTimeArrayForOneCal(timestampArray, timeArrayLength); // Call the updateResultUsingTimestamps () method to calculate the aggregate result using the timestamp list for (int i = 0; i < paths.size(); i++) { aggregateResultList.get(i).updateResultUsingTimestamps( timestampArray, timeArrayLength, allDataReaderList.get(i)); } timeArrayLength = 0; // Determine if it is over if (timestamp >= curEndTime) { hasCachedTimestamp = true; break; } }
The constructTimeArrayForOneCal ()
method traverses timestampGenerator to build a list of timestamps:
for (int cnt = 1; cnt < timeStampFetchSize && timestampGenerator.hasNext(); cnt++) { timestamp = timestampGenerator.next(); if (timestamp < curEndTime) { timestampArray[timeArrayLength++] = timestamp; } else { hasCachedTimestamp = true; break; } }
After down-frequency query, we could also to count the total number of points of
each node at the given level in current Metadata Tree.
The logic is in the GroupByTimeDataSet
class.
In the beginning, get the final paths group by level and the origin path index to final path.
For example, we could get final path
root.sg1
byroot.sg1.d1.s0,root.sg1.d2.s1
andlevel=1
.
Then, get the down-frequency query result: RowRecord.
Finally, merge each RowRecord to NewRecord, which has fields like <final path, count>.
For example, we will get new RowRecord
<root.sg1,7>
by<root.sg1.d1.s0, 3>, <root.sg1.d2.s1, 4>
and level=1.
Attention:
- only support count aggregation
- root's level == 0