Fix pipe tree database creation on receiver
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 4f101e1..978d924 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -26,6 +26,8 @@
 import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -89,6 +91,7 @@
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask;
 import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
 import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.AlterLogicalViewNode;
@@ -99,6 +102,7 @@
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
@@ -133,6 +137,7 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -166,7 +171,8 @@
               this::executeStatementForTableModel);
   private final PipeTreeStatementDataTypeConvertExecutionVisitor
       treeStatementDataTypeConvertExecutionVisitor =
-          new PipeTreeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel);
+          new PipeTreeStatementDataTypeConvertExecutionVisitor(
+              statement -> executeStatementForTreeModel(statement, getTreeDatabaseName(statement)));
   public final PipeTreeStatementToBatchVisitor batchVisitor = new PipeTreeStatementToBatchVisitor();
 
   // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster
@@ -186,6 +192,14 @@
   private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
 
   private PipeMemoryBlock allocatedMemoryBlock;
+  private final Set<String> autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet();
+  private final Set<String> conflictedTreeDatabases = ConcurrentHashMap.newKeySet();
+
+  private enum TreeDatabaseCreationResult {
+    SKIPPED,
+    CREATED_OR_EXISTED,
+    CONFLICTED
+  }
 
   static {
     try {
@@ -988,6 +1002,9 @@
           ((InsertBaseStatement) statement).getDatabaseName().isPresent()
               ? ((InsertBaseStatement) statement).getDatabaseName().get()
               : null;
+    } else if (statement instanceof InsertBaseStatement) {
+      isTableModelStatement = false;
+      databaseName = getTreeDatabaseName(statement);
     } else {
       isTableModelStatement = false;
       databaseName = null;
@@ -1038,7 +1055,7 @@
     final TSStatus status =
         isTableModelStatement
             ? executeStatementForTableModel(statement, databaseName)
-            : executeStatementForTreeModel(statement);
+            : executeStatementForTreeModel(statement, getTreeDatabaseName(statement));
 
     // Try to convert data type if the status code is not success. Insert statements normally return
     // above after the first converted execution. The retry path is kept for load and fallback
@@ -1181,7 +1198,84 @@
     }
   }
 
