Limit direct buffer memory cost when create consensus region (#12431)

diff --git a/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.bat b/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.bat
index 1289011..569374a 100755
--- a/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.bat
+++ b/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.bat
@@ -215,6 +215,7 @@
  -DTSFILE_HOME="%IOTDB_HOME%"^
  -DTSFILE_CONF="%IOTDB_CONF%"^
  -DIOTDB_CONF="%IOTDB_CONF%"^
+ -DOFF_HEAP_MEMORY="%OFF_HEAP_MEMORY%"^
  -Dsun.jnu.encoding=UTF-8^
  -Dfile.encoding=UTF-8
 
diff --git a/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.sh b/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.sh
index 81bc9e1..351f6de 100755
--- a/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.sh
+++ b/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.sh
@@ -163,6 +163,7 @@
 	iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
 	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
 	iotdb_parms="$iotdb_parms -DIOTDB_LOG_DIR=${IOTDB_LOG_DIR}"
+	iotdb_parms="$iotdb_parms -DOFF_HEAP_MEMORY=${OFF_HEAP_MEMORY}"
 
 	  if [ "x$pidfile" != "x" ]; then
        iotdb_parms="$iotdb_parms -Diotdb-pidfile=$pidfile"
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7c5e852..e46cc2e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -189,6 +189,9 @@
   /** When inserting rejected exceeds this, throw an exception. Unit: millisecond */
   private int maxWaitingTimeWhenInsertBlockedInMs = 10000;
 
+  /** off heap memory bytes from env */
+  private long maxOffHeapMemoryBytes = 0;
+
   // region Write Ahead Log Configuration
   /** Write mode of wal */
   private volatile WALMode walMode = WALMode.ASYNC;
@@ -211,6 +214,9 @@
   /** Buffer size of each wal node. Unit: byte */
   private int walBufferSize = 32 * 1024 * 1024;
 
+  /** max total wal buffer off heap memory size proportion */
+  private double maxWalBufferOffHeapMemorySizeProportion = 0.5;
+
   /** Blocking queue capacity of each wal buffer */
   private int walBufferQueueCapacity = 500;
 
@@ -1819,6 +1825,15 @@
     this.walBufferSize = walBufferSize;
   }
 
+  public double getMaxWalBufferOffHeapMemorySizeProportion() {
+    return maxWalBufferOffHeapMemorySizeProportion;
+  }
+
+  public void setMaxWalBufferOffHeapMemorySizeProportion(
+      double maxWalBufferOffHeapMemorySizeProportion) {
+    this.maxWalBufferOffHeapMemorySizeProportion = maxWalBufferOffHeapMemorySizeProportion;
+  }
+
   public int getWalBufferQueueCapacity() {
     return walBufferQueueCapacity;
   }
@@ -2597,6 +2612,14 @@
     this.maxWaitingTimeWhenInsertBlockedInMs = maxWaitingTimeWhenInsertBlocked;
   }
 
+  public void setMaxOffHeapMemoryBytes(long maxOffHeapMemoryBytes) {
+    this.maxOffHeapMemoryBytes = maxOffHeapMemoryBytes;
+  }
+
+  public long getMaxOffHeapMemoryBytes() {
+    return maxOffHeapMemoryBytes;
+  }
+
   public long getSlowQueryThreshold() {
     return slowQueryThreshold;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d4c600c..690ad97 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -46,6 +46,7 @@
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
 import org.apache.iotdb.external.api.IPropertiesLoader;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -419,6 +420,9 @@
                 "max_waiting_time_when_insert_blocked",
                 Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
 
+    String offHeapMemoryStr = System.getProperty("OFF_HEAP_MEMORY");
+    conf.setMaxOffHeapMemoryBytes(MemUtils.strToBytesCnt(offHeapMemoryStr));
+
     conf.setIoTaskQueueSizeForFlushing(
         Integer.parseInt(
             properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index b4d05b3..d5b7f58 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -85,6 +85,7 @@
 import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IPreDeleteLogicalViewPlan;
 import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IRollbackPreDeleteLogicalViewPlan;
 import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.SchemaUtils;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -190,6 +191,18 @@
       return;
     }
 
+    if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax();
+      if (!SystemInfo.getInstance()
+          .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) {
+        throw new MetadataException(
+            "Total allocated memory for direct buffer will be "
+                + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost)
+                + ", which is greater than limit mem cost: "
+                + SystemInfo.getInstance().getTotalDirectBufferMemorySizeLimit());
+      }
+    }
+
     initDir();
 
     try {
@@ -403,6 +416,10 @@
 
     // delete all the schema region files
     SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
+    if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      SystemInfo.getInstance()
+          .decreaseDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax());
+    }
   }
 
   // currently, this method is only used for cluster-ratis mode
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index ec16026..ad3ee7a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -85,6 +85,7 @@
 import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IAlterLogicalViewPlan;
 import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.ICreateLogicalViewPlan;
 import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.SchemaUtils;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -187,6 +188,18 @@
       return;
     }
 
+    if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax();
+      if (!SystemInfo.getInstance()
+          .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) {
+        throw new MetadataException(
+            "Total allocated memory for direct buffer will be "
+                + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost)
+                + ", which is greater than limit mem cost: "
+                + SystemInfo.getInstance().getTotalDirectBufferMemorySizeLimit());
+      }
+    }
+
     initDir();
 
     try {
@@ -462,6 +475,11 @@
 
     // delete all the schema region files
     SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
+
+    if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      SystemInfo.getInstance()
+          .decreaseDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax());
+    }
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d193c9f..1ba6c47 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -97,6 +97,7 @@
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
 import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
