Downsampling query

  • org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet

The result set of the downsampling query will inherit GroupByEngineDataSet, this class contains the following fields:

  • protected long queryId
  • private long interval
  • private long slidingStep

The following two fields are for the entire query, and the time period is left closed and right open, which is [startTime, endTime):

  • private long startTime
  • private long endTime

The following fields are for the current segment, and the time period is left closed and right open, which is [startTime, endTime):

  • protected long curStartTime;
  • protected long curEndTime;
  • private int usedIndex;
  • protected boolean hasCachedTimeInterval;

The core method of GroupByEngineDataSet is very easy. First, determine if there is a next segment based on whether there is a cached time period, and return true; if not, calculate the segmentation start time and increase usedIndex by 1. If the segment start time has exceeded the query end time, return false; otherwise, calculate the query end time, set hasCachedTimeInterval to true, and return true:

protected boolean hasNextWithoutConstraint() {
  if (hasCachedTimeInterval) {
    return true;
  }

  curStartTime = usedIndex * slidingStep + startTime;
  usedIndex++;
  if (curStartTime < endTime) {
    hasCachedTimeInterval = true;
    curEndTime = Math.min(curStartTime + interval, endTime);
    return true;
  } else {
    return false;
  }
}

Downsampling query without value filter

The downsampling query logic without value filter is mainly in the GroupByWithoutValueFilterDataSet class, which inherits GroupByEngineDataSet.

This class has the following key fields:

  • private Map <Path, GroupByExecutor> pathExecutors classifies aggregate functions for the same Path and encapsulates them as GroupByExecutor, GroupByExecutor encapsulates the data calculation logic and method of each Path, which will be described later

  • private TimeRange timeRange encapsulates the time interval of each calculation into an object, which is used to determine whether Statistics can directly participate in the calculation

  • private Filter timeFilter Generates a user-defined query interval as a Filter object, which is used to filter the available files, chunks, and pages.

First, in the initialization initGroupBy() method, the timeFilter is calculated based on the expression, and GroupByExecutor is generated for each path.

First, in the initialization initGroupBy() method, the timeFilter is calculated based on the expression, and GroupByExecutor is generated for each path. The following method is used to convert the result list into a RowRecord. Note that when there are no results in the list, add null to the RowRecord:

for (AggregateResult res : fields) {
  if (res == null) {
    record.addField(null);
    continue;
  }
  record.addField(res.getResult(), res.getResultDataType());
}

GroupByExecutor

Encapsulating the calculation method of all aggregate functions under the same path, this class has the following key fields:

  • private IAggregateReader reader the SeriesAggregateReader used to read the current Path data
  • private BatchData preCachedData Every time the data read from Reader is a batch, and it is likely to exceed the current time period. This BatchData will be cached for next use
  • private List<Pair<AggregateResult, Integer>> results stores all aggregation methods in the current Path, for example: select count(a), sum(a), avg(b), count and sum can be stored together. The Integer on the right is used to convert the result set to the order of the user query before converting it to RowRecord.

Main method

//Read data from the reader and calculate the main method of this class.
private List<Pair<AggregateResult, Integer>> calcResult() throws IOException, QueryProcessException;

//Add aggregation operation for current path
private void addAggregateResult(AggregateResult aggrResult, int index);

//Determine whether the current path has completed all aggregation calculations
private boolean isEndCalc();

//Calculate results from BatchData that did not run out of cache last calculation
private boolean calcFromCacheData() throws IOException;

//Calculation using BatchData
private void calcFromBatch(BatchData batchData) throws IOException;

//Calculate results directly using Page or Chunk's Statistics
private void calcFromStatistics(Statistics statistics) throws QueryProcessException;

//Clear all calculation results
private void resetAggregateResults();

//Iterate through and calculate the data in the page
private boolean readAndCalcFromPage() throws IOException, QueryProcessException;

In GroupByExecutor, because different aggregate functions of the same path use the same data, the entry method calcResult is responsible for reading all the data of the Path. The retrieved data then calls the calcFromBatch method to complete the calculation of BatchData through all the aggregate functions.

The calcResult method returns all AggregateResult under the current Path and the position of the current aggregated value in the user query order. Its main logic is:

//Calculate the data left over from the last time, and end the calculation if you can get the results directly
if (calcFromCacheData()) {
    return results;
}

//Because a chunk contains multiple pages, the page of the current chunk must be used up before the next chunk is opened.
if (readAndCalcFromPage()) {
    return results;
}

