Lift the storage limit for tag and attribute management (#12447)
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 496e9e5..f6f291a 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -426,6 +426,24 @@
return this;
}
+ @Override
+ public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
+ setProperty("tag_attribute_total_size", String.valueOf(tagAttributeTotalSize));
+ return this;
+ }
+
+ @Override
+ public CommonConfig setTagAttributeMaxNum(int tagAttributeMaxNum) {
+ setProperty("tag_attribute_max_num", String.valueOf(tagAttributeMaxNum));
+ return this;
+ }
+
+ @Override
+ public CommonConfig setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize) {
+ setProperty("tag_attribute_entry_max_size", String.valueOf(tagAttributeEntryMaxSize));
+ return this;
+ }
+
// For part of the log directory
public String getClusterConfigStr() {
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index cfbe181..fd1d7b2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -431,4 +431,25 @@
cnConfig.setWalMode(walMode);
return this;
}
+
+ @Override
+ public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
+ dnConfig.setTagAttributeTotalSize(tagAttributeTotalSize);
+ cnConfig.setTagAttributeTotalSize(tagAttributeTotalSize);
+ return this;
+ }
+
+ @Override
+ public CommonConfig setTagAttributeMaxNum(int tagAttributeMaxNum) {
+ dnConfig.setTagAttributeMaxNum(tagAttributeMaxNum);
+ cnConfig.setTagAttributeMaxNum(tagAttributeMaxNum);
+ return this;
+ }
+
+ @Override
+ public CommonConfig setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize) {
+ dnConfig.setTagAttributeEntryMaxSize(tagAttributeEntryMaxSize);
+ cnConfig.setTagAttributeEntryMaxSize(tagAttributeEntryMaxSize);
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 0495edb..665e59e 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -303,4 +303,19 @@
public CommonConfig setWalMode(String walMode) {
return this;
}
+
+ @Override
+ public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
+ return this;
+ }
+
+ @Override
+ public CommonConfig setTagAttributeMaxNum(int tagAttributeMaxNum) {
+ return this;
+ }
+
+ @Override
+ public CommonConfig setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize) {
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index fa0e699..28c37f1 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -135,4 +135,10 @@
CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecutionTimeSliceInMs);
CommonConfig setWalMode(String walMode);
+
+ CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize);
+
+ CommonConfig setTagAttributeMaxNum(int tagAttributeMaxNum);
+
+ CommonConfig setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize);
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBTagLimitIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBTagLimitIT.java
new file mode 100644
index 0000000..5a9f7d5
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBTagLimitIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.schema;
+
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.util.AbstractSchemaIT;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBTagLimitIT extends AbstractSchemaIT {
+
+ public IoTDBTagLimitIT(SchemaTestMode schemaTestMode) {
+ super(schemaTestMode);
+ }
+
+ protected static final String[] SQLs =
+ new String[] {
+ "create database root.turbine",
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setTagAttributeTotalSize(50);
+ EnvFactory.getEnv().getConfig().getCommonConfig().setTagAttributeMaxNum(5);
+ EnvFactory.getEnv().getConfig().getCommonConfig().setTagAttributeEntryMaxSize(30);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData(SQLs);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testBasicFunctionInMultiTagAttributeTotalSize() throws Exception {
+ List<List<String>> rets = new java.util.ArrayList<>();
+ List<String> ret1 =
+ Collections.singletonList(
+ "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+ + "{\"oldTag1\":\"oldV1\",\"tag2\":\"v2\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+ rets.add(ret1);
+ List<String> ret2 =
+ Collections.singletonList(
+ "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+ + "{\"tag1\":\"oldV1\",\"tag2\":\"v2\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+ rets.add(ret2);
+ List<String> ret3 =
+ Collections.singletonList(
+ "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+ + "{\"tag1\":\"v1\",\"tag2\":\"v2\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+ rets.add(ret3);
+ List<String> ret4 =
+ Collections.singletonList(
+ "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+ + "{\"tag1\":\"v1\",\"tag4\":\"v4\",\"tag5\":\"v5\",\"tag2\":\"v2\",\"tag3\":\"v3\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+ rets.add(ret4);
+ List<String> ret5 =
+ Collections.singletonList(
+ "root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,"
+ + "{\"tag4\":\"v4\",\"tag1\":\"v1\",\"tag5\":\"v5\"},{\"att2\":\"a2\",\"att1\":\"a1\"}");
+ rets.add(ret5);
+
+ List<String> sqls = new java.util.ArrayList<>();
+ String sql1 =
+ "create timeseries root.turbine.d1.s1(temperature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY "
+ + "tags('oldTag1'='oldV1', 'tag2'='v2') "
+ + "attributes('att1'='a1', 'att2'='a2')";
+ sqls.add(sql1);
+ String sql2 = "ALTER timeseries root.turbine.d1.s1 RENAME oldTag1 TO tag1";
+ sqls.add(sql2);
+ String sql3 = "ALTER timeseries root.turbine.d1.s1 SET tag1=v1";
+ sqls.add(sql3);
+ String sql4 = "ALTER timeseries root.turbine.d1.s1 ADD TAGS tag3=v3, tag4=v4, tag5=v5";
+ sqls.add(sql4);
+ String sql5 = "ALTER timeseries root.turbine.d1.s1 DROP tag2, tag3";
+ sqls.add(sql5);
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ for (int i = 0; i < sqls.size(); i++) {
+ statement.execute(sqls.get(i));
+ ResultSet resultSet = statement.executeQuery("show timeseries");
+ int count = 0;
+ try {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(ColumnHeaderConstant.TIMESERIES)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.ALIAS)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.DATABASE)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.DATATYPE)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.ENCODING)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.COMPRESSION)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.TAGS)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.ATTRIBUTES);
+ assertTrue(rets.get(i).contains(ans));
+ count++;
+ }
+ } finally {
+ resultSet.close();
+ }
+ assertEquals(rets.get(i).size(), count);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ clearSchema();
+ }
+
+ @Test
+ public void testMapSizeAndEntryLimit() throws Exception {
+ List<String> sqls = new java.util.ArrayList<>();
+ String sql1 =
+ "create timeseries root.turbine.d1.s1 with datatype=FLOAT, encoding=RLE, compression=SNAPPY "
+ + "tags('tag1'='v1', 'tag2'='v2', 'tag3'='v3', 'tag4'='v4', 'tag5'='v5', 'tag6'='v6') "
+ + "attributes('att1'='a1', 'att2'='a2')";
+ sqls.add(sql1);
+ String sql2 =
+ "create timeseries root.turbine.d1.s2 with datatype=FLOAT, encoding=RLE, compression=SNAPPY "
+ + "tags('tag1tag2tag3tag4tag5'='v1v2v3v4v5') "
+ + "attributes('att1'='a1', 'att2'='a2')";
+ sqls.add(sql2);
+
+ List<String> errorMessages = new java.util.ArrayList<>();
+ String errorMessage1 =
+ TSStatusCode.METADATA_ERROR.getStatusCode()
+ + ": the emtry num of the map is over the tagAttributeMaxNum";
+ errorMessages.add(errorMessage1);
+ String errorMessage2 =
+ TSStatusCode.METADATA_ERROR.getStatusCode()
+ + ": An emtry of the map has a size which is over the tagAttributeMaxSize";
+ errorMessages.add(errorMessage2);
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ for (int i = 0; i < sqls.size(); i++) {
+ try {
+ statement.execute(sqls.get(i));
+ fail();
+ } catch (SQLException e) {
+ Assert.assertEquals(errorMessages.get(i), e.getMessage());
+ }
+ }
+ }
+ clearSchema();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 5e6cb55..b4d05b3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -196,7 +196,7 @@
// do not write log when recover
isRecovering = true;
- tagManager = new TagManager(schemaRegionDirPath);
+ tagManager = new TagManager(schemaRegionDirPath, regionStatistics);
mtree =
new MTreeBelowSGMemoryImpl(
new PartialPath(storageGroupFullPath),
@@ -455,7 +455,8 @@
isRecovering = true;
long tagSnapshotStartTime = System.currentTimeMillis();
- tagManager = TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath);
+ tagManager =
+ TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath, regionStatistics);
logger.info(
"Tag snapshot loading of schemaRegion {} costs {}ms.",
schemaRegionId,
@@ -992,6 +993,10 @@
throws MetadataException, IOException {
// upsert alias
upsertAlias(alias, fullPath);
+ if (tagsMap != null && !regionStatistics.isAllowToCreateNewSeries()) {
+ throw new SeriesOverflowException(
+ regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+ }
IMeasurementMNode<IMemMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
if (tagsMap == null && attributesMap == null) {
return;
@@ -1014,9 +1019,15 @@
private void upsertAlias(String alias, PartialPath fullPath)
throws MetadataException, IOException {
- if (mtree.changeAlias(alias, fullPath)) {
- // persist to WAL
- writeToMLog(SchemaRegionWritePlanFactory.getChangeAliasPlan(fullPath, alias));
+ if (alias != null) {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
+ throw new SeriesOverflowException(
+ regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+ }
+ if (mtree.changeAlias(alias, fullPath)) {
+ // persist to WAL
+ writeToMLog(SchemaRegionWritePlanFactory.getChangeAliasPlan(fullPath, alias));
+ }
}
}
@@ -1053,6 +1064,10 @@
@Override
public void addTags(Map<String, String> tagsMap, PartialPath fullPath)
throws MetadataException, IOException {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
+ throw new SeriesOverflowException(
+ regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+ }
IMeasurementMNode<IMemMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
// no tag or attribute, we need to add a new record in log
if (leafMNode.getOffset() < 0) {
@@ -1096,6 +1111,10 @@
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
throws MetadataException, IOException {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
+ throw new SeriesOverflowException(
+ regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+ }
IMeasurementMNode<IMemMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
if (leafMNode.getOffset() < 0) {
throw new MetadataException(
@@ -1119,6 +1138,10 @@
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
throws MetadataException, IOException {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
+ throw new SeriesOverflowException(
+ regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
+ }
IMeasurementMNode<IMemMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
if (leafMNode.getOffset() < 0) {
throw new MetadataException(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 070594d..ec16026 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -193,7 +193,7 @@
// do not write log when recover
isRecovering = true;
- tagManager = new TagManager(schemaRegionDirPath);
+ tagManager = new TagManager(schemaRegionDirPath, regionStatistics);
mtree =
new MTreeBelowSGCachedImpl(
new PartialPath(storageGroupFullPath),
@@ -513,7 +513,8 @@
isRecovering = true;
long tagSnapshotStartTime = System.currentTimeMillis();
- tagManager = TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath);
+ tagManager =
+ TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath, regionStatistics);
logger.info(
"Tag snapshot loading of schemaRegion {} costs {}ms.",
schemaRegionId,
@@ -1076,6 +1077,9 @@
throws MetadataException, IOException {
// upsert alias
upsertAlias(alias, fullPath);
+ while (tagsMap != null && !regionStatistics.isAllowToCreateNewSeries()) {
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
+ }
IMeasurementMNode<ICachedMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
try {
if (tagsMap == null && attributesMap == null) {
@@ -1102,9 +1106,14 @@
private void upsertAlias(String alias, PartialPath fullPath)
throws MetadataException, IOException {
- if (mtree.changeAlias(alias, fullPath)) {
- // persist to WAL
- writeToMLog(SchemaRegionWritePlanFactory.getChangeAliasPlan(fullPath, alias));
+ if (alias != null) {
+ while (!regionStatistics.isAllowToCreateNewSeries()) {
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
+ }
+ if (mtree.changeAlias(alias, fullPath)) {
+ // persist to WAL
+ writeToMLog(SchemaRegionWritePlanFactory.getChangeAliasPlan(fullPath, alias));
+ }
}
}
@@ -1144,6 +1153,9 @@
@Override
public void addTags(Map<String, String> tagsMap, PartialPath fullPath)
throws MetadataException, IOException {
+ while (!regionStatistics.isAllowToCreateNewSeries()) {
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
+ }
IMeasurementMNode<ICachedMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
try {
// no tag or attribute, we need to add a new record in log
@@ -1198,6 +1210,9 @@
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
throws MetadataException, IOException {
+ while (!regionStatistics.isAllowToCreateNewSeries()) {
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
+ }
IMeasurementMNode<ICachedMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
try {
if (leafMNode.getOffset() < 0) {
@@ -1225,6 +1240,9 @@
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
throws MetadataException, IOException {
+ while (!regionStatistics.isAllowToCreateNewSeries()) {
+ ReleaseFlushMonitor.getInstance().waitIfReleasing();
+ }
IMeasurementMNode<ICachedMNode> leafMNode = mtree.getMeasurementMNode(fullPath);
try {
if (leafMNode.getOffset() < 0) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
index 7e2a7a0..2739648 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
@@ -37,7 +37,9 @@
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
public class TagLogFile implements AutoCloseable {
@@ -52,6 +54,11 @@
private static final int MAX_LENGTH =
CommonDescriptor.getInstance().getConfig().getTagAttributeTotalSize();
+ private static final int MAX_ENTRY_NUM =
+ CommonDescriptor.getInstance().getConfig().getTagAttributeMaxNum();
+ private static final int MAX_ENTRY_Size =
+ CommonDescriptor.getInstance().getConfig().getTagAttributeEntryMaxSize();
+
private static final int RECORD_FLUSH_INTERVAL =
IoTDBDescriptor.getInstance().getConfig().getTagAttributeFlushInterval();
private int unFlushedRecordNum = 0;
@@ -95,22 +102,83 @@
* @return tags map, attributes map
* @throws IOException error occurred when reading disk
*/
- public Pair<Map<String, String>, Map<String, String>> read(int size, long position)
- throws IOException {
+ public Pair<Map<String, String>, Map<String, String>> read(long position) throws IOException {
if (position < 0) {
return new Pair<>(Collections.emptyMap(), Collections.emptyMap());
}
- ByteBuffer byteBuffer = ByteBuffer.allocate(size);
- fileChannel.read(byteBuffer, position);
- byteBuffer.flip();
+ ByteBuffer byteBuffer = parseByteBuffer(fileChannel, position);
return new Pair<>(ReadWriteIOUtils.readMap(byteBuffer), ReadWriteIOUtils.readMap(byteBuffer));
}
- public Map<String, String> readTag(int size, long position) throws IOException {
- ByteBuffer byteBuffer = ByteBuffer.allocate(size);
+ public Map<String, String> readTag(long position) throws IOException {
+ ByteBuffer byteBuffer = parseByteBuffer(fileChannel, position);
+ return ReadWriteIOUtils.readMap(byteBuffer);
+ }
+
+ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position)
+ throws IOException {
+ // Read the first block
+ ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
fileChannel.read(byteBuffer, position);
byteBuffer.flip();
- return ReadWriteIOUtils.readMap(byteBuffer);
+ if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
+ int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
+ byteBuffer.position(0);
+ if (firstInt < -1) { // This position is blockNum, the original data occupies multiple blocks
+ int blockNum = -firstInt;
+ ByteBuffer byteBuffers = ByteBuffer.allocate(blockNum * MAX_LENGTH);
+ byteBuffers.put(byteBuffer);
+ byteBuffers.position(4); // Skip blockNum
+ for (int i = 1; i < blockNum; i++) {
+ long nextPosition = ReadWriteIOUtils.readLong(byteBuffers);
+ // read one offset, then use filechannel's read to read it
+ byteBuffers.position(MAX_LENGTH * i);
+ byteBuffers.limit(MAX_LENGTH * (i + 1));
+ fileChannel.read(byteBuffers, nextPosition);
+ byteBuffers.position(4 + i * Long.BYTES);
+ }
+ byteBuffers.limit(byteBuffers.capacity());
+ return byteBuffers;
+ }
+ }
+ return byteBuffer;
+ }
+
+ private List<Long> parseOffsetList(long position) throws IOException {
+ List<Long> blockOffset = new ArrayList<>();
+ blockOffset.add(position);
+ // Read the first block
+ ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
+ fileChannel.read(byteBuffer, position);
+ byteBuffer.flip();
+ if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
+ int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
+ byteBuffer.position(0);
+ if (firstInt < -1) { // This position is blockNum, the original data occupies multiple blocks
+ int blockNum = -firstInt;
+ int blockOffsetStoreLen =
+ (((blockNum - 1) * Long.BYTES + 4) / MAX_LENGTH + 1)
+ * MAX_LENGTH; // blockOffset storage length
+ ByteBuffer blockBuffer = ByteBuffer.allocate(blockOffsetStoreLen);
+ blockBuffer.put(byteBuffer);
+ blockBuffer.position(4); // Skip blockNum
+
+ for (int i = 1; i < blockNum; i++) {
+ blockOffset.add(ReadWriteIOUtils.readLong(blockBuffer));
+ // Every time you read an offset, use filechannel's read to read it
+ if (MAX_LENGTH * (i + 1)
+ <= blockOffsetStoreLen) { // Compared with directly reading bytebuffer, some reading
+ // operations are reduced, only the content of offset is
+ // read
+ blockBuffer.position(MAX_LENGTH * i);
+ blockBuffer.limit(MAX_LENGTH * (i + 1));
+ fileChannel.read(blockBuffer, blockOffset.get(i));
+ blockBuffer.position(4 + i * Long.BYTES);
+ }
+ }
+ }
+ }
+ return blockOffset;
}
public long write(Map<String, String> tagMap, Map<String, String> attributeMap)
@@ -137,11 +205,85 @@
* @return beginning position of the record in tagFile
*/
private synchronized long write(ByteBuffer byteBuffer, long position) throws IOException {
+ // Correct the initial offset of the write
if (position < 0) {
// append the record to file tail
position = fileChannel.size();
}
- fileChannel.write(byteBuffer, position);
+ // Read the original data to get the original space offset
+ List<Long> blockOffset = parseOffsetList(position);
+ // write read data
+ int blockNumReal = byteBuffer.capacity() / MAX_LENGTH;
+ if (blockNumReal < 1) {
+ throw new RuntimeException(
+ "ByteBuffer capacity is smaller than tagAttributeTotalSize, which is not allowed.");
+ }
+ if (blockNumReal == 1 && blockOffset.size() == 1) {
+ // If the original data occupies only one block and the new data occupies only one block, the
+ // original space is used
+ fileChannel.write(byteBuffer, blockOffset.get(0));
+ } else {
+ // if the original space is larger than the new space, the original space is used
+ if (blockOffset.size() >= blockNumReal) {
+ ByteBuffer byteBufferFinal = ByteBuffer.allocate(blockOffset.size() * MAX_LENGTH);
+ byteBufferFinal.putInt(-blockOffset.size());
+ for (int i = 1; i < blockOffset.size(); i++) {
+ byteBufferFinal.putLong(blockOffset.get(i));
+ }
+ if (blockNumReal > 1) { // real data occupies multiple blocks
+ byteBuffer.position((blockNumReal - 1) * Long.BYTES + 4);
+ } else { // real data occupies only one block
+ byteBuffer.position(0);
+ }
+ byteBufferFinal.put(byteBuffer);
+ for (int i = 0; i < blockOffset.size(); i++) {
+ byteBufferFinal.position(i * MAX_LENGTH);
+ byteBufferFinal.limit((i + 1) * MAX_LENGTH);
+ fileChannel.write(byteBufferFinal, blockOffset.get(i));
+ }
+ } else {
+ // if the original space is smaller than the new space, the new space is used
+ // prepare the bytebuffer to store the offset of each block
+ int blockOffsetStoreLen = (blockNumReal - 1) * Long.BYTES + 4;
+ ByteBuffer byteBufferOffset = ByteBuffer.allocate(blockOffsetStoreLen);
+ byteBufferOffset.putInt(-blockNumReal);
+
+ // write the data to the file
+ for (int i = 0; i < blockNumReal; i++) {
+ byteBuffer.position(i * MAX_LENGTH);
+ byteBuffer.limit((i + 1) * MAX_LENGTH);
+
+ // what we want to write is the original space
+ if (i < blockOffset.size()) {
+ if (i > 0) { // first block has been written
+ byteBufferOffset.putLong(blockOffset.get(i));
+ }
+ fileChannel.write(byteBuffer, blockOffset.get(i));
+ } else {
+ // what we want to write is the new space
+ // TODO
+ byteBufferOffset.putLong(fileChannel.size());
+ blockOffset.add(fileChannel.size());
+ fileChannel.write(byteBuffer, fileChannel.size());
+ }
+ }
+
+ // write the offset of each block to the file
+ byteBufferOffset.flip();
+ for (int i = 0; i < blockOffset.size(); i++) {
+ byteBufferOffset.position(i * MAX_LENGTH);
+ if ((i + 1) * MAX_LENGTH > byteBufferOffset.capacity()) {
+ byteBufferOffset.limit(byteBufferOffset.capacity());
+ fileChannel.write(byteBufferOffset, blockOffset.get(i));
+ break;
+ } else {
+ byteBufferOffset.limit((i + 1) * MAX_LENGTH);
+ fileChannel.write(byteBufferOffset, blockOffset.get(i));
+ }
+ }
+ }
+ }
+
unFlushedRecordNum++;
if (unFlushedRecordNum >= RECORD_FLUSH_INTERVAL) {
fileChannel.force(true);
@@ -152,15 +294,68 @@
private ByteBuffer convertMapToByteBuffer(
Map<String, String> tagMap, Map<String, String> attributeMap) throws MetadataException {
- ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
+ int totalMapSize = calculateMapSize(tagMap) + calculateMapSize(attributeMap);
+ ByteBuffer byteBuffer;
+ if (totalMapSize <= MAX_LENGTH) {
+ byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
+ } else {
+ // get the minSolution from Num*MAX_LENGTH < TotalMapSize + 4 + Long.BYTES*Num <=
+ // MAX_LENGTH*(Num + 1)
+ double NumMinLimit = (totalMapSize + 4 - MAX_LENGTH) / (double) (MAX_LENGTH - Long.BYTES);
+ // blockNum = num + 1
+ int blockNum = (int) Math.ceil(NumMinLimit) + 1;
+
+ byteBuffer = ByteBuffer.allocate(blockNum * MAX_LENGTH);
+ // 4 bytes for blockNumSize, blockNum*Long.BYTES for blockOffset
+ byteBuffer.position(4 + (blockNum - 1) * Long.BYTES);
+ }
serializeMap(tagMap, byteBuffer);
serializeMap(attributeMap, byteBuffer);
-
// set position to 0 and the content in this buffer could be read
byteBuffer.position(0);
return byteBuffer;
}
+ public static int calculateMapSize(Map<String, String> map) throws MetadataException {
+ int length = 0;
+ if (map != null) {
+ if (map.size() > MAX_ENTRY_NUM) {
+ throw new MetadataException("the emtry num of the map is over the tagAttributeMaxNum");
+ }
+ length += 4; // mapSize is 4 byte
+ // while mapSize is 0, this for loop will not be executed
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ int entryLength = 0;
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ entryLength += 4; // keySize is 4 byte
+ if (key != null) {
+ entryLength +=
+ key.getBytes()
+ .length; // only key is not null then add key length, while key is null, only
+ // store the keySize marker which is -1 (4 bytes)
+ }
+
+ entryLength += 4; // valueSize is 4 byte
+ if (value != null) {
+ entryLength +=
+ value.getBytes()
+ .length; // only value is not null then add value length, while value is null,
+ // only store the valueSize marker which is -1 (4 bytes)
+ }
+ if (entryLength > MAX_ENTRY_Size) {
+ throw new MetadataException(
+ "An emtry of the map has a size which is over the tagAttributeMaxSize");
+ }
+ length += entryLength;
+ }
+ } else {
+ length += 4; // while map is null, the mapSize is writed to -1 which is 4 byte
+ }
+ return length;
+ }
+
private void serializeMap(Map<String, String> map, ByteBuffer byteBuffer)
throws MetadataException {
try {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
index 1c770e7..2643635 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
@@ -32,6 +32,7 @@
import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
import org.apache.iotdb.commons.schema.tree.SchemaIterator;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowTimeSeriesPlan;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.impl.ShowTimeSeriesResult;
@@ -40,6 +41,7 @@
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.impl.TimeseriesReaderWithViewFetch;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +78,12 @@
private final Map<String, Map<String, Set<IMeasurementMNode<?>>>> tagIndex =
new ConcurrentHashMap<>();
- public TagManager(String sgSchemaDirPath) throws IOException {
+ private final MemSchemaRegionStatistics regionStatistics;
+
+ public TagManager(String sgSchemaDirPath, MemSchemaRegionStatistics regionStatistics)
+ throws IOException {
tagLogFile = new TagLogFile(sgSchemaDirPath, SchemaConstant.TAG_LOG);
+ this.regionStatistics = regionStatistics;
}
public synchronized boolean createSnapshot(File targetDir) {
@@ -120,7 +126,8 @@
}
}
- public static TagManager loadFromSnapshot(File snapshotDir, String sgSchemaDirPath)
+ public static TagManager loadFromSnapshot(
+ File snapshotDir, String sgSchemaDirPath, MemSchemaRegionStatistics regionStatistics)
throws IOException {
File tagSnapshot =
SystemFileFactory.INSTANCE.getFile(snapshotDir, SchemaConstant.TAG_LOG_SNAPSHOT);
@@ -131,7 +138,7 @@
try {
org.apache.commons.io.FileUtils.copyFile(tagSnapshot, tagFile);
- return new TagManager(sgSchemaDirPath);
+ return new TagManager(sgSchemaDirPath, regionStatistics);
} catch (IOException e) {
if (!tagFile.delete()) {
logger.warn(
@@ -143,7 +150,7 @@
public boolean recoverIndex(long offset, IMeasurementMNode<?> measurementMNode)
throws IOException {
- Map<String, String> tags = tagLogFile.readTag(COMMON_CONFIG.getTagAttributeTotalSize(), offset);
+ Map<String, String> tags = tagLogFile.readTag(offset);
if (tags == null || tags.isEmpty()) {
return false;
} else {
@@ -156,10 +163,35 @@
if (tagKey == null || tagValue == null || measurementMNode == null) {
return;
}
- tagIndex
- .computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(tagValue, v -> Collections.synchronizedSet(new HashSet<>()))
- .add(measurementMNode);
+
+ int tagIndexOldSize = tagIndex.size();
+ Map<String, Set<IMeasurementMNode<?>>> tagValueMap =
+ tagIndex.computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>());
+ int tagIndexNewSize = tagIndex.size();
+
+ int tagValueMapOldSize = tagValueMap.size();
+ Set<IMeasurementMNode<?>> measurementsSet =
+ tagValueMap.computeIfAbsent(tagValue, v -> Collections.synchronizedSet(new HashSet<>()));
+ int tagValueMapNewSize = tagValueMap.size();
+
+ int measurementsSetOldSize = measurementsSet.size();
+ measurementsSet.add(measurementMNode);
+ int measurementsSetNewSize = measurementsSet.size();
+
+ long memorySize = 0;
+ if (tagIndexNewSize - tagIndexOldSize == 1) {
+ // the last 4 is the memory occupied by the size of tagvaluemap
+ memorySize += RamUsageEstimator.sizeOf(tagKey) + 4;
+ }
+ if (tagValueMapNewSize - tagValueMapOldSize == 1) {
+ // the last 4 is the memory occupied by the size of measurementsSet
+ memorySize += RamUsageEstimator.sizeOf(tagValue) + 4;
+ }
+ if (measurementsSetNewSize - measurementsSetOldSize == 1) {
+ // 8 is the memory occupied by the length of the IMeasurementMNode
+ memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4;
+ }
+ requestMemory(memorySize);
}
public void addIndex(Map<String, String> tagsMap, IMeasurementMNode<?> measurementMNode) {
@@ -171,10 +203,27 @@
}
public void removeIndex(String tagKey, String tagValue, IMeasurementMNode<?> measurementMNode) {
- tagIndex.get(tagKey).get(tagValue).remove(measurementMNode);
- if (tagIndex.get(tagKey).get(tagValue).isEmpty()) {
- tagIndex.get(tagKey).remove(tagValue);
+ if (tagKey == null || tagValue == null || measurementMNode == null) {
+ return;
}
+ // init memory size
+ long memorySize = 0;
+ if (tagIndex.get(tagKey).get(tagValue).remove(measurementMNode)) {
+ memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4;
+ }
+ if (tagIndex.get(tagKey).get(tagValue).isEmpty()) {
+ if (tagIndex.get(tagKey).remove(tagValue) != null) {
+ // the last 4 is the memory occupied by the size of IMeasurementMNodeSet
+ memorySize += RamUsageEstimator.sizeOf(tagValue) + 4;
+ }
+ }
+ if (tagIndex.get(tagKey).isEmpty()) {
+ if (tagIndex.remove(tagKey) != null) {
+ // the last 4 is the memory occupied by the size of tagValueMap
+ memorySize += RamUsageEstimator.sizeOf(tagKey) + 4;
+ }
+ }
+ releaseMemory(memorySize);
}
private List<IMeasurementMNode<?>> getMatchedTimeseriesInIndex(TagFilter tagFilter) {
@@ -310,8 +359,7 @@
if (node.getOffset() < 0) {
return;
}
- Map<String, String> tagMap =
- tagLogFile.readTag(COMMON_CONFIG.getTagAttributeTotalSize(), node.getOffset());
+ Map<String, String> tagMap = tagLogFile.readTag(node.getOffset());
if (tagMap != null) {
for (Map.Entry<String, String> entry : tagMap.entrySet()) {
if (tagIndex.containsKey(entry.getKey())
@@ -324,13 +372,9 @@
entry.getValue(),
node.getOffset()));
}
- tagIndex.get(entry.getKey()).get(entry.getValue()).remove(node);
- if (tagIndex.get(entry.getKey()).get(entry.getValue()).isEmpty()) {
- tagIndex.get(entry.getKey()).remove(entry.getValue());
- if (tagIndex.get(entry.getKey()).isEmpty()) {
- tagIndex.remove(entry.getKey());
- }
- }
+
+ removeIndex(entry.getKey(), entry.getValue(), node);
+
} else {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -359,8 +403,7 @@
IMeasurementMNode<?> leafMNode)
throws MetadataException, IOException {
- Pair<Map<String, String>, Map<String, String>> pair =
- tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+ Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
if (tagsMap != null) {
for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
@@ -383,6 +426,7 @@
}
removeIndex(key, beforeValue, leafMNode);
+
} else {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -424,8 +468,7 @@
Map<String, String> attributesMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
throws MetadataException, IOException {
- Pair<Map<String, String>, Map<String, String>> pair =
- tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+ Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
for (Map.Entry<String, String> entry : attributesMap.entrySet()) {
String key = entry.getKey();
@@ -453,8 +496,7 @@
Map<String, String> tagsMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
throws MetadataException, IOException {
- Pair<Map<String, String>, Map<String, String>> pair =
- tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+ Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
String key = entry.getKey();
@@ -484,8 +526,7 @@
public void dropTagsOrAttributes(
Set<String> keySet, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
throws MetadataException, IOException {
- Pair<Map<String, String>, Map<String, String>> pair =
- tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+ Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
Map<String, String> deleteTag = new HashMap<>();
for (String key : keySet) {
@@ -505,16 +546,10 @@
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
- Map<String, Set<IMeasurementMNode<?>>> tagVal2LeafMNodeSet;
- Set<IMeasurementMNode<?>> nodeSet;
- for (Map.Entry<String, String> entry : deleteTag.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- // change the tag inverted index map
- tagVal2LeafMNodeSet = tagIndex.get(key);
- if (tagVal2LeafMNodeSet != null) {
- nodeSet = tagVal2LeafMNodeSet.get(value);
- if (nodeSet != null) {
+ if (!deleteTag.isEmpty()) {
+ for (Map.Entry<String, String> entry : deleteTag.entrySet()) {
+ if (tagIndex.containsKey((entry.getKey()))
+ && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
@@ -524,24 +559,20 @@
leafMNode.getOffset()));
}
- nodeSet.remove(leafMNode);
- if (nodeSet.isEmpty()) {
- tagVal2LeafMNodeSet.remove(value);
- if (tagVal2LeafMNodeSet.isEmpty()) {
- tagIndex.remove(key);
- }
+ removeIndex(entry.getKey(), entry.getValue(), leafMNode);
+
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ String.format(
+ String.format(
+ DEBUG_MSG_1, "Drop" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
+ entry.getKey(),
+ entry.getValue(),
+ leafMNode.getOffset(),
+ tagIndex.containsKey(entry.getKey())));
}
}
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug(
- String.format(
- String.format(DEBUG_MSG_1, "Drop" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
- key,
- value,
- leafMNode.getOffset(),
- tagIndex.containsKey(key)));
- }
}
}
}
@@ -557,8 +588,7 @@
Map<String, String> alterMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
throws MetadataException, IOException {
// tags, attributes
- Pair<Map<String, String>, Map<String, String>> pair =
- tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+ Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
Map<String, String> oldTagValue = new HashMap<>();
Map<String, String> newTagValue = new HashMap<>();
@@ -599,7 +629,8 @@
leafMNode.getOffset()));
}
- tagIndex.get(key).get(beforeValue).remove(leafMNode);
+ removeIndex(key, beforeValue, leafMNode);
+
} else {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -628,8 +659,7 @@
String oldKey, String newKey, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
throws MetadataException, IOException {
// tags, attributes
- Pair<Map<String, String>, Map<String, String>> pair =
- tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
+ Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(leafMNode.getOffset());
// current name has existed
if (pair.left.containsKey(newKey) || pair.right.containsKey(newKey)) {
@@ -657,7 +687,7 @@
leafMNode.getOffset()));
}
- tagIndex.get(oldKey).get(value).remove(leafMNode);
+ removeIndex(oldKey, value, leafMNode);
} else {
if (logger.isDebugEnabled()) {
@@ -691,7 +721,7 @@
public Pair<Map<String, String>, Map<String, String>> readTagFile(long tagFileOffset)
throws IOException {
- return tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), tagFileOffset);
+ return tagLogFile.read(tagFileOffset);
}
/**
@@ -716,4 +746,16 @@
tagLogFile = null;
}
}
+
+ private void requestMemory(long size) {
+ if (regionStatistics != null) {
+ regionStatistics.requestMemory(size);
+ }
+ }
+
+ private void releaseMemory(long size) {
+ if (regionStatistics != null) {
+ regionStatistics.releaseMemory(size);
+ }
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
index cd2517d..199cde6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
@@ -62,6 +62,7 @@
import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_ENTITY_MNODE_TYPE;
import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_MNODE_TYPE;
import static org.apache.iotdb.commons.schema.SchemaConstant.isStorageGroupType;
+import static org.apache.iotdb.db.schemaengine.schemaregion.tag.TagLogFile.parseByteBuffer;
public class SRStatementGenerator implements Iterator<Statement>, Iterable<Statement> {
@@ -385,9 +386,7 @@
private Pair<Map<String, String>, Map<String, String>> getTagsAndAttributes(long offset)
throws IOException {
if (tagFileChannel != null) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(COMMON_CONFIG.getTagAttributeTotalSize());
- tagFileChannel.read(byteBuffer, offset);
- byteBuffer.flip();
+ ByteBuffer byteBuffer = parseByteBuffer(tagFileChannel, offset);
Pair<Map<String, String>, Map<String, String>> tagsAndAttributes =
new Pair<>(ReadWriteIOUtils.readMap(byteBuffer), ReadWriteIOUtils.readMap(byteBuffer));
return tagsAndAttributes;
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 7d72edc..400b58b 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -251,11 +251,23 @@
# It is possible to lose at most tag_attribute_flush_interval records
# tag_attribute_flush_interval=1000
-# max size for tag and attribute of one time series
+# max size for a storage block for tags and attributes of one time series. If the combined size of tags and
+# attributes exceeds the tag_attribute_total_size, a new storage block will be allocated to continue storing
+# the excess data.
# the unit is byte
# Datatype: int
# tag_attribute_total_size=700
+# Maximum number of map entries allowed for each of tagMap and attributeMap separately
+# the unit is byte
+# Datatype: int
+# tag_attribute_max_num = 20
+
+# Maximum size allowed for each of map entry separately
+# the unit is byte
+# Datatype: int
+# tag_attribute_entry_max_size = 100
+
# max measurement num of internal request
# When creating timeseries with Session.createMultiTimeseries, the user input plan, the timeseries num of
# which exceeds this num, will be split to several plans with timeseries no more than this num.
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4e4e76e..b1948d1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,6 +251,8 @@
// Max size for tag and attribute of one time series
private int tagAttributeTotalSize = 700;
+ private int tagAttributeMaxNum = 20;
+ private int tagAttributeEntryMaxSize = 100;
// maximum number of Cluster Databases allowed
private int databaseLimitThreshold = -1;
@@ -1097,6 +1099,22 @@
this.tagAttributeTotalSize = tagAttributeTotalSize;
}
+ public int getTagAttributeMaxNum() {
+ return tagAttributeMaxNum;
+ }
+
+ public void setTagAttributeMaxNum(int tagAttributeMaxNum) {
+ this.tagAttributeMaxNum = tagAttributeMaxNum;
+ }
+
+ public int getTagAttributeEntryMaxSize() {
+ return tagAttributeEntryMaxSize;
+ }
+
+ public void setTagAttributeEntryMaxSize(int tagAttributeEntryMaxSize) {
+ this.tagAttributeEntryMaxSize = tagAttributeEntryMaxSize;
+ }
+
public int getDatabaseLimitThreshold() {
return databaseLimitThreshold;
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 04bd5f1..c2fd7f7 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -220,10 +220,22 @@
properties.getProperty(
"tag_attribute_total_size", String.valueOf(config.getTagAttributeTotalSize()))));
+ config.setTagAttributeMaxNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "tag_attribute_max_num", String.valueOf(config.getTagAttributeMaxNum()))));
+
+ config.setTagAttributeEntryMaxSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "tag_attribute_entry_max_size",
+ String.valueOf(config.getTagAttributeEntryMaxSize()))));
+
config.setTimePartitionInterval(
Long.parseLong(
properties.getProperty(
"time_partition_interval", String.valueOf(config.getTimePartitionInterval()))));
+
config.setDatabaseLimitThreshold(
Integer.parseInt(
properties.getProperty(