Limit direct buffer memory cost when create consensus region (#12431)
diff --git a/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.bat b/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.bat
index 1289011..569374a 100755
--- a/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.bat
+++ b/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.bat
@@ -215,6 +215,7 @@
-DTSFILE_HOME="%IOTDB_HOME%"^
-DTSFILE_CONF="%IOTDB_CONF%"^
-DIOTDB_CONF="%IOTDB_CONF%"^
+ -DOFF_HEAP_MEMORY="%OFF_HEAP_MEMORY%"^
-Dsun.jnu.encoding=UTF-8^
-Dfile.encoding=UTF-8
diff --git a/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.sh b/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.sh
index 81bc9e1..351f6de 100755
--- a/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.sh
+++ b/iotdb-core/datanode/src/assembly/resources/sbin/start-datanode.sh
@@ -163,6 +163,7 @@
iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
iotdb_parms="$iotdb_parms -DIOTDB_LOG_DIR=${IOTDB_LOG_DIR}"
+ iotdb_parms="$iotdb_parms -DOFF_HEAP_MEMORY=${OFF_HEAP_MEMORY}"
if [ "x$pidfile" != "x" ]; then
iotdb_parms="$iotdb_parms -Diotdb-pidfile=$pidfile"
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7c5e852..e46cc2e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -189,6 +189,9 @@
/** When inserting rejected exceeds this, throw an exception. Unit: millisecond */
private int maxWaitingTimeWhenInsertBlockedInMs = 10000;
+ /** off heap memory bytes from env */
+ private long maxOffHeapMemoryBytes = 0;
+
// region Write Ahead Log Configuration
/** Write mode of wal */
private volatile WALMode walMode = WALMode.ASYNC;
@@ -211,6 +214,9 @@
/** Buffer size of each wal node. Unit: byte */
private int walBufferSize = 32 * 1024 * 1024;
+ /** max total wal buffer off heap memory size proportion */
+ private double maxWalBufferOffHeapMemorySizeProportion = 0.5;
+
/** Blocking queue capacity of each wal buffer */
private int walBufferQueueCapacity = 500;
@@ -1819,6 +1825,15 @@
this.walBufferSize = walBufferSize;
}
+ public double getMaxWalBufferOffHeapMemorySizeProportion() {
+ return maxWalBufferOffHeapMemorySizeProportion;
+ }
+
+ public void setMaxWalBufferOffHeapMemorySizeProportion(
+ double maxWalBufferOffHeapMemorySizeProportion) {
+ this.maxWalBufferOffHeapMemorySizeProportion = maxWalBufferOffHeapMemorySizeProportion;
+ }
+
public int getWalBufferQueueCapacity() {
return walBufferQueueCapacity;
}
@@ -2597,6 +2612,14 @@
this.maxWaitingTimeWhenInsertBlockedInMs = maxWaitingTimeWhenInsertBlocked;
}
+ public void setMaxOffHeapMemoryBytes(long maxOffHeapMemoryBytes) {
+ this.maxOffHeapMemoryBytes = maxOffHeapMemoryBytes;
+ }
+
+ public long getMaxOffHeapMemoryBytes() {
+ return maxOffHeapMemoryBytes;
+ }
+
public long getSlowQueryThreshold() {
return slowQueryThreshold;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d4c600c..690ad97 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -46,6 +46,7 @@
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
import org.apache.iotdb.external.api.IPropertiesLoader;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -419,6 +420,9 @@
"max_waiting_time_when_insert_blocked",
Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
+ String offHeapMemoryStr = System.getProperty("OFF_HEAP_MEMORY");
+ conf.setMaxOffHeapMemoryBytes(MemUtils.strToBytesCnt(offHeapMemoryStr));
+
conf.setIoTaskQueueSizeForFlushing(
Integer.parseInt(
properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index b4d05b3..d5b7f58 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -85,6 +85,7 @@
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IPreDeleteLogicalViewPlan;
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IRollbackPreDeleteLogicalViewPlan;
import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -190,6 +191,18 @@
return;
}
+ if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax();
+ if (!SystemInfo.getInstance()
+ .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) {
+ throw new MetadataException(
+ "Total allocated memory for direct buffer will be "
+ + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost)
+ + ", which is greater than limit mem cost: "
+ + SystemInfo.getInstance().getTotalDirectBufferMemorySizeLimit());
+ }
+ }
+
initDir();
try {
@@ -403,6 +416,10 @@
// delete all the schema region files
SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
+ if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ SystemInfo.getInstance()
+ .decreaseDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax());
+ }
}
// currently, this method is only used for cluster-ratis mode
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index ec16026..ad3ee7a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -85,6 +85,7 @@
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IAlterLogicalViewPlan;
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.ICreateLogicalViewPlan;
import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -187,6 +188,18 @@
return;
}
+ if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax();
+ if (!SystemInfo.getInstance()
+ .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) {
+ throw new MetadataException(
+ "Total allocated memory for direct buffer will be "
+ + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost)
+ + ", which is greater than limit mem cost: "
+ + SystemInfo.getInstance().getTotalDirectBufferMemorySizeLimit());
+ }
+ }
+
initDir();
try {
@@ -462,6 +475,11 @@
// delete all the schema region files
SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
+
+ if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ SystemInfo.getInstance()
+ .decreaseDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax());
+ }
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d193c9f..1ba6c47 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -97,6 +97,7 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
@@ -292,6 +293,8 @@
*/
private String insertWriteLockHolder = "";
+ private volatile long directBufferMemoryCost = 0;
+
private final AtomicBoolean isCompactionSelecting = new AtomicBoolean(false);
private static final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET =
@@ -314,6 +317,7 @@
this.dataRegionId = dataRegionId;
this.databaseName = databaseName;
this.fileFlushPolicy = fileFlushPolicy;
+ acquireDirectBufferMemory();
storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId);
this.tsFileManager =
@@ -3456,12 +3460,38 @@
writeLock("markDeleted");
try {
deleted = true;
+ releaseDirectBufferMemory();
deletedCondition.signalAll();
} finally {
writeUnlock();
}
}
+ private void acquireDirectBufferMemory() throws DataRegionException {
+ long acquireDirectBufferMemCost = 0;
+ if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
+ acquireDirectBufferMemCost = config.getWalBufferSize();
+ } else if (config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ acquireDirectBufferMemCost = config.getDataRatisConsensusLogAppenderBufferSizeMax();
+ }
+ if (!SystemInfo.getInstance().addDirectBufferMemoryCost(acquireDirectBufferMemCost)) {
+ throw new DataRegionException(
+ "Total allocated memory for direct buffer will be "
+ + (SystemInfo.getInstance().getDirectBufferMemoryCost() + acquireDirectBufferMemCost)
+ + ", which is greater than limit mem cost: "
+ + SystemInfo.getInstance().getTotalDirectBufferMemorySizeLimit());
+ }
+ this.directBufferMemoryCost = acquireDirectBufferMemCost;
+ }
+
+ private void releaseDirectBufferMemory() {
+ SystemInfo.getInstance().decreaseDirectBufferMemoryCost(directBufferMemoryCost);
+ // avoid repeated deletion
+ this.directBufferMemoryCost = 0;
+ }
+
/* Be careful, the thread that calls this method may not hold the write lock!!*/
public void degradeFlushTimeMap(long timePartitionId) {
lastFlushTimeMap.degradeLastFlushTime(timePartitionId);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 51e9fd0e..09bc8d9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -53,9 +53,11 @@
private long memorySizeForMemtable;
private long memorySizeForCompaction;
+ private long totalDirectBufferMemorySizeLimit;
private Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
private long flushingMemTablesCost = 0L;
+ private final AtomicLong directBufferMemoryCost = new AtomicLong(0);
private final AtomicLong compactionMemoryCost = new AtomicLong(0L);
private final AtomicLong seqInnerSpaceCompactionMemoryCost = new AtomicLong(0L);
private final AtomicLong unseqInnerSpaceCompactionMemoryCost = new AtomicLong(0L);
@@ -192,6 +194,30 @@
this.flushingMemTablesCost -= flushingMemTableCost;
}
+ public boolean addDirectBufferMemoryCost(long size) {
+ while (true) {
+ long memCost = directBufferMemoryCost.get();
+ if (memCost + size > totalDirectBufferMemorySizeLimit) {
+ return false;
+ }
+ if (directBufferMemoryCost.compareAndSet(memCost, memCost + size)) {
+ return true;
+ }
+ }
+ }
+
+ public void decreaseDirectBufferMemoryCost(long size) {
+ directBufferMemoryCost.addAndGet(-size);
+ }
+
+ public long getTotalDirectBufferMemorySizeLimit() {
+ return totalDirectBufferMemorySizeLimit;
+ }
+
+ public long getDirectBufferMemoryCost() {
+ return directBufferMemoryCost.get();
+ }
+
public boolean addCompactionFileNum(int fileNum, long timeOutInSecond)
throws InterruptedException, CompactionFileCountExceededException {
if (fileNum > totalFileLimitForCompactionTask) {
@@ -352,6 +378,14 @@
}
public void allocateWriteMemory() {
+ // when we can't get the OffHeapMemory variable from environment, it will be 0
+ // and the limit should not be effective
+ totalDirectBufferMemorySizeLimit =
+ config.getMaxOffHeapMemoryBytes() == 0
+ ? Long.MAX_VALUE
+ : (long)
+ (config.getMaxOffHeapMemoryBytes()
+ * config.getMaxWalBufferOffHeapMemorySizeProportion());
memorySizeForMemtable =
(long)
(config.getAllocateMemoryForStorageEngine() * config.getWriteProportionForMemtable());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index faf8c4d..f6a0143 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -201,4 +201,24 @@
cnt = cnt % IoTDBConstant.KB;
return gbs + " GB " + mbs + " MB " + kbs + " KB " + cnt + " B";
}
+
+ public static long strToBytesCnt(String str) {
+ if (str == null) {
+ return 0;
+ }
+ str = str.toLowerCase();
+ if (!str.endsWith("b")) {
+ str += "b";
+ }
+ long unit = 1;
+ if (str.endsWith("kb")) {
+ unit = IoTDBConstant.KB;
+ } else if (str.endsWith("mb")) {
+ unit = IoTDBConstant.MB;
+ } else if (str.endsWith("gb")) {
+ unit = IoTDBConstant.GB;
+ }
+ str = str.replaceAll("\\D", "");
+ return Long.parseLong(str) * unit;
+ }
}