[IOTDB-4399] Control text chunk size in memtable (#7320)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index cd759c7..6a275d7 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -385,6 +385,11 @@
 # Datatype: int
 # avg_series_point_number_threshold=100000
 
+# When a chunk in memtable reaches this threshold, flush the memtable to disk.
+# The default threshold is 20 MB.
+# Datatype: long
+# max_chunk_raw_size_threshold = 20971520
+
 # How many threads can concurrently flush. When <= 0, use CPU core number.
 # Datatype: int
 # concurrent_flush_thread=0
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 853e37d..c277e65 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,6 +397,9 @@
   /** When average series point number reaches this, flush the memtable to disk */
   private int avgSeriesPointNumberThreshold = 100000;
 
+  /** When a chunk in memtable reaches this threshold, flush the memtable to disk */
+  private long maxChunkRawSizeThreshold = 1024 * 1024 * 20L;
+
   /** Enable inner space compaction for sequence files */
   private boolean enableSeqSpaceCompaction = true;
 
@@ -1970,6 +1973,14 @@
     this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold;
   }
 
+  public long getMaxChunkRawSizeThreshold() {
+    return maxChunkRawSizeThreshold;
+  }
+
+  public void setMaxChunkRawSizeThreshold(long maxChunkRawSizeThreshold) {
+    this.maxChunkRawSizeThreshold = maxChunkRawSizeThreshold;
+  }
+
   public long getCrossCompactionFileSelectionTimeBudget() {
     return crossCompactionFileSelectionTimeBudget;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 081b54d..39d590c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -405,6 +405,12 @@
                 "avg_series_point_number_threshold",
                 Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
 
+    conf.setMaxChunkRawSizeThreshold(
+        Long.parseLong(
+            properties.getProperty(
+                "max_chunk_raw_size_threshold",
+                Long.toString(conf.getMaxChunkRawSizeThreshold()))));
+
     conf.setCheckPeriodWhenInsertBlocked(
         Integer.parseInt(
             properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 756f94d..4eaa7df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -440,7 +440,9 @@
       Object[] objectValue) {
     IWritableMemChunkGroup memChunkGroup =
         createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
-    memChunkGroup.write(insertTime, objectValue, schemaList);
+    if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) {
+      shouldFlush = true;
+    }
   }
 
   @Override
@@ -451,7 +453,9 @@
       Object[] objectValue) {
     IWritableMemChunkGroup memChunkGroup =
         createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
-    memChunkGroup.write(insertTime, objectValue, schemaList);
+    if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) {
+      shouldFlush = true;
+    }
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
@@ -472,13 +476,15 @@
     }
     IWritableMemChunkGroup memChunkGroup =
         createMemChunkGroupIfNotExistAndGet(insertTabletPlan.getDeviceID(), schemaList);
-    memChunkGroup.writeValues(
+    if (memChunkGroup.writeValuesWithFlushCheck(
         insertTabletPlan.getTimes(),
         insertTabletPlan.getColumns(),
         insertTabletPlan.getBitMaps(),
         schemaList,
         start,
-        end);
+        end)) {
+      shouldFlush = true;
+    }
   }
 
   public void write(InsertTabletNode insertTabletNode, int start, int end) {
@@ -498,13 +504,15 @@
     }
     IWritableMemChunkGroup memChunkGroup =
         createMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), schemaList);
-    memChunkGroup.writeValues(
+    if (memChunkGroup.writeValuesWithFlushCheck(
         insertTabletNode.getTimes(),
         insertTabletNode.getColumns(),
         insertTabletNode.getBitMaps(),
         schemaList,
         start,
-        end);
+        end)) {
+      shouldFlush = true;
+    }
   }
 
   @Override
@@ -527,13 +535,15 @@
     }
     IWritableMemChunkGroup memChunkGroup =
         createAlignedMemChunkGroupIfNotExistAndGet(insertTabletPlan.getDeviceID(), schemaList);
-    memChunkGroup.writeValues(
+    if (memChunkGroup.writeValuesWithFlushCheck(
         insertTabletPlan.getTimes(),
         insertTabletPlan.getColumns(),
         insertTabletPlan.getBitMaps(),
         schemaList,
         start,
-        end);
+        end)) {
+      shouldFlush = true;
+    }
   }
 
   public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start, int end) {
@@ -556,13 +566,15 @@
     }
     IWritableMemChunkGroup memChunkGroup =
         createAlignedMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), schemaList);
