blob: 4329b5f05539346f5b5d348a0d1c10d2919d6f55 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class CreateRegionGroupsProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, CreateRegionGroupsState> {
private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionGroupsProcedure.class);
private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
/** key: TConsensusGroupId value: Failed RegionReplicas */
private Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets = new HashMap<>();
public CreateRegionGroupsProcedure() {
super();
}
public CreateRegionGroupsProcedure(CreateRegionGroupsPlan createRegionGroupsPlan) {
this.createRegionGroupsPlan = createRegionGroupsPlan;
}
public CreateRegionGroupsProcedure(
CreateRegionGroupsPlan createRegionGroupsPlan,
Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets) {
this.createRegionGroupsPlan = createRegionGroupsPlan;
this.failedRegionReplicaSets = failedRegionReplicaSets;
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionGroupsState state) {
switch (state) {
case CREATE_REGION_GROUPS:
failedRegionReplicaSets = env.doRegionCreation(createRegionGroupsPlan);
setNextState(CreateRegionGroupsState.SHUNT_REGION_REPLICAS);
break;
case SHUNT_REGION_REPLICAS:
CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan();
// Filter those RegionGroups that created successfully
createRegionGroupsPlan
.getRegionGroupMap()
.forEach(
(storageGroup, regionReplicaSets) ->
regionReplicaSets.forEach(
regionReplicaSet -> {
if (!failedRegionReplicaSets.containsKey(
regionReplicaSet.getRegionId())) {
// A RegionGroup was created successfully when
// all RegionReplicas were created successfully
persistPlan.addRegionGroup(storageGroup, regionReplicaSet);
} else {
TRegionReplicaSet failedRegionReplicas =
failedRegionReplicaSets.get(regionReplicaSet.getRegionId());
if (failedRegionReplicas.getDataNodeLocationsSize()
<= (regionReplicaSet.getDataNodeLocationsSize() - 1) / 2) {
// A RegionGroup can provide service as long as there are more than
// half of the RegionReplicas were created successfully
persistPlan.addRegionGroup(storageGroup, regionReplicaSet);
// Build recreate tasks
failedRegionReplicas
.getDataNodeLocations()
.forEach(
targetDataNode -> {
RegionCreateTask createTask =
new RegionCreateTask(
targetDataNode, storageGroup, regionReplicaSet);
if (TConsensusGroupType.DataRegion.equals(
regionReplicaSet.getRegionId().getType())) {
try {
createTask.setTTL(env.getTTL(storageGroup));
} catch (StorageGroupNotExistsException e) {
LOGGER.error("Can't get TTL", e);
}
}
offerPlan.appendRegionMaintainTask(createTask);
});
} else {
// The redundant RegionReplicas should be deleted otherwise
regionReplicaSet
.getDataNodeLocations()
.forEach(
targetDataNode -> {
if (!failedRegionReplicas
.getDataNodeLocations()
.contains(targetDataNode)) {
RegionDeleteTask deleteTask =
new RegionDeleteTask(
targetDataNode, regionReplicaSet.getRegionId());
offerPlan.appendRegionMaintainTask(deleteTask);
}
});
}
}
}));
env.persistAndBroadcastRegionGroup(persistPlan);
env.getConfigManager().getConsensusManager().write(offerPlan);
setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
break;
case CREATE_REGION_GROUPS_FINISH:
return Flow.NO_MORE_STATE;
}
return Flow.HAS_MORE_STATE;
}
@Override
protected void rollbackState(
ConfigNodeProcedureEnv configNodeProcedureEnv,
CreateRegionGroupsState createRegionGroupsState) {
// Do nothing
}
@Override
protected CreateRegionGroupsState getState(int stateId) {
return CreateRegionGroupsState.values()[stateId];
}
@Override
protected int getStateId(CreateRegionGroupsState createRegionGroupsState) {
return createRegionGroupsState.ordinal();
}
@Override
protected CreateRegionGroupsState getInitialState() {
return CreateRegionGroupsState.CREATE_REGION_GROUPS;
}
@Override
public void serialize(DataOutputStream stream) throws IOException {
// must serialize CREATE_REGION_GROUPS.ordinal() firstly
stream.writeInt(ProcedureFactory.ProcedureType.CREATE_REGION_GROUPS.ordinal());
super.serialize(stream);
createRegionGroupsPlan.serializeForProcedure(stream);
stream.writeInt(failedRegionReplicaSets.size());
failedRegionReplicaSets.forEach(
(groupId, replica) -> {
ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream);
ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream);
});
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
createRegionGroupsPlan.deserializeForProcedure(byteBuffer);
failedRegionReplicaSets.clear();
int failedRegionsSize = byteBuffer.getInt();
while (failedRegionsSize-- > 0) {
TConsensusGroupId groupId =
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
TRegionReplicaSet replica =
ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
failedRegionReplicaSets.put(groupId, replica);
}
} catch (Exception e) {
LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", e);
throw new RuntimeException(e);
}
}
@Override
public boolean equals(Object that) {
if (that instanceof CreateRegionGroupsProcedure) {
CreateRegionGroupsProcedure thatProc = (CreateRegionGroupsProcedure) that;
return thatProc.getProcId() == this.getProcId()
&& thatProc.getState() == this.getState()
&& thatProc.createRegionGroupsPlan.equals(this.createRegionGroupsPlan)
&& thatProc.failedRegionReplicaSets.equals(this.failedRegionReplicaSets);
}
return false;
}
@Override
public int hashCode() {
int result = createRegionGroupsPlan.hashCode();
result = 31 * result + Objects.hash(failedRegionReplicaSets);
return result;
}
}