[To region_migration] cn pull from dn, IT improve, coordinator avoid submit duplicate (#12127)
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 257d813..569059c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -344,7 +344,7 @@ for (AbstractNodeWrapper nodeWrapper : Stream.concat(this.dataNodeWrapperList.stream(), this.configNodeWrapperList.stream()) .collect(Collectors.toList())) { - nodeWrapper.stop(); + nodeWrapper.stopForcibly(); nodeWrapper.destroyDir(); String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort()); if (!new File(lockPath).delete()) {
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index dbc43f8..a27d57e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -475,15 +475,19 @@ if (this.instance == null) { return; } + logger.info("Node {} will be shutdown soon", this.getPort()); this.instance.destroy(); try { if (!this.instance.waitFor(20, TimeUnit.SECONDS)) { + logger.warn("Node {} will be shutdown forcibly soon", this.getPort()); this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - logger.error("Waiting node to shutdown error. %s", e); + logger.error("Waiting node to shutdown error.", e); + return; } + logger.info("Node {} has been shutdown", this.getPort()); } @Override @@ -492,12 +496,14 @@ return; } try { - this.instance.destroyForcibly().waitFor(5, TimeUnit.SECONDS); - logger.info("Node {} has been successfully forcibly stopped", nodePort); + logger.info("Node {} will be shutdown forcibly soon", this.getPort()); + this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - logger.error("Waiting node to shutdown error. %s", e); + logger.error("Waiting node to shutdown error.", e); + return; } + logger.info("Node {} has been shutdown", this.getPort()); } @Override
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java index 18ddac9..812f393 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
@@ -23,12 +23,16 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; import org.apache.iotdb.confignode.procedure.state.RegionTransitionState; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper; +import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper; import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -44,14 +48,18 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class IoTDBRegionMigrateReliabilityIT { private static final Logger LOGGER = @@ -81,61 +89,95 @@ @Test public void normal1C2DTest() throws Exception { - EnvFactory.getEnv() - .getConfig() - .getCommonConfig() - .setDataReplicationFactor(1) - .setSchemaReplicationFactor(1); - EnvFactory.getEnv().initClusterEnvironment(1, 2); - - try (final Connection connection = EnvFactory.getEnv().getConnection(); - final Statement statement = connection.createStatement()) { - - statement.execute(INSERTION); - - ResultSet result = statement.executeQuery(SHOW_REGIONS); - Map<Integer, Set<Integer>> regionMap = getRegionMap(result); - - result = statement.executeQuery(SHOW_DATANODES); - Set<Integer> dataNodeSet = new HashSet<>(); - while (result.next()) { - dataNodeSet.add(result.getInt(ColumnHeaderConstant.NODE_ID)); - } - - final int selectedRegion = selectRegion(regionMap); - final int originalDataNode = selectOriginalDataNode(regionMap, selectedRegion); - final int destDataNode = selectDestDataNode(dataNodeSet, regionMap, selectedRegion); - - // set breakpoint - HashMap<String, Runnable> keywordAction = new HashMap<>(); - Arrays.stream(RegionTransitionState.values()) - .forEach( - state -> - keywordAction.put( - String.valueOf(state), () -> LOGGER.info(String.valueOf(state)))); - ExecutorService service = IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT"); - LOGGER.info("breakpoint setting..."); - service.submit(() -> logBreakpointMonitor(0, keywordAction)); - LOGGER.info("breakpoint set"); - - statement.execute(regionMigrateCommand(selectedRegion, originalDataNode, destDataNode)); - - awaitUntilSuccess(statement, selectedRegion, originalDataNode, destDataNode); - - checkRegionFileClear(originalDataNode); - - LOGGER.info("test pass"); - } + generalTest(1, 1, 1, 2, Collections.emptySet(), Collections.emptySet()); } @Test public void normal3C3DTest() throws Exception { + generalTest(2, 3, 3, 3, Collections.emptySet(), Collections.emptySet()); + } + + // endregion + + // region ConfigNode crash tests + @Test + public void cnCrashDuringPreCheck() throws Exception { + generalTest( + 1, + 1, + 1, + 2, + Stream.of(RegionTransitionState.REGION_MIGRATE_PREPARE.toString()) + .collect(Collectors.toSet()), + Collections.emptySet()); + } + + @Test + public void cnCrashDuringCreatePeer() throws Exception { + generalTest( + 1, + 1, + 1, + 2, + Stream.of(AddRegionPeerState.CREATE_NEW_REGION_PEER.toString()).collect(Collectors.toSet()), + Collections.emptySet()); + } + + @Test + public void cnCrashDuringDoAddPeer() throws Exception { + generalTest( + 1, + 1, + 1, + 2, + Stream.of(AddRegionPeerState.DO_ADD_REGION_PEER.toString()).collect(Collectors.toSet()), + Collections.emptySet()); + } + + @Test + public void cnCrashDuring() throws Exception { + generalTest( + 1, + 1, + 1, + 2, + Stream.of(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE.toString()) + .collect(Collectors.toSet()), + Collections.emptySet()); + } + + // TODO: other cn crash test + + // endregion + + // region DataNode crash tests + + // endregion + + // region Helpers + + public void generalTest( + final int dataReplicateFactor, + final int schemaReplicationFactor, + final int configNodeNum, + final int dataNodeNum, + Set<String> killConfigNodeKeywords, + Set<String> killDataNodeKeywords // TODO:此参数尚未生效 + ) throws Exception { + // prepare env EnvFactory.getEnv() .getConfig() .getCommonConfig() - .setDataReplicationFactor(2) - .setSchemaReplicationFactor(3); - EnvFactory.getEnv().initClusterEnvironment(3, 3); + .setDataReplicationFactor(dataReplicateFactor) + .setSchemaReplicationFactor(schemaReplicationFactor); + EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum); + + ExecutorService service = IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT"); + EnvFactory.getEnv() + .getConfigNodeWrapperList() + .forEach( + configNodeWrapper -> + service.submit(() -> nodeLogKillPoint(configNodeWrapper, killConfigNodeKeywords))); try (final Connection connection = EnvFactory.getEnv().getConnection(); final Statement statement = connection.createStatement(); @@ -157,94 +199,62 @@ final int originalDataNode = selectOriginalDataNode(regionMap, selectedRegion); final int destDataNode = selectDestDataNode(dataNodeSet, regionMap, selectedRegion); - // set breakpoint - HashMap<String, Runnable> keywordAction = new HashMap<>(); - Arrays.stream(RegionTransitionState.values()) - .forEach( - state -> - keywordAction.put( - String.valueOf(state), () -> LOGGER.info(String.valueOf(state)))); - ExecutorService service = IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT"); - LOGGER.info("breakpoint setting..."); - service.submit(() -> logBreakpointMonitor(0, keywordAction)); - service.submit(() -> logBreakpointMonitor(1, keywordAction)); - service.submit(() -> logBreakpointMonitor(2, keywordAction)); - LOGGER.info("breakpoint set"); - statement.execute(regionMigrateCommand(selectedRegion, originalDataNode, destDataNode)); awaitUntilSuccess(statement, selectedRegion, originalDataNode, destDataNode); checkRegionFileClear(originalDataNode); - - LOGGER.info("test pass"); } + LOGGER.info("test pass"); } - // endregion - - // region ConfigNode crash tests - @Test - public void cnCrashDuringPreCheck() {} - - @Test - public void cnCrashDuringCreatePeer() {} - - @Test - public void cnCrashDuringAddPeer() {} - - // TODO: other cn crash test - - // endregion - - // region DataNode crash tests - - // endregion - - // region Helpers - /** * Monitor the node's log and do something. * - * @param nodeIndex - * @param keywordAction Map<keyword, action> + * @param nodeWrapper Easy to understand + * @param killNodeKeywords When detect these keywords in node's log, stop the node forcibly */ - private static void logBreakpointMonitor(int nodeIndex, HashMap<String, Runnable> keywordAction) { + private static void nodeLogKillPoint( + AbstractNodeWrapper nodeWrapper, Set<String> killNodeKeywords) { + if (killNodeKeywords.isEmpty()) { + return; + } + final String logFileName; + if (nodeWrapper instanceof ConfigNodeWrapper) { + logFileName = "log_confignode_all.log"; + } else { + logFileName = "log_datanode_all.log"; + } ProcessBuilder builder = new ProcessBuilder( "tail", "-f", - EnvFactory.getEnv().getConfigNodeWrapper(nodeIndex).getNodePath() - + File.separator - + "logs" - + File.separator - + "log_confignode_all.log"); - builder.redirectErrorStream(true); // 将错误输出和标准输出合并 + nodeWrapper.getNodePath() + File.separator + "logs" + File.separator + logFileName); + builder.redirectErrorStream(true); try { - Process process = builder.start(); // 开始执行命令 - // 读取命令的输出 + Process process = builder.start(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = reader.readLine()) != null) { - Set<String> detected = new HashSet<>(); + // if trigger more than one keyword at a same time, test code may have mistakes + Assert.assertTrue(killNodeKeywords.stream().filter(line::contains).count() <= 1); String finalLine = line; - keywordAction - .keySet() - .forEach( - k -> { - if (finalLine.contains(k)) { - detected.add(k); - } - }); - detected.forEach( - k -> { - keywordAction.get(k).run(); - // - // EnvFactory.getEnv().getConfigNodeWrapper(nodeIndex).stopForcibly(); - keywordAction.remove(k); - }); + Optional<String> detectedKeyword = + killNodeKeywords.stream() + .filter(keyword -> finalLine.contains("breakpoint:" + keyword)) + .findAny(); + if (detectedKeyword.isPresent()) { + // reboot the node + nodeWrapper.stopForcibly(); + nodeWrapper.start(); + // each keyword only trigger once + killNodeKeywords.remove(detectedKeyword.get()); + } + if (killNodeKeywords.isEmpty()) { + break; + } } } } catch (IOException e) { @@ -279,7 +289,7 @@ Map<Integer, Set<Integer>> regionMap, int selectedRegion) { return regionMap.get(selectedRegion).stream() .findAny() - .orElseThrow(() -> new RuntimeException("gg")); + .orElseThrow(() -> new RuntimeException("cannot find original DataNode")); } private static int selectDestDataNode( @@ -287,20 +297,37 @@ return dataNodeSet.stream() .filter(dataNodeId -> !regionMap.get(selectedRegion).contains(dataNodeId)) .findAny() - .orElseThrow(() -> new RuntimeException("gg")); + .orElseThrow(() -> new RuntimeException("cannot find dest DataNode")); } private static void awaitUntilSuccess( Statement statement, int selectedRegion, int originalDataNode, int destDataNode) { - Awaitility.await() - .atMost(1, TimeUnit.MINUTES) - .until( - () -> { - Map<Integer, Set<Integer>> newRegionMap = - getRegionMap(statement.executeQuery(SHOW_REGIONS)); - Set<Integer> dataNodes = newRegionMap.get(selectedRegion); - return !dataNodes.contains(originalDataNode) && dataNodes.contains(destDataNode); - }); + AtomicReference<Set<Integer>> lastTimeDataNodes = new AtomicReference<>(); + try { + Awaitility.await() + .atMost(1, TimeUnit.MINUTES) + .until( + () -> { + try { + Map<Integer, Set<Integer>> newRegionMap = + getRegionMap(statement.executeQuery(SHOW_REGIONS)); + Set<Integer> dataNodes = newRegionMap.get(selectedRegion); + lastTimeDataNodes.set(dataNodes); + return !dataNodes.contains(originalDataNode) && dataNodes.contains(destDataNode); + } catch (Exception e) { + // Any exception can be ignored + return false; + } + }); + } catch (ConditionTimeoutException e) { + // Set<Integer> expectation = new Set<>(lastTimeDataNodes); + String actualSetStr = lastTimeDataNodes.get().toString(); + lastTimeDataNodes.get().remove(originalDataNode); + lastTimeDataNodes.get().add(destDataNode); + String expectSetStr = lastTimeDataNodes.toString(); + LOGGER.info("DataNode Set {} is unexpected, expect {}", actualSetStr, expectSetStr); + throw e; + } } /** Check whether the original DataNode's region file has been deleted. */
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java index 4d0f388..831879d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
@@ -79,11 +79,13 @@ @Test public void procedureRecoverAtAnotherConfigNodeTest() throws Exception { recoverTest(3, false); + LOGGER.info("test pass"); } @Test public void procedureRecoverAtTheSameConfigNodeTest() throws Exception { recoverTest(1, true); + LOGGER.info("test pass"); } private void recoverTest(int configNodeNum, boolean needRestartLeader) throws Exception { @@ -114,7 +116,7 @@ Assert.assertTrue(resp.getDatabaseInfoMap().size() < MAX_STATE); // Then shutdown the leader, wait the new leader exist and the procedure continue final int oldLeaderIndex = EnvFactory.getEnv().getLeaderConfigNodeIndex(); - EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stop(); + EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stopForcibly(); if (needRestartLeader) { EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).start(); }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 4f45ea5..b4102c3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -151,7 +151,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; @@ -412,16 +411,6 @@ } @Override - public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) { - TSStatus status = confirmLeader(); - if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // TODO: 这里需要修改report机制,改为向AddRegionPeerProcedure汇报 - procedureManager.reportRegionMigrateResult(req); - } - return status; - } - - @Override public DataSet getDataNodeConfiguration( GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) { TSStatus status = confirmLeader();
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index e9e310b..368eaf5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -91,7 +91,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; @@ -243,14 +242,6 @@ TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation); /** - * DataNode report region migrate result to ConfigNode when remove DataNode. - * - * @param req TRegionMigrateResultReportReq - * @return TSStatus - */ - TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req); - - /** * Get DataNode info. * * @return DataNodesConfigurationDataSet
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 5d10d8f..5a9b08f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -51,7 +51,7 @@ import org.apache.iotdb.confignode.procedure.ProcedureExecutor; import org.apache.iotdb.confignode.procedure.ProcedureMetrics; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure; import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure; import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure; @@ -74,10 +74,8 @@ import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure; -import org.apache.iotdb.confignode.procedure.impl.statemachine.AddRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure; import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure; -import org.apache.iotdb.confignode.procedure.impl.statemachine.RemoveRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.sync.AuthOperationProcedure; import org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure; import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure; @@ -96,7 +94,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.rpc.RpcUtils; @@ -641,7 +638,7 @@ } // select coordinator for adding peer - DataNodeRemoveHandler handler = new DataNodeRemoveHandler(configManager); + RegionMaintainHandler handler = new RegionMaintainHandler(configManager); Optional<TDataNodeLocation> selectedDataNode = handler.filterDataNodeWithOtherRegionReplica(regionGroupId, destDataNode); if (!selectedDataNode.isPresent()) { @@ -1077,36 +1074,6 @@ this.env = env; } - public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) { - // TODO: ugly, will fix soon - this.executor - .getProcedures() - .values() - .forEach( - procedure1 -> { - if (procedure1 instanceof AddRegionPeerProcedure) { - AddRegionPeerProcedure procedure = (AddRegionPeerProcedure) procedure1; - if (procedure.getConsensusGroupId().equals(req.getRegionId())) { - procedure.notifyAddPeerFinished(req); - } - } - }); - - // TODO: ugly, will fix soon - this.executor - .getProcedures() - .values() - .forEach( - procedure1 -> { - if (procedure1 instanceof RemoveRegionPeerProcedure) { - RemoveRegionPeerProcedure procedure = (RemoveRegionPeerProcedure) procedure1; - if (procedure.getConsensusGroupId().equals(req.getRegionId())) { - procedure.notifyRemovePeerFinished(req); - } - } - }); - } - public void addMetrics() { MetricService.getInstance().addMetricSet(this.procedureMetrics); }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index d18f701..aa457e8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -62,7 +62,7 @@ import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager; import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; import org.apache.iotdb.confignode.persistence.node.NodeInfo; -import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; import org.apache.iotdb.confignode.rpc.thrift.TCQConfig; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; @@ -344,8 +344,8 @@ public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) { LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan); - DataNodeRemoveHandler dataNodeRemoveHandler = - new DataNodeRemoveHandler((ConfigManager) configManager); + RegionMaintainHandler dataNodeRemoveHandler = + new RegionMaintainHandler((ConfigManager) configManager); DataNodeToStatusResp preCheckStatus = dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan); if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 518f3cc..cf4cf20 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -104,14 +104,14 @@ private final ProcedureScheduler scheduler; - private final DataNodeRemoveHandler dataNodeRemoveHandler; + private final RegionMaintainHandler dataNodeRemoveHandler; private final ReentrantLock removeConfigNodeLock; public ConfigNodeProcedureEnv(ConfigManager configManager, ProcedureScheduler scheduler) { this.configManager = configManager; this.scheduler = scheduler; - this.dataNodeRemoveHandler = new DataNodeRemoveHandler(configManager); + this.dataNodeRemoveHandler = new RegionMaintainHandler(configManager); this.removeConfigNodeLock = new ReentrantLock(); } @@ -735,7 +735,7 @@ return schedulerLock; } - public DataNodeRemoveHandler getDataNodeRemoveHandler() { + public RegionMaintainHandler getDataNodeRemoveHandler() { return dataNodeRemoveHandler; }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java similarity index 92% rename from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java rename to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java index 95ccf3d..acc95b6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -22,8 +22,14 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus; +import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.ClientPoolFactory; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.NodeUrlUtils; @@ -59,9 +65,9 @@ import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS; -public class DataNodeRemoveHandler { +public class RegionMaintainHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RegionMaintainHandler.class); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); @@ -70,8 +76,14 @@ /** region migrate lock */ private final LockQueue regionMigrateLock = new LockQueue(); - public DataNodeRemoveHandler(ConfigManager configManager) { + private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> dataNodeClientManager; + + public RegionMaintainHandler(ConfigManager configManager) { this.configManager = configManager; + dataNodeClientManager = + new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>() + .createClientManager( + new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory()); } public static String getIdWithRpcEndpoint(TDataNodeLocation location) { @@ -234,12 +246,15 @@ * @return TSStatus */ public TSStatus addRegionPeer( - TDataNodeLocation destDataNode, TConsensusGroupId regionId, TDataNodeLocation coordinator) { + long procedureId, + TDataNodeLocation destDataNode, + TConsensusGroupId regionId, + TDataNodeLocation coordinator) { TSStatus status; // Send addRegionPeer request to the selected DataNode, // destDataNode is where the new RegionReplica is created - TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode); + TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode, procedureId); status = SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithRetry( @@ -268,11 +283,13 @@ public TSStatus removeRegionPeer( TDataNodeLocation originalDataNode, TConsensusGroupId regionId, - TDataNodeLocation coordinator) { + TDataNodeLocation coordinator, + long procedureId) { TSStatus status; // Send removeRegionPeer request to the rpcClientDataNode - TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode); + TMaintainPeerReq maintainPeerReq = + new TMaintainPeerReq(regionId, originalDataNode, procedureId); status = SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithRetry( @@ -298,10 +315,11 @@ * @return TSStatus */ public TSStatus deleteOldRegionPeer( - TDataNodeLocation originalDataNode, TConsensusGroupId regionId) { + TDataNodeLocation originalDataNode, TConsensusGroupId regionId, long procedureId) { TSStatus status; - TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode); + TMaintainPeerReq maintainPeerReq = + new TMaintainPeerReq(regionId, originalDataNode, procedureId); status = configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId()) @@ -325,6 +343,21 @@ return status; } + public TRegionMaintainTaskStatus waitTaskFinish(long taskId, TDataNodeLocation dataNodeLocation) { + while (true) { + try (SyncDataNodeInternalServiceClient dataNodeClient = + dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) { + TRegionMigrateResultReportReq report = dataNodeClient.getRegionMaintainResult(taskId); + if (report.getTaskStatus() != TRegionMaintainTaskStatus.PROCESSING) { + return report.getTaskStatus(); + } + Thread.sleep(1000); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation newLocation) { LOGGER.info( "AddRegionLocation started, add region {} to {}",
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java index 103ab32..ee4baf4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure; import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState; @@ -68,7 +68,7 @@ return Flow.NO_MORE_STATE; } - DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler(); + RegionMaintainHandler handler = env.getDataNodeRemoveHandler(); try { switch (state) { case REGION_REPLICA_CHECK:
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java index b5b9448..488b85f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
@@ -21,18 +21,17 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; -import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,8 +41,6 @@ import java.nio.ByteBuffer; import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint; -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.ADD_REGION_PEER_PROGRESS; -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; import static org.apache.iotdb.confignode.procedure.state.AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE; import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; @@ -56,11 +53,6 @@ private TDataNodeLocation destDataNode; - private boolean addRegionPeerSuccess = true; - private String addRegionPeerResult; - - private final Object addRegionPeerLock = new Object(); - public AddRegionPeerProcedure() { super(); } @@ -81,7 +73,7 @@ if (consensusGroupId == null) { return Flow.NO_MORE_STATE; } - DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler(); + RegionMaintainHandler handler = env.getDataNodeRemoveHandler(); try { switch (state) { case CREATE_NEW_REGION_PEER: @@ -90,17 +82,23 @@ setNextState(AddRegionPeerState.DO_ADD_REGION_PEER); break; case DO_ADD_REGION_PEER: - TSStatus tsStatus = handler.addRegionPeer(destDataNode, consensusGroupId, coordinator); + TSStatus tsStatus = + handler.addRegionPeer(this.getProcId(), destDataNode, consensusGroupId, coordinator); + TRegionMaintainTaskStatus result; + logBreakpoint(state.name()); if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { - waitForOneMigrationStepFinished(consensusGroupId, state); + result = handler.waitTaskFinish(this.getProcId(), coordinator); } else { throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode"); } - logBreakpoint(state.name()); - setNextState(UPDATE_REGION_LOCATION_CACHE); - break; + if (result == TRegionMaintainTaskStatus.SUCCESS) { + setNextState(UPDATE_REGION_LOCATION_CACHE); + break; + } + throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode"); case UPDATE_REGION_LOCATION_CACHE: handler.addRegionLocation(consensusGroupId, destDataNode); + logBreakpoint(state.name()); return Flow.NO_MORE_STATE; default: throw new ProcedureException("Unsupported state: " + state.name()); @@ -112,61 +110,6 @@ } // TODO: Clear all remaining information related to 'migrate' and 'migration' - public TSStatus waitForOneMigrationStepFinished( - TConsensusGroupId consensusGroupId, AddRegionPeerState state) throws Exception { - LOGGER.info( - "{}, Wait for state {} finished, regionId: {}", - REGION_MIGRATE_PROCESS, - state, - consensusGroupId); - - TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode()); - synchronized (addRegionPeerLock) { - try { - addRegionPeerLock.wait(); - - if (!addRegionPeerSuccess) { - throw new ProcedureException( - String.format("Region migration failed, regionId: %s", consensusGroupId)); - } - } catch (InterruptedException e) { - LOGGER.error( - "{}, region migration {} interrupt", REGION_MIGRATE_PROCESS, consensusGroupId, e); - Thread.currentThread().interrupt(); - status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage("Waiting for region migration interruption," + e.getMessage()); - } - } - return status; - } - - public void notifyAddPeerFinished(TRegionMigrateResultReportReq req) { - - LOGGER.info( - "{}, ConfigNode received region migration result reported by DataNode: {}", - ADD_REGION_PEER_PROGRESS, - req); - - // TODO the req is used in roll back - synchronized (addRegionPeerLock) { - TSStatus migrateStatus = req.getMigrateResult(); - // Migration failed - if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) { - LOGGER.info( - "{}, Region migration failed in DataNode, migrateStatus: {}", - ADD_REGION_PEER_PROGRESS, - migrateStatus); - addRegionPeerSuccess = false; - addRegionPeerResult = migrateStatus.toString(); - } - addRegionPeerLock.notifyAll(); - } - } - - @Override - protected boolean isRollbackSupported(AddRegionPeerState state) { - return false; - } @Override protected void rollbackState( @@ -216,8 +159,4 @@ public TDataNodeLocation getCoordinator() { return coordinator; } - - public TDataNodeLocation getDestDataNode() { - return destDataNode; - } }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java index 35a5100..8b9fc5d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.state.ProcedureLockState; import org.apache.iotdb.confignode.procedure.state.RegionTransitionState; @@ -40,7 +40,7 @@ import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint; import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; -import static org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint; +import static org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler.getIdWithRpcEndpoint; /** Region migrate procedure */ public class RegionMigrateProcedure @@ -52,8 +52,6 @@ private static final int RETRY_THRESHOLD = 5; /** Wait region migrate finished */ - private final Object regionMigrateLock = new Object(); - private TConsensusGroupId consensusGroupId; private TDataNodeLocation originalDataNode; @@ -63,8 +61,6 @@ private TDataNodeLocation coordinatorForAddPeer; private TDataNodeLocation coordinatorForRemovePeer; - private boolean migrateSuccess = true; - private String migrateResult = ""; public RegionMigrateProcedure() { @@ -90,7 +86,7 @@ if (consensusGroupId == null) { return Flow.NO_MORE_STATE; } - DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler(); + RegionMaintainHandler handler = env.getDataNodeRemoveHandler(); try { switch (state) { case REGION_MIGRATE_PREPARE:
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java index f656653..db58f45 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
@@ -21,18 +21,17 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; -import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +41,6 @@ import java.nio.ByteBuffer; import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint; -import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; import static org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER; import static org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE; import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; @@ -54,10 +52,6 @@ private TDataNodeLocation coordinator; private TDataNodeLocation targetDataNode; - private boolean removeRegionPeerSuccess = true; - private String removeRegionPeerResult; - private final Object removeRegionPeerLock = new Object(); - public RemoveRegionPeerProcedure() { super(); } @@ -79,27 +73,37 @@ return Flow.NO_MORE_STATE; } TSStatus tsStatus; - DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler(); + RegionMaintainHandler handler = env.getDataNodeRemoveHandler(); try { switch (state) { case REMOVE_REGION_PEER: - tsStatus = handler.removeRegionPeer(targetDataNode, consensusGroupId, coordinator); + tsStatus = + handler.removeRegionPeer( + targetDataNode, consensusGroupId, coordinator, this.getProcId()); + TRegionMaintainTaskStatus result; + logBreakpoint(state.name()); if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { - waitForOneMigrationStepFinished(consensusGroupId, state); + result = handler.waitTaskFinish(this.getProcId(), coordinator); } else { throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode"); } - logBreakpoint(state.name()); + if (result != TRegionMaintainTaskStatus.SUCCESS) { + throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode"); + } setNextState(DELETE_OLD_REGION_PEER); break; case DELETE_OLD_REGION_PEER: - tsStatus = handler.deleteOldRegionPeer(targetDataNode, consensusGroupId); - if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { - waitForOneMigrationStepFinished(consensusGroupId, state); - } + tsStatus = + handler.deleteOldRegionPeer(targetDataNode, consensusGroupId, this.getProcId()); logBreakpoint(state.name()); - // Remove consensus group after a node stop, which will be failed, but we will - // continuously execute. + if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { + result = handler.waitTaskFinish(this.getProcId(), targetDataNode); + } else { + throw new ProcedureException("DELETE_OLD_REGION_PEER executed failed in DataNode"); + } + if (result != TRegionMaintainTaskStatus.SUCCESS) { + throw new ProcedureException("DELETE_OLD_REGION_PEER executed failed in DataNode"); + } setNextState(REMOVE_REGION_LOCATION_CACHE); break; case REMOVE_REGION_LOCATION_CACHE: @@ -115,68 +119,11 @@ return Flow.HAS_MORE_STATE; } - public TSStatus waitForOneMigrationStepFinished( - TConsensusGroupId consensusGroupId, RemoveRegionPeerState state) throws Exception { - - LOGGER.info( - "{}, Wait for state {} finished, regionId: {}", - REGION_MIGRATE_PROCESS, - state, - consensusGroupId); - - TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode()); - synchronized (removeRegionPeerLock) { - try { - // TODO set timeOut? - removeRegionPeerLock.wait(); - - if (!removeRegionPeerSuccess) { - throw new ProcedureException( - String.format("Region migration failed, regionId: %s", consensusGroupId)); - } - } catch (InterruptedException e) { - LOGGER.error( - "{}, region migration {} interrupt", REGION_MIGRATE_PROCESS, consensusGroupId, e); - Thread.currentThread().interrupt(); - status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage("Waiting for region migration interruption," + e.getMessage()); - } - } - return status; - } - - public void notifyRemovePeerFinished(TRegionMigrateResultReportReq req) { - LOGGER.info( - "{}, ConfigNode received region migration result reported by DataNode: {}", - REGION_MIGRATE_PROCESS, - req); - - // TODO the req is used in roll back - synchronized (removeRegionPeerLock) { - TSStatus migrateStatus = req.getMigrateResult(); - // Migration failed - if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) { - LOGGER.info( - "{}, Region migration failed in DataNode, migrateStatus: {}", - REGION_MIGRATE_PROCESS, - migrateStatus); - removeRegionPeerSuccess = false; - removeRegionPeerResult = migrateStatus.toString(); - } - removeRegionPeerLock.notifyAll(); - } - } - @Override protected void rollbackState(ConfigNodeProcedureEnv env, RemoveRegionPeerState state) throws IOException, InterruptedException, ProcedureException {} @Override - protected boolean isRollbackSupported(RemoveRegionPeerState state) { - return false; - } - - @Override protected RemoveRegionPeerState getState(int stateId) { return RemoveRegionPeerState.values()[stateId]; }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 95c82a1..5b77ad2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -129,7 +129,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp; @@ -271,11 +270,6 @@ } @Override - public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) { - return configManager.reportRegionMigrateResult(req); - } - - @Override public TShowClusterResp showCluster() { return configManager.showCluster(); }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 1387e5f..089047a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -98,7 +98,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq; import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp; @@ -418,12 +417,6 @@ } @Override - public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) throws TException { - return executeRemoteCallWithRetry( - () -> client.reportRegionMigrateResult(req), status -> !updateConfigNodeLeader(status)); - } - - @Override public TShowClusterResp showCluster() throws TException { return executeRemoteCallWithRetry( () -> client.showCluster(), resp -> !updateConfigNodeLeader(resp.status));
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 5a62e1d..c4588d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; +import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; @@ -1618,6 +1619,11 @@ return status; } + @Override + public TRegionMigrateResultReportReq getRegionMaintainResult(long taskId) throws TException { + return RegionMigrateService.getRegionMaintainResult(taskId); + } + private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup, long ttl) { return regionManager.createNewRegion(regionId, storageGroup, ttl); }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index 092394c..a39dd72 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -21,9 +21,10 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus; import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType; +import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -32,28 +33,24 @@ import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; -import org.apache.iotdb.db.protocol.client.ConfigNodeClient; -import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; -import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.rescon.memory.AbstractPoolManager; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class RegionMigrateService implements IService { @@ -67,6 +64,13 @@ private RegionMigratePool regionMigratePool; + // Map<taskId, taskStatus> + // TODO:暂时无法处理一个procedure中向同一个datanode提交多个异步任务的情况 + private static final ConcurrentHashMap<Long, TRegionMigrateResultReportReq> taskResultMap = + new ConcurrentHashMap<>(); + private static final TRegionMigrateResultReportReq unfinishedResult = + new TRegionMigrateResultReportReq(); + private RegionMigrateService() {} public static RegionMigrateService getInstance() { @@ -80,10 +84,13 @@ * @return if the submit task succeed */ public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) { - boolean submitSucceed = true; try { - regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getDestNode())); + if (!addToTaskResultMap(req.getTaskId())) { + LOGGER.warn("{} The AddRegionPeerTask {} has already been submitted and will not be submitted again.", REGION_MIGRATE_PROCESS, req.getTaskId()); + } + regionMigratePool.submit( + new AddRegionPeerTask(req.getTaskId(), req.getRegionId(), req.getDestNode())); } catch (Exception e) { LOGGER.error( "{}, Submit AddRegionPeerTask error for Region: {}", @@ -105,7 +112,11 @@ boolean submitSucceed = true; try { - regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(), req.getDestNode())); + if (!addToTaskResultMap(req.getTaskId())) { + LOGGER.warn("{} The RemoveRegionPeer {} has already been submitted and will not be submitted again.", REGION_MIGRATE_PROCESS, req.getTaskId()); + } + regionMigratePool.submit( + new RemoveRegionPeerTask(req.getTaskId(), req.getRegionId(), req.getDestNode())); } catch (Exception e) { LOGGER.error( "{}, Submit RemoveRegionPeer task error for Region: {}", @@ -124,10 +135,13 @@ * @return if the submit task succeed */ public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq req) { - boolean submitSucceed = true; try { - regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(), req.getDestNode())); + if (!addToTaskResultMap(req.getTaskId())) { + LOGGER.warn("{} The DeleteOldRegionPeerTask {} has already been submitted and will not be submitted again.", REGION_MIGRATE_PROCESS, req.getTaskId()); + } + regionMigratePool.submit( + new DeleteOldRegionPeerTask(req.getTaskId(), req.getRegionId(), req.getDestNode())); } catch (Exception e) { LOGGER.error( "{}, Submit DeleteOldRegionPeerTask error for Region: {}", @@ -139,6 +153,14 @@ return submitSucceed; } + private boolean addToTaskResultMap(long taskId) { + if (taskResultMap.containsKey(taskId)) { + return false; + } + taskResultMap.put(taskId, unfinishedResult); + return true; + } + @Override public void start() throws StartupException { regionMigratePool = new RegionMigratePool(); @@ -190,13 +212,17 @@ private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class); + private final long taskId; + // The RegionGroup that shall perform the add peer process private final TConsensusGroupId tRegionId; // The new DataNode to be added in the RegionGroup private final TDataNodeLocation destDataNode; - public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) { + public AddRegionPeerTask( + long taskId, TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) { + this.taskId = taskId; this.tRegionId = tRegionId; this.destDataNode = destDataNode; } @@ -205,11 +231,12 @@ public void run() { TSStatus runResult = addPeer(); if (isFailed(runResult)) { - reportFailed(tRegionId, destDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult); + taskFail( + taskId, tRegionId, destDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult); return; } - reportSucceed(tRegionId, "AddPeer"); + taskSucceed(taskId, tRegionId, "AddPeer"); } private TSStatus addPeer() { @@ -282,11 +309,15 @@ private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class); + private final long taskId; + private final TConsensusGroupId tRegionId; private final TDataNodeLocation destDataNode; - public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) { + public RemoveRegionPeerTask( + long taskId, TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) { + this.taskId = taskId; this.tRegionId = tRegionId; this.destDataNode = destDataNode; } @@ -295,9 +326,10 @@ public void run() { TSStatus runResult = removePeer(); if (isSucceed(runResult)) { - reportSucceed(tRegionId, "RemovePeer"); + taskSucceed(taskId, tRegionId, "RemovePeer"); } else { - reportFailed(tRegionId, destDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult); + taskFail( + taskId, tRegionId, destDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult); } } @@ -375,13 +407,15 @@ private static class DeleteOldRegionPeerTask implements Runnable { private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class); + private final long taskId; private final TConsensusGroupId tRegionId; private final TDataNodeLocation originalDataNode; public DeleteOldRegionPeerTask( - TConsensusGroupId tRegionId, TDataNodeLocation originalDataNode) { + long taskId, TConsensusGroupId tRegionId, TDataNodeLocation originalDataNode) { + this.taskId = taskId; this.tRegionId = tRegionId; this.originalDataNode = originalDataNode; } @@ -391,7 +425,8 @@ // deletePeer: remove the peer from the consensus group TSStatus runResult = deletePeer(); if (isFailed(runResult)) { - reportFailed( + taskFail( + taskId, tRegionId, originalDataNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, @@ -400,12 +435,17 @@ // deleteRegion: delete region data runResult = deleteRegion(); + if (isFailed(runResult)) { - reportFailed( - tRegionId, originalDataNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult); + taskFail( + taskId, + tRegionId, + originalDataNode, + TRegionMigrateFailedType.DeleteRegionFailed, + runResult); } - reportSucceed(tRegionId, "DeletePeer"); + taskSucceed(taskId, tRegionId, "DeletePeer"); } private TSStatus deletePeer() { @@ -476,57 +516,42 @@ private Holder() {} } - private static void reportSucceed(TConsensusGroupId tRegionId, String migrateState) { + private static void taskSucceed(long taskId, TConsensusGroupId tRegionId, String migrateState) { TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); status.setMessage( String.format("Region: %s, state: %s, executed succeed", tRegionId, migrateState)); - TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status); - try { - reportRegionMigrateResultToConfigNode(req); - } catch (Exception e) { - LOGGER.error( - "{}, Report region {} migrate result error in reportSucceed, result: {}", - REGION_MIGRATE_PROCESS, - tRegionId, - req, - e); - } + TRegionMigrateResultReportReq req = + new TRegionMigrateResultReportReq(TRegionMaintainTaskStatus.SUCCESS); + req.setRegionId(tRegionId).setMigrateResult(status); + taskResultMap.put(taskId, req); } - private static void reportFailed( + private static void taskFail( + long taskId, TConsensusGroupId tRegionId, TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) { Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>(); failedNodeAndReason.put(failedNode, failedType); - TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status); + TRegionMigrateResultReportReq req = + new TRegionMigrateResultReportReq(TRegionMaintainTaskStatus.FAIL); + req.setRegionId(tRegionId).setMigrateResult(status); req.setFailedNodeAndReason(failedNodeAndReason); - try { - reportRegionMigrateResultToConfigNode(req); - } catch (Exception e) { - LOGGER.error( - "{}, Report region {} migrate error in reportFailed, result:{}", - REGION_MIGRATE_PROCESS, - tRegionId, - req, - e); - } + taskResultMap.put(taskId, req); } - private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req) - throws TException, ClientManagerException { - TSStatus status; - try (ConfigNodeClient client = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - status = client.reportRegionMigrateResult(req); - LOGGER.info( - "{}, Report region {} migrate result {} to Config node succeed, result: {}", - REGION_MIGRATE_PROCESS, - req.getRegionId(), - req, - status); + // TODO: 单例模式下static与非static的区别? + public static TRegionMigrateResultReportReq getRegionMaintainResult(Long taskId) { + TRegionMigrateResultReportReq result = new TRegionMigrateResultReportReq(); + if (!taskResultMap.containsKey(taskId)) { + result.setTaskStatus(TRegionMaintainTaskStatus.TASK_NOT_EXIST); + } else if (taskResultMap.get(taskId) == unfinishedResult) { + result.setTaskStatus(TRegionMaintainTaskStatus.PROCESSING); + } else { + result = taskResultMap.get(taskId); } + return result; } private static boolean isSucceed(TSStatus status) {
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index ba5a2da..636b058 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -90,6 +90,7 @@ 2: required TNodeResource resource } +// TODO: deprecated enum TRegionMigrateFailedType { AddPeerFailed, RemovePeerFailed, @@ -98,6 +99,20 @@ CreateRegionFailed } +struct TRegionMigrateResultReportReq { + 1: optional TConsensusGroupId regionId + 2: optional TSStatus migrateResult + 3: optional map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason + 4: required TRegionMaintainTaskStatus taskStatus +} + +enum TRegionMaintainTaskStatus { + TASK_NOT_EXIST, + PROCESSING, + SUCCESS, + FAIL, +} + struct TFlushReq { 1: optional string isSeq 2: optional list<string> storageGroups
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 989255c..886e2bb 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -140,12 +140,6 @@ 2: optional map<common.TDataNodeLocation, common.TSStatus> nodeToStatus } -struct TRegionMigrateResultReportReq { - 1: required common.TConsensusGroupId regionId - 2: required common.TSStatus migrateResult - 3: optional map<common.TDataNodeLocation, common.TRegionMigrateFailedType> failedNodeAndReason -} - struct TDataNodeConfigurationResp { 1: required common.TSStatus status // map<DataNodeId, DataNodeConfiguration> @@ -880,9 +874,6 @@ */ TDataNodeConfigurationResp getDataNodeConfiguration(i32 dataNodeId) - /** Report region migration complete */ - common.TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) - // ====================================================== // Database // ======================================================
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 8afc6bb..7042f67 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -51,6 +51,7 @@ struct TMaintainPeerReq { 1: required common.TConsensusGroupId regionId 2: required common.TDataNodeLocation destNode + 3: required i64 taskId } struct TFragmentInstanceId { @@ -558,6 +559,11 @@ common.TSStatus deleteOldRegionPeer(TMaintainPeerReq req); /** + * Get the result of a region maintainance task + */ + common.TRegionMigrateResultReportReq getRegionMaintainResult(i64 taskId) + + /** * Config node will disable the Data node, the Data node will not accept read/write request when disabled * @param data node location */