-    memChunkGroup.writeValues(
+    if (memChunkGroup.writeValuesWithFlushCheck(
         insertTabletNode.getTimes(),
         insertTabletNode.getColumns(),
         insertTabletNode.getBitMaps(),
         schemaList,
         start,
-        end);
+        end)) {
+      shouldFlush = true;
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index c9cbcbb..845fc8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -108,7 +108,7 @@
   }
 
   @Override
-  public void putBinary(long t, Binary v) {
+  public boolean putBinaryWithFlushCheck(long t, Binary v) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
@@ -118,8 +118,9 @@
   }
 
   @Override
-  public void putAlignedValue(long t, Object[] v, int[] columnIndexArray) {
+  public boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray) {
     list.putAlignedValue(t, v, columnIndexArray);
+    return list.reachMaxChunkSizeThreshold();
   }
 
   @Override
@@ -143,7 +144,8 @@
   }
 
   @Override
-  public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) {
+  public boolean putBinariesWithFlushCheck(
+      long[] t, Binary[] v, BitMap bitMap, int start, int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
@@ -153,31 +155,32 @@
   }
 
   @Override
-  public void putAlignedValues(
+  public boolean putAlignedValuesWithFlushCheck(
       long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) {
     list.putAlignedValues(t, v, bitMaps, columnIndexArray, start, end);
+    return list.reachMaxChunkSizeThreshold();
   }
 
   @Override
-  public void write(long insertTime, Object objectValue) {
+  public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
-  public void writeAlignedValue(
+  public boolean writeAlignedValueWithFlushCheck(
       long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
     int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
-    putAlignedValue(insertTime, objectValue, columnIndexArray);
+    return putAlignedValueWithFlushCheck(insertTime, objectValue, columnIndexArray);
   }
 
   @Override
-  public void write(
+  public boolean writeWithFlushCheck(
       long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
-  public void writeAlignedValues(
+  public boolean writeAlignedValuesWithFlushCheck(
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
@@ -185,7 +188,7 @@
       int start,
       int end) {
     int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
-    putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end);
+    return putAlignedValuesWithFlushCheck(times, valueList, bitMaps, columnIndexArray, start, end);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
index 31ed096..611b7a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -45,14 +45,15 @@
   private AlignedWritableMemChunkGroup() {}
 
   @Override
-  public void writeValues(
+  public boolean writeValuesWithFlushCheck(
       long[] times,
       Object[] columns,
       BitMap[] bitMaps,
       List<IMeasurementSchema> schemaList,
       int start,
       int end) {
-    memChunk.writeAlignedValues(times, columns, bitMaps, schemaList, start, end);
+    return memChunk.writeAlignedValuesWithFlushCheck(
+        times, columns, bitMaps, schemaList, start, end);
   }
 
   @Override
@@ -79,8 +80,9 @@
   }
 
   @Override
-  public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
-    memChunk.writeAlignedValue(insertTime, objectValue, schemaList);
+  public boolean writeWithFlushCheck(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+    return memChunk.writeAlignedValueWithFlushCheck(insertTime, objectValue, schemaList);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 7de03cc..5237e53 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -38,11 +38,11 @@
 
   void putDouble(long t, double v);
 
-  void putBinary(long t, Binary v);
+  boolean putBinaryWithFlushCheck(long t, Binary v);
 
   void putBoolean(long t, boolean v);
 
-  void putAlignedValue(long t, Object[] v, int[] columnIndexArray);
+  boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray);
 
   void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end);
 
@@ -52,26 +52,26 @@
 
   void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end);
 
-  void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end);
+  boolean putBinariesWithFlushCheck(long[] t, Binary[] v, BitMap bitMap, int start, int end);
 
   void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end);
 
-  void putAlignedValues(
+  boolean putAlignedValuesWithFlushCheck(
       long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end);
 
-  void write(long insertTime, Object objectValue);
+  boolean writeWithFlushCheck(long insertTime, Object objectValue);
 
-  void writeAlignedValue(
+  boolean writeAlignedValueWithFlushCheck(
       long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
 
   /**
    * write data in the range [start, end). Null value in the valueList will be replaced by the
    * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
    */
-  void write(
+  boolean writeWithFlushCheck(
       long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end);
 
-  void writeAlignedValues(
+  boolean writeAlignedValuesWithFlushCheck(
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
index 73f2abe..e3d089f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
@@ -29,7 +29,7 @@
 
 public interface IWritableMemChunkGroup extends WALEntryValue {
 
-  void writeValues(
+  boolean writeValuesWithFlushCheck(
       long[] times,
       Object[] columns,
       BitMap[] bitMaps,
@@ -43,7 +43,8 @@
 
   boolean contains(String measurement);
 
-  void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
+  boolean writeWithFlushCheck(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
 
   Map<String, IWritableMemChunk> getMemChunkMap();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 1951e47..c8bd614 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -52,7 +52,7 @@
   private WritableMemChunk() {}
 
   @Override
-  public void write(long insertTime, Object objectValue) {
+  public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
     switch (schema.getType()) {
       case BOOLEAN:
         putBoolean(insertTime, (boolean) objectValue);
@@ -70,21 +70,21 @@
         putDouble(insertTime, (double) objectValue);
         break;
       case TEXT:
-        putBinary(insertTime, (Binary) objectValue);
-        break;
+        return putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
       default:
         throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
     }
+    return false;
   }
 
   @Override
-  public void writeAlignedValue(
+  public boolean writeAlignedValueWithFlushCheck(
       long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
   }
 
   @Override
-  public void write(
+  public boolean writeWithFlushCheck(
       long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
     switch (dataType) {
       case BOOLEAN:
@@ -109,15 +109,15 @@
         break;
       case TEXT:
         Binary[] binaryValues = (Binary[]) valueList;
-        putBinaries(times, binaryValues, bitMap, start, end);
-        break;
+        return putBinariesWithFlushCheck(times, binaryValues, bitMap, start, end);
       default:
         throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType);
     }
+    return false;
   }
 
   @Override
-  public void writeAlignedValues(
+  public boolean writeAlignedValuesWithFlushCheck(
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
@@ -148,8 +148,9 @@
   }
 
   @Override
-  public void putBinary(long t, Binary v) {
+  public boolean putBinaryWithFlushCheck(long t, Binary v) {
     list.putBinary(t, v);
+    return list.reachMaxChunkSizeThreshold();
   }
 
   @Override
@@ -158,7 +159,7 @@
   }
 
   @Override
-  public void putAlignedValue(long t, Object[] v, int[] columnOrder) {
+  public boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
   }
 
@@ -183,8 +184,10 @@
   }
 
   @Override
-  public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) {
+  public boolean putBinariesWithFlushCheck(
+      long[] t, Binary[] v, BitMap bitMap, int start, int end) {
     list.putBinaries(t, v, bitMap, start, end);
+    return list.reachMaxChunkSizeThreshold();
   }
 
   @Override
@@ -193,8 +196,8 @@
   }
 
   @Override
-  public void putAlignedValues(
-      long[] t, Object[] v, BitMap[] bitMaps, int[] columnOrder, int start, int end) {
+  public boolean putAlignedValuesWithFlushCheck(
+      long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
index 703ac67..75e24d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -43,26 +43,29 @@
   }
 
   @Override
-  public void writeValues(
+  public boolean writeValuesWithFlushCheck(
       long[] times,
       Object[] columns,
       BitMap[] bitMaps,
       List<IMeasurementSchema> schemaList,
       int start,
       int end) {
+    boolean flushFlag = false;
     for (int i = 0; i < columns.length; i++) {
       if (columns[i] == null) {
         continue;
       }
       IWritableMemChunk memChunk = createMemChunkIfNotExistAndGet(schemaList.get(i));
-      memChunk.write(
-          times,
-          columns[i],
-          bitMaps == null ? null : bitMaps[i],
-          schemaList.get(i).getType(),
-          start,
-          end);
+      flushFlag |=
+          memChunk.writeWithFlushCheck(
+              times,
+              columns[i],
+              bitMaps == null ? null : bitMaps[i],
+              schemaList.get(i).getType(),
+              start,
+              end);
     }
+    return flushFlag;
   }
 
   private IWritableMemChunk createMemChunkIfNotExistAndGet(IMeasurementSchema schema) {
@@ -92,14 +95,17 @@
   }
 
   @Override
-  public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+  public boolean writeWithFlushCheck(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+    boolean flushFlag = false;
     for (int i = 0; i < objectValue.length; i++) {
       if (objectValue[i] == null) {
         continue;
       }
       IWritableMemChunk memChunk = createMemChunkIfNotExistAndGet(schemaList.get(i));
-      memChunk.write(insertTime, objectValue[i]);
+      flushFlag |= memChunk.writeWithFlushCheck(insertTime, objectValue[i]);
     }
+    return flushFlag;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index fb950b7..39cdabb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -45,6 +45,7 @@
 
 import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
 import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
 
@@ -54,6 +55,9 @@
   // data types of this aligned tvList
   protected List<TSDataType> dataTypes;
 
+  // record total memory size of binary column
+  protected long[] memoryBinaryChunkSize;
+
   // data type list -> list of TVList, add 1 when expanded -> primitive array of basic type
   // index relation: columnIndex(dataTypeIndex) -> arrayIndex -> elementIndex
   protected List<List<Object>> values;
@@ -68,10 +72,16 @@
   // index relation: columnIndex(dataTypeIndex) -> arrayIndex -> elementIndex
   protected List<List<BitMap>> bitMaps;
 
+  // if a sensor chunk size of Text datatype reaches the threshold, this flag will be set true
+  boolean reachMaxChunkSizeFlag;
+
   AlignedTVList(List<TSDataType> types) {
     super();
     indices = new ArrayList<>(types.size());
     dataTypes = types;
+    memoryBinaryChunkSize = new long[dataTypes.size()];
+    reachMaxChunkSizeFlag = false;
+
     values = new ArrayList<>(types.size());
     for (int i = 0; i < types.size(); i++) {
       values.add(new ArrayList<>());
@@ -89,6 +99,8 @@
   public AlignedTVList clone() {
     AlignedTVList cloneList = AlignedTVList.newAlignedList(dataTypes);
     cloneAs(cloneList);
+    System.arraycopy(
+        memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size());
     for (int[] indicesArray : indices) {
       cloneList.indices.add(cloneIndex(indicesArray));
     }
@@ -136,6 +148,13 @@
         case TEXT:
           ((Binary[]) columnValues.get(arrayIndex))[elementIndex] =
               columnValue != null ? (Binary) columnValue : Binary.EMPTY_VALUE;
+          memoryBinaryChunkSize[i] +=
+              columnValue != null
+                  ? getBinarySize((Binary) columnValue)
+                  : getBinarySize(Binary.EMPTY_VALUE);
+          if (memoryBinaryChunkSize[i] >= maxChunkRawSizeThreshold) {
+            reachMaxChunkSizeFlag = true;
+          }
           break;
         case FLOAT:
           ((float[]) columnValues.get(arrayIndex))[elementIndex] =
@@ -312,6 +331,11 @@
     this.bitMaps.add(columnBitMaps);
     this.values.add(columnValue);
     this.dataTypes.add(dataType);
+
+    long[] tmpValueChunkRawSize = memoryBinaryChunkSize;
+    memoryBinaryChunkSize = new long[dataTypes.size()];
+    System.arraycopy(
+        tmpValueChunkRawSize, 0, memoryBinaryChunkSize, 0, tmpValueChunkRawSize.length);
   }
 
   /**
@@ -457,6 +481,10 @@
         int originRowIndex = getValueIndex(i);
         int arrayIndex = originRowIndex / ARRAY_SIZE;
         int elementIndex = originRowIndex % ARRAY_SIZE;
+        if (dataTypes.get(columnIndex) == TSDataType.TEXT) {
+          memoryBinaryChunkSize[columnIndex] -=
+              getBinarySize(((Binary[]) values.get(columnIndex).get(arrayIndex))[elementIndex]);
+        }
         markNullValue(columnIndex, arrayIndex, elementIndex);
         deletedNumber++;
       } else {
@@ -468,6 +496,17 @@
 
   public void deleteColumn(int columnIndex) {
     dataTypes.remove(columnIndex);
+
+    long[] tmpValueChunkRawSize = memoryBinaryChunkSize;
+    memoryBinaryChunkSize = new long[dataTypes.size()];
+    int copyIndex = 0;
+    for (int i = 0; i < tmpValueChunkRawSize.length; i++) {
+      if (i == columnIndex) {
+        continue;
+      }
+      memoryBinaryChunkSize[copyIndex++] = tmpValueChunkRawSize[i];
+    }
+
     for (Object array : values.get(columnIndex)) {
       PrimitiveArrayManager.release(array);
     }
@@ -548,6 +587,7 @@
           columnBitMaps.clear();
         }
       }
+      memoryBinaryChunkSize[i] = 0;
     }
   }
 
@@ -618,6 +658,11 @@
     }
   }
 
+  @Override
+  public boolean reachMaxChunkSizeThreshold() {
+    return reachMaxChunkSizeFlag;
+  }
+
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   @Override
   public void putAlignedValues(
@@ -688,6 +733,15 @@
         case TEXT:
           Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex));
           System.arraycopy(value[columnIndexArray[i]], idx, arrayT, elementIndex, remaining);
+
+          // update raw size of Text chunk
+          for (int i1 = 0; i1 < remaining; i1++) {
+            memoryBinaryChunkSize[i] +=
+                arrayT[elementIndex + i1] != null ? getBinarySize(arrayT[elementIndex + i1]) : 0;
+          }
+          if (memoryBinaryChunkSize[i] > maxChunkRawSizeThreshold) {
+            reachMaxChunkSizeFlag = true;
+          }
           break;
         case FLOAT:
           float[] arrayF = ((float[]) columnValues.get(arrayIndex));
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index 37f1598..d5c7e24 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -38,15 +38,20 @@
 
 import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
 import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 
 public abstract class BinaryTVList extends TVList {
   // list of primitive array, add 1 when expanded -> Binary primitive array
   // index relation: arrayIndex -> elementIndex
   protected List<Binary[]> values;
 
+  // record total memory size of binary tvlist
+  long memoryBinaryChunkSize;
+
   BinaryTVList() {
     super();
     values = new ArrayList<>();
+    memoryBinaryChunkSize = 0;
   }
 
   public static BinaryTVList newList() {
@@ -60,6 +65,7 @@
   public TimBinaryTVList clone() {
     TimBinaryTVList cloneList = new TimBinaryTVList();
     cloneAs(cloneList);
+    cloneList.memoryBinaryChunkSize = memoryBinaryChunkSize;
     for (Binary[] valueArray : values) {
       cloneList.values.add(cloneValue(valueArray));
     }
@@ -84,6 +90,40 @@
     if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
       sorted = false;
     }
+    memoryBinaryChunkSize += getBinarySize(value);
+  }
+
+  @Override
+  public boolean reachMaxChunkSizeThreshold() {
+    return memoryBinaryChunkSize >= maxChunkRawSizeThreshold;
+  }
+
+  @Override
+  public int delete(long lowerBound, long upperBound) {
+    int newSize = 0;
+    minTime = Long.MAX_VALUE;
+    for (int i = 0; i < rowCount; i++) {
+      long time = getTime(i);
+      if (time < lowerBound || time > upperBound) {
+        set(i, newSize++);
+        minTime = Math.min(time, minTime);
+      } else {
+        memoryBinaryChunkSize -= getBinarySize(getBinary(i));
+      }
+    }
+    int deletedNumber = rowCount - newSize;
+    rowCount = newSize;
+    // release primitive arrays that are empty
+    int newArrayNum = newSize / ARRAY_SIZE;
+    if (newSize % ARRAY_SIZE != 0) {
+      newArrayNum++;
+    }
+    int oldArrayNum = timestamps.size();
+    for (int releaseIdx = newArrayNum; releaseIdx < oldArrayNum; releaseIdx++) {
+      releaseLastTimeArray();
+      releaseLastValueArray();
+    }
+    return deletedNumber;
   }
 
   @Override
@@ -114,6 +154,7 @@
       }
       values.clear();
     }
+    memoryBinaryChunkSize = 0;
   }
 
   @Override
@@ -176,6 +217,11 @@
       updateMinTimeAndSorted(time, start, end);
     }
 
+    // update raw size
+    for (int i = idx; i < end; i++) {
+      memoryBinaryChunkSize += getBinarySize(value[i]);
+    }
+
     while (idx < end) {
       int inputRemaining = end - idx;
       int arrayIdx = rowCount / ARRAY_SIZE;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 6786dca..721f9cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.utils.datastructure;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.MathUtils;
 import org.apache.iotdb.db.wal.buffer.WALEntryValue;
@@ -48,6 +49,8 @@
 
   protected static final int SMALL_ARRAY_LENGTH = 32;
   protected static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
+  long maxChunkRawSizeThreshold =
+      IoTDBDescriptor.getInstance().getConfig().getMaxChunkRawSizeThreshold();
   // list of timestamp array, add 1 when expanded -> data point timestamp array
   // index relation: arrayIndex -> elementIndex
   protected List<long[]> timestamps;
@@ -147,6 +150,10 @@
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public boolean reachMaxChunkSizeThreshold() {
+    return false;
+  }
+
   public void putBoolean(long time, boolean value) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index fd01cd0..acc6210 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -76,7 +76,7 @@
         new WritableMemChunk(new MeasurementSchema("s1", dataType, TSEncoding.PLAIN));
     int count = 1000;
     for (int i = 0; i < count; i++) {
-      series.write(i, i);
+      series.writeWithFlushCheck(i, i);
     }
     IPointReader it =
         series.getSortedTvListForQuery().buildTsBlock().getTsBlockSingleColumnIterator();
@@ -95,11 +95,11 @@
         new WritableMemChunk(new MeasurementSchema("s1", dataType, TSEncoding.PLAIN));
     int count = 100;
     for (int i = 0; i < count; i++) {
-      series.write(i, i);
+      series.writeWithFlushCheck(i, i);
     }
-    series.write(0, 21);
-    series.write(99, 20);
-    series.write(20, 21);
+    series.writeWithFlushCheck(0, 21);
+    series.writeWithFlushCheck(99, 20);
+    series.writeWithFlushCheck(20, 21);
     String str = series.toString();
     Assert.assertFalse(series.getTVList().isSorted());
     Assert.assertEquals(
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
index 15697f9..a685ce4 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
@@ -107,5 +107,35 @@
       Assert.assertEquals(tvList.getBinary((int) i), clonedTvList.getBinary((int) i));
       Assert.assertEquals(tvList.getTime((int) i), clonedTvList.getTime((int) i));
     }
+    Assert.assertEquals(tvList.memoryBinaryChunkSize, clonedTvList.memoryBinaryChunkSize);
+  }
+
+  @Test
+  public void testCalculateChunkSize() {
+    BinaryTVList tvList = BinaryTVList.newList();
+    for (int i = 0; i < 10; i++) {
+      tvList.putBinary(i, Binary.valueOf(String.valueOf(i)));
+    }
+    Assert.assertEquals(tvList.memoryBinaryChunkSize, 360);
+
+    Binary[] binaryList = new Binary[10];
+    List<Long> timeList = new ArrayList<>();
+    BitMap bitMap = new BitMap(10);
+    for (int i = 0; i < 10; i++) {
+      timeList.add((long) i + 10);
+      binaryList[i] = Binary.valueOf(String.valueOf(i));
+      if (i % 2 == 0) {
+        bitMap.mark(i);
+      }
+    }
+    tvList.putBinaries(
+        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), binaryList, bitMap, 0, 10);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize, 540);
+
+    tvList.delete(5, 15);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize, 252);
+
+    tvList.clear();
+    Assert.assertEquals(tvList.memoryBinaryChunkSize, 0);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
index 7ddad739..7f660e7 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
@@ -204,5 +204,72 @@
             tvList.isNullValue((int) i, column), clonedTvList.isNullValue((int) i, column));
       }
     }
+
+    for (int i = 0; i < dataTypes.size(); i++) {
+      Assert.assertEquals(tvList.memoryBinaryChunkSize[i], clonedTvList.memoryBinaryChunkSize[i]);
+    }
+  }
+
+  @Test
+  public void testCalculateChunkSize() {
+    List<TSDataType> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.INT32);
+    dataTypes.add(TSDataType.TEXT);
+    AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
+
+    int[] columnOrder = new int[2];
+    columnOrder[0] = 0;
+    columnOrder[1] = 1;
+    for (int i = 0; i < 10; i++) {
+      Object[] value = new Object[2];
+      value[0] = i;
+      value[1] = new Binary(String.valueOf(i));
+      tvList.putAlignedValue(i, value, columnOrder);
+    }
+
+    Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 0);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize[1], 360);
+
+    Object[] vectorArray = new Object[2];
+    BitMap[] bitMaps = new BitMap[2];
+
+    vectorArray[0] = new int[10];
+    vectorArray[1] = new Binary[10];
+    bitMaps[0] = new BitMap(10);
+    bitMaps[1] = new BitMap(10);
+
+    List<Long> timeList = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      timeList.add((long) i + 10);
+      ((int[]) vectorArray[0])[i] = i;
+      ((Binary[]) vectorArray[1])[i] = new Binary(String.valueOf(i));
+
+      if (i % 2 == 0) {
+        bitMaps[1].mark(i);
+      }
+    }
+
+    tvList.putAlignedValues(
+        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])),
+        vectorArray,
+        bitMaps,
+        columnOrder,
+        0,
+        10);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize[1], 720);
+
+    tvList.delete(5, 15);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize[1], 324);
+
+    tvList.deleteColumn(0);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize.length, 1);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 324);
+
+    tvList.extendColumn(TSDataType.INT32);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize.length, 2);
+    Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 324);
+
+    tvList.clear();
+    Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 0);
   }
 }