The main logic of Last query is in LastQueryExecutor
The Last query executes the calculateLastPairForOneSeries
method for each specified time series.
We add a Last data cache to the MNode structure corresponding to the time series that needs to be queried.
calculateLastPairForOneSeries
method For the last query of a certain time series, first try to read the cached data in the MNode.
try { node = IoTDB.metaManager.getDeviceNodeWithAutoCreateStorageGroup(seriesPath.toString()); } catch (MetadataException e) { throw new QueryProcessException(e); } if (((LeafMNode) node).getCachedLast() != null) { return ((LeafMNode) node).getCachedLast(); }
If it is found that the cache has not been written, execute the following standard query process to read the TsFile data.
Last standard query process needs to scan sequential files and unsequential files in a reverse order to get query result, and finally write the query result back to the MNode cache. In the algorithm, sequential files and unsequential files are processed separately.
The sequential files are sorted by its writing time, so use the loadTimeSeriesMetadata()
method directly to get the last valid TimeseriesMetadata
. If the statistics of TimeseriesMetadata
is available, the Last pair could be returned directly, otherwise we need to call loadChunkMetadataList()
to get the last ChunkMetadata
structure and obtain the Last time-value pair via the statistical data of ChunkMetadata
.
for (int i = seqFileResources.size() - 1; i >= 0; i--) { TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata( seqFileResources.get(i), seriesPath, context, null, sensors); if (timeseriesMetadata != null) { if (!timeseriesMetadata.isModified()) { Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics(); resultPair = constructLastPair( timeseriesMetadataStats.getEndTime(), timeseriesMetadataStats.getLastValue(), tsDataType); break; } else { List<ChunkMetadata> chunkMetadataList = timeseriesMetadata.loadChunkMetadataList(); if (!chunkMetadataList.isEmpty()) { ChunkMetadata lastChunkMetaData = chunkMetadataList.get(chunkMetadataList.size() - 1); Statistics chunkStatistics = lastChunkMetaData.getStatistics(); resultPair = constructLastPair( chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType); break; } } } }
For unsequential files, we need to traverse all valid TimeseriesMetadata
structures and keep updating the current Last timestamp to find the biggest timestamp. It should be noted that when multiple ChunkMetadata
have the same timestamp, we take the data in ChunkMatadata
with the largest version
value as the result of Last.
long version = 0; for (TsFileResource resource : unseqFileResources) { if (resource.getEndTime(seriesPath.getDevice()) < resultPair.getTimestamp()) { continue; } TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors); if (timeseriesMetadata != null) { for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) { if (chunkMetaData.getEndTime() > resultPair.getTimestamp() || (chunkMetaData.getEndTime() == resultPair.getTimestamp() && chunkMetaData.getVersion() > version)) { Statistics chunkStatistics = chunkMetaData.getStatistics(); resultPair = constructLastPair( chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType); version = chunkMetaData.getVersion(); } } } }
Finally write the query results to the MNode's Last cache
((LeafMNode) node).updateCachedLast(resultPair, false, Long.MIN_VALUE);
The last cache update logic is located in the UpdateCachedLast
method of LeafMNode
. Here, two additional parameters highPriorityUpdate
and latestFlushTime
are introduced. highPriorityUpdate
is used to indicate whether this update is high priority. Cache updates caused by new data writing are considered high priority updates, and the update cache defaults to low priority updates when querying. latestFlushTime
is used to record the maximum timestamp of data that has been currently written back to disk.
The cache update strategy is as follows:
latestFlushTime
, the written data is written to the cache.The specific code is as follows
public synchronized void updateCachedLast( TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { if (timeValuePair == null || timeValuePair.getValue() == null) return; if (cachedLastValuePair == null) { // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will update cache. if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) { cachedLastValuePair = new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue()); } } else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp() || (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp() && highPriorityUpdate)) { cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp()); cachedLastValuePair.setValue(timeValuePair.getValue()); } }