Lift the storage limit for tag and attribute management (#12447)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 496e9e5..f6f291a 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -426,6 +426,24 @@
     return this;
   }
 
+  @Override
+  public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
+    setProperty("tag_attribute_total_size", String.valueOf(tagAttributeTotalSize));
+    return this;
+  }
+
+  @Override
+  public CommonConfig setTagAttributeMaxNum(int tagAttributeMaxNum) {
+    setProperty("tag_attribute_max_num", String.valueOf(tagAttributeMaxNum));
+    return this;
+  }
+
+  @Override
+  public CommonConfig setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize) {
+    setProperty("tag_attribute_entry_max_size", String.valueOf(tagAttributeEntryMaxSize));
+    return this;
+  }
+
   // For part of the log directory
   public String getClusterConfigStr() {
     return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index cfbe181..fd1d7b2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -431,4 +431,25 @@
     cnConfig.setWalMode(walMode);
     return this;
   }
+
+  @Override
+  public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
+    dnConfig.setTagAttributeTotalSize(tagAttributeTotalSize);
+    cnConfig.setTagAttributeTotalSize(tagAttributeTotalSize);
+    return this;
+  }
+
+  @Override
+  public CommonConfig setTagAttributeMaxNum(int tagAttributeMaxNum) {
+    dnConfig.setTagAttributeMaxNum(tagAttributeMaxNum);
+    cnConfig.setTagAttributeMaxNum(tagAttributeMaxNum);
+    return this;
+  }
+
+  @Override
+  public CommonConfig setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize) {
+    dnConfig.setTagAttributeEntryMaxSize(tagAttributeEntryMaxSize);
+    cnConfig.setTagAttributeEntryMaxSize(tagAttributeEntryMaxSize);
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 0495edb..665e59e 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -303,4 +303,19 @@
   public CommonConfig setWalMode(String walMode) {
     return this;
   }
+
+  @Override
+  public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
+    return this;
+  }
+
+  @Override
+  public CommonConfig setTagAttributeMaxNum(int tagAttributeMaxNum) {
+    return this;
+  }
+
+  @Override
+  public CommonConfig setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize) {
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index fa0e699..28c37f1 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -135,4 +135,10 @@
   CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecutionTimeSliceInMs);
 
   CommonConfig setWalMode(String walMode);
