Fix compaction writer size checkpointing (#17941)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 96d4369..52d6512 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -22,12 +22,15 @@
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.FollowingBatchCompactionAlignedChunkWriter;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.file.header.PageHeader;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -63,6 +66,13 @@
   // The index of the array corresponds to subTaskId.
   protected int[] chunkPointNumArray = new int[subTaskNum];
 
+  // Each sub task has estimated total size of written points in current chunk.
+  // The index of the array corresponds to subTaskId.
+  protected long[] writtenPointTotalSizeArray = new long[subTaskNum];
+
+  // Whether each sub task's current chunk writer contains TEXT, STRING, BLOB or OBJECT.
+  protected boolean[] hasVariableLengthTypeArray = new boolean[subTaskNum];
+
   // used to control the target chunk size
   protected long targetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
 
@@ -74,7 +84,12 @@
   @SuppressWarnings("squid:S1170")
   private final long checkPoint = (targetChunkPointNum >= 10 ? targetChunkPointNum : 10) / 10;
 
-  private long lastCheckIndex = 0;
+  private final long[] lastCheckIndexArray = new long[subTaskNum];
+
+  // When estimated size of written points reaches check point, then check chunk size.
+  private final long writtenPointTotalSizeCheckPoint = Math.max(targetChunkSize / 10, 1L);
+
+  private final long[] lastWrittenPointTotalSizeCheckIndexArray = new long[subTaskNum];
 
   // if unsealed chunk size is lower then this, then deserialize next chunk no matter it is
   // overlapped or not
@@ -115,10 +130,24 @@
   }
 
   public void startMeasurement(String measurement, IChunkWriter chunkWriter, int subTaskId) {
-    lastCheckIndex = 0;
+    resetChunkWriterStatistics(subTaskId);
     lastTimeSet[subTaskId] = false;
     chunkWriters[subTaskId] = chunkWriter;
     measurementId[subTaskId] = measurement;
+    hasVariableLengthTypeArray[subTaskId] = containsVariableLengthType(chunkWriter);
+  }
+
+  private boolean containsVariableLengthType(IChunkWriter chunkWriter) {
+    if (chunkWriter instanceof ChunkWriterImpl) {
+      return ((ChunkWriterImpl) chunkWriter).getDataType().isBinary();
+    }
+    AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
+    for (ValueChunkWriter valueChunkWriter : alignedChunkWriter.getValueChunkWriterList()) {
+      if (valueChunkWriter.getDataType().isBinary()) {
+        return true;
+      }
+    }
+    return false;
   }
 
   public abstract void endMeasurement(int subTaskId) throws IOException;
@@ -139,7 +168,9 @@
    */
   public abstract void checkAndMayFlushChunkMetadata() throws IOException;
 
