| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer; |
| |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; |
| |
| import org.apache.tsfile.common.constant.TsFileConstant; |
| import org.apache.tsfile.exception.write.PageException; |
| import org.apache.tsfile.file.header.PageHeader; |
| import org.apache.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.tsfile.file.metadata.IDeviceID; |
| import org.apache.tsfile.file.metadata.PlainDeviceID; |
| import org.apache.tsfile.file.metadata.statistics.Statistics; |
| import org.apache.tsfile.read.TimeValuePair; |
| import org.apache.tsfile.read.common.Chunk; |
| import org.apache.tsfile.read.common.block.TsBlock; |
| import org.apache.tsfile.utils.TsPrimitiveType; |
| import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; |
| import org.apache.tsfile.write.chunk.ChunkWriterImpl; |
| import org.apache.tsfile.write.chunk.IChunkWriter; |
| import org.apache.tsfile.write.chunk.ValueChunkWriter; |
| import org.apache.tsfile.write.schema.IMeasurementSchema; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| |
| public abstract class AbstractCompactionWriter implements AutoCloseable { |
| protected int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); |
| |
| // check if there is unseq error point during writing |
| protected long[] lastTime = new long[subTaskNum]; |
| |
| // Each sub task has its own chunk writer. |
| // The index of the array corresponds to subTaskId. |
| protected IChunkWriter[] chunkWriters = new IChunkWriter[subTaskNum]; |
| |
| // Each sub task has point count in current measurment, which is used to check size. |
| // The index of the array corresponds to subTaskId. |
| protected int[] chunkPointNumArray = new int[subTaskNum]; |
| |
| // used to control the target chunk size |
| protected long targetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); |
| |
| // used to control the point num of target chunk |
| protected long targetChunkPointNum = |
| IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum(); |
| |
| // When num of points writing into target files reaches check point, then check chunk size |
| @SuppressWarnings("squid:S1170") |
| private final long checkPoint = (targetChunkPointNum >= 10 ? targetChunkPointNum : 10) / 10; |
| |
| private long lastCheckIndex = 0; |
| |
| // if unsealed chunk size is lower then this, then deserialize next chunk no matter it is |
| // overlapped or not |
| protected long chunkSizeLowerBoundInCompaction = |
| IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction(); |
| |
| // if point num of unsealed chunk is lower then this, then deserialize next chunk no matter it is |
| // overlapped or not |
| protected long chunkPointNumLowerBoundInCompaction = |
| IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction(); |
| |
| // if unsealed page size is lower then this, then deserialize next page no matter it is |
| // overlapped or not |
| protected long pageSizeLowerBoundInCompaction = chunkSizeLowerBoundInCompaction / 10; |
| |
| // if point num of unsealed page is lower then this, then deserialize next page no matter it is |
| // overlapped or not |
| protected long pagePointNumLowerBoundInCompaction = chunkPointNumLowerBoundInCompaction / 10; |
| |
| protected boolean isAlign; |
| |
| protected IDeviceID deviceId; |
| |
| protected String[] measurementId = new String[subTaskNum]; |
| |
| public abstract void startChunkGroup(IDeviceID deviceId, boolean isAlign) throws IOException; |
| |
| public abstract void endChunkGroup() throws IOException; |
| |
| public void startMeasurement(List<IMeasurementSchema> measurementSchemaList, int subTaskId) { |
| lastCheckIndex = 0; |
| lastTime[subTaskId] = Long.MIN_VALUE; |
| if (isAlign) { |
| // the first is time metadata and the rest is value metadata list |
| chunkWriters[subTaskId] = |
| new AlignedChunkWriterImpl(measurementSchemaList.remove(0), measurementSchemaList); |
| measurementId[subTaskId] = TsFileConstant.TIME_COLUMN_ID; |
| } else { |
| chunkWriters[subTaskId] = new ChunkWriterImpl(measurementSchemaList.get(0), true); |
| measurementId[subTaskId] = measurementSchemaList.get(0).getMeasurementId(); |
| } |
| } |
| |
| public abstract void endMeasurement(int subTaskId) throws IOException; |
| |
| public abstract void write(TimeValuePair timeValuePair, int subTaskId) throws IOException; |
| |
| public abstract void write(TsBlock tsBlock, int subTaskId) throws IOException; |
| |
| public abstract void endFile() throws IOException; |
| |
| public abstract long getWriterSize() throws IOException; |
| |
| /** |
| * Update startTime and endTime of the current device in each target resources, and check whether |
| * to flush chunk metadatas or not. |
| * |
| * @throws IOException if io errors occurred |
| */ |
| public abstract void checkAndMayFlushChunkMetadata() throws IOException; |
| |
| protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter) { |
| if (chunkWriter instanceof ChunkWriterImpl) { |
| ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter; |
| switch (chunkWriterImpl.getDataType()) { |
| case TEXT: |
| chunkWriterImpl.write(timestamp, value.getBinary()); |
| break; |
| case DOUBLE: |
| chunkWriterImpl.write(timestamp, value.getDouble()); |
| break; |
| case BOOLEAN: |
| chunkWriterImpl.write(timestamp, value.getBoolean()); |
| break; |
| case INT64: |
| chunkWriterImpl.write(timestamp, value.getLong()); |
| break; |
| case INT32: |
| chunkWriterImpl.write(timestamp, value.getInt()); |
| break; |
| case FLOAT: |
| chunkWriterImpl.write(timestamp, value.getFloat()); |
| break; |
| default: |
| throw new UnsupportedOperationException( |
| "Unknown data type " + chunkWriterImpl.getDataType()); |
| } |
| } else { |
| AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; |
| alignedChunkWriter.write(timestamp, value.getVector()); |
| } |
| } |
| |
| @SuppressWarnings("squid:S2445") |
| protected void sealChunk( |
| CompactionTsFileWriter targetWriter, IChunkWriter chunkWriter, int subTaskId) |
| throws IOException { |
| synchronized (targetWriter) { |
| targetWriter.writeChunk(chunkWriter); |
| } |
| chunkPointNumArray[subTaskId] = 0; |
| } |
| |
| public abstract boolean flushNonAlignedChunk( |
| Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) throws IOException; |
| |
| public abstract boolean flushAlignedChunk( |
| Chunk timeChunk, |
| IChunkMetadata timeChunkMetadata, |
| List<Chunk> valueChunks, |
| List<IChunkMetadata> valueChunkMetadatas, |
| int subTaskId) |
| throws IOException; |
| |
| @SuppressWarnings("squid:S2445") |
| protected void flushNonAlignedChunkToFileWriter( |
| CompactionTsFileWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) |
| throws IOException { |
| synchronized (targetWriter) { |
| // seal last chunk to file writer |
| targetWriter.writeChunk(chunkWriters[subTaskId]); |
| chunkPointNumArray[subTaskId] = 0; |
| targetWriter.writeChunk(chunk, chunkMetadata); |
| } |
| } |
| |
| @SuppressWarnings("squid:S2445") |
| protected void flushAlignedChunkToFileWriter( |
| CompactionTsFileWriter targetWriter, |
| Chunk timeChunk, |
| IChunkMetadata timeChunkMetadata, |
| List<Chunk> valueChunks, |
| List<IChunkMetadata> valueChunkMetadatas, |
| int subTaskId) |
| throws IOException { |
| synchronized (targetWriter) { |
| AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriters[subTaskId]; |
| // seal last chunk to file writer |
| targetWriter.writeChunk(alignedChunkWriter); |
| chunkPointNumArray[subTaskId] = 0; |
| |
| targetWriter.markStartingWritingAligned(); |
| |
| // flush time chunk |
| targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata); |
| |
| // flush value chunks |
| for (int i = 0; i < valueChunks.size(); i++) { |
| Chunk valueChunk = valueChunks.get(i); |
| if (valueChunk == null) { |
| // sub sensor does not exist in current file or value chunk has been deleted completely |
| ValueChunkWriter valueChunkWriter = alignedChunkWriter.getValueChunkWriterByIndex(i); |
| targetWriter.writeEmptyValueChunk( |
| valueChunkWriter.getMeasurementId(), |
| valueChunkWriter.getCompressionType(), |
| valueChunkWriter.getDataType(), |
| valueChunkWriter.getEncodingType(), |
| Statistics.getStatsByType(valueChunkWriter.getDataType())); |
| continue; |
| } |
| targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i)); |
| } |
| |
| targetWriter.markEndingWritingAligned(); |
| } |
| } |
| |
| public abstract boolean flushNonAlignedPage( |
| ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) |
| throws IOException, PageException; |
| |
| protected void flushNonAlignedPageToChunkWriter( |
| ChunkWriterImpl chunkWriter, |
| ByteBuffer compressedPageData, |
| PageHeader pageHeader, |
| int subTaskId) |
| throws PageException { |
| // seal current page |
| chunkWriter.sealCurrentPage(); |
| // flush new page to chunk writer directly |
| chunkWriter.writePageHeaderAndDataIntoBuff(compressedPageData, pageHeader); |
| |
| chunkPointNumArray[subTaskId] += pageHeader.getStatistics().getCount(); |
| } |
| |
| public abstract boolean flushAlignedPage( |
| ByteBuffer compressedTimePageData, |
| PageHeader timePageHeader, |
| List<ByteBuffer> compressedValuePageDatas, |
| List<PageHeader> valuePageHeaders, |
| int subTaskId) |
| throws IOException, PageException; |
| |
| protected void flushAlignedPageToChunkWriter( |
| AlignedChunkWriterImpl alignedChunkWriter, |
| ByteBuffer compressedTimePageData, |
| PageHeader timePageHeader, |
| List<ByteBuffer> compressedValuePageDatas, |
| List<PageHeader> valuePageHeaders, |
| int subTaskId) |
| throws IOException, PageException { |
| // seal current page |
| alignedChunkWriter.sealCurrentPage(); |
| // flush new time page to chunk writer directly |
| alignedChunkWriter.writePageHeaderAndDataIntoTimeBuff(compressedTimePageData, timePageHeader); |
| |
| // flush new value pages to chunk writer directly |
| for (int i = 0; i < valuePageHeaders.size(); i++) { |
| if (valuePageHeaders.get(i) == null) { |
| // sub sensor does not exist in current file or value page has been deleted completely |
| alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer(); |
| continue; |
| } |
| alignedChunkWriter.writePageHeaderAndDataIntoValueBuff( |
| compressedValuePageDatas.get(i), valuePageHeaders.get(i), i); |
| } |
| |
| chunkPointNumArray[subTaskId] += timePageHeader.getStatistics().getCount(); |
| } |
| |
| protected void checkChunkSizeAndMayOpenANewChunk( |
| CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int subTaskId) |
| throws IOException { |
| if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) { |
| // if chunk point num reaches the check point, then check if the chunk size over threshold |
| lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint; |
| if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) { |
| sealChunk(fileWriter, chunkWriter, subTaskId); |
| lastCheckIndex = 0; |
| } |
| } |
| } |
| |
| protected long getChunkSize(Chunk chunk) { |
| return (long) chunk.getHeader().getSerializedSize() + chunk.getHeader().getDataSize(); |
| } |
| |
| protected void checkPreviousTimestamp(long currentWritingTimestamp, int subTaskId) { |
| if (currentWritingTimestamp <= lastTime[subTaskId]) { |
| throw new CompactionLastTimeCheckFailedException( |
| ((PlainDeviceID) deviceId).toStringID() |
| + IoTDBConstant.PATH_SEPARATOR |
| + measurementId[subTaskId], |
| currentWritingTimestamp, |
| lastTime[subTaskId]); |
| } |
| } |
| } |