blob: 013ab714c6ef113495ea27e0005e84e4fc390292 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write.chunk;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.encoding.encoder.SDTEncoder;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.write.page.PageWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
public class ChunkWriterImpl implements IChunkWriter {
private static final Logger logger = LoggerFactory.getLogger(ChunkWriterImpl.class);
private final IMeasurementSchema measurementSchema;
private final ICompressor compressor;
/** all pages of this chunk. */
private final PublicBAOS pageBuffer;
private int numOfPages;
/** write data into current page */
private PageWriter pageWriter;
/** page size threshold. */
private final long pageSizeThreshold;
private final int maxNumberOfPointsInPage;
/** value count in current page. */
private int valueCountInOnePageForNextCheck;
// initial value for valueCountInOnePageForNextCheck
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
/** statistic of this chunk. */
private Statistics statistics;
/** SDT parameters */
private boolean isSdtEncoding;
// When the ChunkWriter WILL write the last data point in the chunk, set it to true to tell SDT
// saves the point.
private boolean isLastPoint;
// do not re-execute SDT compression when merging chunks
private boolean isMerging;
private SDTEncoder sdtEncoder;
private static final String LOSS = "loss";
private static final String SDT = "sdt";
private static final String SDT_COMP_DEV = "compdev";
private static final String SDT_COMP_MIN_TIME = "compmintime";
private static final String SDT_COMP_MAX_TIME = "compmaxtime";
/** first page info */
private int sizeWithoutStatistic;
private Statistics<?> firstPageStatistics;
/** @param schema schema of this measurement */
public ChunkWriterImpl(IMeasurementSchema schema) {
this.measurementSchema = schema;
this.compressor = ICompressor.getCompressor(schema.getCompressor());
this.pageBuffer = new PublicBAOS();
this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.maxNumberOfPointsInPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
// initial check of memory usage. So that we have enough data to make an initial prediction
this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
// init statistics for this chunk and page
this.statistics = Statistics.getStatsByType(measurementSchema.getType());
this.pageWriter = new PageWriter(measurementSchema);
this.pageWriter.setTimeEncoder(measurementSchema.getTimeEncoder());
this.pageWriter.setValueEncoder(measurementSchema.getValueEncoder());
// check if the measurement schema uses SDT
checkSdtEncoding();
}
public ChunkWriterImpl(IMeasurementSchema schema, boolean isMerging) {
this(schema);
this.isMerging = isMerging;
}
private void checkSdtEncoding() {
if (measurementSchema.getProps() != null && !isMerging) {
if (measurementSchema.getProps().getOrDefault(LOSS, "").equals(SDT)) {
isSdtEncoding = true;
sdtEncoder = new SDTEncoder();
}
if (isSdtEncoding && measurementSchema.getProps().containsKey(SDT_COMP_DEV)) {
sdtEncoder.setCompDeviation(
Double.parseDouble(measurementSchema.getProps().get(SDT_COMP_DEV)));
}
if (isSdtEncoding && measurementSchema.getProps().containsKey(SDT_COMP_MIN_TIME)) {
sdtEncoder.setCompMinTime(
Long.parseLong(measurementSchema.getProps().get(SDT_COMP_MIN_TIME)));
}
if (isSdtEncoding && measurementSchema.getProps().containsKey(SDT_COMP_MAX_TIME)) {
sdtEncoder.setCompMaxTime(
Long.parseLong(measurementSchema.getProps().get(SDT_COMP_MAX_TIME)));
}
}
}
public void write(long time, long value) {
// store last point for sdtEncoding, it still needs to go through encoding process
// in case it exceeds compdev and needs to store second last point
if (!isSdtEncoding || sdtEncoder.encodeLong(time, value)) {
pageWriter.write(
isSdtEncoding ? sdtEncoder.getTime() : time,
isSdtEncoding ? sdtEncoder.getLongValue() : value);
}
if (isSdtEncoding && isLastPoint) {
pageWriter.write(time, value);
}
checkPageSizeAndMayOpenANewPage();
}
public void write(long time, int value) {
if (!isSdtEncoding || sdtEncoder.encodeInt(time, value)) {
pageWriter.write(
isSdtEncoding ? sdtEncoder.getTime() : time,
isSdtEncoding ? sdtEncoder.getIntValue() : value);
}
if (isSdtEncoding && isLastPoint) {
pageWriter.write(time, value);
}
checkPageSizeAndMayOpenANewPage();
}
public void write(long time, boolean value) {
pageWriter.write(time, value);
checkPageSizeAndMayOpenANewPage();
}
public void write(long time, float value) {
if (!isSdtEncoding || sdtEncoder.encodeFloat(time, value)) {
pageWriter.write(
isSdtEncoding ? sdtEncoder.getTime() : time,
isSdtEncoding ? sdtEncoder.getFloatValue() : value);
}
// store last point for sdt encoding
if (isSdtEncoding && isLastPoint) {
pageWriter.write(time, value);
}
checkPageSizeAndMayOpenANewPage();
}
public void write(long time, double value) {
if (!isSdtEncoding || sdtEncoder.encodeDouble(time, value)) {
pageWriter.write(
isSdtEncoding ? sdtEncoder.getTime() : time,
isSdtEncoding ? sdtEncoder.getDoubleValue() : value);
}
if (isSdtEncoding && isLastPoint) {
pageWriter.write(time, value);
}
checkPageSizeAndMayOpenANewPage();
}
public void write(long time, Binary value) {
pageWriter.write(time, value);
checkPageSizeAndMayOpenANewPage();
}
public void write(long[] timestamps, int[] values, int batchSize) {
if (isSdtEncoding) {
batchSize = sdtEncoder.encode(timestamps, values, batchSize);
}
pageWriter.write(timestamps, values, batchSize);
checkPageSizeAndMayOpenANewPage();
}
public void write(long[] timestamps, long[] values, int batchSize) {
if (isSdtEncoding) {
batchSize = sdtEncoder.encode(timestamps, values, batchSize);
}
pageWriter.write(timestamps, values, batchSize);
checkPageSizeAndMayOpenANewPage();
}
public void write(long[] timestamps, boolean[] values, int batchSize) {
pageWriter.write(timestamps, values, batchSize);
checkPageSizeAndMayOpenANewPage();
}
public void write(long[] timestamps, float[] values, int batchSize) {
if (isSdtEncoding) {
batchSize = sdtEncoder.encode(timestamps, values, batchSize);
}
pageWriter.write(timestamps, values, batchSize);
checkPageSizeAndMayOpenANewPage();
}
public void write(long[] timestamps, double[] values, int batchSize) {
if (isSdtEncoding) {
batchSize = sdtEncoder.encode(timestamps, values, batchSize);
}
pageWriter.write(timestamps, values, batchSize);
checkPageSizeAndMayOpenANewPage();
}
public void write(long[] timestamps, Binary[] values, int batchSize) {
pageWriter.write(timestamps, values, batchSize);
checkPageSizeAndMayOpenANewPage();
}
/**
* check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
* to pageBuffer
*/
private void checkPageSizeAndMayOpenANewPage() {
if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
logger.debug("current line count reaches the upper bound, write page {}", measurementSchema);
writePageToPageBuffer();
} else if (pageWriter.getPointNumber()
>= valueCountInOnePageForNextCheck) { // need to check memory size
// not checking the memory used for every value
long currentPageSize = pageWriter.estimateMaxMemSize();
if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
// we will write the current page
logger.debug(
"enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
measurementSchema.getMeasurementId(),
pageSizeThreshold,
currentPageSize,
pageWriter.getPointNumber());
writePageToPageBuffer();
valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
} else {
// reset the valueCountInOnePageForNextCheck for the next page
valueCountInOnePageForNextCheck =
(int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
}
}
}
private void writePageToPageBuffer() {
try {
if (numOfPages == 0) { // record the firstPageStatistics
this.firstPageStatistics = pageWriter.getStatistics();
this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
} else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
byte[] b = pageBuffer.toByteArray();
pageBuffer.reset();
pageBuffer.write(b, 0, this.sizeWithoutStatistic);
firstPageStatistics.serialize(pageBuffer);
pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
firstPageStatistics = null;
} else {
pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
}
// update statistics of this chunk
numOfPages++;
this.statistics.mergeStatistics(pageWriter.getStatistics());
} catch (IOException e) {
logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
} finally {
// clear start time stamp for next initializing
pageWriter.reset(measurementSchema);
}
}
@Override
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
sealCurrentPage();
writeAllPagesOfChunkToTsFile(tsfileWriter, statistics);
// reinit this chunk writer
pageBuffer.reset();
numOfPages = 0;
sizeWithoutStatistic = 0;
firstPageStatistics = null;
this.statistics = Statistics.getStatsByType(measurementSchema.getType());
}
@Override
public long estimateMaxSeriesMemSize() {
return pageBuffer.size()
+ pageWriter.estimateMaxMemSize()
+ PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+ pageWriter.getStatistics().getSerializedSize();
}
@Override
public long getSerializedChunkSize() {
if (pageBuffer.size() == 0) {
return 0;
}
// return the serialized size of the chunk header + all pages
return ChunkHeader.getSerializedSize(measurementSchema.getMeasurementId(), pageBuffer.size())
+ (long) pageBuffer.size();
}
@Override
public void sealCurrentPage() {
if (pageWriter != null && pageWriter.getPointNumber() > 0) {
writePageToPageBuffer();
}
}
@Override
public void clearPageWriter() {
pageWriter = null;
}
@Override
public boolean checkIsUnsealedPageOverThreshold(
long size, long pointNum, boolean returnTrueIfPageEmpty) {
if (returnTrueIfPageEmpty && pageWriter.getPointNumber() == 0) {
// return true if there is no unsealed page
return true;
}
return pageWriter.getPointNumber() >= pointNum || pageWriter.estimateMaxMemSize() >= size;
}
@Override
public boolean checkIsChunkSizeOverThreshold(
long size, long pointNum, boolean returnTrueIfChunkEmpty) {
if (returnTrueIfChunkEmpty && statistics.getCount() + pageWriter.getPointNumber() == 0) {
// return true if there is no unsealed chunk
return true;
}
return estimateMaxSeriesMemSize() >= size
|| statistics.getCount() + pageWriter.getPointNumber() >= pointNum;
}
@Override
public boolean isEmpty() {
return statistics.getCount() + pageWriter.getPointNumber() == 0;
}
public TSDataType getDataType() {
return measurementSchema.getType();
}
/**
* write the page header and data into the PageWriter's output stream. @NOTE: for upgrading
* 0.11/v2 to 0.12/v3 TsFile
*/
public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
throws PageException {
// write the page header to pageBuffer
try {
logger.debug(
"start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
// serialize pageHeader see writePageToPageBuffer method
if (numOfPages == 0) { // record the firstPageStatistics
this.firstPageStatistics = header.getStatistics();
this.sizeWithoutStatistic +=
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
this.sizeWithoutStatistic +=
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
} else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
byte[] b = pageBuffer.toByteArray();
pageBuffer.reset();
pageBuffer.write(b, 0, this.sizeWithoutStatistic);
firstPageStatistics.serialize(pageBuffer);
pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
header.getStatistics().serialize(pageBuffer);
firstPageStatistics = null;
} else {
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
header.getStatistics().serialize(pageBuffer);
}
logger.debug(
"finish to flush a page header {} of {} into buffer, buffer position {} ",
header,
measurementSchema.getMeasurementId(),
pageBuffer.size());
statistics.mergeStatistics(header.getStatistics());
} catch (IOException e) {
throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
}
numOfPages++;
// write page content to temp PBAOS
try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
channel.write(data);
} catch (IOException e) {
throw new PageException(e);
}
}
/**
* write the page to specified IOWriter.
*
* @param writer the specified IOWriter
* @param statistics the chunk statistics
* @throws IOException exception in IO
*/
private void writeAllPagesOfChunkToTsFile(
TsFileIOWriter writer, Statistics<? extends Serializable> statistics) throws IOException {
if (statistics.getCount() == 0) {
return;
}
// start to write this column chunk
writer.startFlushChunk(
measurementSchema.getMeasurementId(),
compressor.getType(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
statistics,
pageBuffer.size(),
numOfPages,
0);
long dataOffset = writer.getPos();
// write all pages of this column
writer.writeBytesToStream(pageBuffer);
int dataSize = (int) (writer.getPos() - dataOffset);
if (dataSize != pageBuffer.size()) {
throw new IOException(
"Bytes written is inconsistent with the size of data: "
+ dataSize
+ " !="
+ " "
+ pageBuffer.size());
}
writer.endCurrentChunk();
}
public void setIsMerging(boolean isMerging) {
this.isMerging = isMerging;
}
public boolean isMerging() {
return isMerging;
}
public void setLastPoint(boolean isLastPoint) {
this.isLastPoint = isLastPoint;
}
/** Only used for tests. */
public PageWriter getPageWriter() {
return pageWriter;
}
}