-  protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter) {
+  protected void writeDataPoint(
+      long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter, int subTaskId) {
+    long writtenPointTotalSize = 0;
     if (chunkWriter instanceof ChunkWriterImpl) {
       ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
       switch (chunkWriterImpl.getDataType()) {
@@ -147,6 +178,7 @@
         case STRING:
         case BLOB:
           chunkWriterImpl.write(timestamp, value.getBinary());
+          writtenPointTotalSize += value.getBinary().getLength();
           break;
         case DOUBLE:
           chunkWriterImpl.write(timestamp, value.getDouble());
@@ -172,7 +204,85 @@
     } else {
       AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
       alignedChunkWriter.write(timestamp, value.getVector());
+      if (hasVariableLengthTypeArray[subTaskId]) {
+        writtenPointTotalSize = estimateWrittenPointTotalSize(value);
+      }
     }
+    chunkPointNumArray[subTaskId]++;
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      writtenPointTotalSizeArray[subTaskId] += writtenPointTotalSize;
+    }
+  }
+
+  private long estimateWrittenPointTotalSize(TsPrimitiveType value) {
+    long size = Long.BYTES;
+    TsPrimitiveType[] vector = value.getVector();
+    for (TsPrimitiveType tsPrimitiveType : vector) {
+      if (tsPrimitiveType == null) {
+        continue;
+      }
+      TSDataType dataType = tsPrimitiveType.getDataType();
+      switch (dataType) {
+        case TEXT:
+        case STRING:
+        case BLOB:
+          size += tsPrimitiveType.getBinary().getLength();
+          break;
+        case DOUBLE:
+        case INT64:
+        case TIMESTAMP:
+          size += Long.BYTES;
+          break;
+        case INT32:
+        case DATE:
+        case FLOAT:
+          size += Integer.BYTES;
+          break;
+        case BOOLEAN:
+          size += 1;
+          break;
+        default:
+          break;
+      }
+    }
+    return size;
+  }
+
+  protected long estimateWrittenPointTotalSize(TsBlock tsBlock) {
+    int pointCount = tsBlock.getPositionCount();
+    long size = (long) Long.BYTES * pointCount;
+    Column[] columns = tsBlock.getValueColumns();
+    for (Column column : columns) {
+      TSDataType dataType = column.getDataType();
+      if (dataType.isBinary()) {
+        for (int j = 0; j < pointCount; j++) {
+          if (column.isNull(j)) {
+            continue;
+          }
+          size += column.getBinary(j).getLength();
+        }
+        continue;
+      }
+      // This is only used as a checkpoint estimate, so fixed-width values use count directly.
+      switch (dataType) {
+        case DOUBLE:
+        case INT64:
+        case TIMESTAMP:
+          size += (long) Long.BYTES * pointCount;
+          break;
+        case INT32:
+        case DATE:
+        case FLOAT:
+          size += (long) Integer.BYTES * pointCount;
+          break;
+        case BOOLEAN:
+          size += pointCount;
+          break;
+        default:
+          break;
+      }
+    }
+    return size;
   }
 
   @SuppressWarnings("squid:S2445")
@@ -182,7 +292,14 @@
     synchronized (targetWriter) {
       targetWriter.writeChunk(chunkWriter);
     }
+    resetChunkWriterStatistics(subTaskId);
+  }
+
+  private void resetChunkWriterStatistics(int subTaskId) {
     chunkPointNumArray[subTaskId] = 0;
+    writtenPointTotalSizeArray[subTaskId] = 0;
+    lastCheckIndexArray[subTaskId] = 0;
+    lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = 0;
   }
 
   public abstract boolean flushNonAlignedChunk(
@@ -204,7 +321,7 @@
     synchronized (targetWriter) {
       // seal last chunk to file writer
       targetWriter.writeChunk(chunkWriters[subTaskId]);
-      chunkPointNumArray[subTaskId] = 0;
+      resetChunkWriterStatistics(subTaskId);
       targetWriter.writeChunk(chunk, chunkMetadata);
     }
   }
@@ -222,7 +339,7 @@
       AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriters[subTaskId];
       // seal last chunk to file writer
       targetWriter.writeChunk(alignedChunkWriter);
-      chunkPointNumArray[subTaskId] = 0;
+      resetChunkWriterStatistics(subTaskId);
 
       targetWriter.markStartingWritingAligned();
 
@@ -269,6 +386,9 @@
     chunkWriter.writePageHeaderAndDataIntoBuff(compressedPageData, pageHeader);
 
     chunkPointNumArray[subTaskId] += pageHeader.getStatistics().getCount();
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      writtenPointTotalSizeArray[subTaskId] += pageHeader.getSerializedPageSize();
+    }
   }
 
   public abstract boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTaskId)
@@ -293,29 +413,51 @@
     // flush new time page to chunk writer directly
     alignedChunkWriter.writePageHeaderAndDataIntoTimeBuff(compressedTimePageData, timePageHeader);
 
