[Memtable] implement the deffered strategy
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7850bf6..c534cb1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -419,6 +419,11 @@
             properties.getProperty(
                 "tvlist_sort_algorithm", conf.getTvListSortAlgorithm().toString())));
 
+    conf.setSeqMemtableTopKSize(
+        (Integer.parseInt(
+            properties.getProperty(
+                "seq_memtable_topk_size", Integer.toString(conf.getSeqMemtableTopKSize())))));
+
     conf.setAvgSeriesPointNumberThreshold(
         Integer.parseInt(
             properties.getProperty(
@@ -1704,7 +1709,7 @@
     logger.info("initial allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine());
     logger.info("initial allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema());
     logger.info("initial allocateMemoryForConsensus = {}", conf.getAllocateMemoryForConsensus());
-
+    logger.info("initial seqMemtableTopKSize = {}", conf.getSeqMemtableTopKSize());
     initSchemaMemoryAllocate(properties);
     initStorageEngineAllocate(properties);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 75609c2..f33f195 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1267,7 +1267,6 @@
             version,
             0,
             0);
-
     return getTsFileProcessor(sequence, filePath, timePartitionId);
   }
 
@@ -1393,6 +1392,7 @@
       if (!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
         timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
       }
+      logger.info("close an unsequence tsfile processor {}", databaseName + "-" + dataRegionId);
     }
   }
 
@@ -1557,7 +1557,8 @@
       for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
         if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
           logger.info(
-              "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
+              "Exceed sequence memtable {} flush interval, so flush working memtable of time partition {} in database {}[{}]",
+              tsFileProcessor.getWorkMemTable().getMemTableId(),
               tsFileProcessor.getTimeRangeId(),
               databaseName,
               dataRegionId);
@@ -1580,7 +1581,8 @@
       for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
         if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
           logger.info(
-              "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
+              "Exceed unsequence memtable {} flush interval, so flush working memtable of time partition {} in database {}[{}]",
+              tsFileProcessor.getWorkMemTable().getMemTableId(),
               tsFileProcessor.getTimeRangeId(),
               databaseName,
               dataRegionId);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 19450cb..aa84bfa 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -111,7 +111,8 @@
             ? 0
             : memTable.getTotalPointsNum() / memTable.getSeriesNumber();
     LOGGER.info(
-        "The memTable size of SG {} is {}, the avg series points num in chunk is {}, total timeseries number is {}",
+        "The memTable {} size of SG {} is {}, the avg series points num in chunk is {}, total timeseries number is {}",
+        memTable.getMemTableId(),
         storageGroup,
         memTable.memSize(),
         avgSeriesPointsNum,
@@ -215,8 +216,9 @@
             "flush");
 
     LOGGER.info(
-        "Database {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
+        "Database {} flushing memtable {} with {} has finished! Time consumption: {}ms",
         storageGroup,
+        memTable.getMemTableId(),
         memTable,
         System.currentTimeMillis() - start);
   }
@@ -286,8 +288,13 @@
           recordFlushPointsMetric();
 
           LOGGER.info(
-              "Database {}, flushing memtable {} into disk: Encoding data cost " + "{} ms.",
+              "Database {}, flushing memtable {} with size {} and points {} in file "
+                  + "{} into disk: Encoding data cost "
+                  + "{} ms.",
               storageGroup,
+              memTable.getMemTableId(),
+              memTable.memSize(),
+              memTable.getTotalPointsNum(),
               writer.getFile().getName(),
               memSerializeTime);
           WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING, memSerializeTime);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index bdba167..3af36c6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -36,6 +36,7 @@
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -85,24 +86,48 @@
 
   private long totalPointsNum = 0;
 
+  public long getTotalPointsNumThreshold() {
+    return totalPointsNumThreshold;
+  }
+
   private long totalPointsNumThreshold = 0;
 
   private long maxPlanIndex = Long.MIN_VALUE;
 
   private long minPlanIndex = Long.MAX_VALUE;
 
-  private final long memTableId = memTableIdCounter.incrementAndGet();
+  private long memTableId;
 
-  private final long createdTime = System.currentTimeMillis();
+  private long createdTime;
 
   private static final String METRIC_POINT_IN = "pointsIn";
 
   protected AbstractMemTable() {
     this.memTableMap = new HashMap<>();
+    createdTime = System.currentTimeMillis();
+    memTableId = memTableIdCounter.incrementAndGet();
   }
 
