blob: 7db30f7f1022cf5cad95c33e86818c5c0a1317fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.IllegalCompactionTaskSummaryException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IUnseqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionPerformerSubTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastCrossCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastInnerCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class FastCompactionPerformer
implements ICrossCompactionPerformer, ISeqCompactionPerformer, IUnseqCompactionPerformer {
@SuppressWarnings("squid:S1068")
private final Logger logger = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private List<TsFileResource> seqFiles = Collections.emptyList();
private List<TsFileResource> unseqFiles = Collections.emptyList();
private List<TsFileResource> sortedSourceFiles = new ArrayList<>();
private static final int SUB_TASK_NUM =
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
private Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new ConcurrentHashMap<>();
private FastCompactionTaskSummary subTaskSummary;
private List<TsFileResource> targetFiles;
private Map<TsFileResource, List<Modification>> modificationCache = new ConcurrentHashMap<>();
private boolean isCrossCompaction;
public FastCompactionPerformer(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
List<TsFileResource> targetFiles) {
this.seqFiles = seqFiles;
this.unseqFiles = unseqFiles;
this.targetFiles = targetFiles;
if (seqFiles.isEmpty() || unseqFiles.isEmpty()) {
// inner space compaction
isCrossCompaction = false;
} else {
isCrossCompaction = true;
}
}
public FastCompactionPerformer(boolean isCrossCompaction) {
this.isCrossCompaction = isCrossCompaction;
}
@Override
public void perform() throws Exception {
this.subTaskSummary.setTemporalFileNum(targetFiles.size());
try (MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap);
AbstractCompactionWriter compactionWriter =
isCrossCompaction
? new FastCrossCompactionWriter(targetFiles, seqFiles, readerCacheMap)
: new FastInnerCompactionWriter(targetFiles.get(0))) {
while (deviceIterator.hasNextDevice()) {
checkThreadInterrupted();
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
IDeviceID device = deviceInfo.left;
// sort the resources by the start time of current device from old to new, and remove
// resource that does not contain the current device. Notice: when the level of time index
// is file, there will be a false positive judgment problem, that is, the device does not
// actually exist but the judgment return device being existed.
sortedSourceFiles.addAll(seqFiles);
sortedSourceFiles.addAll(unseqFiles);
sortedSourceFiles.removeIf(x -> x.definitelyNotContains(device));
sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device)));
boolean isAligned = deviceInfo.right;
compactionWriter.startChunkGroup(device, isAligned);
if (isAligned) {
compactAlignedSeries(device, deviceIterator, compactionWriter);
} else {
compactNonAlignedSeries(device, deviceIterator, compactionWriter);
}
compactionWriter.endChunkGroup();
// check whether to flush chunk metadata or not
compactionWriter.checkAndMayFlushChunkMetadata();
// Add temp file metrics
subTaskSummary.setTemporalFileSize(compactionWriter.getWriterSize());
sortedSourceFiles.clear();
}
compactionWriter.endFile();
CompactionUtils.updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
} finally {
// readers of source files have been closed in MultiTsFileDeviceIterator
// clean cache
sortedSourceFiles = null;
readerCacheMap = null;
modificationCache = null;
}
}
private void compactAlignedSeries(
IDeviceID deviceId,
MultiTsFileDeviceIterator deviceIterator,
AbstractCompactionWriter fastCrossCompactionWriter)
throws PageException, IOException, WriteProcessException, IllegalPathException {
// measurement -> tsfile resource -> timeseries metadata <startOffset, endOffset>, including
// empty value chunk metadata
Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
new LinkedHashMap<>();
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
// Get all value measurements and their schemas of the current device. Also get start offset and
// end offset of each timeseries metadata, in order to facilitate the reading of chunkMetadata
// directly by this offset later. Instead of deserializing chunk metadata later, we need to
// deserialize chunk metadata here to get the schemas of all value measurements, because we
// should get schemas of all value measurement to startMeasruement() and compaction process is
// to read a batch of overlapped files each time, and we cannot make sure if the first batch of
// overlapped tsfiles contain all the value measurements.
for (Map.Entry<String, Pair<MeasurementSchema, Map<TsFileResource, Pair<Long, Long>>>> entry :
deviceIterator.getTimeseriesSchemaAndMetadataOffsetOfCurrentDevice().entrySet()) {
measurementSchemas.add(entry.getValue().left);
timeseriesMetadataOffsetMap.put(entry.getKey(), entry.getValue().right);
}
FastCompactionTaskSummary taskSummary = new FastCompactionTaskSummary();
new FastCompactionPerformerSubTask(
fastCrossCompactionWriter,
timeseriesMetadataOffsetMap,
readerCacheMap,
modificationCache,
sortedSourceFiles,
measurementSchemas,
deviceId,
taskSummary)
.call();
subTaskSummary.increase(taskSummary);
}
private void compactNonAlignedSeries(
IDeviceID deviceID,
MultiTsFileDeviceIterator deviceIterator,
AbstractCompactionWriter fastCrossCompactionWriter)
throws IOException, InterruptedException {
// measurement -> tsfile resource -> timeseries metadata <startOffset, endOffset>
// Get all measurements of the current device. Also get start offset and end offset of each
// timeseries metadata, in order to facilitate the reading of chunkMetadata directly by this
// offset later. Here we don't need to deserialize chunk metadata, we can deserialize them and
// get their schema later.
Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
deviceIterator.getTimeseriesMetadataOffsetOfCurrentDevice();
List<String> allMeasurements = new ArrayList<>(timeseriesMetadataOffsetMap.keySet());
allMeasurements.sort((String::compareTo));
int subTaskNums = Math.min(allMeasurements.size(), SUB_TASK_NUM);
// assign all measurements to different sub tasks
List<String>[] measurementsForEachSubTask = new ArrayList[subTaskNums];
for (int idx = 0; idx < allMeasurements.size(); idx++) {
if (measurementsForEachSubTask[idx % subTaskNums] == null) {
measurementsForEachSubTask[idx % subTaskNums] = new ArrayList<>();
}
measurementsForEachSubTask[idx % subTaskNums].add(allMeasurements.get(idx));
}
// construct sub tasks and start compacting measurements in parallel
List<Future<Void>> futures = new ArrayList<>();
List<FastCompactionTaskSummary> taskSummaryList = new ArrayList<>();
for (int i = 0; i < subTaskNums; i++) {
FastCompactionTaskSummary taskSummary = new FastCompactionTaskSummary();
futures.add(
CompactionTaskManager.getInstance()
.submitSubTask(
new FastCompactionPerformerSubTask(
fastCrossCompactionWriter,
timeseriesMetadataOffsetMap,
readerCacheMap,
modificationCache,
sortedSourceFiles,
measurementsForEachSubTask[i],
deviceID,
taskSummary,
i)));
taskSummaryList.add(taskSummary);
}
// wait for all sub tasks to finish
for (int i = 0; i < subTaskNums; i++) {
try {
futures.get(i).get();
subTaskSummary.increase(taskSummaryList.get(i));
} catch (ExecutionException e) {
if (e.getCause() instanceof CompactionLastTimeCheckFailedException) {
throw (CompactionLastTimeCheckFailedException) e.getCause();
}
throw new IOException("[Compaction] SubCompactionTask meet errors ", e);
}
}
}
@Override
public void setTargetFiles(List<TsFileResource> targetFiles) {
this.targetFiles = targetFiles;
}
@Override
public void setSummary(CompactionTaskSummary summary) {
if (!(summary instanceof FastCompactionTaskSummary)) {
throw new IllegalCompactionTaskSummaryException(
"CompactionTaskSummary for FastCompactionPerformer "
+ "should be FastCompactionTaskSummary");
}
this.subTaskSummary = (FastCompactionTaskSummary) summary;
}
@Override
public void setSourceFiles(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
this.seqFiles = seqFiles;
this.unseqFiles = unseqFiles;
}
private void checkThreadInterrupted() throws InterruptedException {
if (Thread.interrupted() || subTaskSummary.isCancel()) {
throw new InterruptedException(
String.format(
"[Compaction] compaction for target file %s abort", targetFiles.toString()));
}
}
public FastCompactionTaskSummary getSubTaskSummary() {
return subTaskSummary;
}
public List<TsFileResource> getUnseqFiles() {
return unseqFiles;
}
public List<TsFileResource> getSeqFiles() {
return seqFiles;
}
public Map<TsFileResource, TsFileSequenceReader> getReaderCacheMap() {
return readerCacheMap;
}
public Map<TsFileResource, List<Modification>> getModificationCache() {
return modificationCache;
}
@Override
public void setSourceFiles(List<TsFileResource> unseqFiles) {
this.seqFiles = unseqFiles;
}
}