blob: 7ac1027e14a3ab602587d203276d203ec6ced13d [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# 最近时间戳 Last 查询
Last 查询的主要逻辑在 LastQueryExecutor
* org.apache.iotdb.db.query.executor.LastQueryExecutor
Last查询对每个指定的时间序列执行`calculateLastPairForOneSeries`方法。
## 读取MNode缓存数据
我们在需要查询的时间序列所对应的MNode结构中添加Last数据缓存。`calculateLastPairForOneSeries`方法对于某个时间序列的Last查询,首先尝试读取MNode中的缓存数据。
```
try {
node = IoTDB.metaManager.getDeviceNodeWithAutoCreateStorageGroup(seriesPath.toString());
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
if (((LeafMNode) node).getCachedLast() != null) {
return ((LeafMNode) node).getCachedLast();
}
```
如果发现缓存没有被写入过,则执行下面的标准查询流程读取TsFile数据。
## Last标准查询流程
Last标准查询流程需要以倒序方式扫描顺序文件和乱序文件,一旦得到满足要求的Last查询结果就将其写回到MNode缓存中并返回。算法中对顺序文件和乱序文件分别进行处理。
- 顺序文件由于是按照写入时间已经排好序,因此直接使用`loadTimeSeriesMetadata()`方法取出最后一个不为空的`TimeseriesMetadata`。若`TimeseriesMetadata`的统计数据可用即可直接得到Last时间戳和对应的值;如不可用则需要使用`loadChunkMetadataList()`方法得到下一层的最后一个`ChunkMetadata`,通过统计数据得到Last结果。
```
for (int i = seqFileResources.size() - 1; i >= 0; i--) {
TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(
seqFileResources.get(i), seriesPath, context, null, sensors);
if (timeseriesMetadata != null) {
if (!timeseriesMetadata.isModified()) {
Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics();
resultPair = constructLastPair(
timeseriesMetadataStats.getEndTime(),
timeseriesMetadataStats.getLastValue(),
tsDataType);
break;
} else {
List<ChunkMetadata> chunkMetadataList = timeseriesMetadata.loadChunkMetadataList();
if (!chunkMetadataList.isEmpty()) {
ChunkMetadata lastChunkMetaData = chunkMetadataList.get(chunkMetadataList.size() - 1);
Statistics chunkStatistics = lastChunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
break;
}
}
}
}
```
- 对于乱序文件,需要遍历所有不为空的`TimeseriesMetadata`结构并更新当前最大时间戳的Last数据,直到扫描完所有乱序文件为止。需要注意的是当多个`ChunkMetadata`拥有相同的最大时间戳时,我们取`version`值最大的`ChunkMatadata`中的数据作为Last的结果。
```
long version = 0;
for (TsFileResource resource : unseqFileResources) {
if (resource.getEndTime(seriesPath.getDevice()) < resultPair.getTimestamp()) {
continue;
}
TimeseriesMetadata timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors);
if (timeseriesMetadata != null) {
for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) {
if (chunkMetaData.getEndTime() == resultPair.getTimestamp()
&& chunkMetaData.getVersion() > version) {
Statistics chunkStatistics = chunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
version = chunkMetaData.getVersion();
}
}
}
}
```
- 最后将查询结果写入到MNode的Last缓存
```
((LeafMNode) node).updateCachedLast(resultPair, false, Long.MIN_VALUE);
```
## Last 缓存更新策略
Last缓存更新的逻辑位于`LeafMNode`的`updateCachedLast`方法内,这里引入两个额外的参数`highPriorityUpdate`和`latestFlushTime`。`highPriorityUpdate`用来表示本次更新是否是高优先级的,新数据写入而导致的缓存更新都被认为是高优先级更新,而查询时更新缓存默认为低优先级更新。`latestFlushTime`用来记录当前已被写回到磁盘的数据的最大时间戳。
缓存更新的策略如下:
1. 当缓存中没有记录时,对于查询到的Last数据,将查询的结果直接写入到缓存中。
2. 当缓存中没有记录时,对于写入的最新数据如果时间戳大于或等于`latestFlushTime`,则将写入的数据写入到缓存中。
3. 当缓存中已有记录时,根据查询或写入的数据时间戳与当前缓存中时间戳作对比。写入的数据具有高优先级,时间戳不小于缓存记录则更新缓存;查询出的数据低优先级,必须大于缓存记录的时间戳才更新缓存。
具体代码如下
```
public synchronized void updateCachedLast(
TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
if (timeValuePair == null || timeValuePair.getValue() == null) return;
if (cachedLastValuePair == null) {
// If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will update cache.
if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) {
cachedLastValuePair =
new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue());
}
} else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp()
|| (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp()
&& highPriorityUpdate)) {
cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp());
cachedLastValuePair.setValue(timeValuePair.getValue());
}
}
```