Fix pipe tree database creation on receiver
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 4f101e1..978d924 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.config.PipeConfig; @@ -89,6 +91,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.AlterLogicalViewNode; @@ -99,6 +102,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; @@ -133,6 +137,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -166,7 +171,8 @@ this::executeStatementForTableModel); private final PipeTreeStatementDataTypeConvertExecutionVisitor treeStatementDataTypeConvertExecutionVisitor = - new PipeTreeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel); + new PipeTreeStatementDataTypeConvertExecutionVisitor( + statement -> executeStatementForTreeModel(statement, getTreeDatabaseName(statement))); public final PipeTreeStatementToBatchVisitor batchVisitor = new PipeTreeStatementToBatchVisitor(); // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster @@ -186,6 +192,14 @@ private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); private PipeMemoryBlock allocatedMemoryBlock; + private final Set<String> autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet(); + private final Set<String> conflictedTreeDatabases = ConcurrentHashMap.newKeySet(); + + private enum TreeDatabaseCreationResult { + SKIPPED, + CREATED_OR_EXISTED, + CONFLICTED + } static { try { @@ -988,6 +1002,9 @@ ((InsertBaseStatement) statement).getDatabaseName().isPresent() ? ((InsertBaseStatement) statement).getDatabaseName().get() : null; + } else if (statement instanceof InsertBaseStatement) { + isTableModelStatement = false; + databaseName = getTreeDatabaseName(statement); } else { isTableModelStatement = false; databaseName = null; @@ -1038,7 +1055,7 @@ final TSStatus status = isTableModelStatement ? executeStatementForTableModel(statement, databaseName) - : executeStatementForTreeModel(statement); + : executeStatementForTreeModel(statement, getTreeDatabaseName(statement)); // Try to convert data type if the status code is not success. Insert statements normally return // above after the first converted execution. The retry path is kept for load and fallback @@ -1181,7 +1198,84 @@ } } - private TSStatus executeStatementForTreeModel(final Statement statement) { + private TreeDatabaseCreationResult autoCreateTreeDatabaseIfNecessary(final String database) { + if (database == null + || LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null + || !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { + return TreeDatabaseCreationResult.SKIPPED; + } + if (autoCreatedTreeDatabases.contains(database)) { + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (conflictedTreeDatabases.contains(database)) { + return TreeDatabaseCreationResult.CONFLICTED; + } + + try { + final TSStatus status = + AuthorityChecker.getAccessControl() + .checkCanCreateDatabaseForTree(getUserEntity(), new PartialPath(database)); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(status.getMessage()); + } + + final DatabaseSchemaStatement statement = + new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); + statement.setDatabasePath(new PartialPath(database)); + statement.setEnablePrintExceptionLog(false); + final DatabaseSchemaTask task = new DatabaseSchemaTask(statement); + final ListenableFuture<ConfigTaskResult> future = + task.execute(ClusterConfigTaskExecutor.getInstance()); + final ConfigTaskResult result = future.get(); + final int statusCode = result.getStatusCode().getStatusCode(); + if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || statusCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + autoCreatedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (statusCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + conflictedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CONFLICTED; + } + throw new PipeException( + String.format( + "Auto create tree database failed: %s, status code: %s", + database, result.getStatusCode())); + } catch (final IllegalPathException e) { + throw new PipeException(String.format("Illegal tree database %s.", database), e); + } catch (final ExecutionException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + final Throwable rootCause = getRootCause(e); + final int errorCode; + if (rootCause instanceof IoTDBException) { + errorCode = ((IoTDBException) rootCause).getErrorCode(); + } else if (rootCause instanceof IoTDBRuntimeException) { + errorCode = ((IoTDBRuntimeException) rootCause).getErrorCode(); + } else { + errorCode = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(); + } + if (errorCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + autoCreatedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (errorCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + conflictedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CONFLICTED; + } + throw new PipeException( + DataNodePipeMessages.AUTO_CREATE_DATABASE_FAILED_BECAUSE + e.getMessage()); + } + } + + private TSStatus executeStatementForTreeModel( + final Statement statement, final String databaseName) { + if (autoCreateTreeDatabaseIfNecessary(databaseName) == TreeDatabaseCreationResult.CONFLICTED) { + // Continue execution, but let partition analysis infer the receiver-side database. + clearTreeDatabaseName(statement); + } + return Coordinator.getInstance() .executeForTreeModel( shouldMarkAsPipeRequest.get() ? new PipeEnrichedStatement(statement) : statement, @@ -1196,6 +1290,53 @@ .status; } + private IAuditEntity getUserEntity() { + return userEntity != null + ? userEntity + : AuthorityChecker.createIAuditEntity(username, SESSION_MANAGER.getCurrSession()); + } + + private String getTreeDatabaseName(final Statement statement) { + if (statement instanceof LoadTsFileStatement) { + return ((LoadTsFileStatement) statement).getDatabase(); + } + if (statement instanceof InsertBaseStatement) { + return ((InsertBaseStatement) statement).getDatabaseName().orElse(null); + } + return null; + } + + static void clearTreeDatabaseName(final Statement statement) { + if (statement instanceof LoadTsFileStatement) { + final LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) statement; + loadTsFileStatement.setDatabase(null); + loadTsFileStatement.setDatabaseLevel( + IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel()); + } else if (statement instanceof InsertBaseStatement) { + clearTreeInsertDatabaseName((InsertBaseStatement) statement); + } + } + + private static void clearTreeInsertDatabaseName(final InsertBaseStatement statement) { + statement.setDatabaseName(null); + if (statement instanceof InsertRowsStatement) { + for (final InsertBaseStatement childStatement : + ((InsertRowsStatement) statement).getInsertRowStatementList()) { + childStatement.setDatabaseName(null); + } + } else if (statement instanceof InsertRowsOfOneDeviceStatement) { + for (final InsertBaseStatement childStatement : + ((InsertRowsOfOneDeviceStatement) statement).getInsertRowStatementList()) { + childStatement.setDatabaseName(null); + } + } else if (statement instanceof InsertMultiTabletsStatement) { + for (final InsertBaseStatement childStatement : + ((InsertMultiTabletsStatement) statement).getInsertTabletStatementList()) { + childStatement.setDatabaseName(null); + } + } + } + private TSStatus executeStatementForTableModelWithPermissionCheck( final org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement statement, final String databaseName) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index e78e273..f8c6ead 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -105,12 +105,15 @@ new TsFileInsertionEventScanParser( file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) { for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + final InsertTabletStatement insertTabletStatement = + PipeTransferTabletRawReq.toTPipeTransferRawReq( + tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()) + .constructStatement(); + if (loadTsFileStatement.getDatabase() != null) { + insertTabletStatement.setDatabaseName(loadTsFileStatement.getDatabase()); + } final PipeConvertedInsertTabletStatement statement = - new PipeConvertedInsertTabletStatement( - PipeTransferTabletRawReq.toTPipeTransferRawReq( - tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()) - .constructStatement(), - false); + new PipeConvertedInsertTabletStatement(insertTabletStatement, false); TSStatus result; try {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 6ca3164..e2db158 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -652,6 +652,7 @@ .setConvertOnTypeMismatch(true); if (database != null) { statement.setDatabase(database); + statement.updateDatabaseLevelByTreeDatabase(); } if (isGeneratedByPipe) { statement.markIsGeneratedByPipe();
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java index 7f9197d..8f2e86c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -20,6 +20,10 @@ package org.apache.iotdb.db.pipe.receiver.protocol.thrift; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator; @@ -29,6 +33,8 @@ import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; import java.util.Map; public class IoTDBDataNodeReceiverTest { @@ -150,4 +156,53 @@ Files.deleteIfExists(tsFile); } } + + @Test + public void testClearTreeDatabaseNameForLoadTsFileStatement() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-clear-tree-database", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), true, true); + + IoTDBDataNodeReceiver.clearTreeDatabaseName(statement); + + Assert.assertNull(statement.getDatabase()); + Assert.assertEquals( + IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(), + statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testClearTreeDatabaseNameForBatchInsertStatements() { + final InsertRowStatement rowStatement1 = new InsertRowStatement(); + rowStatement1.setDatabaseName("root.test.sg_0"); + final InsertRowStatement rowStatement2 = new InsertRowStatement(); + rowStatement2.setDatabaseName("root.test.sg_0"); + final InsertRowsStatement insertRowsStatement = new InsertRowsStatement(); + insertRowsStatement.setDatabaseName("root.test.sg_0"); + insertRowsStatement.setInsertRowStatementList(Arrays.asList(rowStatement1, rowStatement2)); + + IoTDBDataNodeReceiver.clearTreeDatabaseName(insertRowsStatement); + + Assert.assertFalse(insertRowsStatement.getDatabaseName().isPresent()); + Assert.assertFalse(rowStatement1.getDatabaseName().isPresent()); + Assert.assertFalse(rowStatement2.getDatabaseName().isPresent()); + + final InsertTabletStatement tabletStatement = new InsertTabletStatement(); + tabletStatement.setDatabaseName("root.test.sg_0"); + final InsertMultiTabletsStatement insertMultiTabletsStatement = + new InsertMultiTabletsStatement(); + insertMultiTabletsStatement.setDatabaseName("root.test.sg_0"); + insertMultiTabletsStatement.setInsertTabletStatementList( + Collections.singletonList(tabletStatement)); + + IoTDBDataNodeReceiver.clearTreeDatabaseName(insertMultiTabletsStatement); + + Assert.assertFalse(insertMultiTabletsStatement.getDatabaseName().isPresent()); + Assert.assertFalse(tabletStatement.getDatabaseName().isPresent()); + } }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java index 2db41c2..19d9749 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.junit.Assert; import org.junit.Before; @@ -35,6 +36,9 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import java.io.File; +import java.lang.reflect.Method; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -87,4 +91,31 @@ Assert.assertEquals("test", LoadTsFileScheduler.getPartitionQueryDatabase(node, false)); } + + @Test + public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws Exception { + final LoadTsFileScheduler scheduler = + new LoadTsFileScheduler( + distributedQueryPlan, + mock(MPPQueryContext.class), + mock(QueryStateMachine.class), + mock(IClientManager.class), + mock(IPartitionFetcher.class), + true); + final Method method = + LoadTsFileScheduler.class.getDeclaredMethod( + "buildRetryTreeLoadStatement", String.class, boolean.class, String.class); + method.setAccessible(true); + + final File tsFile = File.createTempFile("test", ".tsfile"); + tsFile.deleteOnExit(); + + final LoadTsFileStatement statement = + (LoadTsFileStatement) + method.invoke(scheduler, tsFile.getAbsolutePath(), true, "root.test.sg_0"); + + Assert.assertEquals("root.test.sg_0", statement.getDatabase()); + Assert.assertEquals(2, statement.getDatabaseLevel()); + Assert.assertTrue(statement.isGeneratedByPipe()); + } }