[To rel/1.2] Fix lock bug and improve schema file write policy in PBTree (#11477)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
index 3da6a14..38cbaa8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
@@ -39,6 +39,7 @@
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.schemaengine.SchemaConstant.ALL_MATCH_PATTERN;
public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaInfo> {
@@ -117,7 +118,10 @@
@Override
public boolean hasSchemaStatistic(ISchemaRegion schemaRegion) {
- return pathPattern.equals(ALL_MATCH_PATTERN) && (schemaFilter == null);
+ return (pathPattern.equals(ALL_MATCH_PATTERN)
+ || (pathPattern.getMeasurement().equals(MULTI_LEVEL_PATH_WILDCARD)
+ && schemaRegion.getDatabaseFullPath().startsWith(pathPattern.getDevice())))
+ && (schemaFilter == null);
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 2bf9b3a..1103793 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -556,6 +556,8 @@
return;
}
}
+
+ long startTime = System.currentTimeMillis();
List<ICachedMNode> nodesToPersist = cacheManager.collectVolatileMNodes();
for (ICachedMNode volatileNode : nodesToPersist) {
try {
@@ -569,6 +571,10 @@
}
cacheManager.updateCacheStatusAfterPersist(volatileNode);
}
+ logger.info(
+ "It takes {}ms to flush MTree in SchemaRegion {}",
+ (System.currentTimeMillis() - startTime),
+ regionStatistics.getSchemaRegionId());
if (updatedStorageGroupMNode != null || !nodesToPersist.isEmpty()) {
flushCallback.run();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
index 66ef3ab..c48624c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
@@ -50,12 +50,12 @@
private final Lock lock = new ReentrantLock();
private final Condition okToRead = lock.newCondition();
private final Condition okToWrite = lock.newCondition();
- private long stampAllocator = 0;
+ private volatile long stampAllocator = 0;
private final Map<Long, Integer> readCnt = new HashMap<>();
- private int readWait = 0;
- private int writeCnt = 0;
- private int writeWait = 0;
+ private volatile int readWait = 0;
+ private volatile int writeCnt = 0;
+ private volatile int writeWait = 0;
private final ThreadLocal<Long> sharedOwnerStamp = new ThreadLocal<>();
/**
@@ -115,7 +115,7 @@
* @return read lock stamp
*/
private long acquireReadLockStamp(boolean prior) {
- if ((prior ? writeCnt : writeCnt + writeWait) > 0) {
+ while ((prior ? writeCnt : writeCnt + writeWait) > 0) {
readWait++;
try {
okToRead.await();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index 989720f..d475205 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -25,6 +25,8 @@
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.schemafile.SchemaFileNotExists;
import org.apache.iotdb.db.schemaengine.SchemaConstant;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -141,7 +143,11 @@
return new SchemaFile(
sgName,
schemaRegionId,
- !pmtFile.exists(),
+ !pmtFile.exists()
+ || IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getSchemaRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS),
CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
false);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 7d58cf6..d4065b9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -20,6 +20,8 @@
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -89,6 +91,9 @@
private final AtomicInteger logCounter;
private SchemaFileLogWriter logWriter;
+ // flush strategy is dependent on consensus protocol, only check protocol on init
+ protected FlushPageStrategy flushDirtyPagesStrategy;
+
PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String logPath)
throws IOException, MetadataException {
this.pageInstCache =
@@ -107,10 +112,21 @@
this.pmtFile = pmtFile;
this.readChannel = FileChannel.open(pmtFile.toPath(), StandardOpenOption.READ);
- // recover if log exists
- int pageAcc = (int) recoverFromLog(logPath) / SchemaFileConfig.PAGE_LENGTH;
- this.logWriter = new SchemaFileLogWriter(logPath);
- logCounter = new AtomicInteger(pageAcc);
+ if (IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getSchemaRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ // with RATIS enabled, integrity is guaranteed by consensus protocol
+ logCounter = new AtomicInteger();
+ logWriter = null;
+ flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+ } else {
+ // without RATIS, utilize physical logging for integrity
+ int pageAcc = (int) recoverFromLog(logPath) / SchemaFileConfig.PAGE_LENGTH;
+ this.logWriter = new SchemaFileLogWriter(logPath);
+ logCounter = new AtomicInteger(pageAcc);
+ flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+ }
// construct first page if file to init
if (lastPageIndex < 0) {
@@ -405,29 +421,41 @@
return lastPageIndex.get();
}
- @Override
- public void flushDirtyPages() throws IOException {
- if (dirtyPages.size() == 0) {
- return;
- }
+ @FunctionalInterface
+ interface FlushPageStrategy {
+ void apply() throws IOException;
+ }
- // TODO: better performance expected while ensuring integrity when exception interrupts
+ private void flushDirtyPagesWithLogging() throws IOException {
if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
logWriter = logWriter.renew();
logCounter.set(0);
}
-
logCounter.addAndGet(dirtyPages.size());
for (ISchemaPage page : dirtyPages.values()) {
page.syncPageBuffer();
logWriter.write(page);
}
logWriter.prepare();
-
for (ISchemaPage page : dirtyPages.values()) {
page.flushPageToChannel(channel);
}
logWriter.commit();
+ }
+
+ private void flushDirtyPagesWithoutLogging() throws IOException {
+ for (ISchemaPage page : dirtyPages.values()) {
+ page.syncPageBuffer();
+ page.flushPageToChannel(channel);
+ }
+ }
+
+ @Override
+ public void flushDirtyPages() throws IOException {
+ if (dirtyPages.size() == 0) {
+ return;
+ }
+ flushDirtyPagesStrategy.apply();
dirtyPages.clear();
Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
}
@@ -438,7 +466,7 @@
Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
pageInstCache.clear();
lastPageIndex.set(0);
- logWriter = logWriter.renew();
+ logWriter = logWriter == null ? null : logWriter.renew();
}
@Override
@@ -454,7 +482,9 @@
@Override
public void close() throws IOException {
- logWriter.close();
+ if (logWriter != null) {
+ logWriter.close();
+ }
}
// endregion
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
index 2eb8c38..b005c62 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
@@ -217,4 +217,47 @@
Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 4);
Assert.assertEquals(4, counter.get());
}
+
+ @Test
+ public void testConcurrent() throws InterruptedException {
+ StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
+ Semaphore semaphore = new Semaphore(0);
+ AtomicInteger counter1 = new AtomicInteger();
+ AtomicInteger counter2 = new AtomicInteger();
+ // main thread get read lock by stamp
+ new Thread(
+ () -> {
+ // writer thread will be blocked util main thread release read lock.
+ lock.writeLock();
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ lock.unlockWrite();
+ lock.writeLock();
+ counter1.incrementAndGet();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ counter2.incrementAndGet();
+ lock.unlockWrite();
+ })
+ .start();
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ semaphore.release();
+ })
+ .start();
+ lock.threadReadLock();
+ Assert.assertEquals(counter2.get(), counter1.get());
+ lock.threadReadUnlock();
+ }
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index c994471..8b8bb84 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -22,6 +22,8 @@
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.schemaengine.SchemaConstant;
import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -73,6 +75,13 @@
@Test
public void essentialLogTest() throws IOException, MetadataException {
+ // select SIMPLE consensus to trigger logging
+ String previousConsensus =
+ IoTDBDescriptor.getInstance().getConfig().getSchemaRegionConsensusProtocolClass();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setSchemaRegionConsensusProtocolClass(ConsensusFactory.SIMPLE_CONSENSUS);
+
SchemaFile sf =
(SchemaFile) SchemaFile.initSchemaFile("root.test.vRoot1", TEST_SCHEMA_REGION_ID);
IDatabaseMNode<ICachedMNode> newSGNode =
@@ -163,5 +172,9 @@
}
Assert.assertEquals(cnt, cnt2);
sf.close();
+
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setSchemaRegionConsensusProtocolClass(previousConsensus);
}
}