//If the remaining data is calculated, open a new chunk to continue the calculation.
while (reader.hasNextChunk()) {
    Statistics chunkStatistics = reader.currentChunkStatistics();
      // Determine if Statistics is available and perform calculations
       ....
      // Skip current chunk
      reader.skipCurrentChunk();
      // End calculation if all results have been obtained
      if (isEndCalc()) {
        return true;
      }
      continue;
    }
    //If you cannot use chunkStatistics, you need to use page data to calculate
    if (readAndCalcFromPage()) {
      return results;
    }
}

The readAndCalcFromPage method is to obtain the page data from the currently opened chunk and calculate the aggregate result. Returns true when all calculations are completed, otherwise returns false. The main logic:

while (reader.hasNextPage()) {
    Statistics pageStatistics = reader.currentPageStatistics();
    //PageStatistics can only be used if the page does not intersect with other pages
    if (pageStatistics != null) {
        // Determine if Statistics is available and perform calculations
        ....
        // Skip current page
        reader.skipCurrentPage();
        // End calculation if all results have been obtained
        if (isEndCalc()) {
          return true;
        }
        continue;
      }
    }
    // When Statistics is not available, you can only fetch all data for calculation
    BatchData batchData = reader.nextPage();
    if (batchData == null || !batchData.hasCurrent()) {
      continue;
    }
    // If the page just opened exceeds the time range, the data retrieved is cached and the calculation is directly ended.
    if (batchData.currentTime() >= curEndTime) {
      preCachedData = batchData;
      return true;
    }
    //Perform calculations
    calcFromBatch(batchData);
    ...
}

The calcFromBatch method is to traverse all the aggregate functions to calculate the retrieved BatchData. The main logic is:

for (Pair<AggregateResult, Integer> result : results) {
    //If a function has already been calculated, it will not be calculated, such as the minimum calculation.
    if (result.left.isCalculatedAggregationResult()) {
      continue;
    }
    // Perform calculations
    ....
}
//Determine if the data in the current batchdata can still be used next time, if it can be added to the cache
if (batchData.getMaxTimestamp() >= curEndTime) {
    preCachedData = batchData;
}

Aggregated query with value filter

The downsampling query logic with value filtering conditions is mainly in the GroupByWithValueFilterDataSet class, which inherits GroupByEngineDataSet.

This class has the following key fields:

  • private List<IReaderByTimestamp> allDataReaderList
  • private GroupByPlan groupByTimePlan
  • private TimeGenerator timestampGenerator
  • private long timestamp is used to cache timestamp for the next group by partition
  • private boolean hasCachedTimestamp used to determine whether there is a timestamp cache for the next group by partition
  • private int timeStampFetchSize is the size of the group by calculating the batch

First, in the initialization initGroupBy ()method, create a timestampGenerator based on the expression; then create a SeriesReaderByTimestamp for each time series and place it in the allDataReaderList list. After initialization is complete, call the nextWithoutConstraint ()method to update the result. If timestamp is cached for the next group by partition and the time meets the requirements, add it to timestampArray, otherwise return the aggregateResultList result directly; if timestamp is not cached for the next group by partition, use timestampGenerator to traverse:

while (timestampGenerator.hasNext()) {
  // Call constructTimeArrayForOneCal () method to get a list of timestamp
  timeArrayLength = constructTimeArrayForOneCal(timestampArray, timeArrayLength);

  // Call the updateResultUsingTimestamps () method to calculate the aggregate result using the timestamp list
  for (int i = 0; i < paths.size(); i++) {
    aggregateResultList.get(i).updateResultUsingTimestamps(
        timestampArray, timeArrayLength, allDataReaderList.get(i));
  }

  timeArrayLength = 0;
  // Determine if it is over
  if (timestamp >= curEndTime) {
    hasCachedTimestamp = true;
    break;
  }
}

The constructTimeArrayForOneCal ()method traverses timestampGenerator to build a list of timestamps:

for (int cnt = 1; cnt < timeStampFetchSize && timestampGenerator.hasNext(); cnt++) {
  timestamp = timestampGenerator.next();
  if (timestamp < curEndTime) {
    timestampArray[timeArrayLength++] = timestamp;
  } else {
    hasCachedTimestamp = true;
    break;
  }
}

Aggregated query with level

After down-frequency query, we could also to count the total number of points of

each node at the given level in current Metadata Tree.

The logic is in the GroupByTimeDataSet class.

  1. In the beginning, get the final paths group by level and the origin path index to final path.

    For example, we could get final path root.sg1 by root.sg1.d1.s0,root.sg1.d2.s1 and level=1.

  2. Then, get the down-frequency query result: RowRecord.

  3. Finally, merge each RowRecord to NewRecord, which has fields like <final path, count>.

    For example, we will get new RowRecord <root.sg1,7> by <root.sg1.d1.s0, 3>, <root.sg1.d2.s1, 4> and level=1.

Attention:

  1. only support count aggregation
  2. root's level == 0