blob: b2e7566cb9c013aecf9aca1f8cf85e26ad46af0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.NonAlignedPageElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.PageElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.PointPriorityReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.TimeRange;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
public abstract class SeriesCompactionExecutor {
@FunctionalInterface
public interface RemovePage {
void call(PageElement pageElement)
throws WriteProcessException, IOException, IllegalPathException;
}
private final FastCompactionTaskSummary summary;
// source files which are sorted by the start time of current device from old to new. Notice: If
// the type of timeIndex is FileTimeIndex, it may contain resources in which the current device
// does not exist.
protected List<FileElement> fileList = new ArrayList<>();
protected final PriorityQueue<ChunkMetadataElement> chunkMetadataQueue;
protected final PriorityQueue<PageElement> pageQueue;
protected AbstractCompactionWriter compactionWriter;
protected int subTaskId;
protected Map<TsFileResource, TsFileSequenceReader> readerCacheMap;
private final Map<TsFileResource, List<Modification>> modificationCacheMap;
private final PointPriorityReader pointPriorityReader;
protected IDeviceID deviceId;
// Pages in this list will be sequentially judged whether there is a real overlap to choose
// whether to put them in the point priority reader to deserialize or directly flush to chunk
// writer. During the process of compacting overlapped page, there may be new overlapped pages
// added into this list.
@SuppressWarnings("squid:S1068")
private final List<PageElement> candidateOverlappedPages = new ArrayList<>();
private long nextChunkStartTime = Long.MAX_VALUE;
private long nextPageStartTime = Long.MAX_VALUE;
protected boolean isAligned;
protected SeriesCompactionExecutor(
AbstractCompactionWriter compactionWriter,
Map<TsFileResource, TsFileSequenceReader> readerCacheMap,
Map<TsFileResource, List<Modification>> modificationCacheMap,
IDeviceID deviceId,
boolean isAligned,
int subTaskId,
FastCompactionTaskSummary summary) {
this.compactionWriter = compactionWriter;
this.subTaskId = subTaskId;
this.deviceId = deviceId;
this.readerCacheMap = readerCacheMap;
this.modificationCacheMap = modificationCacheMap;
this.summary = summary;
pointPriorityReader = new PointPriorityReader(this::checkShouldRemoveFile, isAligned);
this.isAligned = isAligned;
chunkMetadataQueue =
new PriorityQueue<>(
(o1, o2) -> {
int timeCompare = Long.compare(o1.startTime, o2.startTime);
return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, o1.priority);
});
pageQueue =
new PriorityQueue<>(
(o1, o2) -> {
int timeCompare = Long.compare(o1.getStartTime(), o2.getStartTime());
return timeCompare != 0
? timeCompare
: Long.compare(o2.getPriority(), o1.getPriority());
});
}
public abstract void execute()
throws PageException, IllegalPathException, IOException, WriteProcessException;
protected abstract void compactFiles()
throws PageException, IOException, WriteProcessException, IllegalPathException;
/** Compact chunks in chunk metadata queue. */
protected void compactChunks()
throws IOException, PageException, WriteProcessException, IllegalPathException {
while (!chunkMetadataQueue.isEmpty()) {
ChunkMetadataElement firstChunkMetadataElement = chunkMetadataQueue.poll();
nextChunkStartTime =
chunkMetadataQueue.isEmpty() ? Long.MAX_VALUE : chunkMetadataQueue.peek().startTime;
boolean isChunkOverlap =
firstChunkMetadataElement.chunkMetadata.getEndTime() >= nextChunkStartTime;
boolean isModified = firstChunkMetadataElement.chunkMetadata.isModified();
// read current chunk
readChunk(firstChunkMetadataElement);
boolean forceDecodingChunk = firstChunkMetadataElement.needForceDecoding;
if (isChunkOverlap || isModified || forceDecodingChunk) {
// has overlap or modified chunk, then deserialize it
summary.chunkOverlapOrModified++;
compactWithOverlapChunks(firstChunkMetadataElement);
} else {
// has none overlap or modified chunk, flush it to file writer directly
summary.chunkNoneOverlap += 1;
compactWithNonOverlapChunk(firstChunkMetadataElement);
}
}
}
/**
* Deserialize chunks and start compacting pages. Compact a series of chunks that overlap with
* each other. Eg: The parameters are chunk 1 and chunk 2, that is, chunk 1 only overlaps with
* chunk 2, while chunk 2 overlap with chunk 3, chunk 3 overlap with chunk 4,and so on, there are
* 10 chunks in total. This method will merge all 10 chunks.
*/
private void compactWithOverlapChunks(ChunkMetadataElement overlappedChunkMetadata)
throws IOException, PageException, WriteProcessException, IllegalPathException {
deserializeChunkIntoPageQueue(overlappedChunkMetadata);
compactPages();
}
/**
* Flush chunk to target file directly. If the end time of chunk exceeds the end time of file or
* the unsealed chunk is too small, then deserialize it.
*/
private void compactWithNonOverlapChunk(ChunkMetadataElement chunkMetadataElement)
throws IOException, PageException, WriteProcessException, IllegalPathException {
boolean success;
if (isAligned) {
success =
compactionWriter.flushAlignedChunk(
chunkMetadataElement.chunk,
((AlignedChunkMetadata) chunkMetadataElement.chunkMetadata).getTimeChunkMetadata(),
chunkMetadataElement.valueChunks,
((AlignedChunkMetadata) chunkMetadataElement.chunkMetadata)
.getValueChunkMetadataList(),
subTaskId);
} else {
success =
compactionWriter.flushNonAlignedChunk(
chunkMetadataElement.chunk,
(ChunkMetadata) chunkMetadataElement.chunkMetadata,
subTaskId);
}
if (success) {
// flush chunk successfully, then remove this chunk
updateSummary(chunkMetadataElement, ChunkStatus.DIRECTORY_FLUSH);
checkShouldRemoveFile(chunkMetadataElement);
} else {
// unsealed chunk is not large enough or chunk.endTime > file.endTime, then deserialize chunk
summary.chunkNoneOverlapButDeserialize += 1;
deserializeChunkIntoPageQueue(chunkMetadataElement);
compactPages();
}
}
abstract void deserializeChunkIntoPageQueue(ChunkMetadataElement chunkMetadataElement)
throws IOException;
abstract void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException;
/**
* Deserialize files into chunk metadatas and put them into the chunk metadata queue.
*
* @throws IOException if io errors occurred
* @throws IllegalPathException if file path is illegal
*/
abstract void deserializeFileIntoChunkMetadataQueue(List<FileElement> fileElements)
throws IOException, IllegalPathException;
/** Compact pages in page queue. */
private void compactPages()
throws IOException, PageException, WriteProcessException, IllegalPathException {
while (!pageQueue.isEmpty()) {
PageElement firstPageElement = getPageFromPageQueue(pageQueue.peek().getStartTime());
ModifiedStatus modifiedStatus = isPageModified(firstPageElement);
if (modifiedStatus == ModifiedStatus.ALL_DELETED) {
// all data on this page has been deleted, remove it
checkShouldRemoveFile(firstPageElement);
continue;
}
boolean isPageOverlap =
firstPageElement.getEndTime() >= nextPageStartTime
|| firstPageElement.getEndTime() >= nextChunkStartTime;
if (isPageOverlap
|| modifiedStatus == ModifiedStatus.PARTIAL_DELETED
|| firstPageElement.needForceDecoding()) {
// has overlap or modified pages, then deserialize it
summary.pageOverlapOrModified += 1;
if (pointPriorityReader.addNewPageIfPageNotEmpty(firstPageElement)) {
compactWithOverlapPages();
}
} else {
// has none overlap or modified pages, flush it to chunk writer directly
summary.pageNoneOverlap += 1;
compactWithNonOverlapPage(firstPageElement);
}
}
}
private void compactWithNonOverlapPage(PageElement pageElement)
throws PageException, IOException, WriteProcessException, IllegalPathException {
boolean success;
if (isAligned) {
AlignedPageElement alignedPageElement = (AlignedPageElement) pageElement;
success =
compactionWriter.flushAlignedPage(
alignedPageElement.getTimePageData(),
alignedPageElement.getTimePageHeader(),
alignedPageElement.getValuePageDataList(),
alignedPageElement.getValuePageHeaders(),
subTaskId);
} else {
NonAlignedPageElement nonAlignedPageElement = (NonAlignedPageElement) pageElement;
success =
compactionWriter.flushNonAlignedPage(
nonAlignedPageElement.getPageData(),
nonAlignedPageElement.getPageHeader(),
subTaskId);
}
if (success) {
// flush the page successfully, then remove this page
checkShouldRemoveFile(pageElement);
} else {
// unsealed page is not large enough or page.endTime > file.endTime, then deserialze it
summary.pageNoneOverlapButDeserialize += 1;
if (!pointPriorityReader.addNewPageIfPageNotEmpty(pageElement)) {
return;
}
// write data points of the current page into chunk writer
TimeValuePair point;
while (pointPriorityReader.hasNext()) {
point = pointPriorityReader.currentPoint();
if (point.getTimestamp() > pageElement.getEndTime()) {
// finish writing this page
break;
}
compactionWriter.write(point, subTaskId);
pointPriorityReader.next();
}
}
}
/**
* Compact a series of pages that overlap with each other. Eg: The parameters are page 1 and page
* 2, that is, page 1 only overlaps with page 2, while page 2 overlap with page 3, page 3 overlap
* with page 4,and so on, there are 10 pages in total. This method will compact all 10 pages.
* Pages in the candidate overlapped pages list will be sequentially judged whether there is a
* real overlap, if so, it will be put into the point priority reader and deserialized; if not, it
* means that the page is located in a gap inside another pages, and it can be directly flushed to
* chunk writer. There will be new overlapped pages added into the list during the process of
* compacting overlapped pages. Notice: for a real overlap page, it will be removed from candidate
* list after it has been adding into point priority reader and deserializing. For a fake overlap
* page, it will be removed from candidate list after it has been flushing to chunk writer
* completely.
*/
private void compactWithOverlapPages()
throws IOException, PageException, WriteProcessException, IllegalPathException {
while (pointPriorityReader.hasNext()) {
TimeValuePair currentPoint = pointPriorityReader.currentPoint();
long currentTime = currentPoint.getTimestamp();
while (currentTime >= nextChunkStartTime || currentTime >= nextPageStartTime) {
// current point overlaps with next chunk or next page, then deserialize next chunk if
// necessary and get next page
PageElement nextPageElement = getPageFromPageQueue(currentTime);
// check whether next page is fake overlap or not
checkAndCompactOverlapPage(nextPageElement, currentPoint);
// get new current point
currentPoint = pointPriorityReader.currentPoint();
currentTime = currentPoint.getTimestamp();
}
// write data point into chunk writer
compactionWriter.write(currentPoint, subTaskId);
pointPriorityReader.next();
}
}
/**
* Check whether the page is true overlap or fake overlap. If a page is located in the gap of
* another page, then this page is fake overlap, which can be flushed to chunk writer directly.
* Otherwise, deserialize this page into point priority reader.
*/
private void checkAndCompactOverlapPage(PageElement nextPageElement, TimeValuePair currentPoint)
throws IOException, IllegalPathException, PageException, WriteProcessException {
ModifiedStatus nextPageModifiedStatus = isPageModified(nextPageElement);
if (nextPageModifiedStatus == ModifiedStatus.ALL_DELETED) {
// all data on next page has been deleted, remove it
checkShouldRemoveFile(nextPageElement);
} else {
// check is next page fake overlap (locates in the gap) or not
boolean isNextPageOverlap =
currentPoint.getTimestamp() <= nextPageElement.getEndTime()
|| nextPageElement.getEndTime() >= nextPageStartTime
|| nextPageElement.getEndTime() >= nextChunkStartTime;
if (isNextPageOverlap
|| nextPageModifiedStatus == ModifiedStatus.PARTIAL_DELETED
|| nextPageElement.needForceDecoding()) {
// next page is overlapped or modified, then deserialize it
summary.pageOverlapOrModified++;
pointPriorityReader.addNewPageIfPageNotEmpty(nextPageElement);
} else {
// has none overlap or modified pages, flush it to chunk writer directly
summary.pageFakeOverlap += 1;
compactWithNonOverlapPage(nextPageElement);
}
}
}
/**
* Find overlapped files which have not been selected. Notice: We must ensure that the returned
* list is ordered according to the startTime of the current device in the file from small to
* large, so that each file can be compacted in order.
*/
protected List<FileElement> findOverlapFiles(FileElement file) {
List<FileElement> overlappedFiles = new ArrayList<>();
long endTime = file.resource.getEndTime(deviceId);
for (FileElement fileElement : fileList) {
if (fileElement.resource.getStartTime(deviceId) <= endTime) {
if (!fileElement.isSelected) {
overlappedFiles.add(fileElement);
fileElement.isSelected = true;
}
} else {
break;
}
}
return overlappedFiles;
}
/**
* NONE_DELETED means that no data on this page has been deleted. <br>
* PARTIAL_DELETED means that there is data on this page been deleted. <br>
* ALL_DELETED means that all data on this page has been deleted.
*
* <p>Notice: If is aligned page, return ALL_DELETED if and only if all value pages are deleted.
* Return NONE_DELETED if and only if no data exists on all value pages is deleted
*/
protected abstract ModifiedStatus isPageModified(PageElement pageElement);
protected ModifiedStatus checkIsModified(
long startTime, long endTime, Collection<TimeRange> deletions) {
ModifiedStatus status = ModifiedStatus.NONE_DELETED;
if (deletions != null) {
for (TimeRange range : deletions) {
if (range.contains(startTime, endTime)) {
// all data on this page or chunk has been deleted
return ModifiedStatus.ALL_DELETED;
}
if (range.overlaps(new TimeRange(startTime, endTime))) {
// exist data on this page or chunk been deleted
status = ModifiedStatus.PARTIAL_DELETED;
}
}
}
return status;
}
/**
* Check whether current page is overlap with next chunk which has not been read into memory yet
* before getting one page. If it is, then read next chunk into memory and deserialize it into
* pages.
*
* @throws IOException if io errors occurred
*/
private PageElement getPageFromPageQueue(long curTime) throws IOException {
if (curTime >= nextChunkStartTime) {
// overlap with next chunk, then read it into memory and deserialize it into page queue
summary.chunkOverlapOrModified++;
ChunkMetadataElement chunkMetadataElement = chunkMetadataQueue.poll();
nextChunkStartTime =
chunkMetadataQueue.isEmpty() ? Long.MAX_VALUE : chunkMetadataQueue.peek().startTime;
readChunk(chunkMetadataElement);
deserializeChunkIntoPageQueue(chunkMetadataElement);
}
PageElement page = pageQueue.poll();
nextPageStartTime = pageQueue.isEmpty() ? Long.MAX_VALUE : pageQueue.peek().getStartTime();
return page;
}
/**
* Check should remove file or not. If it is the last page in the chunk and the last chunk in the
* file, then it means the file has been finished compacting and needs to be removed.
*
* @throws IOException if io errors occurred
* @throws IllegalPathException if file path is illegal
*/
private void checkShouldRemoveFile(PageElement pageElement)
throws IOException, IllegalPathException {
if (pageElement.isLastPage() && pageElement.getChunkMetadataElement().isLastChunk) {
// finish compacting the file, remove it from list
removeFile(pageElement.getChunkMetadataElement().fileElement);
}
}
/**
* Check if it is the last chunk in the file. If it is, it means the file has been finished
* compacting and needs to be removed.
*
* @throws IOException if io errors occurred
* @throws IllegalPathException if file path is illegal
*/
private void checkShouldRemoveFile(ChunkMetadataElement chunkMetadataElement)
throws IOException, IllegalPathException {
if (chunkMetadataElement.isLastChunk) {
// finish compacting the file, remove it from list
removeFile(chunkMetadataElement.fileElement);
}
}
/**
* Remove file from sorted list. If the file to be removed is the first file, we should re-find
* new overlapped files with the first file in the current file list, deserialize them into chunk
* metadatas and put them into chunk metadata queue.
*
* @throws IllegalPathException if file path is illegal
* @throws IOException if io errors occurred
*/
protected void removeFile(FileElement fileElement) throws IllegalPathException, IOException {
boolean isFirstFile = fileList.get(0).equals(fileElement);
fileList.remove(fileElement);
if (isFirstFile && !fileList.isEmpty()) {
// find new overlapped files and deserialize them into chunk metadata queue
List<FileElement> newOverlappedFiles = findOverlapFiles(fileList.get(0));
deserializeFileIntoChunkMetadataQueue(newOverlappedFiles);
nextChunkStartTime =
chunkMetadataQueue.isEmpty() ? Long.MAX_VALUE : chunkMetadataQueue.peek().startTime;
}
}
/**
* Get the modifications of a timeseries in the ModificationFile of a TsFile.
*
* @param path name of the time series
*/
protected List<Modification> getModificationsFromCache(
TsFileResource tsFileResource, PartialPath path) {
// copy from TsFileResource so queries are not affected
List<Modification> modifications =
modificationCacheMap.computeIfAbsent(
tsFileResource, resource -> new ArrayList<>(resource.getModFile().getModifications()));
List<Modification> pathModifications = new ArrayList<>();
Iterator<Modification> modificationIterator = modifications.iterator();
while (modificationIterator.hasNext()) {
Modification modification = modificationIterator.next();
if (modification.getPath().matchFullPath(path)) {
pathModifications.add(modification);
}
}
return pathModifications;
}
@SuppressWarnings("squid:S3776")
protected void updateSummary(ChunkMetadataElement chunkMetadataElement, ChunkStatus status) {
switch (status) {
case READ_IN:
summary.increaseProcessChunkNum(
isAligned
? ((AlignedChunkMetadata) chunkMetadataElement.chunkMetadata)
.getValueChunkMetadataList()
.size()
+ 1
: 1);
if (isAligned) {
for (IChunkMetadata valueChunkMetadata :
((AlignedChunkMetadata) chunkMetadataElement.chunkMetadata)
.getValueChunkMetadataList()) {
if (valueChunkMetadata == null) {
continue;
}
summary.increaseProcessPointNum(valueChunkMetadata.getStatistics().getCount());
}
} else {
summary.increaseProcessPointNum(
chunkMetadataElement.chunkMetadata.getStatistics().getCount());
}
break;
case DIRECTORY_FLUSH:
if (isAligned) {
summary.increaseDirectlyFlushChunkNum(
((AlignedChunkMetadata) (chunkMetadataElement.chunkMetadata))
.getValueChunkMetadataList()
.size()
+ 1);
} else {
summary.increaseDirectlyFlushChunkNum(1);
}
break;
case DESERIALIZE_CHUNK:
if (isAligned) {
summary.increaseDeserializedChunkNum(
((AlignedChunkMetadata) (chunkMetadataElement.chunkMetadata))
.getValueChunkMetadataList()
.size()
+ 1);
} else {
summary.increaseDeserializedChunkNum(1);
}
break;
default:
break;
}
}
enum ChunkStatus {
READ_IN,
DIRECTORY_FLUSH,
DESERIALIZE_CHUNK
}
}