blob: c6241c99a9cd77338a3cd05b1122334887d334f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class MultiTsFileDeviceIterator implements AutoCloseable {
// sort from the newest to the oldest by version (Used by FastPerformer and ReadPointPerformer)
private final List<TsFileResource> tsFileResourcesSortedByDesc;
// sort from the oldest to the newest by version (Used by ReadChunkPerformer)
private List<TsFileResource> tsFileResourcesSortedByAsc;
private Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>();
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>();
private final Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
private Pair<IDeviceID, Boolean> currentDevice = null;
/**
* Used for compaction with read chunk performer.
*
* @throws IOException if io error occurred
*/
public MultiTsFileDeviceIterator(List<TsFileResource> tsFileResources) throws IOException {
this.tsFileResourcesSortedByDesc = new ArrayList<>(tsFileResources);
this.tsFileResourcesSortedByAsc = new ArrayList<>(tsFileResources);
// sort the files from the oldest to the newest
Collections.sort(this.tsFileResourcesSortedByAsc, TsFileResource::compareFileName);
// sort the files from the newest to the oldest
Collections.sort(
this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc);
try {
for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) {
CompactionTsFileReader reader =
new CompactionTsFileReader(
tsFileResource.getTsFilePath(), CompactionType.INNER_SEQ_COMPACTION);
readerMap.put(tsFileResource, reader);
deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned());
}
} catch (Exception e) {
// if there is any exception occurs
// existing readers should be closed
for (TsFileSequenceReader reader : readerMap.values()) {
reader.close();
}
throw e;
}
}
/**
* Used for compaction with read point performer.
*
* @throws IOException if io errors occurred
*/
public MultiTsFileDeviceIterator(
List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
this.tsFileResourcesSortedByDesc = new ArrayList<>(seqResources);
tsFileResourcesSortedByDesc.addAll(unseqResources);
// sort the files from the newest to the oldest
Collections.sort(
this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc);
for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(tsFileResource.getTsFilePath(), true);
readerMap.put(tsFileResource, reader);
deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned());
}
}
/**
* Used for compaction with fast performer.
*
* @throws IOException if io errors occurred
*/
public MultiTsFileDeviceIterator(
List<TsFileResource> seqResources,
List<TsFileResource> unseqResources,
Map<TsFileResource, TsFileSequenceReader> readerMap)
throws IOException {
this.tsFileResourcesSortedByDesc = new ArrayList<>(seqResources);
tsFileResourcesSortedByDesc.addAll(unseqResources);
// sort tsfiles from the newest to the oldest
Collections.sort(
this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc);
this.readerMap = readerMap;
CompactionType type = null;
if (!seqResources.isEmpty() && !unseqResources.isEmpty()) {
type = CompactionType.CROSS_COMPACTION;
} else if (seqResources.isEmpty()) {
type = CompactionType.INNER_UNSEQ_COMPACTION;
} else {
type = CompactionType.INNER_SEQ_COMPACTION;
}
for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
TsFileSequenceReader reader =
new CompactionTsFileReader(tsFileResource.getTsFilePath(), type);
readerMap.put(tsFileResource, reader);
deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned());
}
}
public boolean hasNextDevice() {
boolean hasNext = false;
for (TsFileDeviceIterator iterator : deviceIteratorMap.values()) {
hasNext =
hasNext
|| iterator.hasNext()
|| (iterator.current() != null
&& !iterator.current().left.equals(currentDevice.left));
}
return hasNext;
}
/**
* Return next device that is minimal in lexicographical order.
*
* @return Pair of device full path and whether this device is aligned
*/
@SuppressWarnings("squid:S135")
public Pair<IDeviceID, Boolean> nextDevice() {
List<TsFileResource> toBeRemovedResources = new LinkedList<>();
Pair<IDeviceID, Boolean> minDevice = null;
// get the device from source files sorted from the newest to the oldest by version
for (TsFileResource resource : tsFileResourcesSortedByDesc) {
if (!deviceIteratorMap.containsKey(resource)) {
continue;
}
TsFileDeviceIterator deviceIterator = deviceIteratorMap.get(resource);
if (deviceIterator.current() == null
|| deviceIterator.current().left.equals(currentDevice.left)) {
// if current file has same device with current device, then get its next device
if (deviceIterator.hasNext()) {
deviceIterator.next();
} else {
// this iterator does not have next device
// remove them after the loop
toBeRemovedResources.add(resource);
continue;
}
}
if (minDevice == null || minDevice.left.compareTo(deviceIterator.current().left) > 0) {
// get the device that is minimal in lexicographical order according to the all files
minDevice = deviceIterator.current();
}
}
currentDevice = minDevice;
// remove the iterator with no device remaining
for (TsFileResource resource : toBeRemovedResources) {
deviceIteratorMap.remove(resource);
}
return currentDevice;
}
/**
* Get all measurements and schemas of the current device from source files. Traverse all the
* files from the newest to the oldest in turn and start traversing the index tree from the
* firstMeasurementNode node to get all the measurements under the current device.
*
* @throws IOException if io errors occurred
*/
public Map<String, MeasurementSchema> getAllSchemasOfCurrentDevice() throws IOException {
Map<String, MeasurementSchema> schemaMap = new ConcurrentHashMap<>();
// get schemas from the newest file to the oldest file
for (TsFileResource resource : tsFileResourcesSortedByDesc) {
if (!deviceIteratorMap.containsKey(resource)
|| !deviceIteratorMap.get(resource).current().equals(currentDevice)) {
// if this tsfile has no more device or next device is not equals to the current device,
// which means this tsfile does not contain the current device, then skip it.
continue;
}
TsFileSequenceReader reader = readerMap.get(resource);
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
reader.getDeviceTimeseriesMetadata(
timeseriesMetadataList,
deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(),
schemaMap.keySet(),
true);
for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId())
&& !timeseriesMetadata.getChunkMetadataList().isEmpty()) {
schemaMap.put(
timeseriesMetadata.getMeasurementId(),
reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList()));
}
}
}
return schemaMap;
}
/**
* Get all measurements and their timeseries metadata offset in each source file. It is used for
* new fast compaction to compact nonAligned timeseries.
*
* @return measurement -> tsfile resource -> timeseries metadata <startOffset, endOffset>
* @throws IOException if io errors occurred
*/
public Map<String, Map<TsFileResource, Pair<Long, Long>>>
getTimeseriesMetadataOffsetOfCurrentDevice() throws IOException {
Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
new HashMap<>();
Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
for (TsFileResource resource : tsFileResourcesSortedByDesc) {
if (!deviceIteratorMap.containsKey(resource)
|| !deviceIteratorMap.get(resource).current().equals(currentDevice)) {
// if this tsfile has no more device or next device is not equals to the current device,
// which means this tsfile does not contain the current device, then skip it.
continue;
}
TsFileSequenceReader reader = readerMap.get(resource);
for (Map.Entry<String, Pair<TimeseriesMetadata, Pair<Long, Long>>> entrySet :
((CompactionTsFileReader) reader)
.getTimeseriesMetadataAndOffsetByDevice(
deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(),
Collections.emptySet(),
false)
.entrySet()) {
String measurementId = entrySet.getKey();
// skip the TimeseriesMetadata whose data type is not consistent
TSDataType dataTypeOfCurrentTimeseriesMetadata = entrySet.getValue().left.getTsDataType();
TSDataType correctDataTypeOfCurrentMeasurement =
measurementDataTypeMap.putIfAbsent(measurementId, dataTypeOfCurrentTimeseriesMetadata);
if (correctDataTypeOfCurrentMeasurement != null
&& correctDataTypeOfCurrentMeasurement != dataTypeOfCurrentTimeseriesMetadata) {
continue;
}
timeseriesMetadataOffsetMap.putIfAbsent(measurementId, new HashMap<>());
timeseriesMetadataOffsetMap.get(measurementId).put(resource, entrySet.getValue().right);
}
}
return timeseriesMetadataOffsetMap;
}
/**
* Get all measurements and their schemas of the current device and the timeseries metadata offset
* of each timeseries in each source file. It is used for new fast compaction to compact aligned
* timeseries.
*
* @return measurement -> metadata -> tsfile resource -> timeseries metadata <startOffset,
* endOffset>
* @throws IOException if io errors occurred
*/
@SuppressWarnings({"checkstyle:AtclauseOrderCheck", "squid:S3824"})
public Map<String, Pair<MeasurementSchema, Map<TsFileResource, Pair<Long, Long>>>>
getTimeseriesSchemaAndMetadataOffsetOfCurrentDevice() throws IOException {
Map<String, Pair<MeasurementSchema, Map<TsFileResource, Pair<Long, Long>>>>
timeseriesMetadataOffsetMap = new LinkedHashMap<>();
for (TsFileResource resource : tsFileResourcesSortedByDesc) {
if (!deviceIteratorMap.containsKey(resource)
|| !deviceIteratorMap.get(resource).current().equals(currentDevice)) {
// if this tsfile has no more device or next device is not equals to the current device,
// which means this tsfile does not contain the current device, then skip it.
continue;
}
TsFileSequenceReader reader = readerMap.get(resource);
for (Map.Entry<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> entrySet :
reader
.getTimeseriesMetadataOffsetByDevice(
deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(),
timeseriesMetadataOffsetMap.keySet(),
true)
.entrySet()) {
String measurementId = entrySet.getKey();
if (!timeseriesMetadataOffsetMap.containsKey(measurementId)) {
MeasurementSchema schema = reader.getMeasurementSchema(entrySet.getValue().left);
timeseriesMetadataOffsetMap.put(measurementId, new Pair<>(schema, new HashMap<>()));
}
timeseriesMetadataOffsetMap
.get(measurementId)
.right
.put(resource, entrySet.getValue().right);
}
}
return timeseriesMetadataOffsetMap;
}
/**
* return MeasurementIterator, who iterates the measurements of not aligned device
*
* @param device the full path of the device to be iterated
* @return measurement iterator of not aligned device
* @throws IOException if io errors occurred
*/
public MeasurementIterator iterateNotAlignedSeries(
IDeviceID device, boolean derserializeTimeseriesMetadata) throws IOException {
return new MeasurementIterator(readerMap, device, derserializeTimeseriesMetadata);
}
/**
* return a list of the tsfile reader and its aligned chunk metadata list for the aligned device
* which this iterator is visiting. If there is any modification for this device, it will be
* applied to the AlignedChunkMetadata, so that the user of this function can reader Chunk
* directly using the reader and the chunkMetadata returned. Notice, if the TsFile corresponding
* to a TsFileSequenceReader does not contain the current device, the TsFileSequenceReader will
* not appear in the return list.
*
* @return a list of pair(TsFileSequenceReader, the list of AlignedChunkMetadata for current
* device)
* @throws IOException if io errors occurred
*/
@SuppressWarnings({"squid:S1319", "squid:S135"})
public LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
getReaderAndChunkMetadataForCurrentAlignedSeries() throws IOException, IllegalPathException {
if (currentDevice == null || !currentDevice.right) {
return new LinkedList<>();
}
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
new LinkedList<>();
for (TsFileResource tsFileResource : tsFileResourcesSortedByAsc) {
if (!deviceIteratorMap.containsKey(tsFileResource)) {
continue;
}
TsFileDeviceIterator iterator = deviceIteratorMap.get(tsFileResource);
if (!currentDevice.equals(iterator.current())) {
continue;
}
TsFileSequenceReader reader = readerMap.get(tsFileResource);
List<AlignedChunkMetadata> alignedChunkMetadataList =
reader.getAlignedChunkMetadata(currentDevice.left);
applyModificationForAlignedChunkMetadataList(tsFileResource, alignedChunkMetadataList);
readerAndChunkMetadataList.add(new Pair<>(reader, alignedChunkMetadataList));
}
return readerAndChunkMetadataList;
}
/**
* collect the modification for current device and apply it to the alignedChunkMetadataList.
*
* @param tsFileResource tsfile resource
* @param alignedChunkMetadataList list of aligned chunk metadata
*/
private void applyModificationForAlignedChunkMetadataList(
TsFileResource tsFileResource, List<AlignedChunkMetadata> alignedChunkMetadataList)
throws IllegalPathException {
if (alignedChunkMetadataList.isEmpty()) {
// all the value chunks is empty chunk
return;
}
ModificationFile modificationFile = ModificationFile.getNormalMods(tsFileResource);
if (!modificationFile.exists()) {
return;
}
List<Modification> modifications =
modificationCache.computeIfAbsent(
tsFileResource, r -> new ArrayList<>(modificationFile.getModifications()));
// construct the input params List<List<Modification>> for QueryUtils.modifyAlignedChunkMetaData
AlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0);
List<IChunkMetadata> valueChunkMetadataList = alignedChunkMetadata.getValueChunkMetadataList();
List<List<Modification>> modificationForCurDevice = new ArrayList<>();
List<PartialPath> valueSeriesPaths = new ArrayList<>(valueChunkMetadataList.size());
for (int i = 0; i < valueChunkMetadataList.size(); ++i) {
modificationForCurDevice.add(new ArrayList<>());
IChunkMetadata valueChunkMetadata = valueChunkMetadataList.get(i);
valueSeriesPaths.add(
valueChunkMetadata == null
? null
: CompactionPathUtils.getPath(
currentDevice.left, valueChunkMetadata.getMeasurementUid()));
}
for (Modification modification : modifications) {
for (int i = 0; i < valueChunkMetadataList.size(); ++i) {
PartialPath path = valueSeriesPaths.get(i);
if (path != null && modification.getPath().matchFullPath(path)) {
modificationForCurDevice.get(i).add(modification);
}
}
}
ModificationUtils.modifyAlignedChunkMetaData(
alignedChunkMetadataList, modificationForCurDevice);
}
@Override
public void close() throws IOException {
for (TsFileSequenceReader reader : readerMap.values()) {
reader.close();
}
}
/*
NonAligned measurement iterator.
*/
public class MeasurementIterator {
private Map<TsFileResource, TsFileSequenceReader> readerMap;
private IDeviceID device;
private String currentCompactingSeries = null;
private LinkedList<String> seriesInThisIteration = new LinkedList<>();
// tsfile sequence reader -> series -> list<ChunkMetadata>
private Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataCacheMap =
new HashMap<>();
// this map cache the chunk metadata list iterator for each tsfile
// the iterator return a batch of series and all chunk metadata of these series in this tsfile
private Map<TsFileResource, Iterator<Map<String, List<ChunkMetadata>>>>
chunkMetadataIteratorMap = new HashMap<>();
private MeasurementIterator(
Map<TsFileResource, TsFileSequenceReader> readerMap,
IDeviceID device,
boolean needDeserializeTimeseries)
throws IOException {
this.readerMap = readerMap;
this.device = device;
if (needDeserializeTimeseries) {
for (TsFileResource resource : tsFileResourcesSortedByAsc) {
TsFileSequenceReader reader = readerMap.get(resource);
chunkMetadataIteratorMap.put(
resource, reader.getMeasurementChunkMetadataListMapIterator(device));
chunkMetadataCacheMap.put(reader, new TreeMap<>());
}
}
}
/**
* Collect series from files using iterator, and the collected series will be store in
* seriesInThisIteration. To ensure that each serie is compacted once, when iterator of each
* file returns a batch of series, we will find the max of it, and find the min series marked as
* `last series` among the max series in each batch.
*
* <p>That is, lastSeries = min([max(series return in file 1),..., max(series return in file
* n)]). Only the series that are greater than the lastSeries in lexicographical order will be
* collected.
*
* @return true if there is any series is collected, else false.
*/
@SuppressWarnings("squid:S3776")
private boolean collectSeries() {
String lastSeries = null;
List<String> tempCollectedSeries = new ArrayList<>();
for (TsFileResource resource : tsFileResourcesSortedByAsc) {
TsFileSequenceReader reader = readerMap.get(resource);
Map<String, List<ChunkMetadata>> chunkMetadataListMap = chunkMetadataCacheMap.get(reader);
if (chunkMetadataListMap.size() == 0) {
if (chunkMetadataIteratorMap.get(resource).hasNext()) {
chunkMetadataListMap = chunkMetadataIteratorMap.get(resource).next();
if (chunkMetadataListMap.containsKey("")) {
// encounter deleted aligned series, then remove it
chunkMetadataListMap.remove("");
}
chunkMetadataCacheMap.put(reader, chunkMetadataListMap);
} else {
continue;
}
}
// get the min last series in the current chunk metadata
String maxSeries = Collections.max(chunkMetadataListMap.keySet());
if (lastSeries == null) {
lastSeries = maxSeries;
} else {
if (maxSeries.compareTo(lastSeries) < 0) {
lastSeries = maxSeries;
}
}
tempCollectedSeries.addAll(chunkMetadataListMap.keySet());
}
if (!tempCollectedSeries.isEmpty()) {
if (!hasRemainingSeries()) {
lastSeries = Collections.max(tempCollectedSeries);
}
String finalLastSeries = lastSeries;
List<String> finalCollectedSeriesInThisIteration =
tempCollectedSeries.stream()
.filter(series -> series.compareTo(finalLastSeries) <= 0)
.collect(Collectors.toList());
seriesInThisIteration.addAll(finalCollectedSeriesInThisIteration);
return true;
} else {
return false;
}
}
private boolean hasRemainingSeries() {
boolean remaining = false;
for (Iterator<Map<String, List<ChunkMetadata>>> iterator :
chunkMetadataIteratorMap.values()) {
remaining = remaining || iterator.hasNext();
}
return remaining;
}
public boolean hasNextSeries() {
return !seriesInThisIteration.isEmpty() || collectSeries();
}
public String nextSeries() {
if (!hasNextSeries()) {
return null;
} else {
currentCompactingSeries = seriesInThisIteration.removeFirst();
return currentCompactingSeries;
}
}
/**
* Collect all the chunk metadata of current series from the source files.
*
* <p>If there are any modifications for these chunk, we will apply them to the metadata. Use
* `ChunkMetadata.getDeleteIntervalList() == null` to judge if the chunk is modified.
*
* @return all the chunk metadata of current series
* @throws IllegalPathException if path is illegal
*/
@SuppressWarnings("squid:S1319")
public LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
getMetadataListForCurrentSeries() throws IllegalPathException {
if (currentCompactingSeries == null) {
return new LinkedList<>();
}
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataForThisSeries = new LinkedList<>();
PartialPath path = CompactionPathUtils.getPath(device, currentCompactingSeries);
for (TsFileResource resource : tsFileResourcesSortedByAsc) {
TsFileSequenceReader reader = readerMap.get(resource);
Map<String, List<ChunkMetadata>> chunkMetadataListMap = chunkMetadataCacheMap.get(reader);
if (chunkMetadataListMap.containsKey(currentCompactingSeries)) {
// get the chunk metadata list and modification list of current series in this tsfile
List<ChunkMetadata> chunkMetadataListInThisResource =
chunkMetadataListMap.get(currentCompactingSeries);
chunkMetadataListMap.remove(currentCompactingSeries);
List<Modification> modificationsInThisResource =
modificationCache.computeIfAbsent(
resource,
r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
LinkedList<Modification> modificationForCurrentSeries = new LinkedList<>();
// collect the modifications for current series
for (Modification modification : modificationsInThisResource) {
if (modification.getPath().matchFullPath(path)) {
modificationForCurrentSeries.add(modification);
}
}
// if there are modifications of current series, apply them to the chunk metadata
if (!modificationForCurrentSeries.isEmpty()) {
ModificationUtils.modifyChunkMetaData(
chunkMetadataListInThisResource, modificationForCurrentSeries);
}
readerAndChunkMetadataForThisSeries.add(
new Pair<>(reader, chunkMetadataListInThisResource));
}
}
return readerAndChunkMetadataForThisSeries;
}
}
}