-  protected AbstractMemTable(Map<IDeviceID, IWritableMemChunkGroup> memTableMap) {
+  protected AbstractMemTable(
+      Map<IDeviceID, IWritableMemChunkGroup> memTableMap, AbstractMemTable lastMemTable) {
     this.memTableMap = memTableMap;
+    for (IDeviceID key : memTableMap.keySet()) {
+      Map<String, IWritableMemChunk> memChunkMap = memTableMap.get(key).getMemChunkMap();
+      for (String schema : memChunkMap.keySet()) {
+        IWritableMemChunk memChunk = memChunkMap.get(schema);
+        totalPointsNum += memChunk.getTVList().rowCount();
+        memSize +=
+            (long) memChunk.getTVList().rowCount()
+                * memChunk.getSchema().getType().getDataTypeSize();
+      }
+    }
+    totalPointsNumThreshold = lastMemTable.getTotalPointsNumThreshold();
+    createdTime = System.currentTimeMillis();
+    memTableId = memTableIdCounter.incrementAndGet();
+    seriesNumber = lastMemTable.getSeriesNumber();
+    tvListRamCost = lastMemTable.getTVListsRamCost() * totalPointsNum / lastMemTable.totalPointsNum;
+    shouldFlush = false;
+    flushStatus = FlushStatus.WORKING;
   }
 
   @Override
@@ -403,6 +428,16 @@
   }
 
   @Override
+  public void setMemSize(long m) {
+    memSize = memSize + m;
+  }
+
+  @Override
+  public void setTotalPointsNum(long m) {
+    totalPointsNum = totalPointsNum + m;
+  }
+
+  @Override
   public boolean reachTotalPointNumThreshold() {
     if (totalPointsNum == 0) {
       return false;
@@ -490,6 +525,20 @@
   }
 
   @Override
+  public IMemTable divide() throws TopkDivideMemoryNotEnoughException {
+    Map<IDeviceID, IWritableMemChunkGroup> topkMemTableMap = new HashMap<>();
+    for (IDeviceID key : memTableMap.keySet()) {
+      IWritableMemChunkGroup chunkGroupTemp = memTableMap.get(key).divide();
+      topkMemTableMap.put(key, chunkGroupTemp);
+    }
+    PrimitiveMemTable topkMemtable = new PrimitiveMemTable(topkMemTableMap, this);
+    memSize -= topkMemtable.memSize();
+    totalPointsNum -= topkMemtable.getTotalPointsNum();
+    tvListRamCost -= topkMemtable.getTVListsRamCost();
+    return topkMemtable;
+  }
+
+  @Override
   public void addTVListRamCost(long cost) {
     this.tvListRamCost += cost;
   }
@@ -634,6 +683,15 @@
     return latestTimeForEachDevice;
   }
 
