| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.schemaengine.schemaregion.utils; |
| |
| import org.apache.iotdb.commons.path.AlignedPath; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedReadOnlyMemChunk; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunk; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunkGroup; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
| import org.apache.iotdb.db.utils.ModificationUtils; |
| import org.apache.iotdb.db.utils.datastructure.TVList; |
| |
| import org.apache.tsfile.enums.TSDataType; |
| import org.apache.tsfile.file.metadata.AlignedChunkMetadata; |
| import org.apache.tsfile.file.metadata.AlignedTimeSeriesMetadata; |
| import org.apache.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.tsfile.file.metadata.IDeviceID; |
| import org.apache.tsfile.file.metadata.ITimeSeriesMetadata; |
| import org.apache.tsfile.file.metadata.TimeseriesMetadata; |
| import org.apache.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.tsfile.file.metadata.statistics.Statistics; |
| import org.apache.tsfile.read.common.TimeRange; |
| import org.apache.tsfile.utils.Pair; |
| import org.apache.tsfile.write.schema.IMeasurementSchema; |
| import org.apache.tsfile.write.schema.VectorMeasurementSchema; |
| import org.apache.tsfile.write.writer.RestorableTsFileIOWriter; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.apache.iotdb.commons.path.AlignedPath.VECTOR_PLACEHOLDER; |
| |
| /** |
| * Obtain required resources through path, such as readers and writers and etc. AlignedPath and |
| * MeasurementPath have different implementations, and the default PartialPath should not use it. |
| */ |
| public abstract class ResourceByPathUtils { |
| |
| public static ResourceByPathUtils getResourceInstance(PartialPath path) { |
| if (path instanceof AlignedPath) { |
| return new AlignedResourceByPathUtils(path); |
| } else if (path instanceof MeasurementPath) { |
| return new MeasurementResourceByPathUtils(path); |
| } |
| throw new UnsupportedOperationException("Should call exact sub class!"); |
| } |
| |
| public abstract ITimeSeriesMetadata generateTimeSeriesMetadata( |
| List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList) |
| throws IOException; |
| |
| public abstract ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( |
| QueryContext context, |
| IMemTable memTable, |
| List<Pair<Modification, IMemTable>> modsToMemtable, |
| long timeLowerBound) |
| throws QueryProcessException, IOException; |
| |
| public abstract List<IChunkMetadata> getVisibleMetadataListFromWriter( |
| RestorableTsFileIOWriter writer, TsFileResource tsFileResource, QueryContext context); |
| |
| /** get modifications from a memtable. */ |
| protected List<Modification> getModificationsForMemtable( |
| IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable) { |
| List<Modification> modifications = new ArrayList<>(); |
| boolean foundMemtable = false; |
| for (Pair<Modification, IMemTable> entry : modsToMemtable) { |
| if (foundMemtable || entry.right.equals(memTable)) { |
| modifications.add(entry.left); |
| foundMemtable = true; |
| } |
| } |
| return modifications; |
| } |
| } |
| |
| class AlignedResourceByPathUtils extends ResourceByPathUtils { |
| |
| AlignedPath partialPath; |
| |
| public AlignedResourceByPathUtils(PartialPath partialPath) { |
| this.partialPath = (AlignedPath) partialPath; |
| } |
| |
| /** |
| * Because the unclosed tsfile don't have TimeSeriesMetadata and memtables in the memory don't |
| * have chunkMetadata, but query will use these, so we need to generate it for them. |
| */ |
| @Override |
| public AlignedTimeSeriesMetadata generateTimeSeriesMetadata( |
| List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList) { |
| TimeseriesMetadata timeTimeSeriesMetadata = new TimeseriesMetadata(); |
| timeTimeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1); |
| timeTimeSeriesMetadata.setMeasurementId(""); |
| timeTimeSeriesMetadata.setTsDataType(TSDataType.VECTOR); |
| |
| Statistics<? extends Serializable> timeStatistics = |
| Statistics.getStatsByType(timeTimeSeriesMetadata.getTsDataType()); |
| |
| // init each value time series meta |
| List<TimeseriesMetadata> valueTimeSeriesMetadataList = new ArrayList<>(); |
| for (IMeasurementSchema valueChunkMetadata : (partialPath.getSchemaList())) { |
| TimeseriesMetadata valueMetadata = new TimeseriesMetadata(); |
| valueMetadata.setDataSizeOfChunkMetaDataList(-1); |
| valueMetadata.setMeasurementId(valueChunkMetadata.getMeasurementId()); |
| valueMetadata.setTsDataType(valueChunkMetadata.getType()); |
| valueMetadata.setStatistics(Statistics.getStatsByType(valueChunkMetadata.getType())); |
| valueTimeSeriesMetadataList.add(valueMetadata); |
| } |
| |
| boolean[] exist = new boolean[partialPath.getSchemaList().size()]; |
| boolean modified = false; |
| for (IChunkMetadata chunkMetadata : chunkMetadataList) { |
| AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetadata; |
| modified = (modified || alignedChunkMetadata.isModified()); |
| timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics()); |
| for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { |
| if (alignedChunkMetadata.getValueChunkMetadataList().get(i) != null) { |
| exist[i] = true; |
| valueTimeSeriesMetadataList |
| .get(i) |
| .getStatistics() |
| .mergeStatistics( |
| alignedChunkMetadata.getValueChunkMetadataList().get(i).getStatistics()); |
| } |
| } |
| } |
| |
| for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) { |
| if (!memChunk.isEmpty()) { |
| AlignedChunkMetadata alignedChunkMetadata = |
| (AlignedChunkMetadata) memChunk.getChunkMetaData(); |
| timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics()); |
| for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { |
| if (alignedChunkMetadata.getValueChunkMetadataList().get(i) != null) { |
| exist[i] = true; |
| valueTimeSeriesMetadataList |
| .get(i) |
| .getStatistics() |
| .mergeStatistics( |
| alignedChunkMetadata.getValueChunkMetadataList().get(i).getStatistics()); |
| } |
| } |
| } |
| } |
| |
| timeTimeSeriesMetadata.setStatistics(timeStatistics); |
| timeTimeSeriesMetadata.setModified(modified); |
| |
| for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { |
| if (!exist[i]) { |
| valueTimeSeriesMetadataList.set(i, null); |
| } |
| } |
| |
| return new AlignedTimeSeriesMetadata(timeTimeSeriesMetadata, valueTimeSeriesMetadataList); |
| } |
| |
| @Override |
| public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( |
| QueryContext context, |
| IMemTable memTable, |
| List<Pair<Modification, IMemTable>> modsToMemtable, |
| long timeLowerBound) |
| throws QueryProcessException, IOException { |
| Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap(); |
| IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(partialPath); |
| |
| // check If memtable contains this path |
| if (!memTableMap.containsKey(deviceID)) { |
| return null; |
| } |
| AlignedWritableMemChunk alignedMemChunk = |
| ((AlignedWritableMemChunkGroup) memTableMap.get(deviceID)).getAlignedMemChunk(); |
| boolean containsMeasurement = false; |
| for (String measurement : partialPath.getMeasurementList()) { |
| if (alignedMemChunk.containsMeasurement(measurement)) { |
| containsMeasurement = true; |
| break; |
| } |
| } |
| if (!containsMeasurement) { |
| return null; |
| } |
| // get sorted tv list is synchronized so different query can get right sorted list reference |
| TVList alignedTvListCopy = alignedMemChunk.getSortedTvListForQuery(partialPath.getSchemaList()); |
| List<List<TimeRange>> deletionList = null; |
| if (modsToMemtable != null) { |
| deletionList = constructDeletionList(memTable, modsToMemtable, timeLowerBound); |
| } |
| return new AlignedReadOnlyMemChunk( |
| context, getMeasurementSchema(), alignedTvListCopy, deletionList); |
| } |
| |
| public VectorMeasurementSchema getMeasurementSchema() { |
| List<String> measurementList = partialPath.getMeasurementList(); |
| TSDataType[] types = new TSDataType[measurementList.size()]; |
| TSEncoding[] encodings = new TSEncoding[measurementList.size()]; |
| |
| for (int i = 0; i < measurementList.size(); i++) { |
| types[i] = partialPath.getSchemaList().get(i).getType(); |
| encodings[i] = partialPath.getSchemaList().get(i).getEncodingType(); |
| } |
| String[] array = new String[measurementList.size()]; |
| for (int i = 0; i < array.length; i++) { |
| array[i] = measurementList.get(i); |
| } |
| return new VectorMeasurementSchema( |
| VECTOR_PLACEHOLDER, |
| array, |
| types, |
| encodings, |
| partialPath.getSchemaList().get(0).getCompressor()); |
| } |
| |
| private List<List<TimeRange>> constructDeletionList( |
| IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable, long timeLowerBound) { |
| List<List<TimeRange>> deletionList = new ArrayList<>(); |
| for (String measurement : partialPath.getMeasurementList()) { |
| List<TimeRange> columnDeletionList = new ArrayList<>(); |
| columnDeletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound)); |
| for (Modification modification : getModificationsForMemtable(memTable, modsToMemtable)) { |
| if (modification instanceof Deletion) { |
| Deletion deletion = (Deletion) modification; |
| PartialPath fullPath = partialPath.concatNode(measurement); |
| if (deletion.getPath().matchFullPath(fullPath) |
| && deletion.getEndTime() > timeLowerBound) { |
| long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound); |
| columnDeletionList.add(new TimeRange(lowerBound, deletion.getEndTime())); |
| } |
| } |
| } |
| deletionList.add(TimeRange.sortAndMerge(columnDeletionList)); |
| } |
| return deletionList; |
| } |
| |
| @Override |
| public List<IChunkMetadata> getVisibleMetadataListFromWriter( |
| RestorableTsFileIOWriter writer, TsFileResource tsFileResource, QueryContext context) { |
| List<List<Modification>> modifications = |
| context.getPathModifications(tsFileResource, partialPath); |
| |
| List<AlignedChunkMetadata> chunkMetadataList = new ArrayList<>(); |
| List<ChunkMetadata> timeChunkMetadataList = |
| writer.getVisibleMetadataList(partialPath.getIDeviceID(), "", partialPath.getSeriesType()); |
| List<List<ChunkMetadata>> valueChunkMetadataList = new ArrayList<>(); |
| for (int i = 0; i < partialPath.getMeasurementList().size(); i++) { |
| valueChunkMetadataList.add( |
| writer.getVisibleMetadataList( |
| partialPath.getIDeviceID(), |
| partialPath.getMeasurementList().get(i), |
| partialPath.getSchemaList().get(i).getType())); |
| } |
| |
| for (int i = 0; i < timeChunkMetadataList.size(); i++) { |
| // only need time column |
| if (partialPath.getMeasurementList().isEmpty()) { |
| chunkMetadataList.add( |
| new AlignedChunkMetadata(timeChunkMetadataList.get(i), Collections.emptyList())); |
| } else { |
| List<IChunkMetadata> valueChunkMetadata = new ArrayList<>(); |
| // if all the sub sensors doesn't exist, it will be false |
| boolean exits = false; |
| for (List<ChunkMetadata> chunkMetadata : valueChunkMetadataList) { |
| boolean currentExist = i < chunkMetadata.size(); |
| exits = (exits || currentExist); |
| valueChunkMetadata.add(currentExist ? chunkMetadata.get(i) : null); |
| } |
| if (exits) { |
| chunkMetadataList.add( |
| new AlignedChunkMetadata(timeChunkMetadataList.get(i), valueChunkMetadata)); |
| } |
| } |
| } |
| |
| ModificationUtils.modifyAlignedChunkMetaData(chunkMetadataList, modifications); |
| chunkMetadataList.removeIf(context::chunkNotSatisfy); |
| return new ArrayList<>(chunkMetadataList); |
| } |
| } |
| |
| class MeasurementResourceByPathUtils extends ResourceByPathUtils { |
| |
| MeasurementPath partialPath; |
| |
| protected MeasurementResourceByPathUtils(PartialPath partialPath) { |
| this.partialPath = (MeasurementPath) partialPath; |
| } |
| |
| /** |
| * Because the unclosed tsfile don't have TimeSeriesMetadata and memtables in the memory don't |
| * have chunkMetadata, but query will use these, so we need to generate it for them. |
| */ |
| @Override |
| public ITimeSeriesMetadata generateTimeSeriesMetadata( |
| List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList) { |
| TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata(); |
| timeSeriesMetadata.setMeasurementId(partialPath.getMeasurementSchema().getMeasurementId()); |
| timeSeriesMetadata.setTsDataType(partialPath.getMeasurementSchema().getType()); |
| timeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1); |
| |
| Statistics<? extends Serializable> seriesStatistics = |
| Statistics.getStatsByType(timeSeriesMetadata.getTsDataType()); |
| // flush chunkMetadataList one by one |
| boolean isModified = false; |
| for (IChunkMetadata chunkMetadata : chunkMetadataList) { |
| isModified = (isModified || chunkMetadata.isModified()); |
| seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); |
| } |
| |
| for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) { |
| if (!memChunk.isEmpty()) { |
| seriesStatistics.mergeStatistics(memChunk.getChunkMetaData().getStatistics()); |
| } |
| } |
| timeSeriesMetadata.setStatistics(seriesStatistics); |
| timeSeriesMetadata.setModified(isModified); |
| return timeSeriesMetadata; |
| } |
| |
| @Override |
| public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( |
| QueryContext context, |
| IMemTable memTable, |
| List<Pair<Modification, IMemTable>> modsToMemtable, |
| long timeLowerBound) |
| throws QueryProcessException, IOException { |
| Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap(); |
| IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(partialPath.getDevicePath()); |
| // check If Memtable Contains this path |
| if (!memTableMap.containsKey(deviceID) |
| || !memTableMap.get(deviceID).contains(partialPath.getMeasurement())) { |
| return null; |
| } |
| IWritableMemChunk memChunk = |
| memTableMap.get(deviceID).getMemChunkMap().get(partialPath.getMeasurement()); |
| // get sorted tv list is synchronized so different query can get right sorted list reference |
| TVList chunkCopy = memChunk.getSortedTvListForQuery(); |
| List<TimeRange> deletionList = null; |
| if (modsToMemtable != null) { |
| deletionList = constructDeletionList(memTable, modsToMemtable, timeLowerBound); |
| } |
| return new ReadOnlyMemChunk( |
| context, |
| partialPath.getMeasurement(), |
| partialPath.getMeasurementSchema().getType(), |
| partialPath.getMeasurementSchema().getEncodingType(), |
| chunkCopy, |
| partialPath.getMeasurementSchema().getProps(), |
| deletionList); |
| } |
| /** |
| * construct a deletion list from a memtable. |
| * |
| * @param memTable memtable |
| * @param timeLowerBound time watermark |
| */ |
| private List<TimeRange> constructDeletionList( |
| IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable, long timeLowerBound) { |
| List<TimeRange> deletionList = new ArrayList<>(); |
| deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound)); |
| for (Modification modification : getModificationsForMemtable(memTable, modsToMemtable)) { |
| if (modification instanceof Deletion) { |
| Deletion deletion = (Deletion) modification; |
| if (deletion.getPath().matchFullPath(partialPath) |
| && deletion.getEndTime() > timeLowerBound) { |
| long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound); |
| deletionList.add(new TimeRange(lowerBound, deletion.getEndTime())); |
| } |
| } |
| } |
| return TimeRange.sortAndMerge(deletionList); |
| } |
| /** get modifications from a memtable. */ |
| @Override |
| protected List<Modification> getModificationsForMemtable( |
| IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable) { |
| List<Modification> modifications = new ArrayList<>(); |
| boolean foundMemtable = false; |
| for (Pair<Modification, IMemTable> entry : modsToMemtable) { |
| if (foundMemtable || entry.right.equals(memTable)) { |
| modifications.add(entry.left); |
| foundMemtable = true; |
| } |
| } |
| return modifications; |
| } |
| |
| @Override |
| public List<IChunkMetadata> getVisibleMetadataListFromWriter( |
| RestorableTsFileIOWriter writer, TsFileResource tsFileResource, QueryContext context) { |
| List<Modification> modifications = context.getPathModifications(tsFileResource, partialPath); |
| |
| List<IChunkMetadata> chunkMetadataList = |
| new ArrayList<>( |
| writer.getVisibleMetadataList( |
| partialPath.getIDeviceID(), |
| partialPath.getMeasurement(), |
| partialPath.getSeriesType())); |
| |
| ModificationUtils.modifyChunkMetaData(chunkMetadataList, modifications); |
| chunkMetadataList.removeIf(context::chunkNotSatisfy); |
| return chunkMetadataList; |
| } |
| } |