[IOTDB-3656] basic mpp load implement (#6764)
diff --git a/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 05b32e8..24a4418 100644
--- a/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -121,7 +121,8 @@
/** the exact serialized size of chunk header */
public static int getSerializedSize(String measurementID, int dataSize) {
- int measurementIdLength = measurementID.getBytes(TSFileConfig.STRING_CHARSET).length;
+ int measurementIdLength =
+ measurementID == null ? 0 : measurementID.getBytes(TSFileConfig.STRING_CHARSET).length;
return Byte.BYTES // chunkType
+ ReadWriteForEncodingUtils.varIntSize(measurementIdLength) // measurementID length
+ measurementIdLength // measurementID
diff --git a/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 452ecb9..2c1020c 100644
--- a/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -1075,7 +1075,7 @@
* @param position the offset of the chunk data
* @return the pages of this chunk
*/
- private ByteBuffer readChunk(long position, int dataSize) throws IOException {
+ public ByteBuffer readChunk(long position, int dataSize) throws IOException {
try {
return readData(position, dataSize);
} catch (Throwable t) {
diff --git a/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index a1fc973..2737317 100644
--- a/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ b/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -21,6 +21,8 @@
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -32,6 +34,7 @@
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -125,6 +128,30 @@
valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
}
+ public void write(long time, int value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, long value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, boolean value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, float value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, double value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, Binary value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
public void write(long time, TsPrimitiveType[] points) {
valueIndex = 0;
for (TsPrimitiveType point : points) {
@@ -270,6 +297,16 @@
}
}
+ public void writePageHeaderAndDataIntoTimeBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
+ timeChunkWriter.writePageHeaderAndDataIntoBuff(data, header);
+ }
+
+ public void writePageHeaderAndDataIntoValueBuff(
+ ByteBuffer data, PageHeader header, int valueIndex) throws PageException {
+ valueChunkWriterList.get(valueIndex).writePageHeaderAndDataIntoBuff(data, header);
+ }
+
@Override
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
timeChunkWriter.writeToFileWriter(tsfileWriter);
diff --git a/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index 59a1fd4..dc2f2d1 100644
--- a/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -30,6 +31,7 @@
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.write.page.TimePageWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -37,6 +39,9 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
public class TimeChunkWriter {
@@ -168,6 +173,53 @@
}
}
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
+ // write the page header to pageBuffer
+ try {
+ logger.debug(
+ "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
+ // serialize pageHeader see writePageToPageBuffer method
+ if (numOfPages == 0) { // record the firstPageStatistics
+ this.firstPageStatistics = header.getStatistics();
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ firstPageStatistics = null;
+ } else {
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ }
+ logger.debug(
+ "finish to flush a page header {} of time page into buffer, buffer position {} ",
+ header,
+ pageBuffer.size());
+
+ statistics.mergeStatistics(header.getStatistics());
+
+ } catch (IOException e) {
+ throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
+ }
+ numOfPages++;
+ // write page content to temp PBAOS
+ try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+ channel.write(data);
+ } catch (IOException e) {
+ throw new PageException(e);
+ }
+ }
+
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
sealCurrentPage();
writeAllPagesOfChunkToTsFile(tsfileWriter);
diff --git a/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 8b75269..98d67a6 100644
--- a/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -30,6 +31,7 @@
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -38,6 +40,9 @@
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
public class ValueChunkWriter {
@@ -187,6 +192,57 @@
}
}
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
+ // write the page header to pageBuffer
+ try {
+ logger.debug(
+ "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
+ // serialize pageHeader see writePageToPageBuffer method
+ if (numOfPages == 0) { // record the firstPageStatistics
+ if (header.getStatistics() != null) {
+ this.firstPageStatistics = header.getStatistics();
+ }
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ if (firstPageStatistics != null) {
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ }
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ firstPageStatistics = null;
+ } else {
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ }
+ logger.debug(
+ "finish to flush a page header {} of time page into buffer, buffer position {} ",
+ header,
+ pageBuffer.size());
+
+ statistics.mergeStatistics(header.getStatistics());
+
+ } catch (IOException e) {
+ throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
+ }
+ numOfPages++;
+ // write page content to temp PBAOS
+ try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+ channel.write(data);
+ } catch (IOException e) {
+ throw new PageException(e);
+ }
+ }
+
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
sealCurrentPage();
writeAllPagesOfChunkToTsFile(tsfileWriter);
diff --git a/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2f865f2..22a68bd 100644
--- a/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -234,6 +234,19 @@
}
}
+ public void writeChunk(Chunk chunk) throws IOException {
+ ChunkHeader chunkHeader = chunk.getHeader();
+ currentChunkMetadata =
+ new ChunkMetadata(
+ chunkHeader.getMeasurementID(),
+ chunkHeader.getDataType(),
+ out.getPosition(),
+ chunk.getChunkStatistic());
+ chunkHeader.serializeTo(out.wrapAsStream());
+ out.write(chunk.getData());
+ endCurrentChunk();
+ }
+
/** end chunk and write some log. */
public void endCurrentChunk() {
chunkMetadataList.add(currentChunkMetadata);