+  @Override
+  public Map<String, Long> getTopKTime() {
+    Map<String, Long> latestTimeForEachDevice = new HashMap<>();
+    for (Entry<IDeviceID, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
+      latestTimeForEachDevice.put(entry.getKey().toStringID(), entry.getValue().getTopKTime());
+    }
+    return latestTimeForEachDevice;
+  }
+
   public static class Factory {
     private Factory() {
       // Empty constructor
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index aa4b3c4..1046ec0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -252,11 +253,19 @@
   }
 
   @Override
+  public void setSchema(IMeasurementSchema s) {}
+
+  @Override
   public long getMaxTime() {
     return list.getMaxTime();
   }
 
   @Override
+  public long getTopKTime() {
+    return list.getTopKTime();
+  }
+
+  @Override
   public synchronized TVList getSortedTvListForQuery() {
     sortTVList();
     // increase reference count
@@ -465,6 +474,11 @@
   }
 
   @Override
+  public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException {
+    return new AlignedWritableMemChunk(schemaList, (AlignedTVList) list.divide());
+  }
+
+  @Override
   public boolean isEmpty() {
     return list.rowCount() == 0;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index 17b12a7..875c113 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternUtil;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -143,6 +144,18 @@
     return memChunk.getMaxTime();
   }
 
+  @Override
+  public long getTopKTime() {
+    return memChunk.getTopKTime();
+  }
+
+  @Override
+  public IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException {
+    AlignedWritableMemChunkGroup topkMemChunkGroup = new AlignedWritableMemChunkGroup();
+    topkMemChunkGroup.memChunk = (AlignedWritableMemChunk) memChunk.divide();
+    return topkMemChunkGroup;
+  }
+
   public AlignedWritableMemChunk getAlignedMemChunk() {
     return memChunk;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index b4b0204..1f898c0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -27,6 +27,7 @@
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -64,6 +65,10 @@
   /** @return memory usage */
   long memSize();
 
+  void setMemSize(long m);
+
+  void setTotalPointsNum(long m);
+
   /** only used when mem control enabled */
   void addTVListRamCost(long cost);
 
@@ -128,6 +133,8 @@
    */
   void delete(PartialPath path, PartialPath devicePath, long startTimestamp, long endTimestamp);
 
+  /** divide the memtable into the topk and regular ones @ return the topk memtable. */
+  IMemTable divide() throws TopkDivideMemoryNotEnoughException;
   /**
    * Make a copy of this MemTable.
    *
@@ -169,4 +176,6 @@
   void setFlushStatus(FlushStatus flushStatus);
 
   Map<String, Long> getMaxTime();
+
+  Map<String, Long> getTopKTime();
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 4c07c3e..4b643cb 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -20,6 +20,7 @@
 
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -83,6 +84,8 @@
 
   IMeasurementSchema getSchema();
 
+  void setSchema(IMeasurementSchema s);
+
   /**
    * served for read requests.
    *
@@ -128,6 +131,10 @@
     return Long.MAX_VALUE;
   }
 
+  default long getTopKTime() {
+    return Long.MAX_VALUE;
+  }
+
   /** @return how many points are deleted */
   int delete(long lowerBound, long upperBound);
 
@@ -141,5 +148,7 @@
 
   long getLastPoint();
 
+  IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException;
+
   boolean isEmpty();
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index ecaf8b2..eb3c1f0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -21,6 +21,7 @@
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -53,5 +54,9 @@
 
   long getCurrentTVListSize(String measurement);
 
+  IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException;
+
   long getMaxTime();
+
+  long getTopKTime();
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
index d4bf9c1..70f29dd 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
@@ -30,15 +30,16 @@
     this.disableMemControl = !enableMemControl;
   }
 
-  public PrimitiveMemTable(Map<IDeviceID, IWritableMemChunkGroup> memTableMap) {
-    super(memTableMap);
+  public PrimitiveMemTable(
+      Map<IDeviceID, IWritableMemChunkGroup> memTableMap, AbstractMemTable lastMemtable) {
+    super(memTableMap, lastMemtable);
   }
 
   @Override
   public IMemTable copy() {
     Map<IDeviceID, IWritableMemChunkGroup> newMap = new HashMap<>(getMemTableMap());
 
-    return new PrimitiveMemTable(newMap);
+    return new PrimitiveMemTable(newMap, this);
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 3cd09e7..28ca6cd 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -62,6 +62,7 @@
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -91,6 +92,7 @@
 import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.GET_QUERY_RESOURCE_FROM_MEM;
 import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.FLUSHING_MEMTABLE;
 import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORKING_MEMTABLE;
+import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE;
 
 @SuppressWarnings("java:S1135") // ignore todos
 public class TsFileProcessor {
@@ -114,6 +116,8 @@
 
   /** sync this object in read() and asyncTryToFlush(). */
   private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
+  // private final ConcurrentLinkedDeque<IMemTable> waitingMemTables = new
+  // ConcurrentLinkedDeque<>();
 
   /** modification to memtable mapping. */
   private final List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>();
@@ -121,11 +125,14 @@
   /** writer for restore tsfile and flushing. */
   private RestorableTsFileIOWriter writer;
 
+  private RestorableTsFileIOWriter writerTemp;
+
   /** tsfile resource for index this tsfile. */
   private final TsFileResource tsFileResource;
 
   /** time range index to indicate this processor belongs to which time range */
   private long timeRangeId;
+
   /**
    * Whether the processor is in the queue of the FlushManager or being flushed by a flush thread.
    */
@@ -175,6 +182,8 @@
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
       PerformanceOverviewMetrics.getInstance();
 
+  private File tsfileTemp;
+
   @SuppressWarnings("squid:S107")
   public TsFileProcessor(
       String storageGroupName,
@@ -308,6 +317,39 @@
   private void createNewWorkingMemTable() throws WriteProcessException {
     workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName);
     walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
+    logger.info(
+        "Succeed to create {} memTable {}",
+        sequence ? "Sequence" : "Unsequence",
+        workMemTable.getMemTableId());
+  }
+
+  private void createNewWorkingMemTable(IMemTable tobeFlushed) throws WriteProcessException {
+    if (MEMTABLE_TOPK_SIZE != 0) {
+      try {
+        logger.info(
+            "Try to divide memTable {} with size {} and points {}",
+            tobeFlushed.getMemTableId(),
+            tobeFlushed.memSize(),
+            tobeFlushed.getTotalPointsNum());
+        workMemTable = tobeFlushed.divide();
+        walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
+        logger.info(
+            "Succeed to divide {} memTable {} with size {} and points {} to "
+                + "new workMemTable {} with size {} and points {} to",
+            sequence ? "Sequence" : "Unsequence",
+            tobeFlushed.getMemTableId(),
+            tobeFlushed.memSize(),
+            tobeFlushed.getTotalPointsNum(),
+            workMemTable.getMemTableId(),
+            workMemTable.memSize(),
+            workMemTable.getTotalPointsNum());
+      } catch (TopkDivideMemoryNotEnoughException e) {
+        logger.warn(e.getMessage());
+        createNewWorkingMemTable();
+      }
+    } else {
+      createNewWorkingMemTable();
+    }
   }
 
   /**
@@ -858,15 +900,16 @@
       if (logger.isInfoEnabled()) {
         if (workMemTable != null) {
           logger.info(
-              "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile "
-                  + "size: {}, plan index: [{}, {}], progress index: {}",
+              "{}: flush working memtable {} with size: {}, tsfile "
+                  + "size: {}, plan index: [{}, {}], progress index: {} in async close tsfile {}",
               storageGroupName,
-              tsFileResource.getTsFile().getAbsolutePath(),
+              workMemTable.getMemTableId(),
               workMemTable.memSize(),
               tsFileResource.getTsFileSize(),
               workMemTable.getMinPlanIndex(),
               workMemTable.getMaxPlanIndex(),
-              tsFileResource.getMaxProgressIndex());
+              tsFileResource.getMaxProgressIndex(),
+              tsFileResource.getTsFile().getAbsolutePath());
         } else {
           logger.info(
               "{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
@@ -900,7 +943,12 @@
         // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
         // flushing memTable in System module.
         addAMemtableIntoFlushingList(tmpMemTable);
-        logger.info("Memtable {} has been added to flushing list", tmpMemTable);
+        logger.info(
+            "Memtable {}: {} with size {} and points {} has been added to flushing list",
+            tmpMemTable.getMemTableId(),
+            tmpMemTable,
+            tmpMemTable.memSize(),
+            tmpMemTable.getTotalPointsNum());
         shouldClose = true;
       } catch (Exception e) {
         logger.error(
@@ -982,7 +1030,9 @@
         return;
       }
       logger.info(
-          "Async flush a memtable to tsfile: {}", tsFileResource.getTsFile().getAbsolutePath());
+          "Async flush a memtable {} to tsfile: {}",
+          workMemTable.getMemTableId(),
+          tsFileResource.getTsFile().getAbsolutePath());
       addAMemtableIntoFlushingList(workMemTable);
     } catch (Exception e) {
       logger.error(
@@ -1007,7 +1057,9 @@
   private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
     Map<String, Long> lastTimeForEachDevice = new HashMap<>();
     if (sequence) {
-      lastTimeForEachDevice = tobeFlushed.getMaxTime();
+      // TODO:修改lastFlushTime的取法
+      lastTimeForEachDevice = tobeFlushed.getTopKTime();
+      // lastTimeForEachDevice = tobeFlushed.getMaxTime();
       // If some devices have been removed in MemTable, the number of device in MemTable and
       // tsFileResource will not be the same. And the endTime of these devices in resource will be
       // Long.minValue.
@@ -1029,7 +1081,19 @@
     if (enableMemControl) {
       SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
     }
-    flushingMemTables.addLast(tobeFlushed);
+    // TODO:如果是顺序区并且k>0,拆分之后分别进入不同的waitinglist中,否则的话仍然按照原方式刷盘
+    if (!sequence || MEMTABLE_TOPK_SIZE == 0) {
+      flushingMemTables.addLast(tobeFlushed);
+      workMemTable = null;
+    } else {
+      try {
+        createNewWorkingMemTable(tobeFlushed);
+      } catch (WriteProcessException e) {
+        throw new RuntimeException(e);
+      }
+      flushingMemTables.addLast(tobeFlushed);
+    }
+
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: {} Memtable (signal = {}) is added into the flushing Memtable, queue size = {}",
@@ -1042,7 +1106,6 @@
     if (!(tobeFlushed.isSignalMemTable() || tobeFlushed.isEmpty())) {
       totalMemTableSize += tobeFlushed.memSize();
     }
-    workMemTable = null;
     FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 21ba125..d954fe3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -20,6 +20,7 @@
 
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -49,7 +50,13 @@
     this.list = TVList.newList(schema.getType());
   }
 
-  private WritableMemChunk() {}
+  public WritableMemChunk(IMeasurementSchema schema, TVList ls) {
+    this.schema = schema;
+    this.list = ls;
+  }
+
+  // private WritableMemChunk() {}
+  public WritableMemChunk() {}
 
   @Override
   public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
@@ -246,11 +253,21 @@
   }
 
   @Override
+  public void setSchema(IMeasurementSchema s) {
+    this.schema = s;
+  }
+
+  @Override
   public long getMaxTime() {
     return list.getMaxTime();
   }
 
   @Override
+  public long getTopKTime() {
+    return list.getTopKTime();
+  }
+
+  @Override
   public long getFirstPoint() {
     if (list.rowCount() == 0) {
       return Long.MAX_VALUE;
@@ -269,6 +286,11 @@
   }
 
   @Override
+  public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException {
+    return new WritableMemChunk(this.schema, list.divide());
+  }
+
+  @Override
   public boolean isEmpty() {
     return list.rowCount() == 0;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 707eace..dbe03f1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.commons.path.PathPatternUtil;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -173,6 +174,25 @@
   }
 
   @Override
+  public long getTopKTime() {
+    long maxTime = Long.MIN_VALUE;
+    for (IWritableMemChunk memChunk : memChunkMap.values()) {
+      maxTime = Math.max(maxTime, memChunk.getTopKTime());
+    }
+    return maxTime;
+  }
+
+  @Override
+  public IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException {
+    WritableMemChunkGroup topkMemChunkGroup = new WritableMemChunkGroup();
+    for (String key : memChunkMap.keySet()) {
+      IWritableMemChunk topkMemChunk = memChunkMap.get(key).divide();
+      topkMemChunkGroup.memChunkMap.put(key, topkMemChunk);
+    }
+    return topkMemChunkGroup;
+  }
+
+  @Override
   public int serializedSize() {
     int size = 0;
     size += Integer.BYTES;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
index 33264fc..4f1edd8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
@@ -40,6 +40,7 @@
   private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
   public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize();
+  public static final int MEMTABLE_TOPK_SIZE = CONFIG.getSeqMemtableTopKSize();
 
   public static final TVListSortAlgorithm TVLIST_SORT_ALGORITHM = CONFIG.getTvListSortAlgorithm();
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 95bbd23..b84aec8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -43,8 +43,7 @@
 import java.util.List;
 import java.util.Objects;
 
-import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
-import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*;
 import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -643,6 +642,37 @@
     }
   }
 
+  @Override
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) {
+      throw new TopkDivideMemoryNotEnoughException(
+          String.format(
+              "WARNING: MEMTABLE_TOPK_SIZE is bigger than%d "
+                  + "the TVList's row count  %d"
+                  + "or is smaller than ARRAY_SIZE %d",
+              rowCount, MEMTABLE_TOPK_SIZE, ARRAY_SIZE));
+    }
+    AlignedTVList topkTVList = AlignedTVList.newAlignedList(dataTypes);
+    int truncatedIndex = (rowCount - MEMTABLE_TOPK_SIZE + ARRAY_SIZE - 1) / ARRAY_SIZE;
+
+    for (int i = truncatedIndex + 1; i <= rowCount / ARRAY_SIZE; i++) {
+      topkTVList.timestamps.add(timestamps.get(i));
+      for (int colIndex = 0; colIndex < values.size(); colIndex++) {
+        topkTVList.values.get(colIndex).add(values.get(colIndex).get(i));
+        topkTVList.bitMaps.get(colIndex).add(bitMaps.get(colIndex).get(i));
+      }
+    }
+    for (int i = rowCount / ARRAY_SIZE; i >= truncatedIndex + 1; i--) {
+      timestamps.remove(timestamps.size() - 1);
+      for (int colIndex = 0; colIndex < values.size(); colIndex++) {
+        topkTVList.values.get(colIndex).remove(values.get(colIndex).size() - 1);
+        topkTVList.bitMaps.get(colIndex).remove(bitMaps.get(colIndex).size() - 1);
+      }
+    }
+    rowCount = truncatedIndex * ARRAY_SIZE;
+    topKTime = timestamps.get(truncatedIndex - 1)[ARRAY_SIZE - 1];
+    return topkTVList;
+  }
   /**
    * Get the row index value in index column.
    *
@@ -745,6 +775,8 @@
         checkExpansion();
       }
     }
+    sort(Math.max(0, start - MEMTABLE_TOPK_SIZE), end);
+    topKTime = Math.max(topKTime, getTime(Math.max(0, end - MEMTABLE_TOPK_SIZE)));
   }
 
   private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex, int remaining) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
index 6271a31..21783bc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
@@ -53,7 +53,7 @@
 
   /**
    * check block-inversions to find the proper block_size, which is a multiple of array_size. For
-   * totally ordered, the block_size will equals to array_size For totally reverse ordered, the
+   * totally ordered, the block_size will equal to array_size For totally reverse ordered, the
    * block_size will equals to the rowCount. INVERSION_RATIOS_THRESHOLD=0.005 is a empiric value.
    *
    * @param timestamps
@@ -132,6 +132,6 @@
    * @param hi
    */
   default void sortBlock(int lo, int hi) {
-    qsort(lo, hi);
+    qSort(lo, hi);
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index c64dbc9..1865efe 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -34,8 +34,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
-import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*;
 
 public abstract class IntTVList extends TVList {
   // list of primitive array, add 1 when expanded -> int primitive array
@@ -77,14 +76,59 @@
   @Override
   public void putInt(long timestamp, int value) {
     checkExpansion();
-    int arrayIndex = rowCount / ARRAY_SIZE;
-    int elementIndex = rowCount % ARRAY_SIZE;
-    maxTime = Math.max(maxTime, timestamp);
-    timestamps.get(arrayIndex)[elementIndex] = timestamp;
-    values.get(arrayIndex)[elementIndex] = value;
-    rowCount++;
-    if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
-      sorted = false;
+    if (MEMTABLE_TOPK_SIZE == 0) {
+      int arrayIndex = rowCount / ARRAY_SIZE;
+      int elementIndex = rowCount % ARRAY_SIZE;
+      maxTime = Math.max(maxTime, timestamp);
+      topKTime = Math.max(topKTime, timestamp);
+      timestamps.get(arrayIndex)[elementIndex] = timestamp;
+      values.get(arrayIndex)[elementIndex] = value;
+      rowCount++;
+      if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
+        sorted = false;
+      }
+    } else {
+      int arrayIndex;
+      int elementIndex;
+      int waitlen = Math.min(rowCount, MEMTABLE_TOPK_SIZE);
+      int tempRowCount = rowCount - 1; // 记录向前比较的截止位置
+      while (waitlen > 0) {
+        waitlen--;
+        arrayIndex = tempRowCount / ARRAY_SIZE;
+        elementIndex = tempRowCount % ARRAY_SIZE;
+        if (timestamps.get(arrayIndex)[elementIndex] > timestamp) {
+          tempRowCount--;
+          continue;
+        }
+        break;
+      }
+      arrayIndex = rowCount / ARRAY_SIZE;
+      elementIndex = rowCount % ARRAY_SIZE;
+      int arrayIndexLeft = (rowCount - 1) / ARRAY_SIZE;
+      int elementIndexLeft = (rowCount - 1) % ARRAY_SIZE;
+      for (int i = rowCount - 1; i > tempRowCount; i--) {
+        timestamps.get(arrayIndex)[elementIndex] = timestamps.get(arrayIndexLeft)[elementIndexLeft];
+        values.get(arrayIndex)[elementIndex] = values.get(arrayIndexLeft)[elementIndexLeft];
+        arrayIndex = arrayIndexLeft;
+        elementIndex = elementIndexLeft;
+        arrayIndexLeft = (i - 1) / ARRAY_SIZE;
+        elementIndexLeft = (i - 1) % ARRAY_SIZE;
+        sortCount++;
+      }
+      arrayIndex = (tempRowCount + 1) / ARRAY_SIZE;
+      elementIndex = (tempRowCount + 1) % ARRAY_SIZE;
+      maxTime = Math.max(maxTime, timestamp);
+      timestamps.get(arrayIndex)[elementIndex] = timestamp;
+      values.get(arrayIndex)[elementIndex] = value;
+      if (rowCount > MEMTABLE_TOPK_SIZE) {
+        arrayIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) / ARRAY_SIZE;
+        elementIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) % ARRAY_SIZE;
+        topKTime = Math.max(topKTime, timestamps.get(arrayIndex)[elementIndex]);
+      }
+      rowCount++;
+      if (sorted && rowCount > 1 && waitlen == 0) {
+        sorted = false;
+      }
     }
   }
 
@@ -124,6 +168,37 @@
   }
 
   @Override
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) {
+      throw new TopkDivideMemoryNotEnoughException(
+          String.format(
+              "WARNING: MEMTABLE_TOPK_SIZE %d is bigger than "
+                  + "the TVList's row count %d, "
+                  + "or is smaller than ARRAY_SIZE %d",
+              MEMTABLE_TOPK_SIZE, rowCount, ARRAY_SIZE));
+    }
+    IntTVList topkTVList = IntTVList.newList();
+    int truncatedIndex = rowCount - MEMTABLE_TOPK_SIZE;
+    int truncatedArrayIndex =
+        truncatedIndex / ARRAY_SIZE; // no matter truncatedIndex in or not in the block
+    truncatedIndex = truncatedArrayIndex * ARRAY_SIZE;
+    for (int i = truncatedArrayIndex; i < timestamps.size(); i++) {
+      topkTVList.timestamps.add(timestamps.get(i));
+      topkTVList.values.add(values.get(i));
+    }
+    for (int i = timestamps.size() - 1; i >= truncatedArrayIndex; i--) {
+      timestamps.remove(timestamps.size() - 1);
+      values.remove(values.size() - 1);
+    }
+    topkTVList.rowCount = rowCount - truncatedIndex;
+    topkTVList.sorted = true;
+    topkTVList.topKTime = topkTVList.getTime(0);
+    rowCount = truncatedIndex;
+    topKTime = topkTVList.getTopKTime();
+    return topkTVList;
+  }
+
+  @Override
   public TimeValuePair getTimeValuePair(int index) {
     return new TimeValuePair(
         getTime(index), TsPrimitiveType.getByType(TSDataType.INT32, getInt(index)));
@@ -201,6 +276,11 @@
         checkExpansion();
       }
     }
