| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast; |
| |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.db.exception.WriteProcessException; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.PageElement; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionAlignedChunkReader; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
| import org.apache.iotdb.db.utils.ModificationUtils; |
| |
| import org.apache.tsfile.exception.write.PageException; |
| import org.apache.tsfile.file.header.ChunkHeader; |
| import org.apache.tsfile.file.header.PageHeader; |
| import org.apache.tsfile.file.metadata.AlignedChunkMetadata; |
| import org.apache.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.tsfile.file.metadata.IDeviceID; |
| import org.apache.tsfile.read.TsFileSequenceReader; |
| import org.apache.tsfile.read.common.Chunk; |
| import org.apache.tsfile.utils.Pair; |
| import org.apache.tsfile.write.schema.IMeasurementSchema; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { |
| |
| // measurementID -> tsfile resource -> timeseries metadata <startOffset, endOffset> |
| // linked hash map, which has the same measurement lexicographical order as measurementSchemas. |
| // used to get the chunk metadatas from tsfile directly according to timeseries metadata offset. |
| private final Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap; |
| |
| private final List<IMeasurementSchema> measurementSchemas; |
| private final IMeasurementSchema timeColumnMeasurementSchema; |
| private final Map<String, IMeasurementSchema> measurementSchemaMap; |
| |
| @SuppressWarnings("squid:S107") |
| public AlignedSeriesCompactionExecutor( |
| AbstractCompactionWriter compactionWriter, |
| Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap, |
| Map<TsFileResource, TsFileSequenceReader> readerCacheMap, |
| Map<TsFileResource, List<Modification>> modificationCacheMap, |
| List<TsFileResource> sortedSourceFiles, |
| IDeviceID deviceId, |
| int subTaskId, |
| List<IMeasurementSchema> measurementSchemas, |
| FastCompactionTaskSummary summary) { |
| super( |
| compactionWriter, readerCacheMap, modificationCacheMap, deviceId, true, subTaskId, summary); |
| this.timeseriesMetadataOffsetMap = timeseriesMetadataOffsetMap; |
| this.measurementSchemas = measurementSchemas; |
| this.timeColumnMeasurementSchema = measurementSchemas.get(0); |
| this.measurementSchemaMap = new HashMap<>(); |
| this.measurementSchemas.forEach( |
| schema -> measurementSchemaMap.put(schema.getMeasurementId(), schema)); |
| // get source files which are sorted by the startTime of current device from old to new, |
| // files that do not contain the current device have been filtered out as well. |
| sortedSourceFiles.forEach(x -> fileList.add(new FileElement(x))); |
| } |
| |
| @Override |
| public void execute() |
| throws PageException, IllegalPathException, IOException, WriteProcessException { |
| compactionWriter.startMeasurement(measurementSchemas, subTaskId); |
| compactFiles(); |
| compactionWriter.endMeasurement(subTaskId); |
| } |
| |
| @Override |
| protected void compactFiles() |
| throws PageException, IOException, WriteProcessException, IllegalPathException { |
| markStartOfAlignedSeries(); |
| |
| while (!fileList.isEmpty()) { |
| List<FileElement> overlappedFiles = findOverlapFiles(fileList.get(0)); |
| |
| // read chunk metadatas from files and put them into chunk metadata queue |
| deserializeFileIntoChunkMetadataQueue(overlappedFiles); |
| |
| compactChunks(); |
| } |
| |
| markEndOfAlignedSeries(); |
| } |
| |
| private void markStartOfAlignedSeries() { |
| for (TsFileSequenceReader reader : readerCacheMap.values()) { |
| if (reader instanceof CompactionTsFileReader) { |
| ((CompactionTsFileReader) reader).markStartOfAlignedSeries(); |
| } |
| } |
| } |
| |
| private void markEndOfAlignedSeries() { |
| for (TsFileSequenceReader reader : readerCacheMap.values()) { |
| if (reader instanceof CompactionTsFileReader) { |
| ((CompactionTsFileReader) reader).markEndOfAlignedSeries(); |
| } |
| } |
| } |
| |
| /** |
| * Deserialize files into chunk metadatas and put them into the chunk metadata queue. |
| * |
| * @throws IOException if io errors occurred |
| * @throws IllegalPathException if the file path is illegal |
| */ |
| @SuppressWarnings("squid:S3776") |
| void deserializeFileIntoChunkMetadataQueue(List<FileElement> fileElements) |
| throws IOException, IllegalPathException { |
| for (FileElement fileElement : fileElements) { |
| TsFileResource resource = fileElement.resource; |
| |
| // read time chunk metadatas and value chunk metadatas in the current file |
| List<IChunkMetadata> timeChunkMetadatas = new ArrayList<>(); |
| List<List<IChunkMetadata>> valueChunkMetadatas = new ArrayList<>(); |
| for (Map.Entry<String, Map<TsFileResource, Pair<Long, Long>>> entry : |
| timeseriesMetadataOffsetMap.entrySet()) { |
| String measurementID = entry.getKey(); |
| Pair<Long, Long> timeseriesOffsetInCurrentFile = entry.getValue().get(resource); |
| if (measurementID.equals("")) { |
| // read time chunk metadatas |
| if (timeseriesOffsetInCurrentFile == null) { |
| // current file does not contain this aligned device |
| timeChunkMetadatas = null; |
| break; |
| } |
| timeChunkMetadatas = |
| readerCacheMap |
| .get(resource) |
| .getChunkMetadataListByTimeseriesMetadataOffset( |
| timeseriesOffsetInCurrentFile.left, timeseriesOffsetInCurrentFile.right); |
| } else { |
| // read value chunk metadatas |
| if (timeseriesOffsetInCurrentFile == null) { |
| // current file does not contain this aligned timeseries |
| valueChunkMetadatas.add(null); |
| } else { |
| // current file contains this aligned timeseries |
| List<IChunkMetadata> valueColumnChunkMetadataList = |
| readerCacheMap |
| .get(resource) |
| .getChunkMetadataListByTimeseriesMetadataOffset( |
| timeseriesOffsetInCurrentFile.left, timeseriesOffsetInCurrentFile.right); |
| if (isValueChunkDataTypeMatchSchema(valueColumnChunkMetadataList)) { |
| valueChunkMetadatas.add(valueColumnChunkMetadataList); |
| } else { |
| valueChunkMetadatas.add(null); |
| } |
| } |
| } |
| } |
| |
| List<AlignedChunkMetadata> alignedChunkMetadataList = new ArrayList<>(); |
| // if current file contains this aligned device,then construct aligned chunk metadatas |
| if (timeChunkMetadatas != null) { |
| for (int i = 0; i < timeChunkMetadatas.size(); i++) { |
| List<IChunkMetadata> valueChunkMetadataList = new ArrayList<>(); |
| for (List<IChunkMetadata> chunkMetadata : valueChunkMetadatas) { |
| if (chunkMetadata == null) { |
| valueChunkMetadataList.add(null); |
| } else { |
| valueChunkMetadataList.add(chunkMetadata.get(i)); |
| } |
| } |
| AlignedChunkMetadata alignedChunkMetadata = |
| new AlignedChunkMetadata(timeChunkMetadatas.get(i), valueChunkMetadataList); |
| |
| alignedChunkMetadataList.add(alignedChunkMetadata); |
| } |
| |
| // get value modifications of this file |
| List<List<Modification>> valueModifications = new ArrayList<>(); |
| alignedChunkMetadataList |
| .get(0) |
| .getValueChunkMetadataList() |
| .forEach( |
| x -> { |
| try { |
| if (x == null) { |
| valueModifications.add(null); |
| } else { |
| valueModifications.add( |
| getModificationsFromCache( |
| resource, |
| CompactionPathUtils.getPath(deviceId, x.getMeasurementUid()))); |
| } |
| } catch (IllegalPathException e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| |
| // modify aligned chunk metadatas |
| ModificationUtils.modifyAlignedChunkMetaData(alignedChunkMetadataList, valueModifications); |
| } |
| |
| if (alignedChunkMetadataList.isEmpty()) { |
| // all chunks has been deleted in this file or current file does not contain this aligned |
| // device, just remove it |
| removeFile(fileElement); |
| } |
| |
| // put aligned chunk metadatas into queue |
| for (int i = 0; i < alignedChunkMetadataList.size(); i++) { |
| chunkMetadataQueue.add( |
| new ChunkMetadataElement( |
| alignedChunkMetadataList.get(i), |
| resource.getVersion(), |
| i == alignedChunkMetadataList.size() - 1, |
| fileElement)); |
| } |
| } |
| } |
| |
| private boolean isValueChunkDataTypeMatchSchema( |
| List<IChunkMetadata> chunkMetadataListOfOneValueColumn) { |
| for (IChunkMetadata chunkMetadata : chunkMetadataListOfOneValueColumn) { |
| if (chunkMetadata == null) { |
| continue; |
| } |
| String measurement = chunkMetadata.getMeasurementUid(); |
| IMeasurementSchema schema = measurementSchemaMap.get(measurement); |
| return schema.getType() == chunkMetadata.getDataType(); |
| } |
| return true; |
| } |
| |
| /** |
| * Deserialize chunk into pages without uncompressing and put them into the page queue. |
| * |
| * @throws IOException if io errors occurred |
| */ |
| @SuppressWarnings("squid:S3776") |
| void deserializeChunkIntoPageQueue(ChunkMetadataElement chunkMetadataElement) throws IOException { |
| updateSummary(chunkMetadataElement, ChunkStatus.DESERIALIZE_CHUNK); |
| |
| // deserialize time chunk |
| Chunk timeChunk = chunkMetadataElement.chunk; |
| |
| CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk); |
| List<Pair<PageHeader, ByteBuffer>> timePages = chunkReader.readPageDataWithoutUncompressing(); |
| |
| // deserialize value chunks |
| List<List<Pair<PageHeader, ByteBuffer>>> valuePagesList = new ArrayList<>(); |
| List<Chunk> valueChunks = chunkMetadataElement.valueChunks; |
| for (int i = 0; i < valueChunks.size(); i++) { |
| Chunk valueChunk = valueChunks.get(i); |
| if (valueChunk == null) { |
| // value chunk has been deleted completely |
| valuePagesList.add(null); |
| continue; |
| } |
| |
| chunkReader = new CompactionChunkReader(valueChunk); |
| List<Pair<PageHeader, ByteBuffer>> valuesPages = |
| chunkReader.readPageDataWithoutUncompressing(); |
| valuePagesList.add(valuesPages); |
| } |
| |
| // add aligned pages into page queue |
| for (int i = 0; i < timePages.size(); i++) { |
| List<PageHeader> alignedPageHeaders = new ArrayList<>(); |
| List<ByteBuffer> alignedPageDatas = new ArrayList<>(); |
| for (int j = 0; j < valuePagesList.size(); j++) { |
| if (valuePagesList.get(j) == null) { |
| alignedPageHeaders.add(null); |
| alignedPageDatas.add(null); |
| continue; |
| } |
| Pair<PageHeader, ByteBuffer> valuePage = valuePagesList.get(j).get(i); |
| alignedPageHeaders.add(valuePage == null ? null : valuePage.left); |
| alignedPageDatas.add(valuePage == null ? null : valuePage.right); |
| } |
| pageQueue.add( |
| new AlignedPageElement( |
| timePages.get(i).left, |
| alignedPageHeaders, |
| timePages.get(i).right, |
| alignedPageDatas, |
| new CompactionAlignedChunkReader(timeChunk, valueChunks), |
| chunkMetadataElement, |
| i == timePages.size() - 1, |
| chunkMetadataElement.priority)); |
| } |
| chunkMetadataElement.clearChunks(); |
| } |
| |
| @Override |
| void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException { |
| updateSummary(chunkMetadataElement, ChunkStatus.READ_IN); |
| AlignedChunkMetadata alignedChunkMetadata = |
| (AlignedChunkMetadata) chunkMetadataElement.chunkMetadata; |
| chunkMetadataElement.chunk = |
| readerCacheMap |
| .get(chunkMetadataElement.fileElement.resource) |
| .readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); |
| List<Chunk> valueChunks = new ArrayList<>(); |
| for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { |
| if (valueChunkMetadata == null || valueChunkMetadata.getStatistics().getCount() == 0) { |
| // value chunk has been deleted completely or is empty value chunk |
| valueChunks.add(null); |
| continue; |
| } |
| valueChunks.add( |
| readerCacheMap |
| .get(chunkMetadataElement.fileElement.resource) |
| .readMemChunk((ChunkMetadata) valueChunkMetadata)); |
| } |
| chunkMetadataElement.valueChunks = valueChunks; |
| setForceDecoding(chunkMetadataElement); |
| } |
| |
| void setForceDecoding(ChunkMetadataElement chunkMetadataElement) { |
| if (timeColumnMeasurementSchema.getCompressor() |
| != chunkMetadataElement.chunk.getHeader().getCompressionType() |
| || timeColumnMeasurementSchema.getEncodingType() |
| != chunkMetadataElement.chunk.getHeader().getEncodingType()) { |
| chunkMetadataElement.needForceDecoding = true; |
| return; |
| } |
| for (Chunk chunk : chunkMetadataElement.valueChunks) { |
| if (chunk == null) { |
| continue; |
| } |
| ChunkHeader header = chunk.getHeader(); |
| String measurementId = header.getMeasurementID(); |
| IMeasurementSchema measurementSchema = measurementSchemaMap.get(measurementId); |
| if (measurementSchema == null) { |
| continue; |
| } |
| if (measurementSchema.getCompressor() != header.getCompressionType() |
| || measurementSchema.getEncodingType() != header.getEncodingType()) { |
| chunkMetadataElement.needForceDecoding = true; |
| return; |
| } |
| } |
| } |
| |
| /** |
| * NONE_DELETED means that no data on this page has been deleted. <br> |
| * PARTIAL_DELETED means that there is data on this page been deleted. <br> |
| * ALL_DELETED means that all data on this page has been deleted. |
| * |
| * <p>Notice: If is aligned page, return ALL_DELETED if and only if all value pages are deleted. |
| * Return NONE_DELETED if and only if no data exists on all value pages is deleted |
| */ |
| protected ModifiedStatus isPageModified(PageElement pageElement) { |
| long startTime = pageElement.getStartTime(); |
| long endTime = pageElement.getEndTime(); |
| AlignedChunkMetadata alignedChunkMetadata = |
| (AlignedChunkMetadata) pageElement.getChunkMetadataElement().chunkMetadata; |
| ModifiedStatus lastPageStatus = null; |
| for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { |
| ModifiedStatus currentPageStatus = |
| valueChunkMetadata == null |
| ? ModifiedStatus.ALL_DELETED |
| : checkIsModified(startTime, endTime, valueChunkMetadata.getDeleteIntervalList()); |
| if (currentPageStatus == ModifiedStatus.PARTIAL_DELETED) { |
| // one of the value pages exist data been deleted partially |
| return ModifiedStatus.PARTIAL_DELETED; |
| } |
| if (lastPageStatus == null) { |
| // first page |
| lastPageStatus = currentPageStatus; |
| continue; |
| } |
| if (!lastPageStatus.equals(currentPageStatus)) { |
| // there are at least two value pages, one is that all data is deleted, the other is that no |
| // data is deleted |
| lastPageStatus = ModifiedStatus.NONE_DELETED; |
| } |
| } |
| return lastPageStatus; |
| } |
| } |