blob: 1182cfbee0b5515ca57fd75fadf0848ee283ac43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.chunk.ChunkWriterImpl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
public class FastInnerCompactionWriter extends AbstractInnerCompactionWriter {
public FastInnerCompactionWriter(TsFileResource targetFileResource) throws IOException {
super(targetFileResource);
}
@Override
public void write(TsBlock tsBlock, int subTaskId) throws IOException {
throw new RuntimeException("Does not support this method in FastInnerCompactionWriter");
}
/**
* Flush nonAligned chunk to tsfile directly. Return whether the chunk is flushed to tsfile
* successfully or not. Return false if there is unsealed chunk or current chunk is not large
* enough, else return true.
*/
@Override
public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId)
throws IOException {
checkPreviousTimestamp(chunkMetadata.getStartTime(), subTaskId);
if (chunkPointNumArray[subTaskId] != 0
&& chunkWriters[subTaskId].checkIsChunkSizeOverThreshold(
targetChunkSize, targetChunkPointNum, false)) {
// if there is unsealed chunk which is large enough, then seal chunk
sealChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
}
if (chunkPointNumArray[subTaskId] != 0 || !checkIsChunkLargeEnough(chunk)) {
// if there is unsealed chunk or current chunk is not large enough, then deserialize the chunk
return false;
}
flushNonAlignedChunkToFileWriter(fileWriter, chunk, chunkMetadata, subTaskId);
isEmptyFile = false;
lastTime[subTaskId] = chunkMetadata.getEndTime();
return true;
}
/**
* Flush nonAligned chunk to tsfile directly. Return whether the chunk is flushed to tsfile
* successfully or not. Return false if there is unsealed chunk or current chunk is not large
* enough, else return true. Notice: if sub-value measurement is null, then flush empty value
* chunk.
*
* @throws IOException if io errors occurred
*/
@Override
public boolean flushAlignedChunk(
Chunk timeChunk,
IChunkMetadata timeChunkMetadata,
List<Chunk> valueChunks,
List<IChunkMetadata> valueChunkMetadatas,
int subTaskId)
throws IOException {
checkPreviousTimestamp(timeChunkMetadata.getStartTime(), subTaskId);
if (chunkPointNumArray[subTaskId] != 0
&& chunkWriters[subTaskId].checkIsChunkSizeOverThreshold(
targetChunkSize, targetChunkPointNum, false)) {
// if there is unsealed chunk which is large enough, then seal chunk
sealChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
}
if (chunkPointNumArray[subTaskId] != 0
|| !checkIsAlignedChunkLargeEnough(timeChunk, valueChunks)) {
// if there is unsealed chunk or current chunk is not large enough, then deserialize the chunk
return false;
}
flushAlignedChunkToFileWriter(
fileWriter, timeChunk, timeChunkMetadata, valueChunks, valueChunkMetadatas, subTaskId);
isEmptyFile = false;
lastTime[subTaskId] = timeChunkMetadata.getEndTime();
return true;
}
/**
* Flush aligned page to tsfile directly. Return whether the page is flushed to tsfile
* successfully or not. Return false if the unsealed page is too small or the end time of page
* exceeds the end time of file, else return true. Notice: if sub-value measurement is null, then
* flush empty value page.
*
* @throws IOException if io errors occurred
* @throws PageException if errors occurred when write data page header
*/
public boolean flushAlignedPage(
ByteBuffer compressedTimePageData,
PageHeader timePageHeader,
List<ByteBuffer> compressedValuePageDatas,
List<PageHeader> valuePageHeaders,
int subTaskId)
throws IOException, PageException {
checkPreviousTimestamp(timePageHeader.getStartTime(), subTaskId);
boolean isUnsealedPageOverThreshold =
chunkWriters[subTaskId].checkIsUnsealedPageOverThreshold(
pageSizeLowerBoundInCompaction, pagePointNumLowerBoundInCompaction, true);
if (!isUnsealedPageOverThreshold
|| !checkIsAlignedPageLargeEnough(timePageHeader, valuePageHeaders)) {
// there is unsealed page or current page is not large enough , then deserialize the page
return false;
}
flushAlignedPageToChunkWriter(
(AlignedChunkWriterImpl) chunkWriters[subTaskId],
compressedTimePageData,
timePageHeader,
compressedValuePageDatas,
valuePageHeaders,
subTaskId);
isEmptyFile = false;
lastTime[subTaskId] = timePageHeader.getEndTime();
return true;
}
/**
* Flush nonAligned page to tsfile directly. Return whether the page is flushed to tsfile
* successfully or not. Return false if the unsealed page is too small or the end time of page
* exceeds the end time of file, else return true.
*
* @throws PageException if errors occurred when write data page header
*/
public boolean flushNonAlignedPage(
ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) throws PageException {
checkPreviousTimestamp(pageHeader.getStartTime(), subTaskId);
boolean isUnsealedPageOverThreshold =
chunkWriters[subTaskId].checkIsUnsealedPageOverThreshold(
pageSizeLowerBoundInCompaction, pagePointNumLowerBoundInCompaction, true);
if (!isUnsealedPageOverThreshold || !checkIsPageLargeEnough(pageHeader)) {
// there is unsealed page or current page is not large enough , then deserialize the page
return false;
}
flushNonAlignedPageToChunkWriter(
(ChunkWriterImpl) chunkWriters[subTaskId], compressedPageData, pageHeader, subTaskId);
isEmptyFile = false;
lastTime[subTaskId] = pageHeader.getEndTime();
return true;
}
private boolean checkIsAlignedChunkLargeEnough(Chunk timeChunk, List<Chunk> valueChunks) {
if (checkIsChunkLargeEnough(timeChunk)) {
return true;
}
for (Chunk valueChunk : valueChunks) {
if (valueChunk == null) {
continue;
}
if (checkIsChunkLargeEnough(valueChunk)) {
return true;
}
}
return false;
}
private boolean checkIsChunkLargeEnough(Chunk chunk) {
return chunk.getChunkStatistic().getCount() >= targetChunkPointNum
|| getChunkSize(chunk) >= targetChunkSize;
}
private boolean checkIsAlignedPageLargeEnough(
PageHeader timePageHeader, List<PageHeader> valuePageHeaders) {
if (checkIsPageLargeEnough(timePageHeader)) {
return true;
}
for (PageHeader valuePageHeader : valuePageHeaders) {
if (valuePageHeader == null) {
continue;
}
if (checkIsPageLargeEnough(valuePageHeader)) {
return true;
}
}
return false;
}
private boolean checkIsPageLargeEnough(PageHeader pageHeader) {
return pageHeader.getStatistics().getCount() >= pagePointNumLowerBoundInCompaction
|| pageHeader.getSerializedPageSize() >= pageSizeLowerBoundInCompaction;
}
}