blob: c9be7cd1a90c7c8bce8dfd065a6233d6790b2fdf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.task.ConstructSchemaBlackListDataNodeTask;
import org.apache.iotdb.confignode.client.async.task.DeleteDataForDeleteTimeSeriesDataNodeTask;
import org.apache.iotdb.confignode.client.async.task.DeleteTimeSeriesDataNodeTask;
import org.apache.iotdb.confignode.client.async.task.FetchSchemaBlackListDataNodeTask;
import org.apache.iotdb.confignode.client.async.task.InvalidateMatchedSchemaCacheDataNodeTask;
import org.apache.iotdb.confignode.client.async.task.RollbackSchemaBlackListDataNodeTask;
import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.state.DeleteTimeSeriesState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class DeleteTimeSeriesProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, DeleteTimeSeriesState> {
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteTimeSeriesProcedure.class);
private String queryId;
private PathPatternTree patternTree;
private ByteBuffer patternTreeBytes;
private String requestMessage;
public DeleteTimeSeriesProcedure() {
super();
}
public DeleteTimeSeriesProcedure(String queryId, PathPatternTree patternTree) {
super();
this.queryId = queryId;
setPatternTree(patternTree);
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, DeleteTimeSeriesState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case CONSTRUCT_BLACK_LIST:
LOGGER.info("Construct schema black list of timeseries {}", requestMessage);
if (constructBlackList(env) > 0) {
setNextState(DeleteTimeSeriesState.CLEAN_DATANODE_SCHEMA_CACHE);
break;
} else {
setFailure(
new ProcedureException(
new PathNotExistException(
patternTree.getAllPathPatterns().stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toList()))));
return Flow.NO_MORE_STATE;
}
case CLEAN_DATANODE_SCHEMA_CACHE:
LOGGER.info("Invalidate cache of timeseries {}", requestMessage);
invalidateCache(env);
break;
case DELETE_DATA:
LOGGER.info("Delete data of timeseries {}", requestMessage);
deleteData(env);
break;
case DELETE_TIMESERIES:
LOGGER.info("Delete timeseries schema of {}", requestMessage);
deleteTimeSeries(env);
return Flow.NO_MORE_STATE;
default:
setFailure(new ProcedureException("Unrecognized state " + state.toString()));
return Flow.NO_MORE_STATE;
}
return Flow.HAS_MORE_STATE;
}
// return the total num of timeseries in schema black list
private int constructBlackList(ConfigNodeProcedureEnv env) {
Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup =
env.getConfigManager().getRelatedSchemaRegionGroup(patternTree);
if (targetSchemaRegionGroup.isEmpty()) {
return 0;
}
RegionTask<TSStatus> constructBlackListTask =
new RegionTask<TSStatus>("construct schema black list", env, targetSchemaRegionGroup) {
@Override
Map<Integer, TSStatus> sendRequest(
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
// construct request and send
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
ConstructSchemaBlackListDataNodeTask dataNodeTask =
new ConstructSchemaBlackListDataNodeTask(dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TConstructSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
dataNodeTask);
dataNodeTask
.getDataNodeResponseMap()
.forEach(
(k, v) -> {
if (v.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
responseMap.put(k, v);
}
});
return dataNodeTask.getDataNodeResponseMap();
}
};
constructBlackListTask.execute();
if (isFailed()) {
return 0;
}
int preDeletedNum = 0;
for (TSStatus resp : constructBlackListTask.getResponseMap().values()) {
preDeletedNum += Integer.parseInt(resp.getMessage());
}
return preDeletedNum;
}
private void invalidateCache(ConfigNodeProcedureEnv env) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
InvalidateMatchedSchemaCacheDataNodeTask dataNodeTask =
new InvalidateMatchedSchemaCacheDataNodeTask(dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TInvalidateMatchedSchemaCacheReq(patternTreeBytes), dataNodeTask);
Map<Integer, TSStatus> statusMap = dataNodeTask.getDataNodeResponseMap();
for (TSStatus status : statusMap.values()) {
// all dataNodes must clear the related schema cache
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error("Failed to invalidate schema cache of timeseries {}", requestMessage);
setFailure(new ProcedureException(new MetadataException("Invalidate schema cache failed")));
return;
}
}
setNextState(DeleteTimeSeriesState.DELETE_DATA);
}
private void deleteData(ConfigNodeProcedureEnv env) {
Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup =
env.getConfigManager().getRelatedSchemaRegionGroup(patternTree);
Map<TDataNodeLocation, List<TConsensusGroupId>> dataNodeSchemaRegionGroupGroupIdMap =
getLeaderDataNodeRegionGroupMap(
env.getConfigManager().getPartitionManager().getAllLeadership(),
relatedSchemaRegionGroup);
// fetch schema black list by dataNode
for (Map.Entry<TDataNodeLocation, List<TConsensusGroupId>> entry :
dataNodeSchemaRegionGroupGroupIdMap.entrySet()) {
Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup = new HashMap<>();
entry
.getValue()
.forEach(
consensusGroupId ->
targetSchemaRegionGroup.put(
consensusGroupId, relatedSchemaRegionGroup.get(consensusGroupId)));
// resolve original path pattern into specific timeseries full path
PathPatternTree patternTree =
fetchSchemaBlackListOnTargetDataNode(env, targetSchemaRegionGroup);
if (isFailed()) {
return;
}
if (patternTree == null) {
LOGGER.error(
"Failed to fetch schema black list for delete data of timeseries {} on {}",
requestMessage,
entry.getKey());
setFailure(
new ProcedureException(
new MetadataException("Fetch schema black list forDelete data failed")));
return;
}
if (patternTree.isEmpty()) {
continue;
}
Map<TConsensusGroupId, TRegionReplicaSet> relatedDataRegionGroup =
env.getConfigManager().getRelatedDataRegionGroup(patternTree);
// target timeseries has no data
if (relatedDataRegionGroup.isEmpty()) {
continue;
}
RegionTask<TSStatus> deleteDataTask =
new RegionTask<TSStatus>("delete data", env, relatedDataRegionGroup) {
@Override
Map<Integer, TSStatus> sendRequest(
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
DeleteDataForDeleteTimeSeriesDataNodeTask dataNodeTask =
new DeleteDataForDeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TDeleteDataForDeleteTimeSeriesReq(
new ArrayList<>(consensusGroupIdList),
preparePatternTreeBytesData(patternTree)),
dataNodeTask);
return dataNodeTask.getDataNodeResponseMap();
}
};
deleteDataTask.setExecuteOnAllReplicaset(true);
deleteDataTask.execute();
if (isFailed()) {
return;
}
}
setNextState(DeleteTimeSeriesState.DELETE_TIMESERIES);
}
private PathPatternTree fetchSchemaBlackListOnTargetDataNode(
ConfigNodeProcedureEnv env,
Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup) {
RegionTask<TFetchSchemaBlackListResp> fetchSchemaBlackListTask =
new RegionTask<TFetchSchemaBlackListResp>(
"fetch schema black list", env, targetSchemaRegionGroup) {
@Override
Map<Integer, TSStatus> sendRequest(
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
FetchSchemaBlackListDataNodeTask dataNodeTask =
new FetchSchemaBlackListDataNodeTask(dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TFetchSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
dataNodeTask);
Map<Integer, TFetchSchemaBlackListResp> respMap = dataNodeTask.getDataNodeResponseMap();
Map<Integer, TSStatus> statusMap = new HashMap<>();
respMap.forEach(
(k, v) -> {
if (v.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
responseMap.put(k, v);
}
statusMap.put(k, v.getStatus());
});
return statusMap;
}
};
fetchSchemaBlackListTask.execute();
if (isFailed()) {
return null;
}
Map<Integer, TFetchSchemaBlackListResp> respMap = fetchSchemaBlackListTask.getResponseMap();
PathPatternTree patternTree = new PathPatternTree();
for (TFetchSchemaBlackListResp resp : respMap.values()) {
for (PartialPath path :
PathPatternTree.deserialize(ByteBuffer.wrap(resp.getPathPatternTree()))
.getAllPathPatterns()) {
patternTree.appendFullPath(path);
}
}
patternTree.constructTree();
return patternTree;
}
private void deleteTimeSeries(ConfigNodeProcedureEnv env) {
RegionTask<TSStatus> deleteTimeSeriesTask =
new RegionTask<TSStatus>(
"delete timeseries",
env,
env.getConfigManager().getRelatedSchemaRegionGroup(patternTree)) {
@Override
Map<Integer, TSStatus> sendRequest(
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
DeleteTimeSeriesDataNodeTask dataNodeTask =
new DeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TDeleteTimeSeriesReq(consensusGroupIdList, patternTreeBytes), dataNodeTask);
return dataNodeTask.getDataNodeResponseMap();
}
};
deleteTimeSeriesTask.execute();
}
/**
* Try to get and execute request on consensus group leader as possible. If fail to get leader,
* select some other replica for execution.
*/
private Map<TDataNodeLocation, List<TConsensusGroupId>> getLeaderDataNodeRegionGroupMap(
Map<TConsensusGroupId, Integer> leaderMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap) {
Map<TDataNodeLocation, List<TConsensusGroupId>> dataNodeConsensusGroupIdMap = new HashMap<>();
regionReplicaSetMap.forEach(
(consensusGroupId, regionReplicaSet) -> {
Integer leaderId = leaderMap.get(consensusGroupId);
TDataNodeLocation leaderDataNodeLocation = null;
if (leaderId == null || leaderId == -1) {
leaderDataNodeLocation = regionReplicaSet.getDataNodeLocations().get(0);
} else {
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
if (dataNodeLocation.getDataNodeId() == leaderId) {
leaderDataNodeLocation = dataNodeLocation;
break;
}
}
}
dataNodeConsensusGroupIdMap
.computeIfAbsent(leaderDataNodeLocation, k -> new ArrayList<>())
.add(regionReplicaSet.getRegionId());
});
return dataNodeConsensusGroupIdMap;
}
/**
* Try to execute request on all replica of one consensus group. If some replica failed, execute
* according request on some other replica and let consensus layer to sync it.
*/
private Map<TDataNodeLocation, List<TConsensusGroupId>> getAllReplicaDataNodeRegionGroupMap(
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap) {
Map<TDataNodeLocation, List<TConsensusGroupId>> dataNodeConsensusGroupIdMap = new HashMap<>();
regionReplicaSetMap.forEach(
(consensusGroupId, regionReplicaSet) -> {
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
dataNodeConsensusGroupIdMap
.computeIfAbsent(dataNodeLocation, k -> new ArrayList<>())
.add(regionReplicaSet.getRegionId());
}
});
return dataNodeConsensusGroupIdMap;
}
@Override
protected void rollbackState(
ConfigNodeProcedureEnv env, DeleteTimeSeriesState deleteTimeSeriesState)
throws IOException, InterruptedException, ProcedureException {
RegionTask<TSStatus> rollbackStateTask =
new RegionTask<TSStatus>(
"roll back schema black list",
env,
env.getConfigManager().getRelatedSchemaRegionGroup(patternTree)) {
@Override
Map<Integer, TSStatus> sendRequest(
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), dataNodeLocation);
RollbackSchemaBlackListDataNodeTask dataNodeTask =
new RollbackSchemaBlackListDataNodeTask(dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
new TRollbackSchemaBlackListReq(consensusGroupIdList, patternTreeBytes),
dataNodeTask);
return dataNodeTask.getDataNodeResponseMap();
}
};
rollbackStateTask.execute();
}
@Override
protected boolean isRollbackSupported(DeleteTimeSeriesState deleteTimeSeriesState) {
return true;
}
@Override
protected DeleteTimeSeriesState getState(int stateId) {
return DeleteTimeSeriesState.values()[stateId];
}
@Override
protected int getStateId(DeleteTimeSeriesState deleteTimeSeriesState) {
return deleteTimeSeriesState.ordinal();
}
@Override
protected DeleteTimeSeriesState getInitialState() {
return DeleteTimeSeriesState.CONSTRUCT_BLACK_LIST;
}
public String getQueryId() {
return queryId;
}
public PathPatternTree getPatternTree() {
return patternTree;
}
public void setPatternTree(PathPatternTree patternTree) {
this.patternTree = patternTree;
requestMessage = patternTree.getAllPathPatterns().toString();
patternTreeBytes = preparePatternTreeBytesData(patternTree);
}
private ByteBuffer preparePatternTreeBytesData(PathPatternTree patternTree) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
patternTree.serialize(dataOutputStream);
} catch (IOException ignored) {
}
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
}
@Override
public void serialize(DataOutputStream stream) throws IOException {
stream.writeInt(ProcedureFactory.ProcedureType.DELETE_TIMESERIES_PROCEDURE.ordinal());
super.serialize(stream);
ReadWriteIOUtils.write(queryId, stream);
patternTree.serialize(stream);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
queryId = ReadWriteIOUtils.readString(byteBuffer);
setPatternTree(PathPatternTree.deserialize(byteBuffer));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o;
return this.getProcId() == that.getProcId()
&& this.getState() == that.getState()
&& patternTree.equals(that.patternTree);
}
@Override
public int hashCode() {
return Objects.hash(getProcId(), getState(), patternTree);
}
private abstract class RegionTask<T> {
private final String taskName;
private final ConfigNodeProcedureEnv env;
private final Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup;
private boolean executeOnAllReplicaset = false;
protected Map<Integer, T> responseMap = new ConcurrentHashMap<>();
RegionTask(
String taskName,
ConfigNodeProcedureEnv env,
Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup) {
this.taskName = taskName;
this.env = env;
this.targetSchemaRegionGroup = targetSchemaRegionGroup;
}
private void execute() {
// organize schema region by dataNode
Set<TDataNodeLocation> allFailedDataNodeSet = new HashSet<>();
Map<TDataNodeLocation, List<TConsensusGroupId>> dataNodeConsensusGroupIdMap =
executeOnAllReplicaset
? getAllReplicaDataNodeRegionGroupMap(targetSchemaRegionGroup)
: getLeaderDataNodeRegionGroupMap(
env.getConfigManager().getPartitionManager().getAllLeadership(),
targetSchemaRegionGroup);
while (!dataNodeConsensusGroupIdMap.isEmpty()) {
Map<TDataNodeLocation, List<TConsensusGroupId>> currentFailedDataNodeMap =
sendRegionRequest(dataNodeConsensusGroupIdMap);
if (isFailed()) {
// some dataNode execution failure
return;
}
if (currentFailedDataNodeMap.isEmpty()) {
// all succeeded
break;
}
// retry failed dataNode requests caused by unexpected error on other replicas on other
// dataNodes
currentFailedDataNodeMap.forEach(dataNodeConsensusGroupIdMap::remove);
// remove dataNodes that successfully executed request
allFailedDataNodeSet.removeAll(dataNodeConsensusGroupIdMap.keySet());
dataNodeConsensusGroupIdMap =
getAvailableDataNodeLocationForRetry(currentFailedDataNodeMap, allFailedDataNodeSet);
if (isFailed()) {
// some consensus group has no available dataNode
return;
}
}
}
private Map<TDataNodeLocation, List<TConsensusGroupId>> sendRegionRequest(
Map<TDataNodeLocation, List<TConsensusGroupId>> dataNodeConsensusGroupIdMap) {
// send request to each dataNode
Map<TDataNodeLocation, List<TConsensusGroupId>> failedDataNodeMap = new HashMap<>();
for (Map.Entry<TDataNodeLocation, List<TConsensusGroupId>> entry :
dataNodeConsensusGroupIdMap.entrySet()) {
// process response
Map<Integer, TSStatus> dataNodeResponseMap = sendRequest(entry.getKey(), entry.getValue());
TSStatus currentDataNodeResponse = dataNodeResponseMap.get(entry.getKey().getDataNodeId());
if (currentDataNodeResponse.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (currentDataNodeResponse.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
// dataNode execution error
LOGGER.error(
"Failed to execute [{}] of delete timeseries {} on {}",
taskName,
requestMessage,
entry.getKey());
setFailure(
new ProcedureException(
new MetadataException(
String.format(
"Delete timeseries %s failed when [%s]", requestMessage, taskName))));
break;
} else {
// unexpected error, retry on other replicates on other dataNodes
failedDataNodeMap.put(entry.getKey(), entry.getValue());
}
}
}
return failedDataNodeMap;
}
private Map<TDataNodeLocation, List<TConsensusGroupId>> getAvailableDataNodeLocationForRetry(
Map<TDataNodeLocation, List<TConsensusGroupId>> failedDataNodeConsensusGroupIdMap,
Set<TDataNodeLocation> allFailedDataNodeSet) {
Map<TConsensusGroupId, Integer> leaderMap =
env.getConfigManager().getPartitionManager().getAllLeadership();
Map<TDataNodeLocation, List<TConsensusGroupId>> availableDataNodeLocation = new HashMap<>();
for (List<TConsensusGroupId> consensusGroupIdList :
failedDataNodeConsensusGroupIdMap.values()) {
for (TConsensusGroupId consensusGroupId : consensusGroupIdList) {
TRegionReplicaSet regionReplicaSet = targetSchemaRegionGroup.get(consensusGroupId);
TDataNodeLocation selectedDataNode = null;
Integer leaderId = leaderMap.get(consensusGroupId);
if (leaderId == null || leaderId == -1) {
for (TDataNodeLocation candidateDataNode : regionReplicaSet.getDataNodeLocations()) {
if (!allFailedDataNodeSet.contains(candidateDataNode)) {
// since leader of this group is unknown, take the first available one
selectedDataNode = candidateDataNode;
break;
}
}
} else {
for (TDataNodeLocation candidateDataNode : regionReplicaSet.getDataNodeLocations()) {
if (!allFailedDataNodeSet.contains(candidateDataNode)) {
if (leaderId == candidateDataNode.getDataNodeId()) {
// retry on the new leader as possible
selectedDataNode = candidateDataNode;
break;
}
if (selectedDataNode == null) {
selectedDataNode = candidateDataNode;
}
}
}
}
if (selectedDataNode == null) {
setFailure(
new ProcedureException(
new MetadataException(
String.format(
"Delete timeseries %s failed when [%s] because all replicaset of schemaRegion %s failed.",
requestMessage, taskName, consensusGroupId.id))));
return availableDataNodeLocation;
} else {
availableDataNodeLocation
.compute(selectedDataNode, (k, v) -> new ArrayList<>())
.add(consensusGroupId);
}
}
}
return availableDataNodeLocation;
}
Map<Integer, T> getResponseMap() {
return responseMap == null ? Collections.emptyMap() : responseMap;
}
void setExecuteOnAllReplicaset(boolean executeOnAllReplicaset) {
this.executeOnAllReplicaset = executeOnAllReplicaset;
}
abstract Map<Integer, TSStatus> sendRequest(
TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> consensusGroupIdList);
}
}