blob: 7061a6b5c380e6ef0f832d2e53da36da444ee2fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.db.storageengine.rescon.memory.AbstractPoolManager;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class RegionMigrateService implements IService {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]";
private static final int MAX_RETRY_NUM = 5;
private static final int SLEEP_MILLIS = 5000;
private RegionMigratePool regionMigratePool;
// Map<taskId, taskStatus>
// TODO: Due to the use of procedureId as taskId, it is currently unable to handle the situation
// where different asynchronous tasks are submitted to the same datanode within a single procedure
private static final ConcurrentHashMap<Long, TRegionMigrateResult> taskResultMap =
new ConcurrentHashMap<>();
private static final TRegionMigrateResult unfinishedResult = new TRegionMigrateResult();
private RegionMigrateService() {}
public static RegionMigrateService getInstance() {
return Holder.INSTANCE;
}
/**
* Submit AddRegionPeerTask
*
* @param req TMaintainPeerReq
* @return if the submit task succeed
*/
public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) {
boolean submitSucceed = true;
try {
if (!addToTaskResultMap(req.getTaskId())) {
LOGGER.warn(
"{} The AddRegionPeerTask {} has already been submitted and will not be submitted again.",
REGION_MIGRATE_PROCESS,
req.getTaskId());
return true;
}
regionMigratePool.submit(
new AddRegionPeerTask(req.getTaskId(), req.getRegionId(), req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
"{}, Submit AddRegionPeerTask error for Region: {}",
REGION_MIGRATE_PROCESS,
req.getRegionId(),
e);
submitSucceed = false;
}
return submitSucceed;
}
/**
* Submit RemoveRegionPeerTask
*
* @param req TMaintainPeerReq
* @return if the submit task succeed
*/
public synchronized boolean submitRemoveRegionPeerTask(TMaintainPeerReq req) {
boolean submitSucceed = true;
try {
if (!addToTaskResultMap(req.getTaskId())) {
LOGGER.warn(
"{} The RemoveRegionPeer {} has already been submitted and will not be submitted again.",
REGION_MIGRATE_PROCESS,
req.getTaskId());
return true;
}
regionMigratePool.submit(
new RemoveRegionPeerTask(req.getTaskId(), req.getRegionId(), req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
"{}, Submit RemoveRegionPeer task error for Region: {}",
REGION_MIGRATE_PROCESS,
req.getRegionId(),
e);
submitSucceed = false;
}
return submitSucceed;
}
/**
* Submit DeleteOldRegionPeerTask
*
* @param req TMigrateRegionReq
* @return if the submit task succeed
*/
public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq req) {
boolean submitSucceed = true;
try {
if (!addToTaskResultMap(req.getTaskId())) {
LOGGER.warn(
"{} The DeleteOldRegionPeerTask {} has already been submitted and will not be submitted again.",
REGION_MIGRATE_PROCESS,
req.getTaskId());
return true;
}
regionMigratePool.submit(
new DeleteOldRegionPeerTask(req.getTaskId(), req.getRegionId(), req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
"{}, Submit DeleteOldRegionPeerTask error for Region: {}",
REGION_MIGRATE_PROCESS,
req.getRegionId(),
e);
submitSucceed = false;
}
return submitSucceed;
}
public synchronized TSStatus resetPeerList(TResetPeerListReq req) {
List<Peer> correctPeers =
req.getCorrectLocations().stream()
.map(location -> Peer.valueOf(req.getRegionId(), location))
.collect(Collectors.toList());
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
try {
if (regionId instanceof DataRegionId) {
DataRegionConsensusImpl.getInstance().resetPeerList(regionId, correctPeers);
} else {
SchemaRegionConsensusImpl.getInstance().resetPeerList(regionId, correctPeers);
}
} catch (ConsensusGroupNotExistException e) {
LOGGER.warn(
"Reset peer list fail, this DataNode not contains peer of consensus group {}. Maybe caused by create local peer failure.",
regionId,
e);
} catch (ConsensusException e) {
LOGGER.error("reset peer list fail", e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
private boolean addToTaskResultMap(long taskId) {
return taskResultMap.putIfAbsent(taskId, unfinishedResult) == null;
}
@Override
public void start() throws StartupException {
regionMigratePool = new RegionMigratePool();
regionMigratePool.start();
LOGGER.info("Region migrate service start");
}
@Override
public void stop() {
if (regionMigratePool != null) {
regionMigratePool.stop();
}
LOGGER.info("Region migrate service stop");
}
@Override
public ServiceType getID() {
return ServiceType.DATA_NODE_REGION_MIGRATE_SERVICE;
}
private static class RegionMigratePool extends AbstractPoolManager {
private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);
private RegionMigratePool() {
this.pool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.REGION_MIGRATE.getName());
}
@Override
public Logger getLogger() {
return poolLogger;
}
@Override
public void start() {
if (this.pool != null) {
poolLogger.info("DataNode region migrate pool start");
}
}
@Override
public String getName() {
return "migrate region";
}
}
private static class AddRegionPeerTask implements Runnable {
private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
private final long taskId;
// The RegionGroup that shall perform the add peer process
private final TConsensusGroupId tRegionId;
// The new DataNode to be added in the RegionGroup
private final TDataNodeLocation destDataNode;
public AddRegionPeerTask(
long taskId, TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) {
this.taskId = taskId;
this.tRegionId = tRegionId;
this.destDataNode = destDataNode;
}
@Override
public void run() {
TSStatus runResult = addPeer();
if (isFailed(runResult)) {
taskFail(
taskId, tRegionId, destDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
return;
}
taskSucceed(taskId, tRegionId, "AddPeer");
}
private TSStatus addPeer() {
taskLogger.info(
"{}, Start to addPeer {} for region {}", REGION_MIGRATE_PROCESS, destDataNode, tRegionId);
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TEndPoint destEndpoint = getConsensusEndPoint(destDataNode, regionId);
boolean addPeerSucceed = true;
Throwable throwable = null;
for (int i = 0; i < MAX_RETRY_NUM; i++) {
try {
if (!addPeerSucceed) {
Thread.sleep(SLEEP_MILLIS);
}
addRegionPeer(regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndpoint));
addPeerSucceed = true;
} catch (PeerAlreadyInConsensusGroupException e) {
addPeerSucceed = true;
} catch (InterruptedException e) {
throwable = e;
Thread.currentThread().interrupt();
} catch (ConsensusException e) {
addPeerSucceed = false;
throwable = e;
taskLogger.error(
"{}, executed addPeer {} for region {} error, retry times: {}",
REGION_MIGRATE_PROCESS,
destEndpoint,
regionId,
i,
e);
}
if (addPeerSucceed || throwable instanceof InterruptedException) {
break;
}
}
if (!addPeerSucceed) {
String errorMsg =
String.format(
"%s, AddPeer for region error after max retry times, peerId: %s, regionId: %s",
REGION_MIGRATE_PROCESS, destEndpoint, regionId);
taskLogger.error(errorMsg, throwable);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(errorMsg);
return status;
}
taskLogger.info(
"{}, Succeed to addPeer {} for region {}",
REGION_MIGRATE_PROCESS,
destEndpoint,
regionId);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("addPeer " + destEndpoint + " for region " + regionId + " succeed");
return status;
}
private void addRegionPeer(ConsensusGroupId regionId, Peer newPeer) throws ConsensusException {
if (regionId instanceof DataRegionId) {
DataRegionConsensusImpl.getInstance().addRemotePeer(regionId, newPeer);
} else {
SchemaRegionConsensusImpl.getInstance().addRemotePeer(regionId, newPeer);
}
}
}
private static class RemoveRegionPeerTask implements Runnable {
private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
private final long taskId;
private final TConsensusGroupId tRegionId;
private final TDataNodeLocation destDataNode;
public RemoveRegionPeerTask(
long taskId, TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) {
this.taskId = taskId;
this.tRegionId = tRegionId;
this.destDataNode = destDataNode;
}
@Override
public void run() {
TSStatus runResult = removePeer();
if (isSucceed(runResult)) {
taskSucceed(taskId, tRegionId, "RemovePeer");
} else {
taskFail(
taskId, tRegionId, destDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
}
}
private TSStatus removePeer() {
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TEndPoint destEndPoint = getConsensusEndPoint(destDataNode, regionId);
taskLogger.info(
"{}, Start to removePeer {} for region {}",
REGION_MIGRATE_PROCESS,
destEndPoint,
regionId);
Throwable throwable = null;
boolean removePeerSucceed = true;
for (int i = 0; i < MAX_RETRY_NUM; i++) {
try {
if (!removePeerSucceed) {
Thread.sleep(SLEEP_MILLIS);
}
removeRegionPeer(
regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndPoint));
removePeerSucceed = true;
} catch (PeerNotInConsensusGroupException e) {
removePeerSucceed = true;
} catch (InterruptedException e) {
throwable = e;
Thread.currentThread().interrupt();
} catch (ConsensusException e) {
removePeerSucceed = false;
throwable = e;
taskLogger.error(
"{}, executed removePeer {} for region {} error, retry times: {}",
REGION_MIGRATE_PROCESS,
destEndPoint,
regionId,
i,
e);
}
if (removePeerSucceed || throwable instanceof InterruptedException) {
break;
}
}
if (!removePeerSucceed) {
String errorMsg =
String.format(
"%s, RemovePeer for region error after max retry times, peerId: %s, regionId: %s",
REGION_MIGRATE_PROCESS, destEndPoint, regionId);
taskLogger.error(errorMsg, throwable);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(errorMsg);
return status;
}
taskLogger.info(
"{}, Succeed to removePeer {} for region {}",
REGION_MIGRATE_PROCESS,
destEndPoint,
regionId);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("removePeer " + destEndPoint + " for region " + regionId + " succeed");
return status;
}
private void removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer)
throws ConsensusException {
if (regionId instanceof DataRegionId) {
DataRegionConsensusImpl.getInstance().removeRemotePeer(regionId, oldPeer);
} else {
SchemaRegionConsensusImpl.getInstance().removeRemotePeer(regionId, oldPeer);
}
}
}
private static class DeleteOldRegionPeerTask implements Runnable {
private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
private final long taskId;
private final TConsensusGroupId tRegionId;
private final TDataNodeLocation originalDataNode;
public DeleteOldRegionPeerTask(
long taskId, TConsensusGroupId tRegionId, TDataNodeLocation originalDataNode) {
this.taskId = taskId;
this.tRegionId = tRegionId;
this.originalDataNode = originalDataNode;
}
@Override
public void run() {
// deletePeer: remove the peer from the consensus group
TSStatus runResult = deletePeer();
if (isFailed(runResult)) {
taskFail(
taskId,
tRegionId,
originalDataNode,
TRegionMigrateFailedType.RemoveConsensusGroupFailed,
runResult);
}
// deleteRegion: delete region data
runResult = deleteRegion();
if (isFailed(runResult)) {
taskFail(
taskId,
tRegionId,
originalDataNode,
TRegionMigrateFailedType.DeleteRegionFailed,
runResult);
}
taskSucceed(taskId, tRegionId, "DeletePeer");
}
private TSStatus deletePeer() {
taskLogger.info(
"{}, Start to deletePeer {} for region {}",
REGION_MIGRATE_PROCESS,
originalDataNode,
tRegionId);
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
try {
if (regionId instanceof DataRegionId) {
DataRegionConsensusImpl.getInstance().deleteLocalPeer(regionId);
} else {
SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(regionId);
}
} catch (ConsensusException e) {
String errorMsg =
String.format(
"deletePeer error, regionId: %s, errorMessage: %s", regionId, e.getMessage());
taskLogger.error(errorMsg);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(errorMsg);
return status;
} catch (Exception e) {
taskLogger.error("{}, deletePeer error, regionId: {}", REGION_MIGRATE_PROCESS, regionId, e);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
"deletePeer for region: " + regionId + " error. exception: " + e.getMessage());
return status;
}
taskLogger.info(
"{}, Succeed to deletePeer {} from consensus group", REGION_MIGRATE_PROCESS, regionId);
status.setMessage("deletePeer from consensus group " + regionId + "succeed");
return status;
}
private TSStatus deleteRegion() {
taskLogger.info(
"{}, Start to deleteRegion {} for datanode {}",
REGION_MIGRATE_PROCESS,
tRegionId,
originalDataNode);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
try {
if (regionId instanceof DataRegionId) {
DataNodeRegionManager.getInstance().deleteDataRegion((DataRegionId) regionId);
} else {
DataNodeRegionManager.getInstance().deleteSchemaRegion((SchemaRegionId) regionId);
}
} catch (Exception e) {
taskLogger.error("{}, deleteRegion {} error", REGION_MIGRATE_PROCESS, regionId, e);
status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
status.setMessage("deleteRegion " + regionId + " error, " + e.getMessage());
return status;
}
status.setMessage("deleteRegion " + regionId + " succeed");
taskLogger.info("{}, Succeed to deleteRegion {}", REGION_MIGRATE_PROCESS, regionId);
return status;
}
}
private static class Holder {
private static final RegionMigrateService INSTANCE = new RegionMigrateService();
private Holder() {}
}
private static void taskSucceed(long taskId, TConsensusGroupId tRegionId, String migrateState) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage(
String.format("Region: %s, state: %s, executed succeed", tRegionId, migrateState));
TRegionMigrateResult req = new TRegionMigrateResult(TRegionMaintainTaskStatus.SUCCESS);
req.setRegionId(tRegionId).setMigrateResult(status);
taskResultMap.put(taskId, req);
}
private static void taskFail(
long taskId,
TConsensusGroupId tRegionId,
TDataNodeLocation failedNode,
TRegionMigrateFailedType failedType,
TSStatus status) {
Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
failedNodeAndReason.put(failedNode, failedType);
TRegionMigrateResult req = new TRegionMigrateResult(TRegionMaintainTaskStatus.FAIL);
req.setRegionId(tRegionId).setMigrateResult(status);
req.setFailedNodeAndReason(failedNodeAndReason);
taskResultMap.put(taskId, req);
}
public TRegionMigrateResult getRegionMaintainResult(Long taskId) {
TRegionMigrateResult result = new TRegionMigrateResult();
if (!taskResultMap.containsKey(taskId)) {
result.setTaskStatus(TRegionMaintainTaskStatus.TASK_NOT_EXIST);
} else if (taskResultMap.get(taskId) == unfinishedResult) {
result.setTaskStatus(TRegionMaintainTaskStatus.PROCESSING);
} else {
result = taskResultMap.get(taskId);
}
return result;
}
private static boolean isSucceed(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
private static boolean isFailed(TSStatus status) {
return !isSucceed(status);
}
private static TEndPoint getConsensusEndPoint(
TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
if (regionId instanceof DataRegionId) {
return nodeLocation.getDataRegionConsensusEndPoint();
}
return nodeLocation.getSchemaRegionConsensusEndPoint();
}
}