-  private TSStatus executeStatementForTreeModel(final Statement statement) {
+  private TreeDatabaseCreationResult autoCreateTreeDatabaseIfNecessary(final String database) {
+    if (database == null
+        || LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null
+        || !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+      return TreeDatabaseCreationResult.SKIPPED;
+    }
+    if (autoCreatedTreeDatabases.contains(database)) {
+      return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
+    }
+    if (conflictedTreeDatabases.contains(database)) {
+      return TreeDatabaseCreationResult.CONFLICTED;
+    }
+
+    try {
+      final TSStatus status =
+          AuthorityChecker.getAccessControl()
+              .checkCanCreateDatabaseForTree(getUserEntity(), new PartialPath(database));
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new PipeException(status.getMessage());
+      }
+
+      final DatabaseSchemaStatement statement =
+          new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
+      statement.setDatabasePath(new PartialPath(database));
+      statement.setEnablePrintExceptionLog(false);
+      final DatabaseSchemaTask task = new DatabaseSchemaTask(statement);
+      final ListenableFuture<ConfigTaskResult> future =
+          task.execute(ClusterConfigTaskExecutor.getInstance());
+      final ConfigTaskResult result = future.get();
+      final int statusCode = result.getStatusCode().getStatusCode();
+      if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          || statusCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+        autoCreatedTreeDatabases.add(database);
+        return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
+      }
+      if (statusCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+        conflictedTreeDatabases.add(database);
+        return TreeDatabaseCreationResult.CONFLICTED;
+      }
+      throw new PipeException(
+          String.format(
+              "Auto create tree database failed: %s, status code: %s",
+              database, result.getStatusCode()));
+    } catch (final IllegalPathException e) {
+      throw new PipeException(String.format("Illegal tree database %s.", database), e);
+    } catch (final ExecutionException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      final Throwable rootCause = getRootCause(e);
+      final int errorCode;
+      if (rootCause instanceof IoTDBException) {
+        errorCode = ((IoTDBException) rootCause).getErrorCode();
+      } else if (rootCause instanceof IoTDBRuntimeException) {
+        errorCode = ((IoTDBRuntimeException) rootCause).getErrorCode();
+      } else {
+        errorCode = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
+      }
+      if (errorCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+        autoCreatedTreeDatabases.add(database);
+        return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
+      }
+      if (errorCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+        conflictedTreeDatabases.add(database);
+        return TreeDatabaseCreationResult.CONFLICTED;
+      }
+      throw new PipeException(
+          DataNodePipeMessages.AUTO_CREATE_DATABASE_FAILED_BECAUSE + e.getMessage());
+    }
+  }
+
+  private TSStatus executeStatementForTreeModel(
+      final Statement statement, final String databaseName) {
+    if (autoCreateTreeDatabaseIfNecessary(databaseName) == TreeDatabaseCreationResult.CONFLICTED) {
+      // Continue execution, but let partition analysis infer the receiver-side database.
+      clearTreeDatabaseName(statement);
+    }
+
     return Coordinator.getInstance()
         .executeForTreeModel(
             shouldMarkAsPipeRequest.get() ? new PipeEnrichedStatement(statement) : statement,
@@ -1196,6 +1290,53 @@
         .status;
   }
 
+  private IAuditEntity getUserEntity() {
+    return userEntity != null
+        ? userEntity
+        : AuthorityChecker.createIAuditEntity(username, SESSION_MANAGER.getCurrSession());
+  }
+
+  private String getTreeDatabaseName(final Statement statement) {
+    if (statement instanceof LoadTsFileStatement) {
+      return ((LoadTsFileStatement) statement).getDatabase();
+    }
+    if (statement instanceof InsertBaseStatement) {
+      return ((InsertBaseStatement) statement).getDatabaseName().orElse(null);
+    }
+    return null;
+  }
+
+  static void clearTreeDatabaseName(final Statement statement) {
+    if (statement instanceof LoadTsFileStatement) {
+      final LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) statement;
+      loadTsFileStatement.setDatabase(null);
+      loadTsFileStatement.setDatabaseLevel(
+          IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel());
+    } else if (statement instanceof InsertBaseStatement) {
+      clearTreeInsertDatabaseName((InsertBaseStatement) statement);
+    }
+  }
+
+  private static void clearTreeInsertDatabaseName(final InsertBaseStatement statement) {
+    statement.setDatabaseName(null);
+    if (statement instanceof InsertRowsStatement) {
+      for (final InsertBaseStatement childStatement :
+          ((InsertRowsStatement) statement).getInsertRowStatementList()) {
+        childStatement.setDatabaseName(null);
+      }
+    } else if (statement instanceof InsertRowsOfOneDeviceStatement) {
+      for (final InsertBaseStatement childStatement :
+          ((InsertRowsOfOneDeviceStatement) statement).getInsertRowStatementList()) {
+        childStatement.setDatabaseName(null);
+      }
+    } else if (statement instanceof InsertMultiTabletsStatement) {
+      for (final InsertBaseStatement childStatement :
+          ((InsertMultiTabletsStatement) statement).getInsertTabletStatementList()) {
+        childStatement.setDatabaseName(null);
+      }
+    }
+  }
+
   private TSStatus executeStatementForTableModelWithPermissionCheck(
       final org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement statement,
       final String databaseName) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
