| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.tsfile.write.chunk; |
| |
| import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.iotdb.tsfile.encoding.encoder.Encoder; |
| import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder; |
| import org.apache.iotdb.tsfile.exception.write.PageException; |
| import org.apache.iotdb.tsfile.file.header.PageHeader; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.common.block.column.Column; |
| import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; |
| import org.apache.iotdb.tsfile.utils.Binary; |
| import org.apache.iotdb.tsfile.utils.TsPrimitiveType; |
| import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; |
| import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; |
| import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| public class AlignedChunkWriterImpl implements IChunkWriter { |
| |
| private final TimeChunkWriter timeChunkWriter; |
| private final List<ValueChunkWriter> valueChunkWriterList; |
| private int valueIndex; |
| |
| // Used for batch writing |
| private long remainingPointsNumber; |
| |
| // TestOnly |
| public AlignedChunkWriterImpl(VectorMeasurementSchema schema) { |
| timeChunkWriter = |
| new TimeChunkWriter( |
| schema.getMeasurementId(), |
| schema.getCompressor(), |
| schema.getTimeTSEncoding(), |
| schema.getTimeEncoder()); |
| |
| List<String> valueMeasurementIdList = schema.getSubMeasurementsList(); |
| List<TSDataType> valueTSDataTypeList = schema.getSubMeasurementsTSDataTypeList(); |
| List<TSEncoding> valueTSEncodingList = schema.getSubMeasurementsTSEncodingList(); |
| List<Encoder> valueEncoderList = schema.getSubMeasurementsEncoderList(); |
| |
| valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size()); |
| for (int i = 0; i < valueMeasurementIdList.size(); i++) { |
| valueChunkWriterList.add( |
| new ValueChunkWriter( |
| valueMeasurementIdList.get(i), |
| schema.getCompressor(), |
| valueTSDataTypeList.get(i), |
| valueTSEncodingList.get(i), |
| valueEncoderList.get(i))); |
| } |
| |
| this.valueIndex = 0; |
| this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage(); |
| } |
| |
| /** |
| * This is used to rewrite file. The encoding and compression of the time column should be the |
| * same as the source file. |
| * |
| * @param timeSchema time schema |
| * @param valueSchemaList value schema list |
| */ |
| public AlignedChunkWriterImpl( |
| IMeasurementSchema timeSchema, List<IMeasurementSchema> valueSchemaList) { |
| timeChunkWriter = |
| new TimeChunkWriter( |
| timeSchema.getMeasurementId(), |
| timeSchema.getCompressor(), |
| timeSchema.getEncodingType(), |
| timeSchema.getTimeEncoder()); |
| |
| valueChunkWriterList = new ArrayList<>(valueSchemaList.size()); |
| for (int i = 0; i < valueSchemaList.size(); i++) { |
| valueChunkWriterList.add( |
| new ValueChunkWriter( |
| valueSchemaList.get(i).getMeasurementId(), |
| valueSchemaList.get(i).getCompressor(), |
| valueSchemaList.get(i).getType(), |
| valueSchemaList.get(i).getEncodingType(), |
| valueSchemaList.get(i).getValueEncoder())); |
| } |
| |
| this.valueIndex = 0; |
| this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage(); |
| } |
| |
| /** |
| * This is used to write 0-level file. The compression of the time column is 'LZ4' in the |
| * configuration by default. The encoding of the time column is 'TS_2DIFF' in the configuration by |
| * default. |
| * |
| * @param schemaList value schema list |
| */ |
| public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList) { |
| TSEncoding timeEncoding = |
| TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); |
| TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); |
| CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor(); |
| timeChunkWriter = |
| new TimeChunkWriter( |
| "", |
| timeCompression, |
| timeEncoding, |
| TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType)); |
| |
| valueChunkWriterList = new ArrayList<>(schemaList.size()); |
| for (int i = 0; i < schemaList.size(); i++) { |
| valueChunkWriterList.add( |
| new ValueChunkWriter( |
| schemaList.get(i).getMeasurementId(), |
| schemaList.get(i).getCompressor(), |
| schemaList.get(i).getType(), |
| schemaList.get(i).getEncodingType(), |
| schemaList.get(i).getValueEncoder())); |
| } |
| |
| this.valueIndex = 0; |
| |
| this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage(); |
| } |
| |
| public void write(long time, int value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex++).write(time, value, isNull); |
| } |
| |
| public void write(long time, long value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex++).write(time, value, isNull); |
| } |
| |
| public void write(long time, boolean value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex++).write(time, value, isNull); |
| } |
| |
| public void write(long time, float value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex++).write(time, value, isNull); |
| } |
| |
| public void write(long time, double value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex++).write(time, value, isNull); |
| } |
| |
| public void write(long time, Binary value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex++).write(time, value, isNull); |
| } |
| |
| public void write(long time, int value, boolean isNull, int valueIndex) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void write(long time, long value, boolean isNull, int valueIndex) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void write(long time, boolean value, boolean isNull, int valueIndex) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void write(long time, float value, boolean isNull, int valueIndex) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void write(long time, double value, boolean isNull, int valueIndex) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void write(long time, Binary value, boolean isNull, int valueIndex) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void write(long time, TsPrimitiveType[] points) { |
| valueIndex = 0; |
| for (TsPrimitiveType point : points) { |
| ValueChunkWriter writer = valueChunkWriterList.get(valueIndex++); |
| switch (writer.getDataType()) { |
| case INT64: |
| writer.write(time, point != null ? point.getLong() : Long.MAX_VALUE, point == null); |
| break; |
| case INT32: |
| writer.write(time, point != null ? point.getInt() : Integer.MAX_VALUE, point == null); |
| break; |
| case FLOAT: |
| writer.write(time, point != null ? point.getFloat() : Float.MAX_VALUE, point == null); |
| break; |
| case DOUBLE: |
| writer.write(time, point != null ? point.getDouble() : Double.MAX_VALUE, point == null); |
| break; |
| case BOOLEAN: |
| writer.write(time, point != null ? point.getBoolean() : false, point == null); |
| break; |
| case TEXT: |
| writer.write( |
| time, |
| point != null ? point.getBinary() : new Binary("".getBytes(StandardCharsets.UTF_8)), |
| point == null); |
| break; |
| } |
| } |
| write(time); |
| } |
| |
| public void write(long time) { |
| valueIndex = 0; |
| timeChunkWriter.write(time); |
| if (checkPageSizeAndMayOpenANewPage()) { |
| writePageToPageBuffer(); |
| } |
| } |
| |
| public void writeTime(long time) { |
| timeChunkWriter.write(time); |
| } |
| |
| public void write(TimeColumn timeColumn, Column[] valueColumns, int batchSize) { |
| if (remainingPointsNumber < batchSize) { |
| int pointsHasWritten = (int) remainingPointsNumber; |
| batchWrite(timeColumn, valueColumns, pointsHasWritten, 0); |
| batchWrite(timeColumn, valueColumns, batchSize - pointsHasWritten, pointsHasWritten); |
| } else { |
| batchWrite(timeColumn, valueColumns, batchSize, 0); |
| } |
| } |
| |
| private void batchWrite( |
| TimeColumn timeColumn, Column[] valueColumns, int batchSize, int arrayOffset) { |
| valueIndex = 0; |
| long[] times = timeColumn.getTimes(); |
| |
| for (Column column : valueColumns) { |
| ValueChunkWriter chunkWriter = valueChunkWriterList.get(valueIndex++); |
| TSDataType tsDataType = chunkWriter.getDataType(); |
| switch (tsDataType) { |
| case TEXT: |
| chunkWriter.write(times, column.getBinaries(), column.isNull(), batchSize, arrayOffset); |
| break; |
| case DOUBLE: |
| chunkWriter.write(times, column.getDoubles(), column.isNull(), batchSize, arrayOffset); |
| break; |
| case BOOLEAN: |
| chunkWriter.write(times, column.getBooleans(), column.isNull(), batchSize, arrayOffset); |
| break; |
| case INT64: |
| chunkWriter.write(times, column.getLongs(), column.isNull(), batchSize, arrayOffset); |
| break; |
| case INT32: |
| chunkWriter.write(times, column.getInts(), column.isNull(), batchSize, arrayOffset); |
| break; |
| case FLOAT: |
| chunkWriter.write(times, column.getFloats(), column.isNull(), batchSize, arrayOffset); |
| break; |
| default: |
| throw new UnsupportedOperationException("Unknown data type " + tsDataType); |
| } |
| } |
| |
| write(times, batchSize, arrayOffset); |
| } |
| |
| public void write(long[] time, int batchSize, int arrayOffset) { |
| valueIndex = 0; |
| timeChunkWriter.write(time, batchSize, arrayOffset); |
| if (checkPageSizeAndMayOpenANewPage()) { |
| writePageToPageBuffer(); |
| } |
| |
| remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage(); |
| } |
| |
| public void writeByColumn(long time, int value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void writeByColumn(long time, long value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void writeByColumn(long time, boolean value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void writeByColumn(long time, float value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void writeByColumn(long time, double value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void writeByColumn(long time, Binary value, boolean isNull) { |
| valueChunkWriterList.get(valueIndex).write(time, value, isNull); |
| } |
| |
| public void nextColumn() { |
| valueIndex++; |
| } |
| |
| /** |
| * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it |
| * to pageBuffer |
| */ |
| private boolean checkPageSizeAndMayOpenANewPage() { |
| if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) { |
| return true; |
| } |
| for (ValueChunkWriter writer : valueChunkWriterList) { |
| if (writer.checkPageSizeAndMayOpenANewPage()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private void writePageToPageBuffer() { |
| timeChunkWriter.writePageToPageBuffer(); |
| for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { |
| valueChunkWriter.writePageToPageBuffer(); |
| } |
| } |
| |
| public void writePageHeaderAndDataIntoTimeBuff(ByteBuffer data, PageHeader header) |
| throws PageException { |
| timeChunkWriter.writePageHeaderAndDataIntoBuff(data, header); |
| } |
| |
| public void writePageHeaderAndDataIntoValueBuff( |
| ByteBuffer data, PageHeader header, int valueIndex) throws PageException { |
| valueChunkWriterList.get(valueIndex).writePageHeaderAndDataIntoBuff(data, header); |
| } |
| |
| @Override |
| public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException { |
| timeChunkWriter.writeToFileWriter(tsfileWriter); |
| for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { |
| valueChunkWriter.writeToFileWriter(tsfileWriter); |
| } |
| } |
| |
| @Override |
| public long estimateMaxSeriesMemSize() { |
| long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize(); |
| for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { |
| estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize(); |
| } |
| return estimateMaxSeriesMemSize; |
| } |
| |
| public long getSerializedChunkSize() { |
| long currentChunkSize = timeChunkWriter.getCurrentChunkSize(); |
| for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { |
| currentChunkSize += valueChunkWriter.getCurrentChunkSize(); |
| } |
| return currentChunkSize; |
| } |
| |
| @Override |
| public void sealCurrentPage() { |
| timeChunkWriter.sealCurrentPage(); |
| for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { |
| valueChunkWriter.sealCurrentPage(); |
| } |
| } |
| |
| public void sealCurrentTimePage() { |
| timeChunkWriter.sealCurrentPage(); |
| } |
| |
| public void sealCurrentValuePage(int valueIndex) { |
| valueChunkWriterList.get(valueIndex).sealCurrentPage(); |
| } |
| |
| @Override |
| public void clearPageWriter() { |
| timeChunkWriter.clearPageWriter(); |
| for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { |
| valueChunkWriter.clearPageWriter(); |
| } |
| } |
| |
| @Override |
| public boolean checkIsChunkSizeOverThreshold( |
| long size, long pointNum, boolean returnTrueIfChunkEmpty) { |
| if ((returnTrueIfChunkEmpty && timeChunkWriter.getPointNum() == 0) |
| || (timeChunkWriter.getPointNum() >= pointNum |
| || timeChunkWriter.estimateMaxSeriesMemSize() >= size)) { |
| return true; |
| } |
| for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { |
| if (valueChunkWriter.estimateMaxSeriesMemSize() >= size) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| return timeChunkWriter.getPointNum() + timeChunkWriter.getPageWriter().getPointNumber() == 0; |
| } |
| |
| @Override |
| public boolean checkIsUnsealedPageOverThreshold( |
| long size, long pointNum, boolean returnTrueIfPageEmpty) { |
| if ((returnTrueIfPageEmpty && timeChunkWriter.getPageWriter().getPointNumber() == 0) |
| || timeChunkWriter.checkIsUnsealedPageOverThreshold(size, pointNum)) { |
| return true; |
| } |
| for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { |
| if (valueChunkWriter.checkIsUnsealedPageOverThreshold(size)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| public ValueChunkWriter getValueChunkWriterByIndex(int valueIndex) { |
| return valueChunkWriterList.get(valueIndex); |
| } |
| |
| /** Test only */ |
| public TimeChunkWriter getTimeChunkWriter() { |
| return timeChunkWriter; |
| } |
| |
| /** Test only */ |
| public List<ValueChunkWriter> getValueChunkWriterList() { |
| return valueChunkWriterList; |
| } |
| } |