Last 查询的主要逻辑在 LastQueryExecutor
Last查询对每个指定的时间序列执行calculateLastPairForOneSeries方法。
我们在需要查询的时间序列所对应的MNode结构中添加Last数据缓存。calculateLastPairForOneSeries方法对于某个时间序列的Last查询,首先尝试读取MNode中的缓存数据。
try {
node = IoTDB.metaManager.getDeviceNodeWithAutoCreateStorageGroup(seriesPath.toString());
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
if (((LeafMNode) node).getCachedLast() != null) {
return ((LeafMNode) node).getCachedLast();
}
如果发现缓存没有被写入过,则执行下面的标准查询流程读取TsFile数据。
Last标准查询流程需要以倒序方式扫描顺序文件和乱序文件,一旦得到满足要求的Last查询结果就将其写回到MNode缓存中并返回。算法中对顺序文件和乱序文件分别进行处理。
顺序文件由于是按照写入时间已经排好序,因此直接使用loadTimeSeriesMetadata()方法取出最后一个不为空的TimeseriesMetadata。若TimeseriesMetadata的统计数据可用即可直接得到Last时间戳和对应的值;如不可用则需要使用loadChunkMetadataList()方法得到下一层的最后一个ChunkMetadata,通过统计数据得到Last结果。
for (int i = seqFileResources.size() - 1; i >= 0; i--) {
TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(
seqFileResources.get(i), seriesPath, context, null, sensors);
if (timeseriesMetadata != null) {
if (!timeseriesMetadata.isModified()) {
Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics();
resultPair = constructLastPair(
timeseriesMetadataStats.getEndTime(),
timeseriesMetadataStats.getLastValue(),
tsDataType);
break;
} else {
List<ChunkMetadata> chunkMetadataList = timeseriesMetadata.loadChunkMetadataList();
if (!chunkMetadataList.isEmpty()) {
ChunkMetadata lastChunkMetaData = chunkMetadataList.get(chunkMetadataList.size() - 1);
Statistics chunkStatistics = lastChunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
break;
}
}
}
}
对于乱序文件,需要遍历所有不为空的TimeseriesMetadata结构并更新当前最大时间戳的Last数据,直到扫描完所有乱序文件为止。需要注意的是当多个ChunkMetadata拥有相同的最大时间戳时,我们取version值最大的ChunkMatadata中的数据作为Last的结果。
long version = 0;
for (TsFileResource resource : unseqFileResources) {
if (resource.getEndTime(seriesPath.getDevice()) < resultPair.getTimestamp()) {
continue;
}
TimeseriesMetadata timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors);
if (timeseriesMetadata != null) {
for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) {
if (chunkMetaData.getEndTime() == resultPair.getTimestamp()
&& chunkMetaData.getVersion() > version) {
Statistics chunkStatistics = chunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
version = chunkMetaData.getVersion();
}
}
}
}
最后将查询结果写入到MNode的Last缓存
((LeafMNode) node).updateCachedLast(resultPair, false, Long.MIN_VALUE);
Last缓存更新的逻辑位于LeafMNode的updateCachedLast方法内,这里引入两个额外的参数highPriorityUpdate和latestFlushTime。highPriorityUpdate用来表示本次更新是否是高优先级的,新数据写入而导致的缓存更新都被认为是高优先级更新,而查询时更新缓存默认为低优先级更新。latestFlushTime用来记录当前已被写回到磁盘的数据的最大时间戳。
缓存更新的策略如下:
latestFlushTime,则将写入的数据写入到缓存中。具体代码如下
public synchronized void updateCachedLast(
TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
if (timeValuePair == null || timeValuePair.getValue() == null) return;
if (cachedLastValuePair == null) {
// If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will update cache.
if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) {
cachedLastValuePair =
new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue());
}
} else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp()
|| (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp()
&& highPriorityUpdate)) {
cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp());
cachedLastValuePair.setValue(timeValuePair.getValue());
}
}