@@ -292,6 +293,8 @@
    */
   private String insertWriteLockHolder = "";
 
+  private volatile long directBufferMemoryCost = 0;
+
   private final AtomicBoolean isCompactionSelecting = new AtomicBoolean(false);
 
   private static final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET =
@@ -314,6 +317,7 @@
     this.dataRegionId = dataRegionId;
     this.databaseName = databaseName;
     this.fileFlushPolicy = fileFlushPolicy;
+    acquireDirectBufferMemory();
 
     storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId);
     this.tsFileManager =
@@ -3456,12 +3460,38 @@
     writeLock("markDeleted");
     try {
       deleted = true;
+      releaseDirectBufferMemory();
       deletedCondition.signalAll();
     } finally {
       writeUnlock();
     }
   }
 
+  private void acquireDirectBufferMemory() throws DataRegionException {
+    long acquireDirectBufferMemCost = 0;
+    if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
+      acquireDirectBufferMemCost = config.getWalBufferSize();
+    } else if (config
+        .getDataRegionConsensusProtocolClass()
+        .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      acquireDirectBufferMemCost = config.getDataRatisConsensusLogAppenderBufferSizeMax();
+    }
+    if (!SystemInfo.getInstance().addDirectBufferMemoryCost(acquireDirectBufferMemCost)) {
+      throw new DataRegionException(
+          "Total allocated memory for direct buffer will be "
+              + (SystemInfo.getInstance().getDirectBufferMemoryCost() + acquireDirectBufferMemCost)
+              + ", which is greater than limit mem cost: "
+              + SystemInfo.getInstance().getTotalDirectBufferMemorySizeLimit());
+    }
+    this.directBufferMemoryCost = acquireDirectBufferMemCost;
+  }
+
+  private void releaseDirectBufferMemory() {
+    SystemInfo.getInstance().decreaseDirectBufferMemoryCost(directBufferMemoryCost);
+    // avoid repeated deletion
+    this.directBufferMemoryCost = 0;
+  }
+
   /* Be careful, the thread that calls this method may not hold the write lock!!*/
   public void degradeFlushTimeMap(long timePartitionId) {
     lastFlushTimeMap.degradeLastFlushTime(timePartitionId);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 51e9fd0e..09bc8d9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -53,9 +53,11 @@
 
   private long memorySizeForMemtable;
   private long memorySizeForCompaction;
+  private long totalDirectBufferMemorySizeLimit;
   private Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
 
   private long flushingMemTablesCost = 0L;
+  private final AtomicLong directBufferMemoryCost = new AtomicLong(0);
   private final AtomicLong compactionMemoryCost = new AtomicLong(0L);
   private final AtomicLong seqInnerSpaceCompactionMemoryCost = new AtomicLong(0L);
   private final AtomicLong unseqInnerSpaceCompactionMemoryCost = new AtomicLong(0L);
@@ -192,6 +194,30 @@
     this.flushingMemTablesCost -= flushingMemTableCost;
   }
 
+  public boolean addDirectBufferMemoryCost(long size) {
+    while (true) {
+      long memCost = directBufferMemoryCost.get();
+      if (memCost + size > totalDirectBufferMemorySizeLimit) {
+        return false;
+      }
+      if (directBufferMemoryCost.compareAndSet(memCost, memCost + size)) {
+        return true;
+      }
+    }
+  }
+
+  public void decreaseDirectBufferMemoryCost(long size) {
+    directBufferMemoryCost.addAndGet(-size);
+  }
+
+  public long getTotalDirectBufferMemorySizeLimit() {
+    return totalDirectBufferMemorySizeLimit;
+  }
+
+  public long getDirectBufferMemoryCost() {
+    return directBufferMemoryCost.get();
+  }
+
   public boolean addCompactionFileNum(int fileNum, long timeOutInSecond)
       throws InterruptedException, CompactionFileCountExceededException {
     if (fileNum > totalFileLimitForCompactionTask) {
@@ -352,6 +378,14 @@
   }
 
   public void allocateWriteMemory() {
+    // when we can't get the OffHeapMemory variable from environment, it will be 0
+    // and the limit should not be effective
+    totalDirectBufferMemorySizeLimit =
+        config.getMaxOffHeapMemoryBytes() == 0
+            ? Long.MAX_VALUE
+            : (long)
+                (config.getMaxOffHeapMemoryBytes()
+                    * config.getMaxWalBufferOffHeapMemorySizeProportion());
     memorySizeForMemtable =
         (long)
             (config.getAllocateMemoryForStorageEngine() * config.getWriteProportionForMemtable());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index faf8c4d..f6a0143 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -201,4 +201,24 @@
     cnt = cnt % IoTDBConstant.KB;
     return gbs + " GB " + mbs + " MB " + kbs + " KB " + cnt + " B";
   }
+
+  public static long strToBytesCnt(String str) {
+    if (str == null) {
+      return 0;
+    }
+    str = str.toLowerCase();
+    if (!str.endsWith("b")) {
+      str += "b";
+    }
+    long unit = 1;
+    if (str.endsWith("kb")) {
+      unit = IoTDBConstant.KB;
+    } else if (str.endsWith("mb")) {
+      unit = IoTDBConstant.MB;
+    } else if (str.endsWith("gb")) {
+      unit = IoTDBConstant.GB;
+    }
+    str = str.replaceAll("\\D", "");
+    return Long.parseLong(str) * unit;
+  }
 }