| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.execution.operator.source; |
| |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; |
| import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; |
| import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; |
| import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; |
| import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; |
| import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedPageReader; |
| import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemPageReader; |
| import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.DescPriorityMergeReader; |
| import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.PriorityMergeReader; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
| import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; |
| import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IDeviceID; |
| import org.apache.iotdb.tsfile.file.metadata.IMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlock; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| import org.apache.iotdb.tsfile.read.reader.IPageReader; |
| import org.apache.iotdb.tsfile.read.reader.IPointReader; |
| import org.apache.iotdb.tsfile.read.reader.page.AlignedPageReader; |
| import org.apache.iotdb.tsfile.read.reader.series.PaginationController; |
| import org.apache.iotdb.tsfile.utils.TsPrimitiveType; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.PriorityQueue; |
| import java.util.function.ToLongFunction; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_MERGE_READER_ALIGNED; |
| import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_MERGE_READER_NONALIGNED; |
| |
| public class SeriesScanUtil { |
| |
| protected final QueryContext context; |
| |
| // The path of the target series which will be scanned. |
| protected final PartialPath seriesPath; |
| |
| private final IDeviceID deviceID; |
| protected boolean isAligned = false; |
| private final TSDataType dataType; |
| |
| // inner class of SeriesReader for order purpose |
| private final TimeOrderUtils orderUtils; |
| |
| private QueryDataSource dataSource; |
| |
| // file index |
| private int curSeqFileIndex; |
| private int curUnseqFileIndex; |
| |
| // TimeSeriesMetadata cache |
| private ITimeSeriesMetadata firstTimeSeriesMetadata; |
| private final List<ITimeSeriesMetadata> seqTimeSeriesMetadata; |
| private final PriorityQueue<ITimeSeriesMetadata> unSeqTimeSeriesMetadata; |
| |
| // chunk cache |
| private IChunkMetadata firstChunkMetadata; |
| private final PriorityQueue<IChunkMetadata> cachedChunkMetadata; |
| |
| // page cache |
| private VersionPageReader firstPageReader; |
| private final List<VersionPageReader> seqPageReaders; |
| private final PriorityQueue<VersionPageReader> unSeqPageReaders; |
| |
| // point cache |
| private final PriorityMergeReader mergeReader; |
| |
| // result cache |
| private boolean hasCachedNextOverlappedPage; |
| private TsBlock cachedTsBlock; |
| |
| protected SeriesScanOptions scanOptions; |
| private final PaginationController paginationController; |
| |
| private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET = |
| SeriesScanCostMetricSet.getInstance(); |
| |
| public SeriesScanUtil( |
| PartialPath seriesPath, |
| Ordering scanOrder, |
| SeriesScanOptions scanOptions, |
| FragmentInstanceContext context) { |
| this.seriesPath = seriesPath; |
| this.deviceID = seriesPath.getIDeviceID(); |
| this.dataType = seriesPath.getSeriesType(); |
| |
| this.scanOptions = scanOptions; |
| this.paginationController = scanOptions.getPaginationController(); |
| |
| this.context = context; |
| |
| if (scanOrder.isAscending()) { |
| this.orderUtils = new AscTimeOrderUtils(); |
| this.mergeReader = getPriorityMergeReader(); |
| } else { |
| this.orderUtils = new DescTimeOrderUtils(); |
| this.mergeReader = getDescPriorityMergeReader(); |
| } |
| |
| // init TimeSeriesMetadata materializer |
| this.seqTimeSeriesMetadata = new LinkedList<>(); |
| this.unSeqTimeSeriesMetadata = |
| new PriorityQueue<>( |
| orderUtils.comparingLong( |
| timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics()))); |
| |
| // init ChunkMetadata materializer |
| this.cachedChunkMetadata = |
| new PriorityQueue<>( |
| orderUtils.comparingLong( |
| chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics()))); |
| |
| // init PageReader materializer |
| this.seqPageReaders = new LinkedList<>(); |
| this.unSeqPageReaders = |
| new PriorityQueue<>( |
| orderUtils.comparingLong( |
| versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics()))); |
| } |
| |
| /** |
| * Initialize the query data source. This method should be called <b>before any other methods</b>. |
| * |
| * @param dataSource the query data source |
| */ |
| public void initQueryDataSource(QueryDataSource dataSource) { |
| dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending()); |
| this.dataSource = dataSource; |
| |
| // updated filter concerning TTL |
| scanOptions.setTTL(dataSource.getDataTTL()); |
| |
| // init file index |
| orderUtils.setCurSeqFileIndex(dataSource); |
| curUnseqFileIndex = 0; |
| } |
| |
| protected PriorityMergeReader getPriorityMergeReader() { |
| return new PriorityMergeReader(); |
| } |
| |
| protected DescPriorityMergeReader getDescPriorityMergeReader() { |
| return new DescPriorityMergeReader(); |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////////////////////////// |
| // file level methods |
| ///////////////////////////////////////////////////////////////////////////////////////////////// |
| |
| public boolean hasNextFile() throws IOException { |
| if (!paginationController.hasCurLimit()) { |
| return false; |
| } |
| |
| if (!unSeqPageReaders.isEmpty() |
| || firstPageReader != null |
| || mergeReader.hasNextTimeValuePair()) { |
| throw new IllegalStateException( |
| "all cached pages should be consumed first unSeqPageReaders.isEmpty() is " |
| + unSeqPageReaders.isEmpty() |
| + " firstPageReader != null is " |
| + (firstPageReader != null) |
| + " mergeReader.hasNextTimeValuePair() = " |
| + mergeReader.hasNextTimeValuePair()); |
| } |
| |
| if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) { |
| throw new IllegalStateException("all cached chunks should be consumed first"); |
| } |
| |
| if (firstTimeSeriesMetadata != null) { |
| return true; |
| } |
| |
| while (firstTimeSeriesMetadata == null |
| && (orderUtils.hasNextSeqResource() |
| || orderUtils.hasNextUnseqResource() |
| || !seqTimeSeriesMetadata.isEmpty() |
| || !unSeqTimeSeriesMetadata.isEmpty())) { |
| // init first time series metadata whose startTime is minimum |
| tryToUnpackAllOverlappedFilesToTimeSeriesMetadata(); |
| // filter file based on push-down conditions |
| filterFirstTimeSeriesMetadata(); |
| } |
| |
| return firstTimeSeriesMetadata != null; |
| } |
| |
| private boolean currentFileOverlapped() { |
| Statistics<? extends Serializable> fileStatistics = firstTimeSeriesMetadata.getStatistics(); |
| return !seqTimeSeriesMetadata.isEmpty() |
| && orderUtils.isOverlapped(fileStatistics, seqTimeSeriesMetadata.get(0).getStatistics()) |
| || !unSeqTimeSeriesMetadata.isEmpty() |
| && orderUtils.isOverlapped( |
| fileStatistics, unSeqTimeSeriesMetadata.peek().getStatistics()); |
| } |
| |
| public boolean canUseCurrentFileStatistics() { |
| checkState(firstTimeSeriesMetadata != null, "no first file"); |
| |
| if (currentFileOverlapped() || firstTimeSeriesMetadata.isModified()) { |
| return false; |
| } |
| return filterAllSatisfy(scanOptions.getGlobalTimeFilter(), firstTimeSeriesMetadata) |
| && filterAllSatisfy(scanOptions.getPushDownFilter(), firstTimeSeriesMetadata); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| public Statistics currentFileTimeStatistics() { |
| return firstTimeSeriesMetadata.getTimeStatistics(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| public Statistics currentFileStatistics(int index) { |
| return firstTimeSeriesMetadata.getMeasurementStatistics(index).orElse(null); |
| } |
| |
| public void skipCurrentFile() { |
| firstTimeSeriesMetadata = null; |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////////////////////////// |
| // chunk level methods |
| ///////////////////////////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * This method should be called after hasNextFile() until no next chunk, make sure that all |
| * overlapped chunks are consumed. |
| * |
| * @throws IllegalStateException illegal state |
| */ |
| public boolean hasNextChunk() throws IOException { |
| if (!paginationController.hasCurLimit()) { |
| return false; |
| } |
| |
| if (!unSeqPageReaders.isEmpty() |
| || firstPageReader != null |
| || mergeReader.hasNextTimeValuePair()) { |
| throw new IllegalStateException( |
| "all cached pages should be consumed first unSeqPageReaders.isEmpty() is " |
| + unSeqPageReaders.isEmpty() |
| + " firstPageReader != null is " |
| + (firstPageReader != null) |
| + " mergeReader.hasNextTimeValuePair() = " |
| + mergeReader.hasNextTimeValuePair()); |
| } |
| |
| if (firstChunkMetadata != null) { |
| return true; |
| // hasNextFile() has not been invoked |
| } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) { |
| return false; |
| } |
| |
| while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) { |
| initFirstChunkMetadata(); |
| // filter chunk based on push-down conditions |
| filterFirstChunkMetadata(); |
| } |
| return firstChunkMetadata != null; |
| } |
| |
| private void filterFirstChunkMetadata() { |
| if (firstChunkMetadata == null) { |
| return; |
| } |
| |
| if (currentChunkOverlapped() || firstChunkMetadata.isModified()) { |
| return; |
| } |
| |
| // globalTimeFilter.canSkip() must be FALSE |
| Filter pushDownFilter = scanOptions.getPushDownFilter(); |
| if (pushDownFilter != null && pushDownFilter.canSkip(firstChunkMetadata)) { |
| skipCurrentChunk(); |
| return; |
| } |
| |
| Filter globalTimeFilter = scanOptions.getGlobalTimeFilter(); |
| if (filterAllSatisfy(globalTimeFilter, firstChunkMetadata) |
| && filterAllSatisfy(pushDownFilter, firstChunkMetadata) |
| && timeAllSelected(firstChunkMetadata)) { |
| long rowCount = firstChunkMetadata.getStatistics().getCount(); |
| if (paginationController.hasCurOffset(rowCount)) { |
| skipCurrentChunk(); |
| paginationController.consumeOffset(rowCount); |
| } |
| } |
| } |
| |
| /** construct first chunk metadata */ |
| private void initFirstChunkMetadata() throws IOException { |
| if (firstTimeSeriesMetadata != null) { |
| /* |
| * try to unpack all overlapped TimeSeriesMetadata to cachedChunkMetadata |
| */ |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata( |
| orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics())); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( |
| orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()), true); |
| } else { |
| /* |
| * first time series metadata is already unpacked, consume cached ChunkMetadata |
| */ |
| while (!cachedChunkMetadata.isEmpty()) { |
| firstChunkMetadata = cachedChunkMetadata.peek(); |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata( |
| orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics())); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( |
| orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false); |
| if (firstChunkMetadata.equals(cachedChunkMetadata.peek())) { |
| firstChunkMetadata = cachedChunkMetadata.poll(); |
| break; |
| } |
| } |
| } |
| } |
| |
| private void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( |
| long endpointTime, boolean init) { |
| while (!seqTimeSeriesMetadata.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) { |
| unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0)); |
| } |
| while (!unSeqTimeSeriesMetadata.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) { |
| unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll()); |
| } |
| |
| if (firstTimeSeriesMetadata != null |
| && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) { |
| unpackOneTimeSeriesMetadata(firstTimeSeriesMetadata); |
| firstTimeSeriesMetadata = null; |
| } |
| |
| if (init && firstChunkMetadata == null && !cachedChunkMetadata.isEmpty()) { |
| firstChunkMetadata = cachedChunkMetadata.poll(); |
| } |
| } |
| |
| protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata timeSeriesMetadata) { |
| List<IChunkMetadata> chunkMetadataList = |
| FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata); |
| chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq())); |
| |
| cachedChunkMetadata.addAll(chunkMetadataList); |
| } |
| |
| private boolean currentChunkOverlapped() { |
| Statistics<? extends Serializable> chunkStatistics = firstChunkMetadata.getStatistics(); |
| return !cachedChunkMetadata.isEmpty() |
| && orderUtils.isOverlapped(chunkStatistics, cachedChunkMetadata.peek().getStatistics()); |
| } |
| |
| public boolean canUseCurrentChunkStatistics() { |
| checkState(firstChunkMetadata != null, "no first chunk"); |
| |
| if (currentChunkOverlapped() || firstChunkMetadata.isModified()) { |
| return false; |
| } |
| return filterAllSatisfy(scanOptions.getGlobalTimeFilter(), firstChunkMetadata) |
| && filterAllSatisfy(scanOptions.getPushDownFilter(), firstChunkMetadata); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| public Statistics currentChunkTimeStatistics() { |
| return firstChunkMetadata.getTimeStatistics(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| public Statistics currentChunkStatistics(int index) { |
| return firstChunkMetadata.getMeasurementStatistics(index).orElse(null); |
| } |
| |
| public void skipCurrentChunk() { |
| firstChunkMetadata = null; |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////////////////////////// |
| // page level methods |
| ///////////////////////////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * This method should be called after hasNextChunk() until no next page, make sure that all |
| * overlapped pages are consumed |
| */ |
| @SuppressWarnings("squid:S3776") |
| // Suppress high Cognitive Complexity warning |
| public boolean hasNextPage() throws IOException { |
| if (!paginationController.hasCurLimit()) { |
| return false; |
| } |
| |
| /* |
| * has overlapped data before |
| */ |
| if (hasCachedNextOverlappedPage) { |
| return true; |
| } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) { |
| if (hasNextOverlappedPage()) { |
| cachedTsBlock = nextOverlappedPage(); |
| if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) { |
| hasCachedNextOverlappedPage = true; |
| return true; |
| } |
| } |
| } |
| |
| if (firstPageReader != null) { |
| return true; |
| } |
| |
| /* |
| * construct first page reader |
| */ |
| if (firstChunkMetadata != null) { |
| /* |
| * try to unpack all overlapped ChunkMetadata to cachedPageReaders |
| */ |
| unpackAllOverlappedChunkMetadataToPageReaders( |
| orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true); |
| } else { |
| /* |
| * first chunk metadata is already unpacked, consume cached pages |
| */ |
| initFirstPageReader(); |
| } |
| |
| if (isExistOverlappedPage()) { |
| return true; |
| } |
| |
| // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page |
| // readers |
| while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) { |
| |
| initFirstPageReader(); |
| |
| if (isExistOverlappedPage()) { |
| return true; |
| } |
| } |
| return firstPageReader != null; |
| } |
| |
| private boolean isExistOverlappedPage() throws IOException { |
| if (firstPageOverlapped()) { |
| /* |
| * next page is overlapped, read overlapped data and cache it |
| */ |
| if (hasNextOverlappedPage()) { |
| cachedTsBlock = nextOverlappedPage(); |
| if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) { |
| hasCachedNextOverlappedPage = true; |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| private boolean firstPageOverlapped() throws IOException { |
| if (firstPageReader == null) { |
| return false; |
| } |
| |
| long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false); |
| unpackAllOverlappedChunkMetadataToPageReaders(endpointTime, false); |
| |
| return (!seqPageReaders.isEmpty() |
| && orderUtils.isOverlapped( |
| firstPageReader.getStatistics(), seqPageReaders.get(0).getStatistics())) |
| || (!unSeqPageReaders.isEmpty() |
| && orderUtils.isOverlapped( |
| firstPageReader.getStatistics(), unSeqPageReaders.peek().getStatistics()) |
| || (mergeReader.hasNextTimeValuePair() |
| && orderUtils.isOverlapped( |
| mergeReader.currentTimeValuePair().getTimestamp(), |
| firstPageReader.getStatistics()))); |
| } |
| |
| private void unpackAllOverlappedChunkMetadataToPageReaders(long endpointTime, boolean init) |
| throws IOException { |
| if (firstChunkMetadata != null |
| && orderUtils.isOverlapped(endpointTime, firstChunkMetadata.getStatistics())) { |
| unpackOneChunkMetaData(firstChunkMetadata); |
| firstChunkMetadata = null; |
| } |
| // In case unpacking too many sequence chunks |
| boolean hasMeetSeq = false; |
| while (!cachedChunkMetadata.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.peek().getStatistics())) { |
| if (cachedChunkMetadata.peek().isSeq() && hasMeetSeq) { |
| break; |
| } else if (cachedChunkMetadata.peek().isSeq()) { |
| hasMeetSeq = true; |
| } |
| unpackOneChunkMetaData(cachedChunkMetadata.poll()); |
| } |
| if (init |
| && firstPageReader == null |
| && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) { |
| initFirstPageReader(); |
| } |
| } |
| |
| private void unpackOneChunkMetaData(IChunkMetadata chunkMetaData) throws IOException { |
| List<IPageReader> pageReaderList = |
| FileLoaderUtils.loadPageReaderList(chunkMetaData, scanOptions.getGlobalTimeFilter()); |
| |
| // init TsBlockBuilder for each page reader |
| pageReaderList.forEach(p -> p.initTsBlockBuilder(getTsDataTypeList())); |
| |
| if (chunkMetaData.isSeq()) { |
| if (orderUtils.getAscending()) { |
| for (IPageReader iPageReader : pageReaderList) { |
| seqPageReaders.add( |
| new VersionPageReader( |
| context, |
| chunkMetaData.getVersion(), |
| chunkMetaData.getOffsetOfChunkHeader(), |
| iPageReader, |
| true)); |
| } |
| } else { |
| for (int i = pageReaderList.size() - 1; i >= 0; i--) { |
| seqPageReaders.add( |
| new VersionPageReader( |
| context, |
| chunkMetaData.getVersion(), |
| chunkMetaData.getOffsetOfChunkHeader(), |
| pageReaderList.get(i), |
| true)); |
| } |
| } |
| } else { |
| pageReaderList.forEach( |
| pageReader -> |
| unSeqPageReaders.add( |
| new VersionPageReader( |
| context, |
| chunkMetaData.getVersion(), |
| chunkMetaData.getOffsetOfChunkHeader(), |
| pageReader, |
| false))); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private boolean currentPageOverlapped() throws IOException { |
| // This method should be called after calling hasNextPage. |
| // hasNextPage may cache firstPageReader if it is not overlapped or cached a tsBlock if the |
| // first page is overlapped |
| |
| // has cached overlapped page |
| if (hasCachedNextOverlappedPage) { |
| return true; |
| } |
| |
| // has a non-overlapped page in firstPageReader |
| if (mergeReader.hasNextTimeValuePair()) { |
| long mergeReaderTime = mergeReader.currentTimeValuePair().getTimestamp(); |
| if ((orderUtils.getAscending() |
| && mergeReaderTime <= firstPageReader.getStatistics().getEndTime()) |
| || (!orderUtils.getAscending() |
| && mergeReaderTime >= firstPageReader.getStatistics().getStartTime())) { |
| throw new IllegalStateException("overlapped data should be consumed first"); |
| } |
| } |
| |
| Statistics<? extends Serializable> firstPageStatistics = firstPageReader.getStatistics(); |
| return !unSeqPageReaders.isEmpty() |
| && orderUtils.isOverlapped(firstPageStatistics, unSeqPageReaders.peek().getStatistics()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public boolean canUseCurrentPageStatistics() throws IOException { |
| Statistics<? extends Serializable> currentPageStatistics = currentPageTimeStatistics(); |
| if (currentPageStatistics == null) { |
| return false; |
| } |
| if (currentPageOverlapped() || firstPageReader.isModified()) { |
| return false; |
| } |
| return filterAllSatisfy(scanOptions.getGlobalTimeFilter(), firstPageReader.data) |
| && filterAllSatisfy(scanOptions.getPushDownFilter(), firstPageReader.data); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| public Statistics currentPageTimeStatistics() { |
| if (firstPageReader == null) { |
| return null; |
| } |
| return firstPageReader.getTimeStatistics(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| public Statistics currentPageStatistics(int index) { |
| if (firstPageReader == null) { |
| return null; |
| } |
| return firstPageReader.getMeasurementStatistics(index); |
| } |
| |
| public void skipCurrentPage() { |
| firstPageReader = null; |
| } |
| |
| public TsBlock nextPage() throws IOException { |
| |
| if (hasCachedNextOverlappedPage) { |
| hasCachedNextOverlappedPage = false; |
| TsBlock res = |
| applyPushDownFilterAndLimitOffset( |
| cachedTsBlock, scanOptions.getPushDownFilter(), paginationController); |
| cachedTsBlock = null; |
| |
| // cached tsblock has handled by pagination controller & push down filter, return directly |
| return res; |
| } else { |
| // next page is not overlapped, push down filter & limit offset |
| filterFirstPageReader(); |
| if (firstPageReader == null) { |
| return null; |
| } |
| |
| firstPageReader.addPushDownFilter(scanOptions.getPushDownFilter()); |
| TsBlock tsBlock; |
| if (orderUtils.getAscending()) { |
| firstPageReader.setLimitOffset(paginationController); |
| tsBlock = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending()); |
| } else { |
| tsBlock = |
| paginationController.applyTsBlock( |
| firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())); |
| } |
| |
| firstPageReader = null; |
| |
| return tsBlock; |
| } |
| } |
| |
| private TsBlock applyPushDownFilterAndLimitOffset( |
| TsBlock tsBlock, Filter pushDownFilter, PaginationController paginationController) { |
| if (pushDownFilter == null) { |
| return paginationController.applyTsBlock(tsBlock); |
| } |
| return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock( |
| tsBlock, new TsBlockBuilder(getTsDataTypeList()), pushDownFilter, paginationController); |
| } |
| |
| private void filterFirstPageReader() { |
| if (firstPageReader == null) { |
| return; |
| } |
| |
| IPageReader pageReader = firstPageReader.data; |
| if (pageReader.isModified()) { |
| return; |
| } |
| |
| // globalTimeFilter.canSkip() must be FALSE |
| Filter pushDownFilter = scanOptions.getPushDownFilter(); |
| if (pushDownFilter != null && pushDownFilter.canSkip(pageReader)) { |
| skipCurrentPage(); |
| return; |
| } |
| |
| Filter globalTimeFilter = scanOptions.getGlobalTimeFilter(); |
| if (filterAllSatisfy(globalTimeFilter, pageReader) |
| && filterAllSatisfy(pushDownFilter, pageReader) |
| && timeAllSelected(pageReader)) { |
| long rowCount = pageReader.getStatistics().getCount(); |
| if (paginationController.hasCurOffset(rowCount)) { |
| skipCurrentPage(); |
| paginationController.consumeOffset(rowCount); |
| } |
| } |
| } |
| |
| /** |
| * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not |
| * contain data, read till next currentLargestEndTime again |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private boolean hasNextOverlappedPage() throws IOException { |
| long startTime = System.nanoTime(); |
| try { |
| if (hasCachedNextOverlappedPage) { |
| return true; |
| } |
| |
| tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); |
| |
| while (true) { |
| |
| // may has overlapped data |
| if (mergeReader.hasNextTimeValuePair()) { |
| |
| TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList()); |
| long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); |
| while (mergeReader.hasNextTimeValuePair()) { |
| |
| /* |
| * get current first point in mergeReader, this maybe overlapped later |
| */ |
| TimeValuePair timeValuePair = mergeReader.currentTimeValuePair(); |
| |
| if (orderUtils.isExcessEndpoint( |
| timeValuePair.getTimestamp(), currentPageEndPointTime)) { |
| /* |
| * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime |
| * 1. has cached batch data, we don't need to read more data, just use the cached data later |
| * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), |
| * we could just use the first page reader later |
| * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), |
| * we could use the first sequence page reader later |
| */ |
| if (!builder.isEmpty() || firstPageReader != null || !seqPageReaders.isEmpty()) { |
| break; |
| } |
| // so, we don't have other data except mergeReader |
| currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); |
| } |
| |
| // unpack all overlapped data for the first timeValuePair |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp()); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( |
| timeValuePair.getTimestamp(), false); |
| unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false); |
| unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); |
| |
| // update if there are unpacked unSeqPageReaders |
| timeValuePair = mergeReader.currentTimeValuePair(); |
| |
| // from now, the unsequence reader is all unpacked, so we don't need to consider it |
| // we has first page reader now |
| if (firstPageReader != null) { |
| // if current timeValuePair excesses the first page reader's end time, we just use the |
| // cached data |
| if ((orderUtils.getAscending() |
| && timeValuePair.getTimestamp() |
| > firstPageReader.getStatistics().getEndTime()) |
| || (!orderUtils.getAscending() |
| && timeValuePair.getTimestamp() |
| < firstPageReader.getStatistics().getStartTime())) { |
| hasCachedNextOverlappedPage = !builder.isEmpty(); |
| cachedTsBlock = builder.build(); |
| return hasCachedNextOverlappedPage; |
| } else if (orderUtils.isOverlapped( |
| timeValuePair.getTimestamp(), firstPageReader.getStatistics())) { |
| // current timeValuePair is overlapped with firstPageReader, add it to merged reader |
| // and update endTime to the max end time |
| mergeReader.addReader( |
| getPointReader( |
| firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())), |
| firstPageReader.version, |
| orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); |
| context |
| .getQueryStatistics() |
| .pageReaderMaxUsedMemorySize |
| .updateAndGet(v -> Math.max(v, mergeReader.getUsedMemorySize())); |
| currentPageEndPointTime = |
| updateEndPointTime(currentPageEndPointTime, firstPageReader); |
| firstPageReader = null; |
| } |
| } |
| |
| // the seq page readers is not empty, just like first page reader |
| if (!seqPageReaders.isEmpty()) { |
| if ((orderUtils.getAscending() |
| && timeValuePair.getTimestamp() |
| > seqPageReaders.get(0).getStatistics().getEndTime()) |
| || (!orderUtils.getAscending() |
| && timeValuePair.getTimestamp() |
| < seqPageReaders.get(0).getStatistics().getStartTime())) { |
| hasCachedNextOverlappedPage = !builder.isEmpty(); |
| cachedTsBlock = builder.build(); |
| return hasCachedNextOverlappedPage; |
| } else if (orderUtils.isOverlapped( |
| timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) { |
| VersionPageReader pageReader = seqPageReaders.remove(0); |
| mergeReader.addReader( |
| getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), |
| pageReader.version, |
| orderUtils.getOverlapCheckTime(pageReader.getStatistics())); |
| context |
| .getQueryStatistics() |
| .pageReaderMaxUsedMemorySize |
| .updateAndGet(v -> Math.max(v, mergeReader.getUsedMemorySize())); |
| currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader); |
| } |
| } |
| |
| // get the latest first point in mergeReader |
| timeValuePair = mergeReader.nextTimeValuePair(); |
| addTimeValuePairToResult(timeValuePair, builder); |
| } |
| hasCachedNextOverlappedPage = !builder.isEmpty(); |
| cachedTsBlock = builder.build(); |
| /* |
| * if current overlapped page has valid data, return, otherwise read next overlapped page |
| */ |
| if (hasCachedNextOverlappedPage) { |
| return true; |
| } else if (mergeReader.hasNextTimeValuePair()) { |
| // condition: seqPage.endTime < mergeReader.currentTime |
| return false; |
| } |
| } else { |
| return false; |
| } |
| } |
| } finally { |
| SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost( |
| isAligned |
| ? BUILD_TSBLOCK_FROM_MERGE_READER_ALIGNED |
| : BUILD_TSBLOCK_FROM_MERGE_READER_NONALIGNED, |
| System.nanoTime() - startTime); |
| } |
| } |
| |
| private long updateEndPointTime(long currentPageEndPointTime, VersionPageReader pageReader) { |
| if (orderUtils.getAscending()) { |
| return Math.min(currentPageEndPointTime, pageReader.getStatistics().getEndTime()); |
| } else { |
| return Math.max(currentPageEndPointTime, pageReader.getStatistics().getStartTime()); |
| } |
| } |
| |
| private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException { |
| |
| /* |
| * no cached page readers |
| */ |
| if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) { |
| return; |
| } |
| |
| /* |
| * init firstPageReader |
| */ |
| if (firstPageReader == null) { |
| initFirstPageReader(); |
| } |
| |
| long currentPageEndpointTime; |
| if (mergeReader.hasNextTimeValuePair()) { |
| currentPageEndpointTime = mergeReader.getCurrentReadStopTime(); |
| } else { |
| currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); |
| } |
| |
| /* |
| * put all currently directly overlapped unseq page reader to merge reader |
| */ |
| unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime); |
| } |
| |
| private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) { |
| builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp()); |
| switch (dataType) { |
| case BOOLEAN: |
| builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean()); |
| break; |
| case INT32: |
| builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt()); |
| break; |
| case INT64: |
| builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong()); |
| break; |
| case FLOAT: |
| builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat()); |
| break; |
| case DOUBLE: |
| builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble()); |
| break; |
| case TEXT: |
| builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary()); |
| break; |
| case VECTOR: |
| TsPrimitiveType[] values = timeValuePair.getValue().getVector(); |
| for (int i = 0; i < values.length; i++) { |
| if (values[i] == null) { |
| builder.getColumnBuilder(i).appendNull(); |
| } else { |
| builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]); |
| } |
| } |
| break; |
| default: |
| throw new UnSupportedDataTypeException(String.valueOf(dataType)); |
| } |
| builder.declarePosition(); |
| } |
| |
| private void initFirstPageReader() throws IOException { |
| while (this.firstPageReader == null) { |
| VersionPageReader firstPageReader = getFirstPageReaderFromCachedReaders(); |
| |
| // unpack overlapped page using current page reader |
| if (firstPageReader != null) { |
| long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime); |
| unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false); |
| unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false); |
| |
| // this page after unpacking must be the first page |
| if (firstPageReader.equals(getFirstPageReaderFromCachedReaders())) { |
| this.firstPageReader = firstPageReader; |
| if (!seqPageReaders.isEmpty() && firstPageReader.equals(seqPageReaders.get(0))) { |
| seqPageReaders.remove(0); |
| break; |
| } else if (!unSeqPageReaders.isEmpty() |
| && firstPageReader.equals(unSeqPageReaders.peek())) { |
| unSeqPageReaders.poll(); |
| break; |
| } |
| } |
| } else { |
| return; |
| } |
| } |
| } |
| |
| // We use get() and peek() here in case it's not the first page reader before unpacking |
| private VersionPageReader getFirstPageReaderFromCachedReaders() { |
| VersionPageReader firstPageReader = null; |
| if (!seqPageReaders.isEmpty() && !unSeqPageReaders.isEmpty()) { |
| if (orderUtils.isTakeSeqAsFirst( |
| seqPageReaders.get(0).getStatistics(), unSeqPageReaders.peek().getStatistics())) { |
| firstPageReader = seqPageReaders.get(0); |
| } else { |
| firstPageReader = unSeqPageReaders.peek(); |
| } |
| } else if (!seqPageReaders.isEmpty()) { |
| firstPageReader = seqPageReaders.get(0); |
| } else if (!unSeqPageReaders.isEmpty()) { |
| firstPageReader = unSeqPageReaders.peek(); |
| } |
| return firstPageReader; |
| } |
| |
| private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime) |
| throws IOException { |
| while (!unSeqPageReaders.isEmpty() |
| && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().data.getStatistics())) { |
| putPageReaderToMergeReader(unSeqPageReaders.poll()); |
| } |
| if (firstPageReader != null |
| && !firstPageReader.isSeq() |
| && orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) { |
| putPageReaderToMergeReader(firstPageReader); |
| firstPageReader = null; |
| } |
| } |
| |
| private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException { |
| mergeReader.addReader( |
| getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), |
| pageReader.version, |
| orderUtils.getOverlapCheckTime(pageReader.getStatistics())); |
| context |
| .getQueryStatistics() |
| .pageReaderMaxUsedMemorySize |
| .updateAndGet(v -> Math.max(v, mergeReader.getUsedMemorySize())); |
| } |
| |
| private TsBlock nextOverlappedPage() throws IOException { |
| if (hasCachedNextOverlappedPage || hasNextOverlappedPage()) { |
| hasCachedNextOverlappedPage = false; |
| return cachedTsBlock; |
| } |
| throw new IOException("No more batch data"); |
| } |
| |
| /** |
| * unpack all overlapped seq/unseq files and find the first TimeSeriesMetadata |
| * |
| * <p>Because there may be too many files in the scenario used by the user, we cannot open all the |
| * chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. This |
| * approach is likely to be ubiquitous, but it keeps the system running smoothly |
| * |
| * @throws IOException exception in unpacking |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private void tryToUnpackAllOverlappedFilesToTimeSeriesMetadata() throws IOException { |
| /* |
| * Fill sequence TimeSeriesMetadata List until it is not empty |
| */ |
| while (seqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextSeqResource()) { |
| unpackSeqTsFileResource(); |
| } |
| |
| /* |
| * Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty |
| */ |
| while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) { |
| unpackUnseqTsFileResource(); |
| } |
| |
| /* |
| * find end time of the first TimeSeriesMetadata |
| */ |
| long endTime = -1L; |
| if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) { |
| // only has seq |
| endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics()); |
| } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) { |
| // only has unseq |
| endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics()); |
| } else if (!seqTimeSeriesMetadata.isEmpty()) { |
| // has seq and unseq |
| endTime = |
| orderUtils.getCurrentEndPoint( |
| seqTimeSeriesMetadata.get(0).getStatistics(), |
| unSeqTimeSeriesMetadata.peek().getStatistics()); |
| } |
| |
| /* |
| * unpack all directly overlapped seq/unseq files with first TimeSeriesMetadata |
| */ |
| if (endTime != -1) { |
| unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime); |
| } |
| |
| /* |
| * update the first TimeSeriesMetadata |
| */ |
| if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) { |
| // only has seq |
| firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0); |
| } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) { |
| // only has unseq |
| firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll(); |
| } else if (!seqTimeSeriesMetadata.isEmpty()) { |
| // has seq and unseq |
| if (orderUtils.isTakeSeqAsFirst( |
| seqTimeSeriesMetadata.get(0).getStatistics(), |
| unSeqTimeSeriesMetadata.peek().getStatistics())) { |
| firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0); |
| } else { |
| firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll(); |
| } |
| } |
| } |
| |
| private void filterFirstTimeSeriesMetadata() { |
| if (firstTimeSeriesMetadata == null) { |
| return; |
| } |
| |
| if (currentFileOverlapped() || firstTimeSeriesMetadata.isModified()) { |
| return; |
| } |
| |
| // globalTimeFilter.canSkip() must be FALSE |
| Filter pushDownFilter = scanOptions.getPushDownFilter(); |
| if (pushDownFilter != null && pushDownFilter.canSkip(firstTimeSeriesMetadata)) { |
| skipCurrentFile(); |
| return; |
| } |
| |
| Filter globalTimeFilter = scanOptions.getGlobalTimeFilter(); |
| if (filterAllSatisfy(globalTimeFilter, firstTimeSeriesMetadata) |
| && filterAllSatisfy(pushDownFilter, firstTimeSeriesMetadata) |
| && timeAllSelected(firstTimeSeriesMetadata)) { |
| long rowCount = firstTimeSeriesMetadata.getStatistics().getCount(); |
| if (paginationController.hasCurOffset(rowCount)) { |
| skipCurrentFile(); |
| paginationController.consumeOffset(rowCount); |
| } |
| } |
| } |
| |
| private void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime) |
| throws IOException { |
| while (orderUtils.hasNextUnseqResource() && orderUtils.isCurUnSeqOverlappedWith(endpointTime)) { |
| unpackUnseqTsFileResource(); |
| } |
| while (orderUtils.hasNextSeqResource() && orderUtils.isCurSeqOverlappedWith(endpointTime)) { |
| unpackSeqTsFileResource(); |
| } |
| } |
| |
| private void unpackSeqTsFileResource() throws IOException { |
| ITimeSeriesMetadata timeseriesMetadata = |
| loadTimeSeriesMetadata(orderUtils.getNextSeqFileResource(true), true); |
| // skip if data type is mismatched which may be caused by delete |
| if (timeseriesMetadata != null && timeseriesMetadata.typeMatch(getTsDataTypeList())) { |
| timeseriesMetadata.setSeq(true); |
| seqTimeSeriesMetadata.add(timeseriesMetadata); |
| } |
| } |
| |
| private void unpackUnseqTsFileResource() throws IOException { |
| ITimeSeriesMetadata timeseriesMetadata = |
| loadTimeSeriesMetadata(orderUtils.getNextUnseqFileResource(true), false); |
| // skip if data type is mismatched which may be caused by delete |
| if (timeseriesMetadata != null && timeseriesMetadata.typeMatch(getTsDataTypeList())) { |
| timeseriesMetadata.setSeq(false); |
| unSeqTimeSeriesMetadata.add(timeseriesMetadata); |
| } |
| } |
| |
| protected ITimeSeriesMetadata loadTimeSeriesMetadata(TsFileResource resource, boolean isSeq) |
| throws IOException { |
| return FileLoaderUtils.loadTimeSeriesMetadata( |
| resource, |
| seriesPath, |
| context, |
| scanOptions.getGlobalTimeFilter(), |
| scanOptions.getAllSensors(), |
| isSeq); |
| } |
| |
| public List<TSDataType> getTsDataTypeList() { |
| return Collections.singletonList(dataType); |
| } |
| |
| protected IPointReader getPointReader(TsBlock tsBlock) { |
| return tsBlock.getTsBlockSingleColumnIterator(); |
| } |
| |
| protected boolean timeAllSelected(IMetadata metadata) { |
| return true; |
| } |
| |
| private boolean filterAllSatisfy(Filter filter, IMetadata metadata) { |
| return filter == null || filter.allSatisfy(metadata); |
| } |
| |
| protected static class VersionPageReader { |
| private final QueryContext context; |
| private final PriorityMergeReader.MergeReaderPriority version; |
| private final IPageReader data; |
| |
| private final boolean isSeq; |
| private final boolean isAligned; |
| private final boolean isMem; |
| |
| VersionPageReader( |
| QueryContext context, long version, long offset, IPageReader data, boolean isSeq) { |
| this.context = context; |
| this.version = new PriorityMergeReader.MergeReaderPriority(version, offset); |
| this.data = data; |
| this.isSeq = isSeq; |
| this.isAligned = data instanceof AlignedPageReader || data instanceof MemAlignedPageReader; |
| this.isMem = data instanceof MemPageReader || data instanceof MemAlignedPageReader; |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| Statistics getStatistics() { |
| return data.getStatistics(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| Statistics getMeasurementStatistics(int index) { |
| return data.getMeasurementStatistics(index).orElse(null); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| Statistics getTimeStatistics() { |
| return data.getTimeStatistics(); |
| } |
| |
| TsBlock getAllSatisfiedPageData(boolean ascending) throws IOException { |
| long startTime = System.nanoTime(); |
| try { |
| TsBlock tsBlock = data.getAllSatisfiedData(); |
| if (!ascending) { |
| tsBlock.reverse(); |
| } |
| return tsBlock; |
| } finally { |
| long time = System.nanoTime() - startTime; |
| if (isAligned) { |
| if (isMem) { |
| context.getQueryStatistics().pageReadersDecodeAlignedMemCount.getAndAdd(1); |
| context.getQueryStatistics().pageReadersDecodeAlignedMemTime.getAndAdd(time); |
| } else { |
| context.getQueryStatistics().pageReadersDecodeAlignedDiskCount.getAndAdd(1); |
| context.getQueryStatistics().pageReadersDecodeAlignedDiskTime.getAndAdd(time); |
| } |
| } else { |
| if (isMem) { |
| context.getQueryStatistics().pageReadersDecodeNonAlignedMemCount.getAndAdd(1); |
| context.getQueryStatistics().pageReadersDecodeNonAlignedMemTime.getAndAdd(time); |
| } else { |
| context.getQueryStatistics().pageReadersDecodeNonAlignedDiskCount.getAndAdd(1); |
| context.getQueryStatistics().pageReadersDecodeNonAlignedDiskTime.getAndAdd(time); |
| } |
| } |
| } |
| } |
| |
| void addPushDownFilter(Filter pushDownFilter) { |
| data.addRecordFilter(pushDownFilter); |
| } |
| |
| boolean isModified() { |
| return data.isModified(); |
| } |
| |
| public boolean isSeq() { |
| return isSeq; |
| } |
| |
| public void setLimitOffset(PaginationController paginationController) { |
| data.setLimitOffset(paginationController); |
| } |
| } |
| |
| public interface TimeOrderUtils { |
| |
| long getOrderTime(Statistics<? extends Object> statistics); |
| |
| long getOverlapCheckTime(Statistics<? extends Object> range); |
| |
| boolean isOverlapped(Statistics<? extends Object> left, Statistics<? extends Object> right); |
| |
| boolean isOverlapped(long time, Statistics<? extends Object> right); |
| |
| boolean isCurSeqOverlappedWith(long time); |
| |
| boolean isCurUnSeqOverlappedWith(long time); |
| |
| <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor); |
| |
| long getCurrentEndPoint(long time, Statistics<? extends Object> statistics); |
| |
| long getCurrentEndPoint( |
| Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics); |
| |
| boolean isExcessEndpoint(long time, long endpointTime); |
| |
| /** Return true if taking first page reader from seq readers */ |
| boolean isTakeSeqAsFirst( |
| Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics); |
| |
| boolean getAscending(); |
| |
| boolean hasNextSeqResource(); |
| |
| boolean hasNextUnseqResource(); |
| |
| TsFileResource getNextSeqFileResource(boolean isDelete); |
| |
| TsFileResource getNextUnseqFileResource(boolean isDelete); |
| |
| void setCurSeqFileIndex(QueryDataSource dataSource); |
| } |
| |
| class DescTimeOrderUtils implements TimeOrderUtils { |
| |
| @SuppressWarnings("squid:S3740") |
| @Override |
| public long getOrderTime(Statistics statistics) { |
| return statistics.getEndTime(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| @Override |
| public long getOverlapCheckTime(Statistics range) { |
| return range.getStartTime(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| @Override |
| public boolean isOverlapped(Statistics left, Statistics right) { |
| return left.getStartTime() <= right.getEndTime(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| @Override |
| public boolean isOverlapped(long time, Statistics right) { |
| return time <= right.getEndTime(); |
| } |
| |
| @Override |
| public boolean isCurSeqOverlappedWith(long time) { |
| return time <= dataSource.getCurrentSeqOrderTime(curSeqFileIndex); |
| } |
| |
| @Override |
| public boolean isCurUnSeqOverlappedWith(long time) { |
| return time <= dataSource.getCurrentUnSeqOrderTime(curUnseqFileIndex); |
| } |
| |
| @Override |
| public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) { |
| Objects.requireNonNull(keyExtractor); |
| return (Comparator<T> & Serializable) |
| (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c2), keyExtractor.applyAsLong(c1)); |
| } |
| |
| @Override |
| public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) { |
| return Math.max(time, statistics.getStartTime()); |
| } |
| |
| @Override |
| public long getCurrentEndPoint( |
| Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) { |
| return Math.max(seqStatistics.getStartTime(), unseqStatistics.getStartTime()); |
| } |
| |
| @Override |
| public boolean isExcessEndpoint(long time, long endpointTime) { |
| return time < endpointTime; |
| } |
| |
| @Override |
| public boolean isTakeSeqAsFirst( |
| Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) { |
| return seqStatistics.getEndTime() > unseqStatistics.getEndTime(); |
| } |
| |
| @Override |
| public boolean getAscending() { |
| return false; |
| } |
| |
| @Override |
| public boolean hasNextSeqResource() { |
| while (dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID)) { |
| if (dataSource.isSeqSatisfied( |
| deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { |
| break; |
| } |
| curSeqFileIndex--; |
| } |
| return dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID); |
| } |
| |
| @Override |
| public boolean hasNextUnseqResource() { |
| while (dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID)) { |
| if (dataSource.isUnSeqSatisfied( |
| deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { |
| break; |
| } |
| curUnseqFileIndex++; |
| } |
| return dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID); |
| } |
| |
| @Override |
| public TsFileResource getNextSeqFileResource(boolean isDelete) { |
| TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex); |
| if (isDelete) { |
| curSeqFileIndex--; |
| } |
| return tsFileResource; |
| } |
| |
| @Override |
| public TsFileResource getNextUnseqFileResource(boolean isDelete) { |
| TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex); |
| if (isDelete) { |
| curUnseqFileIndex++; |
| } |
| return tsFileResource; |
| } |
| |
| @Override |
| public void setCurSeqFileIndex(QueryDataSource dataSource) { |
| curSeqFileIndex = dataSource.getSeqResourcesSize() - 1; |
| } |
| } |
| |
| class AscTimeOrderUtils implements TimeOrderUtils { |
| |
| @SuppressWarnings("squid:S3740") |
| @Override |
| public long getOrderTime(Statistics statistics) { |
| return statistics.getStartTime(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| @Override |
| public long getOverlapCheckTime(Statistics range) { |
| return range.getEndTime(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| @Override |
| public boolean isOverlapped(Statistics left, Statistics right) { |
| return left.getEndTime() >= right.getStartTime(); |
| } |
| |
| @SuppressWarnings("squid:S3740") |
| @Override |
| public boolean isOverlapped(long time, Statistics right) { |
| return time >= right.getStartTime(); |
| } |
| |
| @Override |
| public boolean isCurSeqOverlappedWith(long time) { |
| return time >= dataSource.getCurrentSeqOrderTime(curSeqFileIndex); |
| } |
| |
| @Override |
| public boolean isCurUnSeqOverlappedWith(long time) { |
| return time >= dataSource.getCurrentUnSeqOrderTime(curUnseqFileIndex); |
| } |
| |
| @Override |
| public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) { |
| Objects.requireNonNull(keyExtractor); |
| return (Comparator<T> & Serializable) |
| (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c1), keyExtractor.applyAsLong(c2)); |
| } |
| |
| @Override |
| public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) { |
| return Math.min(time, statistics.getEndTime()); |
| } |
| |
| @Override |
| public long getCurrentEndPoint( |
| Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) { |
| return Math.min(seqStatistics.getEndTime(), unseqStatistics.getEndTime()); |
| } |
| |
| @Override |
| public boolean isExcessEndpoint(long time, long endpointTime) { |
| return time > endpointTime; |
| } |
| |
| @Override |
| public boolean isTakeSeqAsFirst( |
| Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) { |
| return seqStatistics.getStartTime() < unseqStatistics.getStartTime(); |
| } |
| |
| @Override |
| public boolean getAscending() { |
| return true; |
| } |
| |
| @Override |
| public boolean hasNextSeqResource() { |
| while (dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID)) { |
| if (dataSource.isSeqSatisfied( |
| deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { |
| break; |
| } |
| curSeqFileIndex++; |
| } |
| return dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID); |
| } |
| |
| @Override |
| public boolean hasNextUnseqResource() { |
| while (dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID)) { |
| if (dataSource.isUnSeqSatisfied( |
| deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { |
| break; |
| } |
| curUnseqFileIndex++; |
| } |
| return dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID); |
| } |
| |
| @Override |
| public TsFileResource getNextSeqFileResource(boolean isDelete) { |
| TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex); |
| if (isDelete) { |
| curSeqFileIndex++; |
| } |
| return tsFileResource; |
| } |
| |
| @Override |
| public TsFileResource getNextUnseqFileResource(boolean isDelete) { |
| TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex); |
| if (isDelete) { |
| curUnseqFileIndex++; |
| } |
| return tsFileResource; |
| } |
| |
| @Override |
| public void setCurSeqFileIndex(QueryDataSource dataSource) { |
| curSeqFileIndex = 0; |
| } |
| } |
| } |