+
+  CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize);
+
+  CommonConfig setTagAttributeMaxNum(int tagAttributeMaxNum);
+
+  CommonConfig setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize);
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBTagLimitIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBTagLimitIT.java
new file mode 100644
index 0000000..5a9f7d5
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBTagLimitIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.schema;
+
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.util.AbstractSchemaIT;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBTagLimitIT extends AbstractSchemaIT {
+
+  public IoTDBTagLimitIT(SchemaTestMode schemaTestMode) {
+    super(schemaTestMode);
+  }
+
+  protected static final String[] SQLs =
+      new String[] {
+        "create database root.turbine",
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setTagAttributeTotalSize(50);
+    EnvFactory.getEnv().getConfig().getCommonConfig().setTagAttributeMaxNum(5);
+    EnvFactory.getEnv().getConfig().getCommonConfig().setTagAttributeEntryMaxSize(30);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(SQLs);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testBasicFunctionInMultiTagAttributeTotalSize() throws Exception {
+    List<List<String>> rets = new java.util.ArrayList<>();
+    List<String> ret1 =
+        Collections.singletonList(
+            "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+                + "{\"oldTag1\":\"oldV1\",\"tag2\":\"v2\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+    rets.add(ret1);
+    List<String> ret2 =
+        Collections.singletonList(
+            "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+                + "{\"tag1\":\"oldV1\",\"tag2\":\"v2\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+    rets.add(ret2);
+    List<String> ret3 =
+        Collections.singletonList(
+            "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+                + "{\"tag1\":\"v1\",\"tag2\":\"v2\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+    rets.add(ret3);
+    List<String> ret4 =
+        Collections.singletonList(
+            "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+                + "{\"tag1\":\"v1\",\"tag4\":\"v4\",\"tag5\":\"v5\",\"tag2\":\"v2\",\"tag3\":\"v3\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+    rets.add(ret4);
+    List<String> ret5 =
+        Collections.singletonList(
+            "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+                + "{\"tag4\":\"v4\",\"tag1\":\"v1\",\"tag5\":\"v5\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+    rets.add(ret5);
+
+    List<String> sqls = new java.util.ArrayList<>();
+    String sql1 =
+        "create timeseries root.turbine.d1.s1(temperature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY "
+            + "tags('oldTag1'='oldV1', 'tag2'='v2') "
+            + "attributes('att1'='a1', 'att2'='a2')";
+    sqls.add(sql1);
+    String sql2 = "ALTER timeseries root.turbine.d1.s1 RENAME oldTag1 TO tag1";
+    sqls.add(sql2);
+    String sql3 = "ALTER timeseries root.turbine.d1.s1 SET tag1=v1";
+    sqls.add(sql3);
+    String sql4 = "ALTER timeseries root.turbine.d1.s1 ADD TAGS tag3=v3, tag4=v4, tag5=v5";
+    sqls.add(sql4);
+    String sql5 = "ALTER timeseries root.turbine.d1.s1 DROP tag2, tag3";
+    sqls.add(sql5);
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < sqls.size(); i++) {
+        statement.execute(sqls.get(i));
+        ResultSet resultSet = statement.executeQuery("show timeseries");
+        int count = 0;
+        try {
+          while (resultSet.next()) {
+            String ans =
+                resultSet.getString(ColumnHeaderConstant.TIMESERIES)
+                    + ","
+                    + resultSet.getString(ColumnHeaderConstant.ALIAS)
+                    + ","
+                    + resultSet.getString(ColumnHeaderConstant.DATABASE)
+                    + ","
+                    + resultSet.getString(ColumnHeaderConstant.DATATYPE)
+                    + ","
+                    + resultSet.getString(ColumnHeaderConstant.ENCODING)
+                    + ","
+                    + resultSet.getString(ColumnHeaderConstant.COMPRESSION)
+                    + ","
+                    + resultSet.getString(ColumnHeaderConstant.TAGS)
+                    + ","
+                    + resultSet.getString(ColumnHeaderConstant.ATTRIBUTES);
+            assertTrue(rets.get(i).contains(ans));
+            count++;
+          }
+        } finally {
+          resultSet.close();
+        }
+        assertEquals(rets.get(i).size(), count);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+    clearSchema();
+  }
+
+  @Test
+  public void testMapSizeAndEntryLimit() throws Exception {
+    List<String> sqls = new java.util.ArrayList<>();
+    String sql1 =
+        "create timeseries root.turbine.d1.s1 with datatype=FLOAT, encoding=RLE, compression=SNAPPY "
+            + "tags('tag1'='v1', 'tag2'='v2', 'tag3'='v3', 'tag4'='v4', 'tag5'='v5', 'tag6'='v6') "
+            + "attributes('att1'='a1', 'att2'='a2')";
+    sqls.add(sql1);
+    String sql2 =
+        "create timeseries root.turbine.d1.s2 with datatype=FLOAT, encoding=RLE, compression=SNAPPY "
+            + "tags('tag1tag2tag3tag4tag5'='v1v2v3v4v5') "
+            + "attributes('att1'='a1', 'att2'='a2')";
+    sqls.add(sql2);
+
+    List<String> errorMessages = new java.util.ArrayList<>();
+    String errorMessage1 =
+        TSStatusCode.METADATA_ERROR.getStatusCode()
+            + ": the emtry num of the map is over the tagAttributeMaxNum";
+    errorMessages.add(errorMessage1);
+    String errorMessage2 =
+        TSStatusCode.METADATA_ERROR.getStatusCode()
+            + ": An emtry of the map has a size which is over the tagAttributeMaxSize";
+    errorMessages.add(errorMessage2);
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < sqls.size(); i++) {
+        try {
+          statement.execute(sqls.get(i));
+          fail();
+        } catch (SQLException e) {
+          Assert.assertEquals(errorMessages.get(i), e.getMessage());
+        }
+      }
+    }
+    clearSchema();
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 5e6cb55..b4d05b3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -196,7 +196,7 @@
       // do not write log when recover
       isRecovering = true;
 
-      tagManager = new TagManager(schemaRegionDirPath);
+      tagManager = new TagManager(schemaRegionDirPath, regionStatistics);
       mtree =
           new MTreeBelowSGMemoryImpl(
               new PartialPath(storageGroupFullPath),
@@ -455,7 +455,8 @@
       isRecovering = true;
 
       long tagSnapshotStartTime = System.currentTimeMillis();
-      tagManager = TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath);
+      tagManager =
+          TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath, regionStatistics);
       logger.info(
           "Tag snapshot loading of schemaRegion {} costs {}ms.",
           schemaRegionId,
@@ -992,6 +993,10 @@
       throws MetadataException, IOException {
     // upsert alias
     upsertAlias(alias, fullPath);
+    if (tagsMap != null && !regionStatistics.isAllowToCreateNewSeries()) {
+      throw new SeriesOverflowException(
+          regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+    }
     IMeasurementMNode<IMemMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
     if (tagsMap == null && attributesMap == null) {
       return;
@@ -1014,9 +1019,15 @@
 
   private void upsertAlias(String alias, PartialPath fullPath)
       throws MetadataException, IOException {
-    if (mtree.changeAlias(alias, fullPath)) {
-      // persist to WAL
-      writeToMLog(SchemaRegionWritePlanFactory.getChangeAliasPlan(fullPath, alias));
+    if (alias != null) {
+      if (!regionStatistics.isAllowToCreateNewSeries()) {
+        throw new SeriesOverflowException(
+            regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+      }
+      if (mtree.changeAlias(alias, fullPath)) {
+        // persist to WAL
+        writeToMLog(SchemaRegionWritePlanFactory.getChangeAliasPlan(fullPath, alias));
+      }
     }
   }
 
@@ -1053,6 +1064,10 @@
   @Override
   public void addTags(Map<String, String> tagsMap, PartialPath fullPath)
       throws MetadataException, IOException {
+    if (!regionStatistics.isAllowToCreateNewSeries()) {
+      throw new SeriesOverflowException(
+          regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+    }
     IMeasurementMNode<IMemMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
     // no tag or attribute, we need to add a new record in log
     if (leafMNode.getOffset() < 0) {
@@ -1096,6 +1111,10 @@
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
       throws MetadataException, IOException {
+    if (!regionStatistics.isAllowToCreateNewSeries()) {
+      throw new SeriesOverflowException(
+          regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+    }
     IMeasurementMNode<IMemMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
     if (leafMNode.getOffset() < 0) {
       throw new MetadataException(
@@ -1119,6 +1138,10 @@
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
       throws MetadataException, IOException {
+    if (!regionStatistics.isAllowToCreateNewSeries()) {
+      throw new SeriesOverflowException(
+          regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+    }
     IMeasurementMNode<IMemMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
     if (leafMNode.getOffset() < 0) {
       throw new MetadataException(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 070594d..ec16026 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -193,7 +193,7 @@
       // do not write log when recover
       isRecovering = true;
 
-      tagManager = new TagManager(schemaRegionDirPath);
+      tagManager = new TagManager(schemaRegionDirPath, regionStatistics);
       mtree =
           new MTreeBelowSGCachedImpl(
               new PartialPath(storageGroupFullPath),
@@ -513,7 +513,8 @@
       isRecovering = true;
 
       long tagSnapshotStartTime = System.currentTimeMillis();
-      tagManager = TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath);
+      tagManager =
+          TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath, regionStatistics);
       logger.info(
           "Tag snapshot loading of schemaRegion {} costs {}ms.",
           schemaRegionId,
@@ -1076,6 +1077,9 @@
       throws MetadataException, IOException {
     // upsert alias
     upsertAlias(alias, fullPath);
+    while (tagsMap != null && !regionStatistics.isAllowToCreateNewSeries()) {
+      ReleaseFlushMonitor.getInstance().waitIfReleasing();
+    }
     IMeasurementMNode<ICachedMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
     try {
       if (tagsMap == null && attributesMap == null) {
@@ -1102,9 +1106,14 @@
 
   private void upsertAlias(String alias, PartialPath fullPath)
       throws MetadataException, IOException {
-    if (mtree.changeAlias(alias, fullPath)) {
-      // persist to WAL
-      writeToMLog(SchemaRegionWritePlanFactory.getChangeAliasPlan(fullPath, alias));
+    if (alias != null) {
+      while (!regionStatistics.isAllowToCreateNewSeries()) {
+        ReleaseFlushMonitor.getInstance().waitIfReleasing();
+      }
+      if (mtree.changeAlias(alias, fullPath)) {
+        // persist to WAL
+        writeToMLog(SchemaRegionWritePlanFactory.getChangeAliasPlan(fullPath, alias));
+      }
     }
   }
 
@@ -1144,6 +1153,9 @@
   @Override
   public void addTags(Map<String, String> tagsMap, PartialPath fullPath)
       throws MetadataException, IOException {
+    while (!regionStatistics.isAllowToCreateNewSeries()) {
+      ReleaseFlushMonitor.getInstance().waitIfReleasing();
+    }
     IMeasurementMNode<ICachedMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
     try {
       // no tag or attribute, we need to add a new record in log
@@ -1198,6 +1210,9 @@
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
       throws MetadataException, IOException {
+    while (!regionStatistics.isAllowToCreateNewSeries()) {
+      ReleaseFlushMonitor.getInstance().waitIfReleasing();
+    }
     IMeasurementMNode<ICachedMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
     try {
       if (leafMNode.getOffset() < 0) {
@@ -1225,6 +1240,9 @@
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
       throws MetadataException, IOException {
+    while (!regionStatistics.isAllowToCreateNewSeries()) {
+      ReleaseFlushMonitor.getInstance().waitIfReleasing();
+    }
     IMeasurementMNode<ICachedMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
     try {
       if (leafMNode.getOffset() < 0) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
index 7e2a7a0..2739648 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
@@ -37,7 +37,9 @@
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 public class TagLogFile implements AutoCloseable {
@@ -52,6 +54,11 @@
   private static final int MAX_LENGTH =
       CommonDescriptor.getInstance().getConfig().getTagAttributeTotalSize();
 
+  private static final int MAX_ENTRY_NUM =
+      CommonDescriptor.getInstance().getConfig().getTagAttributeMaxNum();
+  private static final int MAX_ENTRY_Size =
+      CommonDescriptor.getInstance().getConfig().getTagAttributeEntryMaxSize();
+
   private static final int RECORD_FLUSH_INTERVAL =
       IoTDBDescriptor.getInstance().getConfig().getTagAttributeFlushInterval();
   private int unFlushedRecordNum = 0;
@@ -95,22 +102,83 @@
    * @return tags map, attributes map
    * @throws IOException error occurred when reading disk
    */
-  public Pair<Map<String, String>, Map<String, String>> read(int size, long position)
-      throws IOException {
+  public Pair<Map<String, String>, Map<String, String>> read(long position) throws IOException {
     if (position < 0) {
       return new Pair<>(Collections.emptyMap(), Collections.emptyMap());
     }
-    ByteBuffer byteBuffer = ByteBuffer.allocate(size);
-    fileChannel.read(byteBuffer, position);
-    byteBuffer.flip();
+    ByteBuffer byteBuffer = parseByteBuffer(fileChannel, position);
     return new Pair<>(ReadWriteIOUtils.readMap(byteBuffer), ReadWriteIOUtils.readMap(byteBuffer));
   }
 
-  public Map<String, String> readTag(int size, long position) throws IOException {
-    ByteBuffer byteBuffer = ByteBuffer.allocate(size);
+  public Map<String, String> readTag(long position) throws IOException {
+    ByteBuffer byteBuffer = parseByteBuffer(fileChannel, position);
+    return ReadWriteIOUtils.readMap(byteBuffer);
+  }
+
+  public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position)
+      throws IOException {
+    // Read the first block
+    ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
     fileChannel.read(byteBuffer, position);
     byteBuffer.flip();
-    return ReadWriteIOUtils.readMap(byteBuffer);
+    if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
+      int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
+      byteBuffer.position(0);
+      if (firstInt < -1) { // This position is blockNum, the original data occupies multiple blocks
+        int blockNum = -firstInt;
+        ByteBuffer byteBuffers = ByteBuffer.allocate(blockNum * MAX_LENGTH);
+        byteBuffers.put(byteBuffer);
+        byteBuffers.position(4); // Skip blockNum
+        for (int i = 1; i < blockNum; i++) {
+          long nextPosition = ReadWriteIOUtils.readLong(byteBuffers);
+          // read one offset, then use filechannel's read to read it
+          byteBuffers.position(MAX_LENGTH * i);
+          byteBuffers.limit(MAX_LENGTH * (i + 1));
+          fileChannel.read(byteBuffers, nextPosition);
+          byteBuffers.position(4 + i * Long.BYTES);
+        }
+        byteBuffers.limit(byteBuffers.capacity());
+        return byteBuffers;
+      }
+    }
+    return byteBuffer;
+  }
+
+  private List<Long> parseOffsetList(long position) throws IOException {
+    List<Long> blockOffset = new ArrayList<>();
+    blockOffset.add(position);
+    // Read the first block
+    ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
+    fileChannel.read(byteBuffer, position);
+    byteBuffer.flip();
+    if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
+      int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
+      byteBuffer.position(0);
+      if (firstInt < -1) { // This position is blockNum, the original data occupies multiple blocks
+        int blockNum = -firstInt;
+        int blockOffsetStoreLen =
+            (((blockNum - 1) * Long.BYTES + 4) / MAX_LENGTH + 1)
+                * MAX_LENGTH; // blockOffset storage length
+        ByteBuffer blockBuffer = ByteBuffer.allocate(blockOffsetStoreLen);
+        blockBuffer.put(byteBuffer);
+        blockBuffer.position(4); // Skip blockNum
+
+        for (int i = 1; i < blockNum; i++) {
+          blockOffset.add(ReadWriteIOUtils.readLong(blockBuffer));
+          // Every time you read an offset, use filechannel's read to read it
+          if (MAX_LENGTH * (i + 1)
+              <= blockOffsetStoreLen) { // Compared with directly reading bytebuffer, some reading
+            // operations are reduced, only the content of offset is
+            // read
+            blockBuffer.position(MAX_LENGTH * i);
+            blockBuffer.limit(MAX_LENGTH * (i + 1));
+            fileChannel.read(blockBuffer, blockOffset.get(i));
+            blockBuffer.position(4 + i * Long.BYTES);
+          }
+        }
+      }
+    }
+    return blockOffset;
   }
 
   public long write(Map<String, String> tagMap, Map<String, String> attributeMap)
@@ -137,11 +205,85 @@
    * @return beginning position of the record in tagFile
    */
   private synchronized long write(ByteBuffer byteBuffer, long position) throws IOException {
+    // Correct the initial offset of the write
     if (position < 0) {
       // append the record to file tail
       position = fileChannel.size();
     }
-    fileChannel.write(byteBuffer, position);
+    // Read the original data to get the original space offset
+    List<Long> blockOffset = parseOffsetList(position);
+    // write read data
+    int blockNumReal = byteBuffer.capacity() / MAX_LENGTH;
+    if (blockNumReal < 1) {
+      throw new RuntimeException(
+          "ByteBuffer capacity is smaller than tagAttributeTotalSize, which is not allowed.");
+    }
+    if (blockNumReal == 1 && blockOffset.size() == 1) {
+      // If the original data occupies only one block and the new data occupies only one block, the
+      // original space is used
+      fileChannel.write(byteBuffer, blockOffset.get(0));
+    } else {
+      // if the original space is larger than the new space, the original space is used
+      if (blockOffset.size() >= blockNumReal) {
+        ByteBuffer byteBufferFinal = ByteBuffer.allocate(blockOffset.size() * MAX_LENGTH);
+        byteBufferFinal.putInt(-blockOffset.size());
+        for (int i = 1; i < blockOffset.size(); i++) {
+          byteBufferFinal.putLong(blockOffset.get(i));
+        }
+        if (blockNumReal > 1) { // real data occupies multiple blocks
+          byteBuffer.position((blockNumReal - 1) * Long.BYTES + 4);
+        } else { // real data occupies only one block
+          byteBuffer.position(0);
+        }
+        byteBufferFinal.put(byteBuffer);
+        for (int i = 0; i < blockOffset.size(); i++) {
+          byteBufferFinal.position(i * MAX_LENGTH);
+          byteBufferFinal.limit((i + 1) * MAX_LENGTH);
+          fileChannel.write(byteBufferFinal, blockOffset.get(i));
+        }
+      } else {
+        // if the original space is smaller than the new space, the new space is used
+        // prepare the bytebuffer to store the offset of each block
+        int blockOffsetStoreLen = (blockNumReal - 1) * Long.BYTES + 4;
+        ByteBuffer byteBufferOffset = ByteBuffer.allocate(blockOffsetStoreLen);
+        byteBufferOffset.putInt(-blockNumReal);
+
+        // write the data to the file
+        for (int i = 0; i < blockNumReal; i++) {
+          byteBuffer.position(i * MAX_LENGTH);
+          byteBuffer.limit((i + 1) * MAX_LENGTH);
+
+          // what we want to write is the original space
+          if (i < blockOffset.size()) {
+            if (i > 0) { // first block has been written
+              byteBufferOffset.putLong(blockOffset.get(i));
+            }
+            fileChannel.write(byteBuffer, blockOffset.get(i));
+          } else {
+            // what we want to write is the new space
+            // TODO
+            byteBufferOffset.putLong(fileChannel.size());
+            blockOffset.add(fileChannel.size());
+            fileChannel.write(byteBuffer, fileChannel.size());
+          }
+        }
+
+        // write the offset of each block to the file
+        byteBufferOffset.flip();
+        for (int i = 0; i < blockOffset.size(); i++) {
+          byteBufferOffset.position(i * MAX_LENGTH);
+          if ((i + 1) * MAX_LENGTH > byteBufferOffset.capacity()) {
+            byteBufferOffset.limit(byteBufferOffset.capacity());
+            fileChannel.write(byteBufferOffset, blockOffset.get(i));
+            break;
+          } else {
+            byteBufferOffset.limit((i + 1) * MAX_LENGTH);
+            fileChannel.write(byteBufferOffset, blockOffset.get(i));
+          }
+        }
+      }
+    }
+
     unFlushedRecordNum++;
     if (unFlushedRecordNum >= RECORD_FLUSH_INTERVAL) {
       fileChannel.force(true);
@@ -152,15 +294,68 @@
 
   private ByteBuffer convertMapToByteBuffer(
       Map<String, String> tagMap, Map<String, String> attributeMap) throws MetadataException {
-    ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
+    int totalMapSize = calculateMapSize(tagMap) + calculateMapSize(attributeMap);
+    ByteBuffer byteBuffer;
+    if (totalMapSize <= MAX_LENGTH) {
+      byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
+    } else {
+      // get the minSolution from Num*MAX_LENGTH < TotalMapSize + 4 + Long.BYTES*Num <=
+      // MAX_LENGTH*(Num + 1)
+      double NumMinLimit = (totalMapSize + 4 - MAX_LENGTH) / (double) (MAX_LENGTH - Long.BYTES);
+      // blockNum = num + 1
+      int blockNum = (int) Math.ceil(NumMinLimit) + 1;
+
+      byteBuffer = ByteBuffer.allocate(blockNum * MAX_LENGTH);
+      // 4 bytes for blockNumSize, blockNum*Long.BYTES for blockOffset
+      byteBuffer.position(4 + (blockNum - 1) * Long.BYTES);
+    }
     serializeMap(tagMap, byteBuffer);
     serializeMap(attributeMap, byteBuffer);
-
     // set position to 0 and the content in this buffer could be read
     byteBuffer.position(0);
     return byteBuffer;
   }
 
+  public static int calculateMapSize(Map<String, String> map) throws MetadataException {
+    int length = 0;
+    if (map != null) {
+      if (map.size() > MAX_ENTRY_NUM) {
+        throw new MetadataException("the emtry num of the map is over the tagAttributeMaxNum");
+      }
+      length += 4; // mapSize is 4 byte
+      // while mapSize is 0, this for loop will not be executed
+      for (Map.Entry<String, String> entry : map.entrySet()) {
+        int entryLength = 0;
+        String key = entry.getKey();
+        String value = entry.getValue();
+
+        entryLength += 4; // keySize is 4 byte
+        if (key != null) {
+          entryLength +=
+              key.getBytes()
+                  .length; // only key is not null then add key length, while key is null, only
+          // store the keySize marker which is -1 (4 bytes)
+        }
+
+        entryLength += 4; // valueSize is 4 byte
+        if (value != null) {
+          entryLength +=
+              value.getBytes()
+                  .length; // only value is not null then add value length, while value is null,
+          // only store the valueSize marker which is -1 (4 bytes)
+        }
+        if (entryLength > MAX_ENTRY_Size) {
+          throw new MetadataException(
+              "An emtry of the map has a size which is over the tagAttributeMaxSize");
+        }
+        length += entryLength;
+      }
+    } else {
+      length += 4; // while map is null, the mapSize is writed to -1 which is 4 byte
+    }
+    return length;
+  }
+
   private void serializeMap(Map<String, String> map, ByteBuffer byteBuffer)
       throws MetadataException {
     try {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
index 1c770e7..2643635 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
@@ -32,6 +32,7 @@
 import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
 import org.apache.iotdb.commons.schema.tree.SchemaIterator;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics;
 import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowTimeSeriesPlan;
 import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.impl.ShowTimeSeriesResult;
@@ -40,6 +41,7 @@
 import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.impl.TimeseriesReaderWithViewFetch;
 
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,8 +78,12 @@
   private final Map<String, Map<String, Set<IMeasurementMNode<?>>>> tagIndex =
       new ConcurrentHashMap<>();
 
-  public TagManager(String sgSchemaDirPath) throws IOException {
+  private final MemSchemaRegionStatistics regionStatistics;
+
+  public TagManager(String sgSchemaDirPath, MemSchemaRegionStatistics regionStatistics)
+      throws IOException {
     tagLogFile = new TagLogFile(sgSchemaDirPath, SchemaConstant.TAG_LOG);
+    this.regionStatistics = regionStatistics;
   }
 
   public synchronized boolean createSnapshot(File targetDir) {
@@ -120,7 +126,8 @@
     }
   }
 
-  public static TagManager loadFromSnapshot(File snapshotDir, String sgSchemaDirPath)
+  public static TagManager loadFromSnapshot(
+      File snapshotDir, String sgSchemaDirPath, MemSchemaRegionStatistics regionStatistics)
       throws IOException {
     File tagSnapshot =
         SystemFileFactory.INSTANCE.getFile(snapshotDir, SchemaConstant.TAG_LOG_SNAPSHOT);
@@ -131,7 +138,7 @@
 
     try {
       org.apache.commons.io.FileUtils.copyFile(tagSnapshot, tagFile);
-      return new TagManager(sgSchemaDirPath);
+      return new TagManager(sgSchemaDirPath, regionStatistics);
     } catch (IOException e) {
       if (!tagFile.delete()) {
         logger.warn(
@@ -143,7 +150,7 @@
 
   public boolean recoverIndex(long offset, IMeasurementMNode<?> measurementMNode)
       throws IOException {
-    Map<String, String> tags = tagLogFile.readTag(COMMON_CONFIG.getTagAttributeTotalSize(), offset);
+    Map<String, String> tags = tagLogFile.readTag(offset);
     if (tags == null || tags.isEmpty()) {
       return false;
     } else {
@@ -156,10 +163,35 @@
     if (tagKey == null || tagValue == null || measurementMNode == null) {
       return;
     }
-    tagIndex
-        .computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(tagValue, v -> Collections.synchronizedSet(new HashSet<>()))
-        .add(measurementMNode);
+
+    int tagIndexOldSize = tagIndex.size();
+    Map<String, Set<IMeasurementMNode<?>>> tagValueMap =
+        tagIndex.computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>());
+    int tagIndexNewSize = tagIndex.size();
+
+    int tagValueMapOldSize = tagValueMap.size();
+    Set<IMeasurementMNode<?>> measurementsSet =
+        tagValueMap.computeIfAbsent(tagValue, v -> Collections.synchronizedSet(new HashSet<>()));
+    int tagValueMapNewSize = tagValueMap.size();
+
+    int measurementsSetOldSize = measurementsSet.size();
+    measurementsSet.add(measurementMNode);
+    int measurementsSetNewSize = measurementsSet.size();
+
+    long memorySize = 0;
+    if (tagIndexNewSize - tagIndexOldSize == 1) {
+      // the last 4 is the memory occupied by the size of tagvaluemap
+      memorySize += RamUsageEstimator.sizeOf(tagKey) + 4;
+    }
+    if (tagValueMapNewSize - tagValueMapOldSize == 1) {
+      // the last 4 is the memory occupied by the size of measurementsSet
+      memorySize += RamUsageEstimator.sizeOf(tagValue) + 4;
+    }
+    if (measurementsSetNewSize - measurementsSetOldSize == 1) {
+      // 8 is the memory occupied by the length of the IMeasurementMNode
+      memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4;
+    }
+    requestMemory(memorySize);
   }
 
   public void addIndex(Map<String, String> tagsMap, IMeasurementMNode<?> measurementMNode) {
@@ -171,10 +203,27 @@
   }
 
   public void removeIndex(String tagKey, String tagValue, IMeasurementMNode<?> measurementMNode) {
-    tagIndex.get(tagKey).get(tagValue).remove(measurementMNode);
-    if (tagIndex.get(tagKey).get(tagValue).isEmpty()) {
-      tagIndex.get(tagKey).remove(tagValue);
+    if (tagKey == null || tagValue == null || measurementMNode == null) {
+      return;
     }
+    // init memory size
+    long memorySize = 0;
+    if (tagIndex.get(tagKey).get(tagValue).remove(measurementMNode)) {
+      memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4;
+    }
+    if (tagIndex.get(tagKey).get(tagValue).isEmpty()) {
+      if (tagIndex.get(tagKey).remove(tagValue) != null) {
+        // the last 4 is the memory occupied by the size of IMeasurementMNodeSet
+        memorySize += RamUsageEstimator.sizeOf(tagValue) + 4;
+      }
+    }
+    if (tagIndex.get(tagKey).isEmpty()) {
+      if (tagIndex.remove(tagKey) != null) {
+        // the last 4 is the memory occupied by the size of tagValueMap
+        memorySize += RamUsageEstimator.sizeOf(tagKey) + 4;
+      }
+    }
+    releaseMemory(memorySize);
   }
 
   private List<IMeasurementMNode<?>> getMatchedTimeseriesInIndex(TagFilter tagFilter) {
@@ -310,8 +359,7 @@
     if (node.getOffset() < 0) {
       return;
     }
-    Map<String, String> tagMap =
-        tagLogFile.readTag(COMMON_CONFIG.getTagAttributeTotalSize(), node.getOffset());
+    Map<String, String> tagMap = tagLogFile.readTag(node.getOffset());
     if (tagMap != null) {
       for (Map.Entry<String, String> entry : tagMap.entrySet()) {
         if (tagIndex.containsKey(entry.getKey())
@@ -324,13 +372,9 @@
                     entry.getValue(),
                     node.getOffset()));
           }
-          tagIndex.get(entry.getKey()).get(entry.getValue()).remove(node);
-          if (tagIndex.get(entry.getKey()).get(entry.getValue()).isEmpty()) {
-            tagIndex.get(entry.getKey()).remove(entry.getValue());
-            if (tagIndex.get(entry.getKey()).isEmpty()) {
-              tagIndex.remove(entry.getKey());
-            }
-          }
+
+          removeIndex(entry.getKey(), entry.getValue(), node);
+
         } else {
           if (logger.isDebugEnabled()) {
             logger.debug(
@@ -359,8 +403,7 @@
       IMeasurementMNode<?> leafMNode)
       throws MetadataException, IOException {
 
-    Pair<Map<String, String>, Map<String, String>> pair =
-        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+    Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
 
     if (tagsMap != null) {
       for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
@@ -383,6 +426,7 @@
             }
 
             removeIndex(key, beforeValue, leafMNode);
+
           } else {
             if (logger.isDebugEnabled()) {
               logger.debug(
@@ -424,8 +468,7 @@
       Map<String, String> attributesMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
       throws MetadataException, IOException {
 
-    Pair<Map<String, String>, Map<String, String>> pair =
-        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+    Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
 
     for (Map.Entry<String, String> entry : attributesMap.entrySet()) {
       String key = entry.getKey();
@@ -453,8 +496,7 @@
       Map<String, String> tagsMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
       throws MetadataException, IOException {
 
-    Pair<Map<String, String>, Map<String, String>> pair =
-        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+    Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
 
     for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
       String key = entry.getKey();
@@ -484,8 +526,7 @@
   public void dropTagsOrAttributes(
       Set<String> keySet, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
       throws MetadataException, IOException {
-    Pair<Map<String, String>, Map<String, String>> pair =
-        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+    Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
 
     Map<String, String> deleteTag = new HashMap<>();
     for (String key : keySet) {
@@ -505,16 +546,10 @@
     // persist the change to disk
     tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
 
-    Map<String, Set<IMeasurementMNode<?>>> tagVal2LeafMNodeSet;
-    Set<IMeasurementMNode<?>> nodeSet;
-    for (Map.Entry<String, String> entry : deleteTag.entrySet()) {
-      String key = entry.getKey();
-      String value = entry.getValue();
-      // change the tag inverted index map
-      tagVal2LeafMNodeSet = tagIndex.get(key);
-      if (tagVal2LeafMNodeSet != null) {
-        nodeSet = tagVal2LeafMNodeSet.get(value);
-        if (nodeSet != null) {
+    if (!deleteTag.isEmpty()) {
+      for (Map.Entry<String, String> entry : deleteTag.entrySet()) {
+        if (tagIndex.containsKey((entry.getKey()))
+            && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) {
           if (logger.isDebugEnabled()) {
             logger.debug(
                 String.format(
@@ -524,24 +559,20 @@
                     leafMNode.getOffset()));
           }
 
-          nodeSet.remove(leafMNode);
-          if (nodeSet.isEmpty()) {
-            tagVal2LeafMNodeSet.remove(value);
-            if (tagVal2LeafMNodeSet.isEmpty()) {
-              tagIndex.remove(key);
-            }
+          removeIndex(entry.getKey(), entry.getValue(), leafMNode);
+
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug(
+                String.format(
+                    String.format(
+                        DEBUG_MSG_1, "Drop" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
+                    entry.getKey(),
+                    entry.getValue(),
+                    leafMNode.getOffset(),
+                    tagIndex.containsKey(entry.getKey())));
           }
         }
-      } else {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              String.format(
-                  String.format(DEBUG_MSG_1, "Drop" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
-                  key,
-                  value,
-                  leafMNode.getOffset(),
-                  tagIndex.containsKey(key)));
-        }
       }
     }
   }
@@ -557,8 +588,7 @@
       Map<String, String> alterMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
       throws MetadataException, IOException {
     // tags, attributes
-    Pair<Map<String, String>, Map<String, String>> pair =
-        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+    Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
     Map<String, String> oldTagValue = new HashMap<>();
     Map<String, String> newTagValue = new HashMap<>();
 
@@ -599,7 +629,8 @@
                   leafMNode.getOffset()));
         }
 
-        tagIndex.get(key).get(beforeValue).remove(leafMNode);
+        removeIndex(key, beforeValue, leafMNode);
+
       } else {
         if (logger.isDebugEnabled()) {
           logger.debug(
@@ -628,8 +659,7 @@
       String oldKey, String newKey, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
       throws MetadataException, IOException {
     // tags, attributes
-    Pair<Map<String, String>, Map<String, String>> pair =
-        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+    Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
 
     // current name has existed
     if (pair.left.containsKey(newKey) || pair.right.containsKey(newKey)) {
@@ -657,7 +687,7 @@
                   leafMNode.getOffset()));
         }
 
-        tagIndex.get(oldKey).get(value).remove(leafMNode);
+        removeIndex(oldKey, value, leafMNode);
 
       } else {
         if (logger.isDebugEnabled()) {
@@ -691,7 +721,7 @@
 
   public Pair<Map<String, String>, Map<String, String>> readTagFile(long tagFileOffset)
       throws IOException {
-    return tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), tagFileOffset);
+    return tagLogFile.read(tagFileOffset);
   }
 
   /**
@@ -716,4 +746,16 @@
       tagLogFile = null;
     }
   }
+
+  private void requestMemory(long size) {
+    if (regionStatistics != null) {
+      regionStatistics.requestMemory(size);
+    }
+  }
+
+  private void releaseMemory(long size) {
+    if (regionStatistics != null) {
+      regionStatistics.releaseMemory(size);
+    }
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
index cd2517d..199cde6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
@@ -62,6 +62,7 @@
 import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_ENTITY_MNODE_TYPE;
 import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_MNODE_TYPE;
 import static org.apache.iotdb.commons.schema.SchemaConstant.isStorageGroupType;
+import static org.apache.iotdb.db.schemaengine.schemaregion.tag.TagLogFile.parseByteBuffer;
 
 public class SRStatementGenerator implements Iterator<Statement>, Iterable<Statement> {
 
@@ -385,9 +386,7 @@
   private Pair<Map<String, String>, Map<String, String>> getTagsAndAttributes(long offset)
       throws IOException {
     if (tagFileChannel != null) {
-      ByteBuffer byteBuffer = ByteBuffer.allocate(COMMON_CONFIG.getTagAttributeTotalSize());
-      tagFileChannel.read(byteBuffer, offset);
-      byteBuffer.flip();
+      ByteBuffer byteBuffer = parseByteBuffer(tagFileChannel, offset);
       Pair<Map<String, String>, Map<String, String>> tagsAndAttributes =
           new Pair<>(ReadWriteIOUtils.readMap(byteBuffer), ReadWriteIOUtils.readMap(byteBuffer));
       return tagsAndAttributes;
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 7d72edc..400b58b 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -251,11 +251,23 @@
 # It is possible to lose at most tag_attribute_flush_interval records
 # tag_attribute_flush_interval=1000
 
-# max size for tag and attribute of one time series
+# max size for a storage block for tags and attributes of one time series. If the combined size of tags and
+# attributes exceeds the tag_attribute_total_size, a new storage block will be allocated to continue storing
+# the excess data.
 # the unit is byte
 # Datatype: int
 # tag_attribute_total_size=700
 
+# Maximum number of map entries allowed for each of tagMap and attributeMap separately
+# the unit is byte
+# Datatype: int
+# tag_attribute_max_num = 20
+
+# Maximum size allowed for each of map entry separately
+# the unit is byte
+# Datatype: int
+# tag_attribute_entry_max_size = 100
+
 # max measurement num of internal request
 # When creating timeseries with Session.createMultiTimeseries, the user input plan, the timeseries num of
 # which exceeds this num, will be split to several plans with timeseries no more than this num.
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4e4e76e..b1948d1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,6 +251,8 @@
 
   // Max size for tag and attribute of one time series
   private int tagAttributeTotalSize = 700;
+  private int tagAttributeMaxNum = 20;
+  private int tagAttributeEntryMaxSize = 100;
 
   // maximum number of Cluster Databases allowed
   private int databaseLimitThreshold = -1;
@@ -1097,6 +1099,22 @@
     this.tagAttributeTotalSize = tagAttributeTotalSize;
   }
 
+  public int getTagAttributeMaxNum() {
+    return tagAttributeMaxNum;
+  }
+
+  public void setTagAttributeMaxNum(int tagAttributeMaxNum) {
+    this.tagAttributeMaxNum = tagAttributeMaxNum;
+  }
+
+  public int getTagAttributeEntryMaxSize() {
+    return tagAttributeEntryMaxSize;
+  }
+
+  public void setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize) {
+    this.tagAttributeEntryMaxSize = tagAttributeEntryMaxSize;
+  }
+
   public int getDatabaseLimitThreshold() {
     return databaseLimitThreshold;
   }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 04bd5f1..c2fd7f7 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -220,10 +220,22 @@
             properties.getProperty(
                 "tag_attribute_total_size", String.valueOf(config.getTagAttributeTotalSize()))));
 
+    config.setTagAttributeMaxNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "tag_attribute_max_num", String.valueOf(config.getTagAttributeMaxNum()))));
+
+    config.setTagAttributeEntryMaxSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "tag_attribute_entry_max_size",
+                String.valueOf(config.getTagAttributeEntryMaxSize()))));
+
     config.setTimePartitionInterval(
         Long.parseLong(
             properties.getProperty(
                 "time_partition_interval", String.valueOf(config.getTimePartitionInterval()))));
+
     config.setDatabaseLimitThreshold(
         Integer.parseInt(
             properties.getProperty(