+    if (MEMTABLE_TOPK_SIZE != 0) {
+      sort(Math.max(0, start - MEMTABLE_TOPK_SIZE), rowCount);
+      int index = Math.max(0, rowCount - MEMTABLE_TOPK_SIZE - 1);
+      topKTime = Math.max(topKTime, getTime(index));
+    }
   }
 
   // move null values to the end of time array and value array, then return number of null values
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
index 5e6a9de..d60067a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
@@ -30,12 +30,17 @@
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
   @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
+  @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
     int srcV = getValueIndex(src);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
index dcdcbc2..ee4e6f9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
@@ -42,12 +42,17 @@
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
   @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
+  @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
     Binary srcV = getBinary(src);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
index e4fbfae..c7b4347 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
@@ -39,12 +39,17 @@
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
   @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
+  @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
     boolean srcV = getBoolean(src);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
index 852844f..584c8f8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
@@ -39,12 +39,17 @@
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
   @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
+  @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
     double srcV = getDouble(src);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
index 409a00a..9048525 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
@@ -39,12 +39,17 @@
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
   @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
+  @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
     float srcV = getFloat(src);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
index bc44cf7..d24842b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
@@ -22,12 +22,17 @@
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
   @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
+  @Override
   public void swap(int p, int q) {
     int valueP = getInt(p);
     long timeP = getTime(p);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
index 16d8fab..3d4f1ab 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
@@ -39,12 +39,17 @@
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
   @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
+  @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
     long srcV = getLong(src);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
index 8229781..9bad94d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
@@ -58,7 +58,7 @@
   //        }
   //    }
 
-  default void qsort(int lo, int hi) {
+  default void qSort(int lo, int hi) {
     if (lo < hi) {
       // TODO: use insertion sort in smaller array
       // if(hi - lo <= 32) {
@@ -66,8 +66,8 @@
       // }
       // partition
       int pivotIndex = partition(lo, hi);
-      qsort(lo, pivotIndex - 1);
-      qsort(pivotIndex + 1, hi);
+      qSort(lo, pivotIndex - 1);
+      qSort(pivotIndex + 1, hi);
     }
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 246102b..788af81 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -42,6 +42,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
+import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE;
 import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
 
@@ -58,6 +59,8 @@
 
   protected boolean sorted = true;
   protected long maxTime;
+  protected long topKTime;
+  protected int sortCount;
   // record reference count of this tv list
   // currently this reference will only be increase because we can't know when to decrease it
   protected AtomicInteger referenceCount;
@@ -66,7 +69,9 @@
   protected TVList() {
     timestamps = new ArrayList<>();
     rowCount = 0;
+    sortCount = 0;
     maxTime = Long.MIN_VALUE;
+    topKTime = Long.MIN_VALUE;
     referenceCount = new AtomicInteger();
   }
 
@@ -109,6 +114,8 @@
 
   public abstract void sort();
 
+  public abstract void sort(int lo, int hi);
+
   public void increaseReferenceCount() {
     referenceCount.incrementAndGet();
   }
@@ -214,6 +221,10 @@
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public Object getAlignedValue(int index) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
@@ -231,6 +242,10 @@
     return maxTime;
   }
 
+  public long getTopKTime() {
+    return topKTime;
+  }
+
   public long getVersion() {
     return version;
   }
@@ -330,6 +345,7 @@
     for (int i = start; i < end; i++) {
       inPutMinTime = Math.min(inPutMinTime, time[i]);
       maxTime = Math.max(maxTime, time[i]);
+      topKTime = Math.max(topKTime, time[Math.max(0, i - MEMTABLE_TOPK_SIZE)]);
       if (inputSorted && i < length - 1 && time[i] > time[i + 1]) {
         inputSorted = false;
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
index 7c6fa7f..819b00e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
@@ -50,7 +50,7 @@
           (int[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      this.timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -58,7 +58,12 @@
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
index 03760fd..fccb64a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
@@ -44,14 +44,19 @@
       sortedValues =
           (Binary[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.TEXT, rowCount);
     }
-    sort(0, rowCount);
+    timSort(0, rowCount);
     clearSortedValue();
     clearSortedTime();
     sorted = true;
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
index 22b5647..04d43f3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
@@ -43,7 +43,7 @@
           (boolean[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.BOOLEAN, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
index 3c0aa82..543a4e2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
@@ -43,7 +43,7 @@
           (double[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.DOUBLE, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
index 2b1fb44..d360959 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
@@ -44,7 +44,7 @@
           (float[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.FLOAT, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -52,7 +52,12 @@
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
index b3ae939..d2f153e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
@@ -33,18 +33,22 @@
 
   @Override
   public void sort() {
+    sort(0, rowCount);
+  }
+
+  @Override
+  public void sort(int lo, int hi) {
+    int len = hi - lo;
     if (sortedTimestamps == null
-        || sortedTimestamps.length < PrimitiveArrayManager.getArrayRowCount(rowCount)) {
+        || sortedTimestamps.length < PrimitiveArrayManager.getArrayRowCount(len)) {
       sortedTimestamps =
-          (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount);
+          (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, len);
     }
-    if (sortedValues == null
-        || sortedValues.length < PrimitiveArrayManager.getArrayRowCount(rowCount)) {
-      sortedValues =
-          (int[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount);
+    if (sortedValues == null || sortedValues.length < PrimitiveArrayManager.getArrayRowCount(len)) {
+      sortedValues = (int[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, len);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, len);
     }
     clearSortedValue();
     clearSortedTime();
@@ -52,7 +56,7 @@
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
index e3a02e4..9096cd4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
@@ -43,7 +43,7 @@
           (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
index 6358442..0426fda 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
@@ -27,7 +27,7 @@
   int SMALL_ARRAY_LENGTH = 32;
 
   /** the same as the 'set' function in TVList, the reason is to avoid two equal functions. */
-  void tim_set(int src, int dest);
+  void timSet(int src, int dest);
 
   void setFromSorted(int src, int dest);
 
@@ -55,7 +55,7 @@
    * the entrance of tim_sort; 1. array_size <= 32, use binary sort. 2. recursively invoke merge
    * sort.
    */
-  default void sort(int lo, int hi) {
+  default void timSort(int lo, int hi) {
     if (lo == hi) {
       return;
     }
@@ -65,8 +65,8 @@
       return;
     }
     int mid = (lo + hi) >>> 1;
-    sort(lo, mid);
-    sort(mid, hi);
+    timSort(lo, mid);
+    timSort(mid, hi);
     merge(lo, mid, hi);
   }
 
@@ -127,7 +127,7 @@
        */
       int n = start - left; // The number of elements to move
       for (int i = n; i >= 1; i--) {
-        tim_set(left + i - 1, left + i);
+        timSet(left + i - 1, left + i);
       }
       setPivotTo(left);
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
new file mode 100644
index 0000000..e1d8d72
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.utils.datastructure;
+
+public class TopkDivideMemoryNotEnoughException extends Exception {
+
+  public TopkDivideMemoryNotEnoughException(String message) {
+    super(message);
+  }
+}
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 277e7c3..6e971ec 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -516,6 +516,11 @@
 # BACKWARD: backward sort
 # tvlist_sort_algorithm=TIM
 
+# The number of points stay in the next tvlist
+# The default waiting size is 0
+# Datatype: int
+# seq_memtable_topk_size=0
+
 # When the average point number of timeseries in memtable exceeds this, the memtable is flushed to disk. The default threshold is 100000.
 # Datatype: int
 # avg_series_point_number_threshold=100000