blob: 0c04b34b0dee3e78f4ce208042c3d6c4b9ee2093 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
import org.apache.iotdb.confignode.persistence.partition.RegionMaintainTask;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** The PartitionManager Manages cluster PartitionTable read and write requests. */
public class PartitionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
private final IManager configManager;
private final PartitionInfo partitionInfo;
private SeriesPartitionExecutor executor;
/** Region cleaner */
// Monitor for leadership change
private final Object scheduleMonitor = new Object();
// Try to delete Regions in every 10s
private static final int REGION_MAINTAINER_WORK_INTERVAL = 10;
private final ScheduledExecutorService regionMaintainer;
private Future<?> currentRegionMaintainerFuture;
// Map<RegionId, RegionGroupCache>
private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
this.configManager = configManager;
this.partitionInfo = partitionInfo;
this.regionMaintainer =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Maintainer");
this.regionGroupCacheMap = new ConcurrentHashMap<>();
setSeriesPartitionExecutor();
}
/** Construct SeriesPartitionExecutor by iotdb-confignode.properties */
private void setSeriesPartitionExecutor() {
ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
this.executor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
conf.getSeriesPartitionExecutorClass(), conf.getSeriesPartitionSlotNum());
}
// ======================================================
// Consensus read/write interfaces
// ======================================================
/**
* Thread-safely get SchemaPartition
*
* @param req SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionDataSet that contains only existing SchemaPartition
*/
public DataSet getSchemaPartition(GetSchemaPartitionPlan req) {
return getConsensusManager().read(req).getDataset();
}
/**
* Thread-safely get DataPartition
*
* @param req DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
* List<TimePartitionSlot>>>
* @return DataPartitionDataSet that contains only existing DataPartition
*/
public DataSet getDataPartition(GetDataPartitionPlan req) {
return getConsensusManager().read(req).getDataset();
}
/**
* Get SchemaPartition and create a new one if it does not exist
*
* @param req SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionResp with DataPartition and TSStatus. SUCCESS_STATUS if all process
* finish. NOT_ENOUGH_DATA_NODE if the DataNodes is not enough to create new Regions.
* STORAGE_GROUP_NOT_EXIST if some StorageGroup don't exist.
*/
public DataSet getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan req) {
// After all the SchemaPartitions are allocated,
// all the read requests about SchemaPartitionTable are parallel.
SchemaPartitionResp resp = (SchemaPartitionResp) getSchemaPartition(req);
if (resp.isAllPartitionsExist()) {
return resp;
}
// We serialize the creation process of SchemaPartitions to
// ensure that each SchemaPartition is created by a unique CreateSchemaPartitionReq.
// Because the number of SchemaPartitions per storage group is limited
// by the number of SeriesPartitionSlots,
// the number of serialized CreateSchemaPartitionReqs is acceptable.
synchronized (this) {
// Filter unassigned SchemaPartitionSlots
Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap =
partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap());
// Here we ensure that each StorageGroup has at least one SchemaRegion.
// And if some StorageGroups own too many slots, extend SchemaRegion for them.
// Map<StorageGroup, unassigned SeriesPartitionSlot count>
Map<String, Integer> unassignedSchemaPartitionSlotsCountMap = new ConcurrentHashMap<>();
unassignedSchemaPartitionSlotsMap.forEach(
(storageGroup, unassignedSchemaPartitionSlots) ->
unassignedSchemaPartitionSlotsCountMap.put(
storageGroup, unassignedSchemaPartitionSlots.size()));
TSStatus status =
extendRegionsIfNecessary(
unassignedSchemaPartitionSlotsCountMap, TConsensusGroupType.SchemaRegion);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Return an error code if Region extension failed
resp.setStatus(status);
return resp;
}
status = getConsensusManager().confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Here we check the leadership second time
// since the RegionGroup creating process might take some time
resp.setStatus(status);
return resp;
} else {
// Allocate SchemaPartitions only if
// the current ConfigNode still holds its leadership
Map<String, SchemaPartitionTable> assignedSchemaPartition;
try {
assignedSchemaPartition =
getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
} catch (NotAvailableRegionGroupException e) {
LOGGER.error(e.getMessage());
resp.setStatus(
new TSStatus(TSStatusCode.NOT_AVAILABLE_REGION_GROUP.getStatusCode())
.setMessage(e.getMessage()));
return resp;
}
// Cache allocating result
CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
getConsensusManager().write(createPlan);
}
}
return getSchemaPartition(req);
}
/**
* Get DataPartition and create a new one if it does not exist
*
* @param req DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
* List<TimePartitionSlot>>>
* @return DataPartitionResp with DataPartition and TSStatus. SUCCESS_STATUS if all process
* finish. NOT_ENOUGH_DATA_NODE if the DataNodes is not enough to create new Regions.
* STORAGE_GROUP_NOT_EXIST if some StorageGroup don't exist.
*/
public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) {
// After all the DataPartitions are allocated,
// all the read requests about DataPartitionTable are parallel.
DataPartitionResp resp = (DataPartitionResp) getDataPartition(req);
if (resp.isAllPartitionsExist()) {
return resp;
}
// We serialize the creation process of DataPartitions to
// ensure that each DataPartition is created by a unique CreateDataPartitionReq.
// Because the number of DataPartitions per storage group is limited
// by the number of SeriesPartitionSlots,
// the number of serialized CreateDataPartitionReqs is acceptable.
synchronized (this) {
// Filter unassigned DataPartitionSlots
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
unassignedDataPartitionSlotsMap =
partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap());
// Here we ensure that each StorageGroup has at least one DataRegion.
// And if some StorageGroups own too many slots, extend DataRegion for them.
// Map<StorageGroup, unassigned SeriesPartitionSlot count>
Map<String, Integer> unassignedDataPartitionSlotsCountMap = new ConcurrentHashMap<>();
unassignedDataPartitionSlotsMap.forEach(
(storageGroup, unassignedDataPartitionSlots) ->
unassignedDataPartitionSlotsCountMap.put(
storageGroup, unassignedDataPartitionSlots.size()));
TSStatus status =
extendRegionsIfNecessary(
unassignedDataPartitionSlotsCountMap, TConsensusGroupType.DataRegion);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Return an error code if Region extension failed
resp.setStatus(status);
return resp;
}
status = getConsensusManager().confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Here we check the leadership second time
// since the RegionGroup creating process might take some time
resp.setStatus(status);
return resp;
} else {
// Allocate DataPartitions only if
// the current ConfigNode still holds its leadership
Map<String, DataPartitionTable> assignedDataPartition;
try {
assignedDataPartition =
getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
} catch (NotAvailableRegionGroupException e) {
LOGGER.error(e.getMessage());
resp.setStatus(
new TSStatus(TSStatusCode.NOT_AVAILABLE_REGION_GROUP.getStatusCode())
.setMessage(e.getMessage()));
return resp;
}
// Cache allocating result
CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
createPlan.setAssignedDataPartition(assignedDataPartition);
getConsensusManager().write(createPlan);
}
}
return getDataPartition(req);
}
// ======================================================
// Leader scheduling interfaces
// ======================================================
/**
* Allocate more Regions to StorageGroups who have too many slots.
*
* @param unassignedPartitionSlotsCountMap Map<StorageGroup, unassigned Partition count>
* @param consensusGroupType SchemaRegion or DataRegion
* @return SUCCESS_STATUS when Region extension successful; NOT_ENOUGH_DATA_NODE when there are
* not enough DataNodes; STORAGE_GROUP_NOT_EXIST when some StorageGroups don't exist
*/
private TSStatus extendRegionsIfNecessary(
Map<String, Integer> unassignedPartitionSlotsCountMap,
TConsensusGroupType consensusGroupType) {
TSStatus result = new TSStatus();
try {
// Map<StorageGroup, Region allotment>
Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
final String storageGroup = entry.getKey();
final int unassignedPartitionSlotsCount = entry.getValue();
float allocatedRegionCount = partitionInfo.getRegionCount(storageGroup, consensusGroupType);
// The slotCount equals to the sum of assigned slot count and unassigned slot count
float slotCount =
(float) partitionInfo.getAssignedSeriesPartitionSlotsCount(storageGroup)
+ unassignedPartitionSlotsCount;
float maxRegionCount =
getClusterSchemaManager().getMaxRegionGroupCount(storageGroup, consensusGroupType);
float maxSlotCount =
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
/* Region extension is required in the following cases */
// 1. There are no Region has been created for the current StorageGroup
if (allocatedRegionCount == 0) {
// The delta is equal to the smallest integer solution that satisfies the inequality:
// slotCount / delta < maxSlotCount / maxRegionCount
int delta =
Math.min(
(int) maxRegionCount,
Math.max(1, (int) Math.ceil(slotCount * maxRegionCount / maxSlotCount)));
allotmentMap.put(storageGroup, delta);
continue;
}
// 2. The average number of partitions held by each Region will be greater than the
// expected average number after the partition allocation is completed
if (allocatedRegionCount < maxRegionCount
&& slotCount / allocatedRegionCount > maxSlotCount / maxRegionCount) {
// The delta is equal to the smallest integer solution that satisfies the inequality:
// slotCount / (allocatedRegionCount + delta) < maxSlotCount / maxRegionCount
int delta =
Math.min(
(int) (maxRegionCount - allocatedRegionCount),
Math.max(
1,
(int)
Math.ceil(
slotCount * maxRegionCount / maxSlotCount - allocatedRegionCount)));
allotmentMap.put(storageGroup, delta);
continue;
}
// 3. All RegionGroups in the specified StorageGroup are disabled currently
if (allocatedRegionCount
== filterRegionGroupThroughStatus(storageGroup, RegionGroupStatus.Disabled).size()
&& allocatedRegionCount < maxRegionCount) {
allotmentMap.put(storageGroup, 1);
}
}
if (!allotmentMap.isEmpty()) {
CreateRegionGroupsPlan createRegionGroupsPlan =
getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
result = getProcedureManager().createRegionGroups(createRegionGroupsPlan);
} else {
result = RpcUtils.SUCCESS_STATUS;
}
} catch (NotEnoughDataNodeException e) {
String prompt = "ConfigNode failed to extend Region because there are not enough DataNodes";
LOGGER.error(prompt);
result.setCode(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
result.setMessage(prompt);
} catch (StorageGroupNotExistsException e) {
String prompt = "ConfigNode failed to extend Region because some StorageGroup doesn't exist.";
LOGGER.error(prompt);
result.setCode(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode());
result.setMessage(prompt);
}
return result;
}
/**
* Only leader use this interface. Checks whether the specified DataPartition has a predecessor
* and returns if it does
*
* @param storageGroup StorageGroupName
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
* @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
public TConsensusGroupId getPrecededDataPartition(
String storageGroup,
TSeriesPartitionSlot seriesPartitionSlot,
TTimePartitionSlot timePartitionSlot,
long timePartitionInterval) {
return partitionInfo.getPrecededDataPartition(
storageGroup, seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
}
/**
* Get the DataNodes who contain the specific StorageGroup's Schema or Data
*
* @param storageGroup The specific StorageGroup's name
* @param type SchemaRegion or DataRegion
* @return Set<TDataNodeLocation>, the related DataNodes
*/
public Set<TDataNodeLocation> getStorageGroupRelatedDataNodes(
String storageGroup, TConsensusGroupType type) {
return partitionInfo.getStorageGroupRelatedDataNodes(storageGroup, type);
}
/**
* Only leader use this interface
*
* @return Deep copy of all Regions' RegionReplicaSet
*/
public List<TRegionReplicaSet> getAllReplicaSets() {
return partitionInfo.getAllReplicaSets();
}
/**
* Only leader use this interface.
*
* @param storageGroup The specified StorageGroup
* @return All Regions' RegionReplicaSet of the specified StorageGroup
*/
public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
return partitionInfo.getAllReplicaSets(storageGroup);
}
/**
* Only leader use this interface. Get the number of Regions currently owned by the specific
* StorageGroup
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
* @return Number of Regions currently owned by the specific StorageGroup
* @throws StorageGroupNotExistsException When the specific StorageGroup doesn't exist
*/
public int getRegionCount(String storageGroup, TConsensusGroupType type)
throws StorageGroupNotExistsException {
return partitionInfo.getRegionCount(storageGroup, type);
}
/**
* Only leader use this interface.
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
* @return The specific StorageGroup's Regions that sorted by the number of allocated slots
* @throws NotAvailableRegionGroupException When all RegionGroups within the specified
* StorageGroup are unavailable currently
*/
public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
String storageGroup, TConsensusGroupType type) throws NotAvailableRegionGroupException {
// Collect static data
List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter =
partitionInfo.getRegionGroupSlotsCounter(storageGroup, type);
// Filter RegionGroups that have Disabled status
List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) {
// Use Running or Available RegionGroups
RegionGroupStatus status = getRegionGroupStatus(slotsCounter.getRight());
if (RegionGroupStatus.Running.equals(status) || RegionGroupStatus.Available.equals(status)) {
result.add(slotsCounter);
}
}
if (result.isEmpty()) {
throw new NotAvailableRegionGroupException();
}
result.sort(Comparator.comparingLong(Pair::getLeft));
return result;
}
/**
* Only leader use this interface
*
* @return the next RegionGroupId
*/
public int generateNextRegionGroupId() {
return partitionInfo.generateNextRegionGroupId();
}
/**
* GetNodePathsPartition
*
* @param physicalPlan GetNodesPathsPartitionReq
* @return SchemaNodeManagementPartitionDataSet that contains only existing matched
* SchemaPartition and matched child paths aboveMTree
*/
public DataSet getNodePathsPartition(GetNodePathsPartitionPlan physicalPlan) {
SchemaNodeManagementResp schemaNodeManagementResp;
ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
schemaNodeManagementResp = (SchemaNodeManagementResp) consensusReadResponse.getDataset();
return schemaNodeManagementResp;
}
public void preDeleteStorageGroup(
String storageGroup, PreDeleteStorageGroupPlan.PreDeleteType preDeleteType) {
final PreDeleteStorageGroupPlan preDeleteStorageGroupPlan =
new PreDeleteStorageGroupPlan(storageGroup, preDeleteType);
getConsensusManager().write(preDeleteStorageGroupPlan);
}
public void addMetrics() {
MetricService.getInstance().addMetricSet(new PartitionInfoMetrics(partitionInfo));
}
/**
* Get TSeriesPartitionSlot
*
* @param devicePath Full path ending with device name
* @return SeriesPartitionSlot
*/
public TSeriesPartitionSlot getSeriesPartitionSlot(String devicePath) {
return executor.getSeriesPartitionSlot(devicePath);
}
public DataSet getRegionInfoList(GetRegionInfoListPlan req) {
// Get static result
RegionInfoListResp regionInfoListResp =
(RegionInfoListResp) getConsensusManager().read(req).getDataset();
Map<TConsensusGroupId, Integer> allLeadership = getAllLeadership();
// Get cached result
regionInfoListResp
.getRegionInfoList()
.forEach(
regionInfo -> {
regionInfo.setStatus(
getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
.getStatus());
String regionType =
regionInfo.getDataNodeId()
== allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
? RegionRoleType.Leader.toString()
: RegionRoleType.Follower.toString();
regionInfo.setRoleType(regionType);
});
return regionInfoListResp;
}
/**
* update region location
*
* @param req UpdateRegionLocationReq
* @return TSStatus
*/
public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) {
// Remove heartbeat cache if exists
if (regionGroupCacheMap.containsKey(req.getRegionId())) {
regionGroupCacheMap
.get(req.getRegionId())
.removeCacheIfExists(req.getOldNode().getDataNodeId());
}
return getConsensusManager().write(req).getStatus();
}
/**
* get storage group for region
*
* @param regionId regionId
* @return storage group name
*/
public String getRegionStorageGroup(TConsensusGroupId regionId) {
return partitionInfo.getRegionStorageGroup(regionId);
}
/**
* Called by {@link PartitionManager#regionMaintainer}
*
* <p>Periodically maintain the RegionReplicas to be created or deleted
*/
public void maintainRegionReplicas() {
// the consensusManager of configManager may not be fully initialized at this time
Optional.ofNullable(getConsensusManager())
.ifPresent(
consensusManager -> {
if (getConsensusManager().isLeader()) {
List<RegionMaintainTask> regionMaintainTaskList =
partitionInfo.getRegionMaintainEntryList();
if (!regionMaintainTaskList.isEmpty()) {
for (RegionMaintainTask entry : regionMaintainTaskList) {
TSStatus status;
switch (entry.getType()) {
case CREATE:
RegionCreateTask createEntry = (RegionCreateTask) entry;
LOGGER.info(
"Start to create Region: {} on DataNode: {}",
createEntry.getRegionReplicaSet().getRegionId(),
createEntry.getTargetDataNode());
switch (createEntry.getRegionReplicaSet().getRegionId().getType()) {
case SchemaRegion:
// Create SchemaRegion
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
createEntry.getTargetDataNode().getInternalEndPoint(),
new TCreateSchemaRegionReq(
createEntry.getRegionReplicaSet(),
createEntry.getStorageGroup()),
DataNodeRequestType.CREATE_SCHEMA_REGION);
break;
case DataRegion:
default:
// Create DataRegion
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
createEntry.getTargetDataNode().getInternalEndPoint(),
new TCreateDataRegionReq(
createEntry.getRegionReplicaSet(),
createEntry.getStorageGroup())
.setTtl(createEntry.getTTL()),
DataNodeRequestType.CREATE_DATA_REGION);
}
break;
case DELETE:
default:
// Delete Region
RegionDeleteTask deleteEntry = (RegionDeleteTask) entry;
LOGGER.info(
"Start to delete Region: {} on DataNode: {}",
deleteEntry.getRegionId(),
deleteEntry.getTargetDataNode());
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
deleteEntry.getTargetDataNode().getInternalEndPoint(),
deleteEntry.getRegionId(),
DataNodeRequestType.DELETE_REGION);
}
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Poll the head entry if success
getConsensusManager().write(new PollRegionMaintainTaskPlan());
} else {
// Here we just break and wait until next schedule task
// due to all the RegionMaintainEntry should be executed by
// the order of they were offered
break;
}
}
}
}
});
}
public void startRegionCleaner() {
synchronized (scheduleMonitor) {
if (currentRegionMaintainerFuture == null) {
/* Start the RegionCleaner service */
currentRegionMaintainerFuture =
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
regionMaintainer,
this::maintainRegionReplicas,
0,
REGION_MAINTAINER_WORK_INTERVAL,
TimeUnit.SECONDS);
LOGGER.info("RegionCleaner is started successfully.");
}
}
}
public void stopRegionCleaner() {
synchronized (scheduleMonitor) {
if (currentRegionMaintainerFuture != null) {
/* Stop the RegionCleaner service */
currentRegionMaintainerFuture.cancel(false);
currentRegionMaintainerFuture = null;
regionGroupCacheMap.clear();
LOGGER.info("RegionCleaner is stopped successfully.");
}
}
}
public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() {
return regionGroupCacheMap;
}
public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
regionGroupCacheMap.remove(consensusGroupId);
}
/**
* Get the leadership of each RegionGroup.
*
* @return Map<RegionGroupId, DataNodeId where the leader located>
* <p>Some RegionGroups that supposed to be occurred in the result map might be nonexistent
* and some leaderId might be -1(leader unknown yet) due to heartbeat latency
*/
public Map<TConsensusGroupId, Integer> getAllLeadership() {
// TODO: Will be optimized by IOTDB-4341
Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
if (ConfigNodeDescriptor.getInstance()
.getConf()
.getDataRegionConsensusProtocolClass()
.equals(ConsensusFactory.MultiLeaderConsensus)) {
regionGroupCacheMap.forEach(
(consensusGroupId, regionGroupCache) -> {
if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) {
int leaderDataNodeId = regionGroupCache.getLeaderDataNodeId();
if (configManager.getNodeManager().isNodeRemoving(leaderDataNodeId)) {
result.put(consensusGroupId, -1);
} else {
result.put(consensusGroupId, leaderDataNodeId);
}
}
});
getLoadManager()
.getRouteBalancer()
.getRouteMap()
.forEach(
(consensusGroupId, regionReplicaSet) ->
result.put(
consensusGroupId,
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId()));
} else {
regionGroupCacheMap.forEach(
(consensusGroupId, regionGroupCache) -> {
int leaderDataNodeId = regionGroupCache.getLeaderDataNodeId();
if (configManager.getNodeManager().isNodeRemoving(leaderDataNodeId)) {
result.put(consensusGroupId, -1);
} else {
result.put(consensusGroupId, leaderDataNodeId);
}
});
}
return result;
}
/**
* Filter the RegionGroups in the specified StorageGroup through the RegionGroupStatus
*
* @param storageGroup The specified StorageGroup
* @param status The specified RegionGroupStatus
* @return Filtered RegionGroups with the specific RegionGroupStatus
*/
public List<TRegionReplicaSet> filterRegionGroupThroughStatus(
String storageGroup, RegionGroupStatus... status) {
return getAllReplicaSets(storageGroup).stream()
.filter(
regionReplicaSet -> {
TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId();
return regionGroupCacheMap.containsKey(regionGroupId)
&& Arrays.stream(status)
.anyMatch(
s ->
s.equals(
regionGroupCacheMap.get(regionGroupId).getRegionGroupStatus()));
})
.collect(Collectors.toList());
}
/**
* Safely get RegionStatus
*
* @param consensusGroupId Specified RegionGroupId
* @param dataNodeId Specified RegionReplicaId
* @return Corresponding RegionStatus if cache exists, Unknown otherwise
*/
public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
return regionGroupCacheMap.containsKey(consensusGroupId)
? regionGroupCacheMap.get(consensusGroupId).getRegionStatus(dataNodeId)
: RegionStatus.Unknown;
}
/**
* Safely get RegionGroupStatus
*
* @param consensusGroupId Specified RegionGroupId
* @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
*/
public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
return regionGroupCacheMap.containsKey(consensusGroupId)
? regionGroupCacheMap.get(consensusGroupId).getRegionGroupStatus()
: RegionGroupStatus.Disabled;
}
public void cacheHeartbeatSample(
TConsensusGroupId regionGroupId, RegionHeartbeatSample regionHeartbeatSample) {
regionGroupCacheMap
.computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
.cacheHeartbeatSample(regionHeartbeatSample);
regionGroupCacheMap.get(regionGroupId).updateRegionStatistics();
}
public ScheduledExecutorService getRegionMaintainer() {
return regionMaintainer;
}
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
private ClusterSchemaManager getClusterSchemaManager() {
return configManager.getClusterSchemaManager();
}
private LoadManager getLoadManager() {
return configManager.getLoadManager();
}
private ProcedureManager getProcedureManager() {
return configManager.getProcedureManager();
}
}