最近时间戳 Last 查询

Last 查询的主要逻辑在 LastQueryExecutor

  • org.apache.iotdb.db.query.executor.LastQueryExecutor

Last查询对每个指定的时间序列执行calculateLastPairForOneSeries方法。

读取MNode缓存数据

我们在需要查询的时间序列所对应的MNode结构中添加Last数据缓存。calculateLastPairForOneSeries方法对于某个时间序列的Last查询,首先尝试读取MNode中的缓存数据。

try {
  node = IoTDB.metaManager.getDeviceNodeWithAutoCreateStorageGroup(seriesPath.toString());
} catch (MetadataException e) {
  throw new QueryProcessException(e);
}
if (((LeafMNode) node).getCachedLast() != null) {
  return ((LeafMNode) node).getCachedLast();
}

如果发现缓存没有被写入过,则执行下面的标准查询流程读取TsFile数据。

Last标准查询流程

Last标准查询流程需要以倒序方式扫描顺序文件和乱序文件,一旦得到满足要求的Last查询结果就将其写回到MNode缓存中并返回。算法中对顺序文件和乱序文件分别进行处理。

  • 顺序文件由于是按照写入时间已经排好序,因此直接使用loadTimeSeriesMetadata()方法取出最后一个不为空的TimeseriesMetadata。若TimeseriesMetadata的统计数据可用即可直接得到Last时间戳和对应的值;如不可用则需要使用loadChunkMetadataList()方法得到下一层的最后一个ChunkMetadata,通过统计数据得到Last结果。

    for (int i = seqFileResources.size() - 1; i >= 0; i--) {
        TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(
                seqFileResources.get(i), seriesPath, context, null, sensors);
        if (timeseriesMetadata != null) {
          if (!timeseriesMetadata.isModified()) {
            Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics();
            resultPair = constructLastPair(
                    timeseriesMetadataStats.getEndTime(),
                    timeseriesMetadataStats.getLastValue(),
                    tsDataType);
            break;
          } else {
            List<ChunkMetadata> chunkMetadataList = timeseriesMetadata.loadChunkMetadataList();
            if (!chunkMetadataList.isEmpty()) {
              ChunkMetadata lastChunkMetaData = chunkMetadataList.get(chunkMetadataList.size() - 1);
              Statistics chunkStatistics = lastChunkMetaData.getStatistics();
              resultPair =
                  constructLastPair(
                      chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
              break;
            }
          }
        }
      }
    
  • 对于乱序文件,需要遍历所有不为空的TimeseriesMetadata结构并更新当前最大时间戳的Last数据,直到扫描完所有乱序文件为止。需要注意的是当多个ChunkMetadata拥有相同的最大时间戳时,我们取version值最大的ChunkMatadata中的数据作为Last的结果。

    long version = 0;
    for (TsFileResource resource : unseqFileResources) {
      if (resource.getEndTime(seriesPath.getDevice()) < resultPair.getTimestamp()) {
        continue;
      }
      TimeseriesMetadata timeseriesMetadata =
          FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors);
      if (timeseriesMetadata != null) {
        for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) {
          if (chunkMetaData.getEndTime() == resultPair.getTimestamp()
              && chunkMetaData.getVersion() > version) {
            Statistics chunkStatistics = chunkMetaData.getStatistics();
            resultPair =
                constructLastPair(
                    chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
            version = chunkMetaData.getVersion();
          }
        }
      }
    }
    
  • 最后将查询结果写入到MNode的Last缓存

    ((LeafMNode) node).updateCachedLast(resultPair, false, Long.MIN_VALUE);
    

Last 缓存更新策略

Last缓存更新的逻辑位于LeafMNodeupdateCachedLast方法内,这里引入两个额外的参数highPriorityUpdatelatestFlushTimehighPriorityUpdate用来表示本次更新是否是高优先级的,新数据写入而导致的缓存更新都被认为是高优先级更新,而查询时更新缓存默认为低优先级更新。latestFlushTime用来记录当前已被写回到磁盘的数据的最大时间戳。

缓存更新的策略如下:

  1. 当缓存中没有记录时,对于查询到的Last数据,将查询的结果直接写入到缓存中。
  2. 当缓存中没有记录时,对于写入的最新数据如果时间戳大于或等于latestFlushTime,则将写入的数据写入到缓存中。
  3. 当缓存中已有记录时,根据查询或写入的数据时间戳与当前缓存中时间戳作对比。写入的数据具有高优先级,时间戳不小于缓存记录则更新缓存;查询出的数据低优先级,必须大于缓存记录的时间戳才更新缓存。

具体代码如下

public synchronized void updateCachedLast(
  TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
    if (timeValuePair == null || timeValuePair.getValue() == null) return;
    
    if (cachedLastValuePair == null) {
      // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will update cache.
      if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) {
        cachedLastValuePair =
            new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue());
      }
    } else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp()
        || (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp()
            && highPriorityUpdate)) {
      cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp());
      cachedLastValuePair.setValue(timeValuePair.getValue());
    }
}