Optimize region migration log && change region status to removing before transferring leader (#12368)
* improve log
* move leader change && improve log
* use another func
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateClusterCrashIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateClusterCrashIT.java
index db8ad76..be18a7f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateClusterCrashIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateClusterCrashIT.java
@@ -21,7 +21,6 @@
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
-import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -48,7 +47,7 @@
@Test
public void clusterCrash4() throws Exception {
- killClusterTest(buildSet(RegionTransitionState.CHANGE_REGION_LEADER), true);
+ killClusterTest(buildSet(RemoveRegionPeerState.TRANSFER_REGION_LEADER), true);
}
@Test
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
index d9bb2ac..9baeab5 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
@@ -93,7 +93,7 @@
1,
1,
2,
- buildSet(RegionTransitionState.CHANGE_REGION_LEADER),
+ buildSet(RemoveRegionPeerState.TRANSFER_REGION_LEADER),
noKillPoints(),
KillNode.CONFIG_NODE);
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index cfad4ae..5f44c1f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -660,20 +660,13 @@
*
* @param regionId The region to be migrated
* @param originalDataNode The DataNode where the region locates
- * @param migrateDestDataNode The DataNode where the region is to be migrated
*/
- public void changeRegionLeader(
- TConsensusGroupId regionId,
- TDataNodeLocation originalDataNode,
- TDataNodeLocation migrateDestDataNode) {
+ public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation originalDataNode) {
Optional<TDataNodeLocation> newLeaderNode =
filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
if (TConsensusGroupType.DataRegion.equals(regionId.getType())
&& IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
- if (CONF.getDataReplicationFactor() == 1) {
- newLeaderNode = Optional.of(migrateDestDataNode);
- }
if (newLeaderNode.isPresent()) {
configManager
.getLoadManager()
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index b380c7c..24d629c 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -24,6 +24,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
@@ -33,6 +34,7 @@
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.slf4j.Logger;
@@ -85,6 +87,11 @@
outerSwitch:
switch (state) {
case CREATE_NEW_REGION_PEER:
+ LOGGER.info(
+ "[pid{}][AddRegion] started, region {} will be added to DataNode {}.",
+ getProcId(),
+ consensusGroupId.getId(),
+ destDataNode.getDataNodeId());
handler.addRegionLocation(consensusGroupId, destDataNode, RegionStatus.Adding);
TSStatus status = handler.createNewRegionPeer(consensusGroupId, destDataNode);
setKillPoint(state);
@@ -102,7 +109,10 @@
this.getProcId(), destDataNode, consensusGroupId, coordinator);
setKillPoint(state);
if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
- throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode");
+ throw new ProcedureException(
+ String.format(
+ "[pid%d][AddRegion] failed to submit task to DataNode, procedure failed",
+ getProcId()));
}
}
TRegionMigrateResult result = handler.waitTaskFinish(this.getProcId(), coordinator);
@@ -112,7 +122,8 @@
case FAIL:
// maybe some DataNode crash
LOGGER.warn(
- "{} result is {}, procedure failed. Will try to reset peer list automatically...",
+ "[pid{}][AddRegion] {} result is {}, procedure failed. Will try to reset peer list automatically...",
+ getProcId(),
state,
result.getTaskStatus());
rollback(env, handler);
@@ -132,31 +143,39 @@
case UPDATE_REGION_LOCATION_CACHE:
handler.updateRegionCache(consensusGroupId, destDataNode, RegionStatus.Running);
setKillPoint(state);
- LOGGER.info("AddRegionPeer state {} complete", state);
+ LOGGER.info("[pid{}][AddRegion] state {} complete", getProcId(), state);
LOGGER.info(
- "AddRegionPeerProcedure success, region {} has been added to DataNode {}",
+ "[pid{}][AddRegion] success, region {} has been added to DataNode {}. Procedure took {} (start at {}).",
+ getProcId(),
consensusGroupId.getId(),
- destDataNode.getDataNodeId());
+ destDataNode.getDataNodeId(),
+ CommonDateTimeUtils.convertMillisecondToDurationStr(
+ System.currentTimeMillis() - getSubmittedTime()),
+ DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
return Flow.NO_MORE_STATE;
default:
throw new ProcedureException("Unsupported state: " + state.name());
}
} catch (Exception e) {
- LOGGER.error("AddRegionPeer state {} failed", state, e);
+ LOGGER.error("[pid{}][AddRegion] state {} failed", getProcId(), state, e);
return Flow.NO_MORE_STATE;
}
- LOGGER.info("AddRegionPeer state {} complete", state);
+ LOGGER.info("[pid{}][AddRegion] state {} complete", getProcId(), state);
return Flow.HAS_MORE_STATE;
}
- private void rollback(ConfigNodeProcedureEnv env, RegionMaintainHandler handler) {
+ private void rollback(ConfigNodeProcedureEnv env, RegionMaintainHandler handler)
+ throws ProcedureException {
handler.removeRegionLocation(consensusGroupId, destDataNode);
List<TDataNodeLocation> correctDataNodeLocations =
env.getConfigManager().getPartitionManager().getAllReplicaSets().stream()
.filter(tRegionReplicaSet -> tRegionReplicaSet.getRegionId().equals(consensusGroupId))
.findAny()
- .get()
+ .orElseThrow(
+ () ->
+ new ProcedureException(
+ "Cannot roll back, because cannot find the correct locations"))
.getDataNodeLocations();
String correctStr =
@@ -170,7 +189,8 @@
relatedDataNodeLocations.forEach(
location -> relatedDataNodeLocationMap.put(location.dataNodeId, location));
LOGGER.info(
- "Will reset peer list of consensus group {} on DataNode {}",
+ "[pid{}][AddRegion] Will reset peer list of consensus group {} on DataNode {}",
+ getProcId(),
consensusGroupId,
relatedDataNodeLocations.stream()
.map(TDataNodeLocation::getDataNodeId)
@@ -184,14 +204,16 @@
(dataNodeId, resetResult) -> {
if (resetResult.getCode() == SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
- "reset peer list: peer list of consensus group {} on DataNode {} has been successfully to {}",
+ "[pid{}][AddRegion] reset peer list: peer list of consensus group {} on DataNode {} has been successfully to {}",
+ getProcId(),
consensusGroupId,
dataNodeId,
correctStr);
} else {
// TODO: more precise
LOGGER.warn(
- "reset peer list: peer list of consensus group {} on DataNode {} failed to reset to {}, you may manually reset it",
+ "[pid{}][AddRegion] reset peer list: peer list of consensus group {} on DataNode {} failed to reset to {}, you may manually reset it",
+ getProcId(),
consensusGroupId,
dataNodeId,
correctStr);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index 7823a6d..6204559 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -22,7 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
-import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
@@ -31,6 +31,7 @@
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.db.utils.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,6 @@
extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateProcedure.class);
- private static final int RETRY_THRESHOLD = 5;
/** Wait region migrate finished */
private TConsensusGroupId consensusGroupId;
@@ -55,8 +55,6 @@
private TDataNodeLocation coordinatorForAddPeer;
private TDataNodeLocation coordinatorForRemovePeer;
- private String migrateResult = "";
-
public RegionMigrateProcedure() {
super();
}
@@ -84,6 +82,12 @@
try {
switch (state) {
case REGION_MIGRATE_PREPARE:
+ LOGGER.info(
+ "[pid{}][MigrateRegion] started, region {} will be migrated from DataNode {} to {}.",
+ getProcId(),
+ consensusGroupId.getId(),
+ originalDataNode.getDataNodeId(),
+ destDataNode.getDataNodeId());
setNextState(RegionTransitionState.ADD_REGION_PEER);
break;
case ADD_REGION_PEER:
@@ -96,15 +100,10 @@
.getPartitionManager()
.isDataNodeContainsRegion(destDataNode.getDataNodeId(), consensusGroupId)) {
LOGGER.warn(
- "sub-procedure AddRegionPeerProcedure fail, RegionMigrateProcedure will not continue");
+ "[pid{}][MigrateRegion] sub-procedure AddRegionPeerProcedure fail, RegionMigrateProcedure will not continue",
+ getProcId());
return Flow.NO_MORE_STATE;
}
- LOGGER.info("sub-procedure AddRegionPeerProcedure success");
- setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
- break;
- case CHANGE_REGION_LEADER:
- handler.changeRegionLeader(consensusGroupId, originalDataNode, destDataNode);
- KillPoint.setKillPoint(state);
setNextState(RegionTransitionState.REMOVE_REGION_PEER);
break;
case REMOVE_REGION_PEER:
@@ -118,24 +117,29 @@
.getPartitionManager()
.isDataNodeContainsRegion(originalDataNode.getDataNodeId(), consensusGroupId)) {
LOGGER.warn(
- "RegionMigrateProcedure success, but you may need to manually clean the old region to make everything works fine");
+ "[pid{}][MigrateRegion] success, but you may need to manually clean the old region to make everything works fine",
+ getProcId());
} else {
LOGGER.info(
- "RegionMigrateProcedure success, region {} has been migrated from DataNode {} to {}",
+ "[pid{}][MigrateRegion] success, region {} has been migrated from DataNode {} to {}. Procedure took {} (started at {})",
+ getProcId(),
consensusGroupId.getId(),
originalDataNode.getDataNodeId(),
- destDataNode.getDataNodeId());
+ destDataNode.getDataNodeId(),
+ CommonDateTimeUtils.convertMillisecondToDurationStr(
+ System.currentTimeMillis() - getSubmittedTime()),
+ DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
}
return Flow.NO_MORE_STATE;
default:
throw new ProcedureException("Unsupported state: " + state.name());
}
} catch (Exception e) {
- LOGGER.error("RegionMigrateProcedure state {} fail", state, e);
+ LOGGER.error("[pid{}][MigrateRegion] state {} fail", getProcId(), state, e);
// meets exception in region migrate process terminate the process
return Flow.NO_MORE_STATE;
}
- LOGGER.info("RegionMigrateProcedure state {} complete", state);
+ LOGGER.info("[pid{}][MigrateRegion] state {} complete", getProcId(), state);
return Flow.HAS_MORE_STATE;
}
@@ -144,11 +148,6 @@
throws IOException, InterruptedException, ProcedureException {}
@Override
- protected boolean isRollbackSupported(RegionTransitionState state) {
- return false;
- }
-
- @Override
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
configNodeProcedureEnv.getSchedulerLock().lock();
try {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index ef3a589..e459161 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -25,6 +25,8 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
@@ -34,6 +36,7 @@
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.slf4j.Logger;
@@ -46,6 +49,7 @@
import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
import static org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER;
import static org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE;
+import static org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_PEER;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
public class RemoveRegionPeerProcedure
@@ -79,6 +83,17 @@
RegionMaintainHandler handler = env.getRegionMaintainHandler();
try {
switch (state) {
+ case TRANSFER_REGION_LEADER:
+ LOGGER.info(
+ "[pid{}][RemoveRegion] started, region {} will be removed from DataNode {}.",
+ getProcId(),
+ consensusGroupId.getId(),
+ targetDataNode.getDataNodeId());
+ handler.updateRegionCache(consensusGroupId, targetDataNode, RegionStatus.Removing);
+ handler.changeRegionLeader(consensusGroupId, targetDataNode);
+ KillPoint.setKillPoint(state);
+ setNextState(REMOVE_REGION_PEER);
+ break;
case REMOVE_REGION_PEER:
handler.updateRegionCache(consensusGroupId, targetDataNode, RegionStatus.Removing);
tsStatus =
@@ -87,7 +102,8 @@
setKillPoint(state);
if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
- "{} task submitted failed, procedure will continue. You should manually clear peer list.",
+ "[pid{}][RemoveRegion] {} task submitted failed, procedure will continue. You should manually clear peer list.",
+ getProcId(),
state);
setNextState(DELETE_OLD_REGION_PEER);
break;
@@ -96,7 +112,8 @@
handler.waitTaskFinish(this.getProcId(), coordinator);
if (removeRegionPeerResult.getTaskStatus() != TRegionMaintainTaskStatus.SUCCESS) {
LOGGER.warn(
- "{} executed failed, procedure will continue. You should manually clear peer list.",
+ "[pid{}][RemoveRegion] {} executed failed, procedure will continue. You should manually clear peer list.",
+ getProcId(),
state);
}
setNextState(DELETE_OLD_REGION_PEER);
@@ -109,7 +126,8 @@
setKillPoint(state);
if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
- "DELETE_OLD_REGION_PEER task submitted failed, procedure will continue. You should manually delete region file.");
+ "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted failed, procedure will continue. You should manually delete region file.",
+ getProcId());
setNextState(REMOVE_REGION_LOCATION_CACHE);
break;
}
@@ -117,7 +135,8 @@
handler.waitTaskFinish(this.getProcId(), targetDataNode);
if (deleteOldRegionPeerResult.getTaskStatus() != TRegionMaintainTaskStatus.SUCCESS) {
LOGGER.warn(
- "DELETE_OLD_REGION_PEER executed failed, procedure will continue. You should manually delete region file.");
+ "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed failed, procedure will continue. You should manually delete region file.",
+ getProcId());
}
setNextState(REMOVE_REGION_LOCATION_CACHE);
break;
@@ -126,9 +145,13 @@
setKillPoint(state);
LOGGER.info("RemoveRegionPeer state {} success", state);
LOGGER.info(
- "RemoveRegionPeerProcedure success, region {} has been removed from DataNode {}",
+ "[pid{}][RemoveRegion] success, region {} has been removed from DataNode {}. Procedure took {} (started at {})",
+ getProcId(),
consensusGroupId.getId(),
- targetDataNode.getDataNodeId());
+ targetDataNode.getDataNodeId(),
+ CommonDateTimeUtils.convertMillisecondToDurationStr(
+ System.currentTimeMillis() - getSubmittedTime()),
+ DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
return Flow.NO_MORE_STATE;
default:
throw new ProcedureException("Unsupported state: " + state.name());
@@ -137,7 +160,7 @@
LOGGER.error("RemoveRegionPeer state {} failed", state, e);
return Flow.NO_MORE_STATE;
}
- LOGGER.info("RemoveRegionPeer state {} success", state);
+ LOGGER.info("[pid{}][RemoveRegion] state {} success", getProcId(), state);
return Flow.HAS_MORE_STATE;
}
@@ -157,7 +180,7 @@
@Override
protected RemoveRegionPeerState getInitialState() {
- return RemoveRegionPeerState.REMOVE_REGION_PEER;
+ return RemoveRegionPeerState.TRANSFER_REGION_LEADER;
}
@Override
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
index 5c8f7bf..a58bb82 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
@@ -23,7 +23,6 @@
REGION_MIGRATE_PREPARE,
ADD_REGION_PEER,
CHECK_ADD_REGION_PEER,
- CHANGE_REGION_LEADER,
REMOVE_REGION_PEER,
CHECK_REMOVE_REGION_PEER,
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
index f4b9002..e976797 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.state;
public enum RemoveRegionPeerState {
+ TRANSFER_REGION_LEADER,
REMOVE_REGION_PEER,
DELETE_OLD_REGION_PEER,
REMOVE_REGION_LOCATION_CACHE,