index e78e273..f8c6ead 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -105,12 +105,15 @@
           new TsFileInsertionEventScanParser(
               file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) {
         for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
+          final InsertTabletStatement insertTabletStatement =
+              PipeTransferTabletRawReq.toTPipeTransferRawReq(
+                      tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
+                  .constructStatement();
+          if (loadTsFileStatement.getDatabase() != null) {
+            insertTabletStatement.setDatabaseName(loadTsFileStatement.getDatabase());
+          }
           final PipeConvertedInsertTabletStatement statement =
-              new PipeConvertedInsertTabletStatement(
-                  PipeTransferTabletRawReq.toTPipeTransferRawReq(
-                          tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
-                      .constructStatement(),
-                  false);
+              new PipeConvertedInsertTabletStatement(insertTabletStatement, false);
 
           TSStatus result;
           try {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 6ca3164..e2db158 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -652,6 +652,7 @@
             .setConvertOnTypeMismatch(true);
     if (database != null) {
       statement.setDatabase(database);
+      statement.updateDatabaseLevelByTreeDatabase();
     }
     if (isGeneratedByPipe) {
       statement.markIsGeneratedByPipe();
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
index 7f9197d..8f2e86c 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -20,6 +20,10 @@
 package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
 import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
@@ -29,6 +33,8 @@
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 
 public class IoTDBDataNodeReceiverTest {
@@ -150,4 +156,53 @@
       Files.deleteIfExists(tsFile);
     }
   }
+
+  @Test
+  public void testClearTreeDatabaseNameForLoadTsFileStatement() throws Exception {
+    final Path tsFile = Files.createTempFile("pipe-load-clear-tree-database", ".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), true, true);
+
+      IoTDBDataNodeReceiver.clearTreeDatabaseName(statement);
+
+      Assert.assertNull(statement.getDatabase());
+      Assert.assertEquals(
+          IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
+          statement.getDatabaseLevel());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void testClearTreeDatabaseNameForBatchInsertStatements() {
+    final InsertRowStatement rowStatement1 = new InsertRowStatement();
+    rowStatement1.setDatabaseName("root.test.sg_0");
+    final InsertRowStatement rowStatement2 = new InsertRowStatement();
+    rowStatement2.setDatabaseName("root.test.sg_0");
+    final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
+    insertRowsStatement.setDatabaseName("root.test.sg_0");
+    insertRowsStatement.setInsertRowStatementList(Arrays.asList(rowStatement1, rowStatement2));
+
+    IoTDBDataNodeReceiver.clearTreeDatabaseName(insertRowsStatement);
+
+    Assert.assertFalse(insertRowsStatement.getDatabaseName().isPresent());
+    Assert.assertFalse(rowStatement1.getDatabaseName().isPresent());
+    Assert.assertFalse(rowStatement2.getDatabaseName().isPresent());
+
+    final InsertTabletStatement tabletStatement = new InsertTabletStatement();
+    tabletStatement.setDatabaseName("root.test.sg_0");
+    final InsertMultiTabletsStatement insertMultiTabletsStatement =
+        new InsertMultiTabletsStatement();
+    insertMultiTabletsStatement.setDatabaseName("root.test.sg_0");
+    insertMultiTabletsStatement.setInsertTabletStatementList(
+        Collections.singletonList(tabletStatement));
+
+    IoTDBDataNodeReceiver.clearTreeDatabaseName(insertMultiTabletsStatement);
+
+    Assert.assertFalse(insertMultiTabletsStatement.getDatabaseName().isPresent());
+    Assert.assertFalse(tabletStatement.getDatabaseName().isPresent());
+  }
 }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 2db41c2..19d9749 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -28,6 +28,7 @@
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -35,6 +36,9 @@
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import java.io.File;
+import java.lang.reflect.Method;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -87,4 +91,31 @@
 
     Assert.assertEquals("test", LoadTsFileScheduler.getPartitionQueryDatabase(node, false));
   }
+
+  @Test
+  public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws Exception {
+    final LoadTsFileScheduler scheduler =
+        new LoadTsFileScheduler(
+            distributedQueryPlan,
+            mock(MPPQueryContext.class),
+            mock(QueryStateMachine.class),
+            mock(IClientManager.class),
+            mock(IPartitionFetcher.class),
+            true);
+    final Method method =
+        LoadTsFileScheduler.class.getDeclaredMethod(
+            "buildRetryTreeLoadStatement", String.class, boolean.class, String.class);
+    method.setAccessible(true);
+
+    final File tsFile = File.createTempFile("test", ".tsfile");
+    tsFile.deleteOnExit();
+
+    final LoadTsFileStatement statement =
+        (LoadTsFileStatement)
+            method.invoke(scheduler, tsFile.getAbsolutePath(), true, "root.test.sg_0");
+
+    Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+    Assert.assertEquals(2, statement.getDatabaseLevel());
+    Assert.assertTrue(statement.isGeneratedByPipe());
+  }
 }