[To rel/1.2] Fix lock bug and improve schema file write policy in PBTree (#11477)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
index 3da6a14..38cbaa8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
@@ -39,6 +39,7 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.schemaengine.SchemaConstant.ALL_MATCH_PATTERN;
 
 public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaInfo> {
@@ -117,7 +118,10 @@
 
   @Override
   public boolean hasSchemaStatistic(ISchemaRegion schemaRegion) {
-    return pathPattern.equals(ALL_MATCH_PATTERN) && (schemaFilter == null);
+    return (pathPattern.equals(ALL_MATCH_PATTERN)
+            || (pathPattern.getMeasurement().equals(MULTI_LEVEL_PATH_WILDCARD)
+                && schemaRegion.getDatabaseFullPath().startsWith(pathPattern.getDevice())))
+        && (schemaFilter == null);
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 2bf9b3a..1103793 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -556,6 +556,8 @@
           return;
         }
       }
+
+      long startTime = System.currentTimeMillis();
       List<ICachedMNode> nodesToPersist = cacheManager.collectVolatileMNodes();
       for (ICachedMNode volatileNode : nodesToPersist) {
         try {
@@ -569,6 +571,10 @@
         }
         cacheManager.updateCacheStatusAfterPersist(volatileNode);
       }
+      logger.info(
+          "It takes {}ms to flush MTree in SchemaRegion {}",
+          (System.currentTimeMillis() - startTime),
+          regionStatistics.getSchemaRegionId());
       if (updatedStorageGroupMNode != null || !nodesToPersist.isEmpty()) {
         flushCallback.run();
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
index 66ef3ab..c48624c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
@@ -50,12 +50,12 @@
   private final Lock lock = new ReentrantLock();
   private final Condition okToRead = lock.newCondition();
   private final Condition okToWrite = lock.newCondition();
-  private long stampAllocator = 0;
+  private volatile long stampAllocator = 0;
 
   private final Map<Long, Integer> readCnt = new HashMap<>();
-  private int readWait = 0;
-  private int writeCnt = 0;
-  private int writeWait = 0;
+  private volatile int readWait = 0;
+  private volatile int writeCnt = 0;
+  private volatile int writeWait = 0;
 
   private final ThreadLocal<Long> sharedOwnerStamp = new ThreadLocal<>();
   /**
@@ -115,7 +115,7 @@
    * @return read lock stamp
    */
   private long acquireReadLockStamp(boolean prior) {
-    if ((prior ? writeCnt : writeCnt + writeWait) > 0) {
+    while ((prior ? writeCnt : writeCnt + writeWait) > 0) {
       readWait++;
       try {
         okToRead.await();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index 989720f..d475205 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -25,6 +25,8 @@
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.schemafile.SchemaFileNotExists;
 import org.apache.iotdb.db.schemaengine.SchemaConstant;
 import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -141,7 +143,11 @@
     return new SchemaFile(
         sgName,
         schemaRegionId,
-        !pmtFile.exists(),
+        !pmtFile.exists()
+            || IoTDBDescriptor.getInstance()
+                .getConfig()
+                .getSchemaRegionConsensusProtocolClass()
+                .equals(ConsensusFactory.RATIS_CONSENSUS),
         CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
         false);
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 7d58cf6..d4065b9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -20,6 +20,8 @@
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
 import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -89,6 +91,9 @@
   private final AtomicInteger logCounter;
   private SchemaFileLogWriter logWriter;
 
+  // flush strategy is dependent on consensus protocol, only check protocol on init
+  protected FlushPageStrategy flushDirtyPagesStrategy;
+
   PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String logPath)
       throws IOException, MetadataException {
     this.pageInstCache =
@@ -107,10 +112,21 @@
     this.pmtFile = pmtFile;
     this.readChannel = FileChannel.open(pmtFile.toPath(), StandardOpenOption.READ);
 
-    // recover if log exists
-    int pageAcc = (int) recoverFromLog(logPath) / SchemaFileConfig.PAGE_LENGTH;
-    this.logWriter = new SchemaFileLogWriter(logPath);
-    logCounter = new AtomicInteger(pageAcc);
+    if (IoTDBDescriptor.getInstance()
+        .getConfig()
+        .getSchemaRegionConsensusProtocolClass()
+        .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      // with RATIS enabled, integrity is guaranteed by consensus protocol
+      logCounter = new AtomicInteger();
+      logWriter = null;
+      flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+    } else {
+      // without RATIS, utilize physical logging for integrity
+      int pageAcc = (int) recoverFromLog(logPath) / SchemaFileConfig.PAGE_LENGTH;
+      this.logWriter = new SchemaFileLogWriter(logPath);
+      logCounter = new AtomicInteger(pageAcc);
+      flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+    }
 
     // construct first page if file to init
     if (lastPageIndex < 0) {
@@ -405,29 +421,41 @@
     return lastPageIndex.get();
   }
 
-  @Override
-  public void flushDirtyPages() throws IOException {
-    if (dirtyPages.size() == 0) {
-      return;
-    }
+  @FunctionalInterface
+  interface FlushPageStrategy {
+    void apply() throws IOException;
+  }
 
-    // TODO: better performance expected while ensuring integrity when exception interrupts
+  private void flushDirtyPagesWithLogging() throws IOException {
     if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
       logWriter = logWriter.renew();
       logCounter.set(0);
     }
-
     logCounter.addAndGet(dirtyPages.size());
     for (ISchemaPage page : dirtyPages.values()) {
       page.syncPageBuffer();
       logWriter.write(page);
     }
     logWriter.prepare();
-
     for (ISchemaPage page : dirtyPages.values()) {
       page.flushPageToChannel(channel);
     }
     logWriter.commit();
+  }
+
+  private void flushDirtyPagesWithoutLogging() throws IOException {
+    for (ISchemaPage page : dirtyPages.values()) {
+      page.syncPageBuffer();
+      page.flushPageToChannel(channel);
+    }
+  }
+
+  @Override
+  public void flushDirtyPages() throws IOException {
+    if (dirtyPages.size() == 0) {
+      return;
+    }
+    flushDirtyPagesStrategy.apply();
     dirtyPages.clear();
     Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
   }
@@ -438,7 +466,7 @@
     Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
     pageInstCache.clear();
     lastPageIndex.set(0);
-    logWriter = logWriter.renew();
+    logWriter = logWriter == null ? null : logWriter.renew();
   }
 
   @Override
@@ -454,7 +482,9 @@
 
   @Override
   public void close() throws IOException {
-    logWriter.close();
+    if (logWriter != null) {
+      logWriter.close();
+    }
   }
 
   // endregion
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
index 2eb8c38..b005c62 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
@@ -217,4 +217,47 @@
     Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 4);
     Assert.assertEquals(4, counter.get());
   }
+
+  @Test
+  public void testConcurrent() throws InterruptedException {
+    StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
+    Semaphore semaphore = new Semaphore(0);
+    AtomicInteger counter1 = new AtomicInteger();
+    AtomicInteger counter2 = new AtomicInteger();
+    // main thread get read lock by stamp
+    new Thread(
+            () -> {
+              // writer thread will be blocked util main thread release read lock.
+              lock.writeLock();
+              try {
+                semaphore.acquire();
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              lock.unlockWrite();
+              lock.writeLock();
+              counter1.incrementAndGet();
+              try {
+                Thread.sleep(500);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              counter2.incrementAndGet();
+              lock.unlockWrite();
+            })
+        .start();
+    new Thread(
+            () -> {
+              try {
+                Thread.sleep(500);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              semaphore.release();
+            })
+        .start();
+    lock.threadReadLock();
+    Assert.assertEquals(counter2.get(), counter1.get());
+    lock.threadReadUnlock();
+  }
 }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index c994471..8b8bb84 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -22,6 +22,8 @@
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.schemaengine.SchemaConstant;
 import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
 import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -73,6 +75,13 @@
 
   @Test
   public void essentialLogTest() throws IOException, MetadataException {
+    // select SIMPLE consensus to trigger logging
+    String previousConsensus =
+        IoTDBDescriptor.getInstance().getConfig().getSchemaRegionConsensusProtocolClass();
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setSchemaRegionConsensusProtocolClass(ConsensusFactory.SIMPLE_CONSENSUS);
+
     SchemaFile sf =
         (SchemaFile) SchemaFile.initSchemaFile("root.test.vRoot1", TEST_SCHEMA_REGION_ID);
     IDatabaseMNode<ICachedMNode> newSGNode =
@@ -163,5 +172,9 @@
     }
     Assert.assertEquals(cnt, cnt2);
     sf.close();
+
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setSchemaRegionConsensusProtocolClass(previousConsensus);
   }
 }