blob: de289b9bb8fce164cdc5682121f6844c8b5022d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write.chunk;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
public class ValueChunkWriter {
private static final Logger logger = LoggerFactory.getLogger(ValueChunkWriter.class);
private final String measurementId;
private final TSEncoding encodingType;
private final TSDataType dataType;
private final CompressionType compressionType;
/** all pages of this chunk. */
private final PublicBAOS pageBuffer;
private int numOfPages;
/** write data into current page */
private ValuePageWriter pageWriter;
/** page size threshold. */
private final long pageSizeThreshold;
private final int maxNumberOfPointsInPage;
/** value count in current page. */
private int valueCountInOnePageForNextCheck;
// initial value for valueCountInOnePageForNextCheck
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
/** statistic of this chunk. */
private Statistics<? extends Serializable> statistics;
/** first page info */
private int sizeWithoutStatistic;
private Statistics<?> firstPageStatistics;
public ValueChunkWriter(
String measurementId,
CompressionType compressionType,
TSDataType dataType,
TSEncoding encodingType,
Encoder valueEncoder) {
this.measurementId = measurementId;
this.encodingType = encodingType;
this.dataType = dataType;
this.compressionType = compressionType;
this.pageBuffer = new PublicBAOS();
this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.maxNumberOfPointsInPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
// init statistics for this chunk and page
this.statistics = Statistics.getStatsByType(dataType);
this.pageWriter =
new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), dataType);
}
public void write(long time, long value, boolean isNull) {
pageWriter.write(time, value, isNull);
}
public void write(long time, int value, boolean isNull) {
pageWriter.write(time, value, isNull);
}
public void write(long time, boolean value, boolean isNull) {
pageWriter.write(time, value, isNull);
}
public void write(long time, float value, boolean isNull) {
pageWriter.write(time, value, isNull);
}
public void write(long time, double value, boolean isNull) {
pageWriter.write(time, value, isNull);
}
public void write(long time, Binary value, boolean isNull) {
pageWriter.write(time, value, isNull);
}
public void write(long[] timestamps, int[] values, boolean[] isNull, int batchSize, int pos) {
pageWriter.write(timestamps, values, isNull, batchSize, pos);
}
public void write(long[] timestamps, long[] values, boolean[] isNull, int batchSize, int pos) {
pageWriter.write(timestamps, values, isNull, batchSize, pos);
}
public void write(long[] timestamps, boolean[] values, boolean[] isNull, int batchSize, int pos) {
pageWriter.write(timestamps, values, isNull, batchSize, pos);
}
public void write(long[] timestamps, float[] values, boolean[] isNull, int batchSize, int pos) {
pageWriter.write(timestamps, values, isNull, batchSize, pos);
}
public void write(long[] timestamps, double[] values, boolean[] isNull, int batchSize, int pos) {
pageWriter.write(timestamps, values, isNull, batchSize, pos);
}
public void write(long[] timestamps, Binary[] values, boolean[] isNull, int batchSize, int pos) {
pageWriter.write(timestamps, values, isNull, batchSize, pos);
}
public void writeEmptyPageToPageBuffer() throws IOException {
if (numOfPages == 1 && firstPageStatistics != null) {
// if the first page is not an empty page
byte[] b = pageBuffer.toByteArray();
pageBuffer.reset();
pageBuffer.write(b, 0, this.sizeWithoutStatistic);
firstPageStatistics.serialize(pageBuffer);
pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
firstPageStatistics = null;
}
pageWriter.writeEmptyPageIntoBuff(pageBuffer);
numOfPages++;
}
public void writePageToPageBuffer() {
try {
if (numOfPages == 0) {
if (pageWriter.getStatistics().getCount() != 0) {
// record the firstPageStatistics if it is not empty page
this.firstPageStatistics = pageWriter.getStatistics();
}
this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
} else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
if (firstPageStatistics != null) { // Consider previous page is an empty page
byte[] b = pageBuffer.toByteArray();
pageBuffer.reset();
pageBuffer.write(b, 0, this.sizeWithoutStatistic);
firstPageStatistics.serialize(pageBuffer);
pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
}
pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
firstPageStatistics = null;
} else {
pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
}
// update statistics of this chunk
numOfPages++;
this.statistics.mergeStatistics(pageWriter.getStatistics());
} catch (IOException e) {
logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
} finally {
// clear start time stamp for next initializing
pageWriter.reset(dataType);
}
}
public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
throws PageException {
// write the page header to pageBuffer
try {
logger.debug(
"start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
// serialize pageHeader see writePageToPageBuffer method
if (numOfPages == 0) { // record the firstPageStatistics
this.sizeWithoutStatistic +=
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
if (header.getStatistics() == null) {
this.firstPageStatistics = null;
} else {
this.firstPageStatistics = header.getStatistics();
this.sizeWithoutStatistic +=
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
}
} else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
if (firstPageStatistics != null) {
byte[] b = pageBuffer.toByteArray();
pageBuffer.reset();
pageBuffer.write(b, 0, this.sizeWithoutStatistic);
firstPageStatistics.serialize(pageBuffer);
pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
}
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
if (header.getUncompressedSize() != 0) {
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
header.getStatistics().serialize(pageBuffer);
}
firstPageStatistics = null;
} else {
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
if (header.getUncompressedSize() != 0) {
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
header.getStatistics().serialize(pageBuffer);
}
}
logger.debug(
"finish to flush a page header {} of time page into buffer, buffer position {} ",
header,
pageBuffer.size());
if (header.getStatistics() != null) {
statistics.mergeStatistics(header.getStatistics());
}
} catch (IOException e) {
throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
}
numOfPages++;
// write page content to temp PBAOS
try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
channel.write(data);
} catch (IOException e) {
throw new PageException(e);
}
}
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
sealCurrentPage();
writeAllPagesOfChunkToTsFile(tsfileWriter);
// reinit this chunk writer
pageBuffer.reset();
numOfPages = 0;
sizeWithoutStatistic = 0;
firstPageStatistics = null;
this.statistics = Statistics.getStatsByType(dataType);
}
public long estimateMaxSeriesMemSize() {
return pageBuffer.size()
+ pageWriter.estimateMaxMemSize()
+ PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+ pageWriter.getStatistics().getSerializedSize();
}
public long getCurrentChunkSize() {
/**
* It may happen if subsequent write operations are all out of order, then count of statistics
* in this chunk will be 0 and this chunk will not be flushed.
*/
if (pageBuffer.size() == 0) {
return 0;
}
// Empty chunk, it may happen if pageBuffer stores empty bits and only chunk header will be
// flushed.
if (statistics.getCount() == 0) {
return ChunkHeader.getSerializedSize(measurementId, 0);
}
// return the serialized size of the chunk header + all pages
return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+ (long) pageBuffer.size();
}
public boolean checkPageSizeAndMayOpenANewPage() {
if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
logger.debug("current line count reaches the upper bound, write page {}", measurementId);
return true;
} else if (pageWriter.getPointNumber()
>= valueCountInOnePageForNextCheck) { // need to check memory size
// not checking the memory used for every value
long currentPageSize = pageWriter.estimateMaxMemSize();
if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
// we will write the current page
logger.debug(
"enough size, write page {}, pageSizeThreshold:{}, currentPageSize:{}, valueCountInOnePage:{}",
measurementId,
pageSizeThreshold,
currentPageSize,
pageWriter.getPointNumber());
valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
return true;
} else {
// reset the valueCountInOnePageForNextCheck for the next page
valueCountInOnePageForNextCheck =
(int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
}
}
return false;
}
public void sealCurrentPage() {
// if the page contains no points, we still need to serialize it
if (pageWriter != null && pageWriter.getSize() != 0) {
writePageToPageBuffer();
}
}
public void clearPageWriter() {
pageWriter = null;
}
public int getNumOfPages() {
return numOfPages;
}
public TSDataType getDataType() {
return dataType;
}
/**
* write the page to specified IOWriter.
*
* @param writer the specified IOWriter
* @throws IOException exception in IO
*/
public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
if (statistics.getCount() == 0) {
if (pageBuffer.size() == 0) {
return;
}
// In order to ensure that different chunkgroups in a tsfile have the same chunks or if all
// data of this timeseries has been deleted, it is possible to have an empty valueChunk in a
// chunkGroup during compaction. To save the disk space, we only serialize chunkHeader for the
// empty valueChunk, whose dataSize is 0.
writer.startFlushChunk(
measurementId,
compressionType,
dataType,
encodingType,
statistics,
0,
0,
TsFileConstant.VALUE_COLUMN_MASK);
writer.endCurrentChunk();
return;
}
// start to write this column chunk
writer.startFlushChunk(
measurementId,
compressionType,
dataType,
encodingType,
statistics,
pageBuffer.size(),
numOfPages,
TsFileConstant.VALUE_COLUMN_MASK);
long dataOffset = writer.getPos();
// write all pages of this column
writer.writeBytesToStream(pageBuffer);
int dataSize = (int) (writer.getPos() - dataOffset);
if (dataSize != pageBuffer.size()) {
throw new IOException(
"Bytes written is inconsistent with the size of data: "
+ dataSize
+ " !="
+ " "
+ pageBuffer.size());
}
writer.endCurrentChunk();
}
public String getMeasurementId() {
return measurementId;
}
public TSEncoding getEncodingType() {
return encodingType;
}
public CompressionType getCompressionType() {
return compressionType;
}
/** only used for test */
public PublicBAOS getPageBuffer() {
return pageBuffer;
}
public boolean checkIsUnsealedPageOverThreshold(long size) {
return pageWriter.estimateMaxMemSize() >= size;
}
public ValuePageWriter getPageWriter() {
return pageWriter;
}
}