+    long writtenValuePageSize = 0;
     // flush new value pages to chunk writer directly
     for (int i = 0; i < valuePageHeaders.size(); i++) {
-      if (valuePageHeaders.get(i) == null) {
+      PageHeader valuePageHeader = valuePageHeaders.get(i);
+      if (valuePageHeader == null) {
         // sub sensor does not exist in current file or value page has been deleted completely
         alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer();
         continue;
       }
       alignedChunkWriter.writePageHeaderAndDataIntoValueBuff(
-          compressedValuePageDatas.get(i), valuePageHeaders.get(i), i);
+          compressedValuePageDatas.get(i), valuePageHeader, i);
+      if (hasVariableLengthTypeArray[subTaskId]) {
+        writtenValuePageSize += valuePageHeader.getSerializedPageSize();
+      }
     }
 
     chunkPointNumArray[subTaskId] += timePageHeader.getStatistics().getCount();
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      // Direct-flushed pages are already serialized, so use page size as checkpoint estimate.
+      writtenPointTotalSizeArray[subTaskId] +=
+          timePageHeader.getSerializedPageSize() + writtenValuePageSize;
+    }
   }
 
   protected void checkChunkSizeAndMayOpenANewChunk(
       CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int subTaskId)
       throws IOException {
-    if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) {
-      // if chunk point num reaches the check point, then check if the chunk size over threshold
-      lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint;
+    if (chunkWriter instanceof FollowingBatchCompactionAlignedChunkWriter
+        && chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
+      sealChunk(fileWriter, chunkWriter, subTaskId);
+      return;
+    }
+    boolean reachesPointCheckPoint =
+        chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1) * checkPoint;
+    boolean reachesSizeCheckPoint =
+        hasVariableLengthTypeArray[subTaskId]
+            && writtenPointTotalSizeArray[subTaskId]
+                >= (lastWrittenPointTotalSizeCheckIndexArray[subTaskId] + 1)
+                    * writtenPointTotalSizeCheckPoint;
+    if (reachesPointCheckPoint || reachesSizeCheckPoint) {
+      lastCheckIndexArray[subTaskId] = chunkPointNumArray[subTaskId] / checkPoint;
+      lastWrittenPointTotalSizeCheckIndexArray[subTaskId] =
+          writtenPointTotalSizeArray[subTaskId] / writtenPointTotalSizeCheckPoint;
       if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
         sealChunk(fileWriter, chunkWriter, subTaskId);
-        lastCheckIndex = 0;
       }
     }
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index d630d5e..777e77a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -133,8 +133,7 @@
 
     checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
     int fileIndex = seqFileIndexArray[subTaskId];
-    writeDataPoint(timestamp, value, chunkWriters[subTaskId]);
-    chunkPointNumArray[subTaskId]++;
+    writeDataPoint(timestamp, value, chunkWriters[subTaskId], subTaskId);
     checkChunkSizeAndMayOpenANewChunk(
         targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
     isDeviceExistedInTargetFiles[fileIndex] = true;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index c460f7e..77b7ea4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -115,8 +115,8 @@
   @Override
   public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException {
     checkPreviousTimestamp(timeValuePair.getTimestamp(), subTaskId);
-    writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]);
-    chunkPointNumArray[subTaskId]++;
+    writeDataPoint(
+        timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId);
     checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
     lastTime[subTaskId] = timeValuePair.getTimestamp();
     lastTimeSet[subTaskId] = true;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java
index 6519101..daf66ef 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java
@@ -182,6 +182,8 @@
         valuePageHeaders,
         subTaskId);
 
+    checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
+
     lastTime[subTaskId] = timePageHeader.getEndTime();
     lastTimeSet[subTaskId] = true;
     return true;
@@ -218,6 +220,8 @@
         valuePageHeaders,
         subTaskId);
 
+    checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
+
     lastTime[subTaskId] = timePageHeader.getEndTime();
     lastTimeSet[subTaskId] = true;
     return true;
@@ -228,10 +232,12 @@
    * successfully or not. Return false if the unsealed page is too small or the end time of page
    * exceeds the end time of file, else return true.
    *
+   * @throws IOException if io errors occurred
    * @throws PageException if errors occurred when write data page header
    */
   public boolean flushNonAlignedPage(
-      ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) throws PageException {
+      ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId)
+      throws IOException, PageException {
     checkPreviousTimestamp(pageHeader.getStartTime(), subTaskId);
     boolean isUnsealedPageOverThreshold =
         chunkWriters[subTaskId].checkIsUnsealedPageOverThreshold(
@@ -244,6 +250,8 @@
     flushNonAlignedPageToChunkWriter(
         (ChunkWriterImpl) chunkWriters[subTaskId], compressedPageData, pageHeader, subTaskId);
 
+    checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
+
     lastTime[subTaskId] = pageHeader.getEndTime();
     lastTimeSet[subTaskId] = true;
     return true;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
index 231bd75..e5812b1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
@@ -56,13 +56,16 @@
     checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), subTaskId);
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
+    chunkPointNumArray[subTaskId] += batchSize;
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock);
+    }
     synchronized (this) {
       // we need to synchronized here to avoid multi-thread competition in sub-task
       TsFileResource resource = targetResources.get(seqFileIndexArray[subTaskId]);
       resource.updateStartTime(deviceId, timestamps.getStartTime());
       resource.updateEndTime(deviceId, timestamps.getEndTime());
     }
