Aggregation query

The main logic of the aggregation query is in AggregateExecutor

  • org.apache.iotdb.db.query.executor.AggregationExecutor

Aggregation query without value filter

For aggregate queries without value filters, the results are obtained by the executeWithoutValueFilter() method and a dataSet is constructed. First use the mergeSameSeries() method to merge aggregate queries for the same time series. For example: if you need to calculate count(s1), sum(s2), count(s3), sum(s1), you need to calculate two aggregation values of s1, then the pathToAggrIndexesMap result will be: s1-> 0, 3; s2-> 1; s3-> 2.

Then you will get pathToAggrIndexesMap, where each entry is an aggregate query of series, so you can calculate its aggregate value aggregateResults by calling the groupAggregationsBySeries() method. Before you finally create the result set, you need to restore its order to the order of the user query. Finally use the constructDataSet() method to create a result set and return it.

The groupAggregationsBySeries () method is explained in detail below. First create an IAggregateReader:

IAggregateReader seriesReader = new SeriesAggregateReader(
        pathToAggrIndexes.getKey(), tsDataType, context, QueryResourceManager.getInstance()
        .getQueryDataSource(seriesPath, context, timeFilter), timeFilter, null);

For each entry (that is, series), first create an aggregate result AggregateResult for each aggregate query. Maintain a boolean list isCalculatedList, corresponding to whether each AggregateResult has been calculated. Record the remaining number of functions to be calculated in remainingToCalculate. The list of boolean values and this count value will make some aggregate functions (such as FIRST_VALUE) not need to continue the entire loop process after obtaining the result.

Next, update AggregateResult according to the usage method of aggregateReader introduced in Section 5.2:

while (aggregateReader.hasNextChunk()) {
  if (aggregateReader.canUseCurrentChunkStatistics()) {
    Statistics chunkStatistics = aggregateReader.currentChunkStatistics();
    
    // do some aggregate calculation using chunk statistics
    ...
    
    aggregateReader.skipCurrentChunk();
    continue;
  }
	  
  while (aggregateReader.hasNextPage()) {
	 if (aggregateReader.canUseCurrentPageStatistics()) {
	   Statistics pageStatistic = aggregateReader.currentPageStatistics();
	   
	   // do some aggregate calculation using page statistics
      ...
	   
	   aggregateReader.skipCurrentPage();
	   continue;
	 } else {
	 	BatchData batchData = aggregateReader.nextPage();
	 	// do some aggregate calculation using batch data
      ...
	 }	 
  }
}

It should be noted that before updating each result, you need to first determine whether it has been calculated (using the isCalculatedList list); after each update, call the isCalculatedAggregationResult () method to update the boolean values in the list . If all values in the list are true, that is, the value of remainingToCalculate is 0, it proves that all aggregate function results have been calculated and can be returned.

if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
  AggregateResult aggregateResult = aggregateResultList.get(i);
  ... // update
  if (aggregateResult.isCalculatedAggregationResult()) {
    isCalculatedList.set(i, true);
    remainingToCalculate--;
    if (remainingToCalculate == 0) {
      return aggregateResultList;
    }
  }
}

When using overlapedPageData to update, since each batch function result will traverse this batchData, you need to call theresetBatchData ()method to point the pointer to its starting position, so that the next function can traverse.

Aggregated query with value filter

For an aggregate query with a value filter, obtain the results through the executeWithoutValueFilter() method and build a dataSet. First create a timestampGenerator based on the expression, then create a SeriesReaderByTimestamp for each time series and place it in the readersOfSelectedSeries list; create an aggregate result for each query as AggregateResult, and place it in the aggregateResults list.

After initialization is complete, call the aggregateWithValueFilter() method to update the result:

while (timestampGenerator.hasNext()) {
  // Generate timestamps
  long[] timeArray = new long[aggregateFetchSize];
  int timeArrayLength = 0;
  for (int cnt = 0; cnt < aggregateFetchSize; cnt++) {
    if (!timestampGenerator.hasNext()) {
      break;
    }
    timeArray[timeArrayLength++] = timestampGenerator.next();
  }

  // Calculate aggregate results using timestamps
  for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
    aggregateResults.get(i).updateResultUsingTimestamps(timeArray, timeArrayLength,
      readersOfSelectedSeries.get(i));
    }
  }

Aggregated query with level

After aggregated query, we could also to count the total number of points of

each node at the given level in current Metadata Tree.

The logic is in the AggregationExecutor class.

  1. In the beginning, get the final paths group by level and the origin path index to final path.

    For example, we could get final path root.sg1 by root.sg1.d1.s0,root.sg1.d2.s1 and level=1.

  2. Then, get the aggregated query result: RowRecord.

  3. Finally, merge each RowRecord to NewRecord, which has fields like <final path, count>.

    For example, we will get new RowRecord <root.sg1,7> by <root.sg1.d1.s0, 3>, <root.sg1.d2.s1, 4> and level=1.

Attention:

  1. only support count aggregation
  2. root's level == 0