| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.query.reader.series; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.Comparator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.PriorityQueue; |
| import java.util.Set; |
| import java.util.function.ToLongFunction; |
| import java.util.stream.Collectors; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.engine.querycontext.QueryDataSource; |
| import org.apache.iotdb.db.engine.storagegroup.TsFileResource; |
| import org.apache.iotdb.db.metadata.PartialPath; |
| import org.apache.iotdb.db.query.context.QueryContext; |
| import org.apache.iotdb.db.query.control.QueryResourceManager; |
| import org.apache.iotdb.db.query.filter.TsFileFilter; |
| import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader; |
| import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; |
| import org.apache.iotdb.db.utils.FileLoaderUtils; |
| import org.apache.iotdb.db.utils.QueryUtils; |
| import org.apache.iotdb.db.utils.TestOnly; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.common.BatchData; |
| import org.apache.iotdb.tsfile.read.common.BatchDataFactory; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter; |
| import org.apache.iotdb.tsfile.read.reader.IPageReader; |
| |
| public class SeriesReader { |
| |
| // inner class of SeriesReader for order purpose |
| private TimeOrderUtils orderUtils; |
| |
| private final PartialPath seriesPath; |
| |
| // all the sensors in this device; |
| private final Set<String> allSensors; |
| private final TSDataType dataType; |
| private final QueryContext context; |
| |
| /* |
| * There is at most one is not null between timeFilter and valueFilter |
| * |
| * timeFilter is pushed down to all pages (seq, unseq) without correctness problem |
| * |
| * valueFilter is pushed down to non-overlapped page only |
| */ |
| private final Filter timeFilter; |
| private final Filter valueFilter; |
| /* |
| * file cache |
| */ |
| private final List<TsFileResource> seqFileResource; |
| private final List<TsFileResource> unseqFileResource; |
| |
| /* |
| * TimeSeriesMetadata cache |
| */ |
| private TimeseriesMetadata firstTimeSeriesMetadata; |
| private final List<TimeseriesMetadata> seqTimeSeriesMetadata = new LinkedList<>(); |
| private final PriorityQueue<TimeseriesMetadata> unSeqTimeSeriesMetadata; |
| |
| /* |
| * chunk cache |
| */ |
| private ChunkMetadata firstChunkMetadata; |
| private final PriorityQueue<ChunkMetadata> cachedChunkMetadata; |
| |
| /* |
| * page cache |
| */ |
| private VersionPageReader firstPageReader; |
| private final List<VersionPageReader> seqPageReaders = new LinkedList<>(); |
| private final PriorityQueue<VersionPageReader> unSeqPageReaders; |
| |
| /* |
| * point cache |
| */ |
| private final PriorityMergeReader mergeReader; |
| |
| /* |
| * result cache |
| */ |
| private boolean hasCachedNextOverlappedPage; |
| private BatchData cachedBatchData; |
| |
| public SeriesReader(PartialPath seriesPath, Set<String> allSensors, TSDataType dataType, |
| QueryContext context, |
| QueryDataSource dataSource, Filter timeFilter, Filter valueFilter, TsFileFilter fileFilter, |
| boolean ascending) { |
| this.seriesPath = seriesPath; |
| this.allSensors = allSensors; |
| this.dataType = dataType; |
| this.context = context; |
| QueryUtils.filterQueryDataSource(dataSource, fileFilter); |
| this.timeFilter = timeFilter; |
| this.valueFilter = valueFilter; |
| if (ascending) { |
| this.orderUtils = new AscTimeOrderUtils(); |
| mergeReader = new PriorityMergeReader(); |
| } else { |
| this.orderUtils = new DescTimeOrderUtils(); |
| mergeReader = new DescPriorityMergeReader(); |
| } |
| |
| this.seqFileResource = new LinkedList<>(dataSource.getSeqResources()); |
| this.unseqFileResource = sortUnSeqFileResources(dataSource.getUnseqResources()); |
| unSeqTimeSeriesMetadata = new PriorityQueue<>(orderUtils.comparingLong( |
| timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics()))); |
| cachedChunkMetadata = new PriorityQueue<>(orderUtils.comparingLong( |
| chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics()))); |
| unSeqPageReaders = new PriorityQueue<>(orderUtils.comparingLong( |
| versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics()))); |
| } |
| |
| @TestOnly |
| @SuppressWarnings("squid:S107") |
| SeriesReader(PartialPath seriesPath, Set<String> allSensors, TSDataType dataType, |
| QueryContext context, |
| List<TsFileResource> seqFileResource, List<TsFileResource> unseqFileResource, |
| Filter timeFilter, Filter valueFilter, boolean ascending) { |
| this.seriesPath = seriesPath; |
| this.allSensors = allSensors; |
| this.dataType = dataType; |
| this.context = context; |
| this.timeFilter = timeFilter; |
| this.valueFilter = valueFilter; |
| if (ascending) { |
| this.orderUtils = new AscTimeOrderUtils(); |
| mergeReader = new PriorityMergeReader(); |
| } else { |
| this.orderUtils = new DescTimeOrderUtils(); |
| mergeReader = new DescPriorityMergeReader(); |
| } |
| |
| this.seqFileResource = new LinkedList<>(seqFileResource); |
| this.unseqFileResource = sortUnSeqFileResources(unseqFileResource); |
| unSeqTimeSeriesMetadata = new PriorityQueue<>(orderUtils.comparingLong( |
| timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics()))); |
| cachedChunkMetadata = new PriorityQueue<>(orderUtils.comparingLong( |
| chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics()))); |
| unSeqPageReaders = new PriorityQueue<>(orderUtils.comparingLong( |
| versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics()))); |
| } |
| |
| public boolean isEmpty() throws IOException { |
| return !(hasNextPage() || hasNextChunk() || hasNextFile()); |
| } |
| |
| boolean hasNextFile() throws IOException { |
| |
| if (!unSeqPageReaders.isEmpty() |
| || firstPageReader != null |
| || mergeReader.hasNextTimeValuePair()) { |
| throw new IOException( |
| "all cached pages should be consumed first cachedPageReaders.isEmpty() is " |
| + unSeqPageReaders.isEmpty() |
| + " firstPageReader != null is " |
| + (firstPageReader != null) |
| + " mergeReader.hasNextTimeValuePair() = " |
| + mergeReader.hasNextTimeValuePair()); |
| } |
| |
| if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) { |
| throw new IOException("all cached chunks should be consumed first"); |
| } |
| |
| if (firstTimeSeriesMetadata != null) { |
| return true; |
| } |
| |
| // init first time series metadata whose startTime is minimum |
| tryToUnpackAllOverlappedFilesToTimeSeriesMetadata(); |
| |
| return firstTimeSeriesMetadata != null; |
| } |
| |
| boolean isFileOverlapped() throws IOException { |
| if (firstTimeSeriesMetadata == null) { |
| throw new IOException("no first file"); |
| } |
| |
| Statistics fileStatistics = firstTimeSeriesMetadata.getStatistics(); |
| return !seqTimeSeriesMetadata.isEmpty() |
| && orderUtils.isOverlapped(fileStatistics, seqTimeSeriesMetadata.get(0).getStatistics()) |
| || !unSeqTimeSeriesMetadata.isEmpty() |
| && orderUtils.isOverlapped(fileStatistics, unSeqTimeSeriesMetadata.peek().getStatistics()); |
| } |
| |
| Statistics currentFileStatistics() { |
| return firstTimeSeriesMetadata.getStatistics(); |
| } |
| |
| boolean currentFileModified() throws IOException { |
| if (firstTimeSeriesMetadata == null) { |
| throw new IOException("no first file"); |
| } |
| return firstTimeSeriesMetadata.isModified(); |
| } |
| |
| void skipCurrentFile() { |
| firstTimeSeriesMetadata = null; |
| } |
| |
| /** |
| * This method should be called after hasNextFile() until no next chunk, make sure that all |
| * overlapped chunks are consumed |
| */ |
| boolean hasNextChunk() throws IOException { |
| if (!unSeqPageReaders.isEmpty() |
| || firstPageReader != null |
| || mergeReader.hasNextTimeValuePair()) { |
| throw new IOException( |
| "all cached pages should be consumed first cachedPageReaders.isEmpty() is " |
| + unSeqPageReaders.isEmpty() |
| + " firstPageReader != null is " |
| + (firstPageReader != null) |
| + " mergeReader.hasNextTimeValuePair() = " |
| + mergeReader.hasNextTimeValuePair()); |
| } |
| |
| if (firstChunkMetadata != null) { |
| return true; |
| } |
| |
| /* |
| * construct first chunk metadata |
| */ |
| if (firstTimeSeriesMetadata != null) { |
| /* |
| * try to unpack all overlapped TimeSeriesMetadata to cachedChunkMetadata |
| */ |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata( |
| orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics())); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( |
| orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()), true); |
| } else { |
| /* |
| * first time series metadata is already unpacked, consume cached ChunkMetadata |
| */ |
| if (!cachedChunkMetadata.isEmpty()) { |
| firstChunkMetadata = cachedChunkMetadata.poll(); |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata( |
| orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics())); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( |
| orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false); |
| } |
| } |
| |
| return firstChunkMetadata != null; |
| } |
| |
| private void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( |
| long endpointTime, boolean init) throws IOException { |
| while (!seqTimeSeriesMetadata.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) { |
| unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0)); |
| } |
| while (!unSeqTimeSeriesMetadata.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) { |
| unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll()); |
| } |
| |
| if (firstTimeSeriesMetadata != null |
| && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) { |
| unpackOneTimeSeriesMetadata(firstTimeSeriesMetadata); |
| firstTimeSeriesMetadata = null; |
| } |
| |
| if (init && firstChunkMetadata == null && !cachedChunkMetadata.isEmpty()) { |
| firstChunkMetadata = cachedChunkMetadata.poll(); |
| } |
| } |
| |
| private void unpackOneTimeSeriesMetadata(TimeseriesMetadata timeSeriesMetadata) |
| throws IOException { |
| List<ChunkMetadata> chunkMetadataList = FileLoaderUtils |
| .loadChunkMetadataList(timeSeriesMetadata); |
| chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq())); |
| // try to calculate the total number of chunk and time-value points in chunk |
| if (IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) { |
| QueryResourceManager queryResourceManager = QueryResourceManager.getInstance(); |
| queryResourceManager.getChunkNumMap() |
| .compute(context.getQueryId(), |
| (k, v) -> v == null ? chunkMetadataList.size() : v + chunkMetadataList.size()); |
| |
| long totalChunkSize = chunkMetadataList.stream() |
| .mapToLong(chunkMetadata -> chunkMetadata.getStatistics().getCount()).sum(); |
| queryResourceManager.getChunkSizeMap() |
| .compute(context.getQueryId(), (k, v) -> v == null ? totalChunkSize : v + totalChunkSize); |
| } |
| |
| cachedChunkMetadata.addAll(chunkMetadataList); |
| } |
| |
| boolean isChunkOverlapped() throws IOException { |
| if (firstChunkMetadata == null) { |
| throw new IOException("no first chunk"); |
| } |
| |
| Statistics chunkStatistics = firstChunkMetadata.getStatistics(); |
| return !cachedChunkMetadata.isEmpty() |
| && orderUtils.isOverlapped(chunkStatistics, cachedChunkMetadata.peek().getStatistics()); |
| } |
| |
| Statistics currentChunkStatistics() { |
| return firstChunkMetadata.getStatistics(); |
| } |
| |
| boolean currentChunkModified() throws IOException { |
| if (firstChunkMetadata == null) { |
| throw new IOException("no first chunk"); |
| } |
| return firstChunkMetadata.isModified(); |
| } |
| |
| void skipCurrentChunk() { |
| firstChunkMetadata = null; |
| } |
| |
| /** |
| * This method should be called after hasNextChunk() until no next page, make sure that all |
| * overlapped pages are consumed |
| */ |
| @SuppressWarnings("squid:S3776") |
| // Suppress high Cognitive Complexity warning |
| boolean hasNextPage() throws IOException { |
| |
| /* |
| * has overlapped data before |
| */ |
| if (hasCachedNextOverlappedPage) { |
| return true; |
| } else if (mergeReader.hasNextTimeValuePair()) { |
| if (hasNextOverlappedPage()) { |
| cachedBatchData = nextOverlappedPage(); |
| if (cachedBatchData != null && cachedBatchData.hasCurrent()) { |
| hasCachedNextOverlappedPage = true; |
| return true; |
| } |
| } |
| } |
| |
| if (firstPageReader != null) { |
| return true; |
| } |
| |
| /* |
| * construct first page reader |
| */ |
| if (firstChunkMetadata != null) { |
| /* |
| * try to unpack all overlapped ChunkMetadata to cachedPageReaders |
| */ |
| unpackAllOverlappedChunkMetadataToCachedPageReaders( |
| orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true); |
| } else { |
| /* |
| * first chunk metadata is already unpacked, consume cached pages |
| */ |
| initFirstPageReader(); |
| if (firstPageReader != null) { |
| long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false); |
| unpackAllOverlappedChunkMetadataToCachedPageReaders(endpointTime, false); |
| } |
| } |
| |
| if (firstPageOverlapped()) { |
| /* |
| * next page is overlapped, read overlapped data and cache it |
| */ |
| if (hasNextOverlappedPage()) { |
| cachedBatchData = nextOverlappedPage(); |
| if (cachedBatchData != null && cachedBatchData.hasCurrent()) { |
| hasCachedNextOverlappedPage = true; |
| return true; |
| } |
| } |
| } |
| |
| // make sure firstPageReader won't be null while cachedPageReaders has more cached page readers |
| while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) { |
| |
| initFirstPageReader(); |
| |
| if (firstPageOverlapped()) { |
| /* |
| * next page is overlapped, read overlapped data and cache it |
| */ |
| if (hasNextOverlappedPage()) { |
| cachedBatchData = nextOverlappedPage(); |
| if (cachedBatchData != null && cachedBatchData.hasCurrent()) { |
| hasCachedNextOverlappedPage = true; |
| return true; |
| } |
| } |
| } |
| } |
| return firstPageReader != null; |
| } |
| |
| private boolean firstPageOverlapped() { |
| return firstPageReader != null && (!seqPageReaders.isEmpty() && orderUtils |
| .isOverlapped(firstPageReader.getStatistics(), seqPageReaders.get(0).getStatistics())) || ( |
| !unSeqPageReaders.isEmpty() && orderUtils.isOverlapped(firstPageReader.getStatistics(), |
| unSeqPageReaders.peek().getStatistics())); |
| } |
| |
| private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long endpointTime, boolean init) |
| throws IOException { |
| while (!cachedChunkMetadata.isEmpty() && |
| orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.peek().getStatistics())) { |
| unpackOneChunkMetaData(cachedChunkMetadata.poll()); |
| } |
| if (firstChunkMetadata != null && |
| orderUtils.isOverlapped(endpointTime, firstChunkMetadata.getStatistics())) { |
| unpackOneChunkMetaData(firstChunkMetadata); |
| firstChunkMetadata = null; |
| } |
| if (init && firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders |
| .isEmpty())) { |
| initFirstPageReader(); |
| } |
| } |
| |
| private void unpackOneChunkMetaData(ChunkMetadata chunkMetaData) |
| throws IOException { |
| FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter).forEach( |
| pageReader -> { |
| if (chunkMetaData.isSeq()) { |
| // addLast for asc; addFirst for desc |
| if (orderUtils.getAscending()) { |
| seqPageReaders |
| .add(new VersionPageReader(chunkMetaData.getVersion(), pageReader, true)); |
| } else { |
| seqPageReaders |
| .add(0, new VersionPageReader(chunkMetaData.getVersion(), pageReader, true)); |
| } |
| |
| } else { |
| unSeqPageReaders |
| .add(new VersionPageReader(chunkMetaData.getVersion(), pageReader, false)); |
| } |
| }); |
| } |
| |
| /** |
| * This method should be called after calling hasNextPage. |
| * |
| * <p>hasNextPage may cache firstPageReader if it is not overlapped or cached a BatchData if the |
| * first page is overlapped |
| */ |
| boolean isPageOverlapped() throws IOException { |
| |
| /* |
| * has an overlapped page |
| */ |
| if (hasCachedNextOverlappedPage) { |
| return true; |
| } |
| |
| /* |
| * has a non-overlapped page in firstPageReader |
| */ |
| if (mergeReader.hasNextTimeValuePair() |
| && mergeReader.currentTimeValuePair().getTimestamp() <= firstPageReader.getStatistics() |
| .getEndTime()) { |
| throw new IOException("overlapped data should be consumed first"); |
| } |
| |
| Statistics firstPageStatistics = firstPageReader.getStatistics(); |
| |
| return !unSeqPageReaders.isEmpty() |
| && orderUtils.isOverlapped(firstPageStatistics, unSeqPageReaders.peek().getStatistics()); |
| } |
| |
| Statistics currentPageStatistics() { |
| if (firstPageReader == null) { |
| return null; |
| } |
| return firstPageReader.getStatistics(); |
| } |
| |
| boolean currentPageModified() throws IOException { |
| if (firstPageReader == null) { |
| throw new IOException("no first page"); |
| } |
| return firstPageReader.isModified(); |
| } |
| |
| void skipCurrentPage() { |
| firstPageReader = null; |
| } |
| |
| /** |
| * This method should only be used when the method isPageOverlapped() return true. |
| */ |
| BatchData nextPage() throws IOException { |
| |
| if (!hasNextPage()) { |
| throw new IOException("no next page, neither non-overlapped nor overlapped"); |
| } |
| |
| if (hasCachedNextOverlappedPage) { |
| hasCachedNextOverlappedPage = false; |
| return cachedBatchData; |
| } else { |
| |
| /* |
| * next page is not overlapped, push down value filter if it exists |
| */ |
| if (valueFilter != null) { |
| firstPageReader.setFilter(valueFilter); |
| } |
| BatchData batchData = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending()); |
| firstPageReader = null; |
| |
| return batchData; |
| } |
| } |
| |
| /** |
| * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not |
| * contain data, read till next currentLargestEndTime again |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private boolean hasNextOverlappedPage() throws IOException { |
| |
| if (hasCachedNextOverlappedPage) { |
| return true; |
| } |
| |
| tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); |
| |
| while (true) { |
| |
| if (mergeReader.hasNextTimeValuePair()) { |
| |
| cachedBatchData = BatchDataFactory.createBatchData(dataType); |
| long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); |
| if (firstPageReader != null) { |
| currentPageEndPointTime = orderUtils |
| .getCurrentEndPoint(currentPageEndPointTime, firstPageReader.getStatistics()); |
| } |
| if (!seqPageReaders.isEmpty()) { |
| currentPageEndPointTime = orderUtils |
| .getCurrentEndPoint(currentPageEndPointTime, seqPageReaders.get(0).getStatistics()); |
| } |
| while (mergeReader.hasNextTimeValuePair()) { |
| |
| /* |
| * get current first point in mergeReader, this maybe overlapped latter |
| */ |
| TimeValuePair timeValuePair = mergeReader.currentTimeValuePair(); |
| |
| if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) { |
| if (cachedBatchData.hasCurrent() || firstPageReader != null || !seqPageReaders.isEmpty()) { |
| break; |
| } |
| currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); |
| } |
| |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp()); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( |
| timeValuePair.getTimestamp(), false); |
| unpackAllOverlappedChunkMetadataToCachedPageReaders(timeValuePair.getTimestamp(), false); |
| unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); |
| |
| if (firstPageReader != null) { |
| if ((orderUtils.getAscending() && timeValuePair.getTimestamp() > firstPageReader |
| .getStatistics().getEndTime()) || (!orderUtils.getAscending() |
| && timeValuePair.getTimestamp() < firstPageReader.getStatistics().getStartTime())) { |
| hasCachedNextOverlappedPage = cachedBatchData.hasCurrent(); |
| return hasCachedNextOverlappedPage; |
| } else { |
| mergeReader |
| .addReader(firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending()) |
| .getBatchDataIterator(), firstPageReader.version, |
| orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); |
| firstPageReader = null; |
| } |
| } |
| |
| if (!seqPageReaders.isEmpty()) { |
| if ((orderUtils.getAscending() && timeValuePair.getTimestamp() > seqPageReaders.get(0) |
| .getStatistics().getEndTime()) || (!orderUtils.getAscending() |
| && timeValuePair.getTimestamp() < seqPageReaders.get(0).getStatistics() |
| .getStartTime())) { |
| hasCachedNextOverlappedPage = cachedBatchData.hasCurrent(); |
| return hasCachedNextOverlappedPage; |
| } else { |
| VersionPageReader pageReader = seqPageReaders.remove(0); |
| mergeReader.addReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending()) |
| .getBatchDataIterator(), pageReader.version, |
| orderUtils.getOverlapCheckTime(pageReader.getStatistics())); |
| } |
| } |
| |
| /* |
| * get the latest first point in mergeReader |
| */ |
| timeValuePair = mergeReader.nextTimeValuePair(); |
| |
| if (valueFilter == null || valueFilter |
| .satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) { |
| cachedBatchData.putAnObject( |
| timeValuePair.getTimestamp(), timeValuePair.getValue().getValue()); |
| } |
| } |
| hasCachedNextOverlappedPage = cachedBatchData.hasCurrent(); |
| /* |
| * if current overlapped page has valid data, return, otherwise read next overlapped page |
| */ |
| if (hasCachedNextOverlappedPage) { |
| return true; |
| // condition: seqPage.endTime < mergeReader.currentTime |
| } else if (mergeReader.hasNextTimeValuePair()) { |
| return false; |
| } |
| } else { |
| return false; |
| } |
| } |
| } |
| |
| private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException { |
| |
| /* |
| * no cached page readers |
| */ |
| if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) { |
| return; |
| } |
| |
| /* |
| * init firstPageReader |
| */ |
| if (firstPageReader == null) { |
| initFirstPageReader(); |
| } |
| |
| long currentPageEndpointTime; |
| if (mergeReader.hasNextTimeValuePair()) { |
| currentPageEndpointTime = mergeReader.getCurrentReadStopTime(); |
| } else { |
| currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); |
| } |
| |
| /* |
| * put all currently directly overlapped unseq page reader to merge reader |
| */ |
| unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime); |
| } |
| |
| private void initFirstPageReader() { |
| if (!seqPageReaders.isEmpty() && !unSeqPageReaders.isEmpty()) { |
| if (orderUtils.isTakeSeqAsFirst(seqPageReaders.get(0).getStatistics(), unSeqPageReaders.peek() |
| .getStatistics())) { |
| firstPageReader = seqPageReaders.remove(0); |
| } else { |
| firstPageReader = unSeqPageReaders.poll(); |
| } |
| } else if (!seqPageReaders.isEmpty()) { |
| firstPageReader = seqPageReaders.remove(0); |
| } else if (!unSeqPageReaders.isEmpty()) { |
| firstPageReader = unSeqPageReaders.poll(); |
| } |
| } |
| |
| private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime) |
| throws IOException { |
| while (!unSeqPageReaders.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().data.getStatistics())) { |
| putPageReaderToMergeReader(unSeqPageReaders.poll()); |
| } |
| if (firstPageReader != null && !firstPageReader.isSeq() && |
| orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) { |
| putPageReaderToMergeReader(firstPageReader); |
| firstPageReader = null; |
| } |
| } |
| |
| private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException { |
| mergeReader.addReader( |
| pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getBatchDataIterator(), |
| pageReader.version, |
| orderUtils.getOverlapCheckTime(pageReader.getStatistics())); |
| } |
| |
| private BatchData nextOverlappedPage() throws IOException { |
| if (hasCachedNextOverlappedPage || hasNextOverlappedPage()) { |
| hasCachedNextOverlappedPage = false; |
| return cachedBatchData; |
| } |
| throw new IOException("No more batch data"); |
| } |
| |
| private LinkedList<TsFileResource> sortUnSeqFileResources(List<TsFileResource> tsFileResources) { |
| return tsFileResources.stream() |
| .sorted( |
| orderUtils.comparingLong(tsFileResource -> orderUtils.getOrderTime(tsFileResource))) |
| .collect(Collectors.toCollection(LinkedList::new)); |
| } |
| |
| /** |
| * unpack all overlapped seq/unseq files and find the first TimeSeriesMetadata |
| * |
| * <p>Because there may be too many files in the scenario used by the user, we cannot open all |
| * the chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. |
| * This approach is likely to be ubiquitous, but it keeps the system running smoothly |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private void tryToUnpackAllOverlappedFilesToTimeSeriesMetadata() throws IOException { |
| /* |
| * Fill sequence TimeSeriesMetadata List until it is not empty |
| */ |
| while (seqTimeSeriesMetadata.isEmpty() && !seqFileResource.isEmpty()) { |
| TimeseriesMetadata timeseriesMetadata = |
| FileLoaderUtils.loadTimeSeriesMetadata( |
| orderUtils.getNextSeqFileResource(seqFileResource, true), seriesPath, context, |
| getAnyFilter(), allSensors); |
| if (timeseriesMetadata != null) { |
| timeseriesMetadata.setSeq(true); |
| seqTimeSeriesMetadata.add(timeseriesMetadata); |
| } |
| } |
| |
| /* |
| * Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty |
| */ |
| while (unSeqTimeSeriesMetadata.isEmpty() && !unseqFileResource.isEmpty()) { |
| TimeseriesMetadata timeseriesMetadata = |
| FileLoaderUtils.loadTimeSeriesMetadata( |
| unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors); |
| if (timeseriesMetadata != null) { |
| timeseriesMetadata.setModified(true); |
| timeseriesMetadata.setSeq(false); |
| unSeqTimeSeriesMetadata.add(timeseriesMetadata); |
| } |
| } |
| |
| /* |
| * find end time of the first TimeSeriesMetadata |
| */ |
| long endTime = -1L; |
| if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) { |
| // only has seq |
| endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics()); |
| } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) { |
| // only has unseq |
| endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics()); |
| } else if (!seqTimeSeriesMetadata.isEmpty()) { |
| // has seq and unseq |
| endTime = orderUtils.getCurrentEndPoint(seqTimeSeriesMetadata.get(0).getStatistics(), |
| unSeqTimeSeriesMetadata.peek().getStatistics()); |
| } |
| |
| /* |
| * unpack all directly overlapped seq/unseq files with first TimeSeriesMetadata |
| */ |
| if (endTime != -1) { |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime); |
| } |
| |
| /* |
| * update the first TimeSeriesMetadata |
| */ |
| if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) { |
| // only has seq |
| firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0); |
| } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) { |
| // only has unseq |
| firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll(); |
| } else if (!seqTimeSeriesMetadata.isEmpty()) { |
| // has seq and unseq |
| if (orderUtils.isTakeSeqAsFirst(seqTimeSeriesMetadata.get(0).getStatistics(), |
| unSeqTimeSeriesMetadata.peek().getStatistics())) { |
| firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0); |
| } else { |
| firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll(); |
| } |
| } |
| } |
| |
| private void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime) |
| throws IOException { |
| while (!unseqFileResource.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, unseqFileResource.get(0))) { |
| TimeseriesMetadata timeseriesMetadata = |
| FileLoaderUtils.loadTimeSeriesMetadata( |
| unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors); |
| if (timeseriesMetadata != null) { |
| timeseriesMetadata.setModified(true); |
| timeseriesMetadata.setSeq(false); |
| unSeqTimeSeriesMetadata.add(timeseriesMetadata); |
| } |
| } |
| while (!seqFileResource.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, |
| orderUtils.getNextSeqFileResource(seqFileResource, false))) { |
| TimeseriesMetadata timeseriesMetadata = |
| FileLoaderUtils.loadTimeSeriesMetadata( |
| orderUtils.getNextSeqFileResource(seqFileResource, true), seriesPath, context, |
| getAnyFilter(), allSensors); |
| if (timeseriesMetadata != null) { |
| timeseriesMetadata.setSeq(true); |
| seqTimeSeriesMetadata.add(timeseriesMetadata); |
| } |
| } |
| } |
| |
| private Filter getAnyFilter() { |
| return timeFilter != null ? timeFilter : valueFilter; |
| } |
| |
| void setTimeFilter(long timestamp) { |
| ((UnaryFilter) timeFilter).setValue(timestamp); |
| } |
| |
| Filter getTimeFilter() { |
| return timeFilter; |
| } |
| |
| private class VersionPageReader { |
| |
| protected long version; |
| protected IPageReader data; |
| |
| protected boolean isSeq; |
| |
| VersionPageReader(long version, IPageReader data, boolean isSeq) { |
| this.version = version; |
| this.data = data; |
| this.isSeq = isSeq; |
| } |
| |
| Statistics getStatistics() { |
| return data.getStatistics(); |
| } |
| |
| BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { |
| return data.getAllSatisfiedPageData(ascending); |
| } |
| |
| void setFilter(Filter filter) { |
| data.setFilter(filter); |
| } |
| |
| boolean isModified() { |
| return data.isModified(); |
| } |
| |
| public boolean isSeq() { |
| return isSeq; |
| } |
| } |
| |
| |
| public interface TimeOrderUtils { |
| |
| long getOrderTime(Statistics<? extends Object> statistics); |
| |
| long getOrderTime(TsFileResource fileResource); |
| |
| long getOverlapCheckTime(Statistics<? extends Object> range); |
| |
| boolean isOverlapped(Statistics<? extends Object> left, Statistics<? extends Object> right); |
| |
| boolean isOverlapped(long time, Statistics<? extends Object> right); |
| |
| boolean isOverlapped(long time, TsFileResource right); |
| |
| TsFileResource getNextSeqFileResource(List<TsFileResource> seqResources, boolean isDelete); |
| |
| <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor); |
| |
| long getCurrentEndPoint(long time, Statistics<? extends Object> statistics); |
| |
| long getCurrentEndPoint(Statistics<? extends Object> seqStatistics, |
| Statistics<? extends Object> unseqStatistics); |
| |
| boolean isExcessEndpoint(long time, long endpointTime); |
| |
| /** |
| * Return true if taking first page reader from seq readers |
| */ |
| boolean isTakeSeqAsFirst(Statistics<? extends Object> seqStatistics, |
| Statistics<? extends Object> unseqStatistics); |
| |
| boolean getAscending(); |
| } |
| |
| |
| class DescTimeOrderUtils implements TimeOrderUtils { |
| |
| @Override |
| public long getOrderTime(Statistics statistics) { |
| return statistics.getEndTime(); |
| } |
| |
| @Override |
| public long getOrderTime(TsFileResource fileResource) { |
| return fileResource.getEndTime(seriesPath.getDevice()); |
| } |
| |
| @Override |
| public long getOverlapCheckTime(Statistics range) { |
| return range.getStartTime(); |
| } |
| |
| @Override |
| public boolean isOverlapped(Statistics left, Statistics right) { |
| return left.getStartTime() <= right.getEndTime(); |
| } |
| |
| @Override |
| public boolean isOverlapped(long time, Statistics right) { |
| return time <= right.getEndTime(); |
| } |
| |
| @Override |
| public boolean isOverlapped(long time, TsFileResource right) { |
| return time <= right.getStartTime(seriesPath.getDevice()); |
| } |
| |
| @Override |
| public TsFileResource getNextSeqFileResource(List<TsFileResource> seqResources, |
| boolean isDelete) { |
| if (isDelete) { |
| return seqResources.remove(seqResources.size() - 1); |
| } |
| return seqResources.get(seqResources.size() - 1); |
| } |
| |
| @Override |
| public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) { |
| Objects.requireNonNull(keyExtractor); |
| return (Comparator<T> & Serializable) |
| (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c2), keyExtractor.applyAsLong(c1)); |
| } |
| |
| @Override |
| public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) { |
| return Math.max(time, statistics.getStartTime()); |
| } |
| |
| @Override |
| public long getCurrentEndPoint(Statistics<? extends Object> seqStatistics, |
| Statistics<? extends Object> unseqStatistics) { |
| return Math.max(seqStatistics.getStartTime(), unseqStatistics.getStartTime()); |
| } |
| |
| @Override |
| public boolean isExcessEndpoint(long time, long endpointTime) { |
| return time < endpointTime; |
| } |
| |
| @Override |
| public boolean isTakeSeqAsFirst(Statistics<? extends Object> seqStatistics, |
| Statistics<? extends Object> unseqStatistics) { |
| return seqStatistics.getEndTime() > unseqStatistics.getEndTime(); |
| } |
| |
| @Override |
| public boolean getAscending() { |
| return false; |
| } |
| } |
| |
| |
| class AscTimeOrderUtils implements TimeOrderUtils { |
| |
| @Override |
| public long getOrderTime(Statistics statistics) { |
| return statistics.getStartTime(); |
| } |
| |
| @Override |
| public long getOrderTime(TsFileResource fileResource) { |
| return fileResource.getStartTime(seriesPath.getDevice()); |
| } |
| |
| @Override |
| public long getOverlapCheckTime(Statistics range) { |
| return range.getEndTime(); |
| } |
| |
| @Override |
| public boolean isOverlapped(Statistics left, Statistics right) { |
| return left.getEndTime() >= right.getStartTime(); |
| } |
| |
| @Override |
| public boolean isOverlapped(long time, Statistics right) { |
| return time >= right.getStartTime(); |
| } |
| |
| @Override |
| public boolean isOverlapped(long time, TsFileResource right) { |
| return time >= right.getStartTime(seriesPath.getDevice()); |
| } |
| |
| @Override |
| public TsFileResource getNextSeqFileResource(List<TsFileResource> seqResources, |
| boolean isDelete) { |
| if (isDelete) { |
| return seqResources.remove(0); |
| } |
| return seqResources.get(0); |
| } |
| |
| @Override |
| public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) { |
| Objects.requireNonNull(keyExtractor); |
| return (Comparator<T> & Serializable) |
| (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c1), keyExtractor.applyAsLong(c2)); |
| } |
| |
| @Override |
| public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) { |
| return Math.min(time, statistics.getEndTime()); |
| } |
| |
| @Override |
| public long getCurrentEndPoint(Statistics<? extends Object> seqStatistics, |
| Statistics<? extends Object> unseqStatistics) { |
| return Math.min(seqStatistics.getEndTime(), unseqStatistics.getEndTime()); |
| } |
| |
| @Override |
| public boolean isExcessEndpoint(long time, long endpointTime) { |
| return time > endpointTime; |
| } |
| |
| @Override |
| public boolean isTakeSeqAsFirst(Statistics<? extends Object> seqStatistics, |
| Statistics<? extends Object> unseqStatistics) { |
| return seqStatistics.getStartTime() < unseqStatistics.getStartTime(); |
| } |
| |
| @Override |
| public boolean getAscending() { |
| return true; |
| } |
| } |
| |
| } |