[IOTDB-3656] basic mpp load implement (#6764)

diff --git a/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 05b32e8..24a4418 100644
--- a/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -121,7 +121,8 @@
 
   /** the exact serialized size of chunk header */
   public static int getSerializedSize(String measurementID, int dataSize) {
-    int measurementIdLength = measurementID.getBytes(TSFileConfig.STRING_CHARSET).length;
+    int measurementIdLength =
+        measurementID == null ? 0 : measurementID.getBytes(TSFileConfig.STRING_CHARSET).length;
     return Byte.BYTES // chunkType
         + ReadWriteForEncodingUtils.varIntSize(measurementIdLength) // measurementID length
         + measurementIdLength // measurementID
diff --git a/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 452ecb9..2c1020c 100644
--- a/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -1075,7 +1075,7 @@
    * @param position the offset of the chunk data
    * @return the pages of this chunk
    */
-  private ByteBuffer readChunk(long position, int dataSize) throws IOException {
+  public ByteBuffer readChunk(long position, int dataSize) throws IOException {
     try {
       return readData(position, dataSize);
     } catch (Throwable t) {
diff --git a/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index a1fc973..2737317 100644
--- a/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ b/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -21,6 +21,8 @@
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
 import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -32,6 +34,7 @@
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
@@ -125,6 +128,30 @@
     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
   }
 
+  public void write(long time, int value, boolean isNull, int valueIndex) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void write(long time, long value, boolean isNull, int valueIndex) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void write(long time, boolean value, boolean isNull, int valueIndex) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void write(long time, float value, boolean isNull, int valueIndex) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void write(long time, double value, boolean isNull, int valueIndex) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void write(long time, Binary value, boolean isNull, int valueIndex) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
   public void write(long time, TsPrimitiveType[] points) {
     valueIndex = 0;
     for (TsPrimitiveType point : points) {
@@ -270,6 +297,16 @@
     }
   }
 
+  public void writePageHeaderAndDataIntoTimeBuff(ByteBuffer data, PageHeader header)
+      throws PageException {
+    timeChunkWriter.writePageHeaderAndDataIntoBuff(data, header);
+  }
+
+  public void writePageHeaderAndDataIntoValueBuff(
+      ByteBuffer data, PageHeader header, int valueIndex) throws PageException {
+    valueChunkWriterList.get(valueIndex).writePageHeaderAndDataIntoBuff(data, header);
+  }
+
   @Override
   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
     timeChunkWriter.writeToFileWriter(tsfileWriter);
diff --git a/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index 59a1fd4..dc2f2d1 100644
--- a/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.compress.ICompressor;
 import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -30,6 +31,7 @@
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.iotdb.tsfile.write.page.TimePageWriter;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -37,6 +39,9 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 
 public class TimeChunkWriter {
 
@@ -168,6 +173,53 @@
     }
   }
 
+  public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+      throws PageException {
+    // write the page header to pageBuffer
+    try {
+      logger.debug(
+          "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
+      // serialize pageHeader  see writePageToPageBuffer method
+      if (numOfPages == 0) { // record the firstPageStatistics
+        this.firstPageStatistics = header.getStatistics();
+        this.sizeWithoutStatistic +=
+            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        this.sizeWithoutStatistic +=
+            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+        byte[] b = pageBuffer.toByteArray();
+        pageBuffer.reset();
+        pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+        firstPageStatistics.serialize(pageBuffer);
+        pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+        header.getStatistics().serialize(pageBuffer);
+        firstPageStatistics = null;
+      } else {
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+        header.getStatistics().serialize(pageBuffer);
+      }
+      logger.debug(
+          "finish to flush a page header {} of time page into buffer, buffer position {} ",
+          header,
+          pageBuffer.size());
+
+      statistics.mergeStatistics(header.getStatistics());
+
+    } catch (IOException e) {
+      throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
+    }
+    numOfPages++;
+    // write page content to temp PBAOS
+    try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+      channel.write(data);
+    } catch (IOException e) {
+      throw new PageException(e);
+    }
+  }
+
   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
     sealCurrentPage();
     writeAllPagesOfChunkToTsFile(tsfileWriter);
diff --git a/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 8b75269..98d67a6 100644
--- a/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.compress.ICompressor;
 import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -30,6 +31,7 @@
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -38,6 +40,9 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 
 public class ValueChunkWriter {
 
@@ -187,6 +192,57 @@
     }
   }
 
+  public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+      throws PageException {
+    // write the page header to pageBuffer
+    try {
+      logger.debug(
+          "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
+      // serialize pageHeader  see writePageToPageBuffer method
+      if (numOfPages == 0) { // record the firstPageStatistics
+        if (header.getStatistics() != null) {
+          this.firstPageStatistics = header.getStatistics();
+        }
+        this.sizeWithoutStatistic +=
+            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        this.sizeWithoutStatistic +=
+            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+        if (firstPageStatistics != null) {
+          byte[] b = pageBuffer.toByteArray();
+          pageBuffer.reset();
+          pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+          firstPageStatistics.serialize(pageBuffer);
+          pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+        }
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+        header.getStatistics().serialize(pageBuffer);
+        firstPageStatistics = null;
+      } else {
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+        header.getStatistics().serialize(pageBuffer);
+      }
+      logger.debug(
+          "finish to flush a page header {} of time page into buffer, buffer position {} ",
+          header,
+          pageBuffer.size());
+
+      statistics.mergeStatistics(header.getStatistics());
+
+    } catch (IOException e) {
+      throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
+    }
+    numOfPages++;
+    // write page content to temp PBAOS
+    try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+      channel.write(data);
+    } catch (IOException e) {
+      throw new PageException(e);
+    }
+  }
+
   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
     sealCurrentPage();
     writeAllPagesOfChunkToTsFile(tsfileWriter);
diff --git a/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2f865f2..22a68bd 100644
--- a/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -234,6 +234,19 @@
     }
   }
 
+  public void writeChunk(Chunk chunk) throws IOException {
+    ChunkHeader chunkHeader = chunk.getHeader();
+    currentChunkMetadata =
+        new ChunkMetadata(
+            chunkHeader.getMeasurementID(),
+            chunkHeader.getDataType(),
+            out.getPosition(),
+            chunk.getChunkStatistic());
+    chunkHeader.serializeTo(out.wrapAsStream());
+    out.write(chunk.getData());
+    endCurrentChunk();
+  }
+
   /** end chunk and write some log. */
   public void endCurrentChunk() {
     chunkMetadataList.add(currentChunkMetadata);