-    chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
     checkChunkSizeAndMayOpenANewChunk(
         targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, subTaskId);
     isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
index 2e85124..a67a0b56 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
@@ -54,7 +54,10 @@
     int batchSize = tsBlock.getPositionCount();
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
-    chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
+    chunkPointNumArray[subTaskId] += batchSize;
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock);
+    }
     checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
index 5d46eb5..d8465ee 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
@@ -74,8 +74,8 @@
   }
 
   private void writeToChunkWriter(TimeValuePair timeValuePair, int subTaskId) throws IOException {
-    writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]);
-    chunkPointNumArray[subTaskId]++;
+    writeDataPoint(
+        timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId);
     checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
   }
 
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java
new file mode 100644
index 0000000..ee8a54d
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer;
+
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractCompactionWriterTest {
+
+  private static final int SUB_TASK_ID = 0;
+
+  @Test
+  public void testBinarySizeCheckpointTriggersChunkSizeCheckBeforePointCheckpoint()
+      throws IOException, PageException {
+    TestCompactionWriter compactionWriter = new TestCompactionWriter();
+    CountingChunkWriter chunkWriter = new CountingChunkWriter();
+    PageHeader pageHeader =
+        createPageHeader(compactionWriter.getCompressedSizeToReachSizeCheckpoint());
+
+    compactionWriter.startMeasurement("s1", chunkWriter, SUB_TASK_ID);
+    compactionWriter.flushNonAlignedPageToChunkWriter(
+        chunkWriter, ByteBuffer.allocate(0), pageHeader, SUB_TASK_ID);
+    compactionWriter.checkChunkSizeAndMayOpenANewChunk(null, chunkWriter, SUB_TASK_ID);
+
+    Assert.assertEquals(1, chunkWriter.chunkSizeCheckCount);
+  }
+
+  private static PageHeader createPageHeader(int compressedSize) {
+    Statistics<?> statistics = Statistics.getStatsByType(TSDataType.TEXT);
+    return new PageHeader(compressedSize, compressedSize, statistics);
+  }
+
+  private static class CountingChunkWriter extends ChunkWriterImpl {
+
+    private int chunkSizeCheckCount;
+
+    private CountingChunkWriter() {
+      super(new MeasurementSchema("s1", TSDataType.TEXT));
+    }
+
+    @Override
+    public boolean checkIsChunkSizeOverThreshold(
+        long size, long pointNum, boolean returnTrueIfChunkEmpty) {
+      chunkSizeCheckCount++;
+      return false;
+    }
+  }
+
+  private static class TestCompactionWriter extends AbstractCompactionWriter {
+
+    private int getCompressedSizeToReachSizeCheckpoint() {
+      return (int) Math.max(targetChunkSize / 10, 1L);
+    }
+
+    @Override
+    public void startChunkGroup(IDeviceID deviceId, boolean isAlign) {}
+
+    @Override
+    public void endChunkGroup() {}
+
+    @Override
+    public void endMeasurement(int subTaskId) {}
+
+    @Override
+    public void write(TimeValuePair timeValuePair, int subTaskId) {}
+
+    @Override
+    public void write(TsBlock tsBlock, int subTaskId) {}
+
+    @Override
+    public void endFile() {}
+
+    @Override
+    public long getWriterSize() {
+      return 0;
+    }
+
+    @Override
+    public void checkAndMayFlushChunkMetadata() {}
+
+    @Override
+    public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) {
+      return false;
+    }
+
+    @Override
+    public boolean flushAlignedChunk(ChunkMetadataElement chunkMetadataElement, int subTaskId) {
+      return false;
+    }
+
+    @Override
+    public boolean flushBatchedValueChunk(
+        ChunkMetadataElement chunkMetadataElement,
+        int subTaskId,
+        AbstractCompactionFlushController flushController) {
+      return false;
+    }
+
+    @Override
+    public boolean flushNonAlignedPage(
+        ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) {
+      return false;
+    }
+
+    @Override
+    public boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTaskId) {
+      return false;
+    }
+
+    @Override
+    public boolean flushBatchedValuePage(
+        AlignedPageElement alignedPageElement,
+        int subTaskId,
+        AbstractCompactionFlushController flushController) {
+      return false;
+    }
+
+    @Override
+    public void close() {}
+  }
+}