| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.confignode.manager.partition; |
| |
| import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; |
| import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; |
| import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; |
| import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; |
| import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; |
| import org.apache.iotdb.commons.cluster.RegionRoleType; |
| import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; |
| import org.apache.iotdb.commons.concurrent.ThreadName; |
| import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; |
| import org.apache.iotdb.commons.partition.DataPartitionTable; |
| import org.apache.iotdb.commons.partition.SchemaPartitionTable; |
| import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.confignode.client.DataNodeRequestType; |
| import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; |
| import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; |
| import org.apache.iotdb.confignode.conf.ConfigNodeConfig; |
| import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; |
| import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateSchemaPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; |
| import org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp; |
| import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp; |
| import org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp; |
| import org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp; |
| import org.apache.iotdb.confignode.consensus.response.partition.GetTimeSlotListResp; |
| import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp; |
| import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp; |
| import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionResp; |
| import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; |
| import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; |
| import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; |
| import org.apache.iotdb.confignode.manager.IManager; |
| import org.apache.iotdb.confignode.manager.ProcedureManager; |
| import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; |
| import org.apache.iotdb.confignode.manager.load.LoadManager; |
| import org.apache.iotdb.confignode.manager.node.NodeManager; |
| import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; |
| import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; |
| import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; |
| import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; |
| import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask; |
| import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType; |
| import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; |
| import org.apache.iotdb.consensus.exception.ConsensusException; |
| import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; |
| import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| |
| import org.apache.tsfile.utils.Pair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| |
| /** The {@link PartitionManager} manages cluster PartitionTable read and write requests. */ |
| public class PartitionManager { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class); |
| |
| private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); |
| private static final RegionGroupExtensionPolicy SCHEMA_REGION_GROUP_EXTENSION_POLICY = |
| CONF.getSchemaRegionGroupExtensionPolicy(); |
| private static final RegionGroupExtensionPolicy DATA_REGION_GROUP_EXTENSION_POLICY = |
| CONF.getDataRegionGroupExtensionPolicy(); |
| |
| private final IManager configManager; |
| private final PartitionInfo partitionInfo; |
| |
| private SeriesPartitionExecutor executor; |
| |
| private static final String CONSENSUS_READ_ERROR = |
| "Failed in the read API executing the consensus layer due to: "; |
| |
| private static final String CONSENSUS_WRITE_ERROR = |
| "Failed in the write API executing the consensus layer due to: "; |
| |
| /** Region cleaner. */ |
| // Monitor for leadership change |
| private final Object scheduleMonitor = new Object(); |
| |
| // Try to delete Regions in every 10s |
| private static final int REGION_MAINTAINER_WORK_INTERVAL = 10; |
| private final ScheduledExecutorService regionMaintainer; |
| private Future<?> currentRegionMaintainerFuture; |
| |
| public PartitionManager(IManager configManager, PartitionInfo partitionInfo) { |
| this.configManager = configManager; |
| this.partitionInfo = partitionInfo; |
| this.regionMaintainer = |
| IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( |
| ThreadName.CONFIG_NODE_REGION_MAINTAINER.getName()); |
| setSeriesPartitionExecutor(); |
| } |
| |
| /** Construct SeriesPartitionExecutor by iotdb-confignode{@literal .}properties. */ |
| private void setSeriesPartitionExecutor() { |
| this.executor = |
| SeriesPartitionExecutor.getSeriesPartitionExecutor( |
| CONF.getSeriesPartitionExecutorClass(), CONF.getSeriesSlotNum()); |
| } |
| |
| // ====================================================== |
| // Consensus read/write interfaces |
| // ====================================================== |
| |
| /** |
| * Thread-safely get SchemaPartition. |
| * |
| * @param req SchemaPartitionPlan with partitionSlotsMap |
| * @return SchemaPartitionDataSet that contains only existing SchemaPartition |
| */ |
| public SchemaPartitionResp getSchemaPartition(GetSchemaPartitionPlan req) { |
| try { |
| return (SchemaPartitionResp) getConsensusManager().read(req); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_READ_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return new SchemaPartitionResp(res, false, Collections.emptyMap()); |
| } |
| } |
| |
| /** |
| * Thread-safely get DataPartition |
| * |
| * @param req DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot, |
| * TTimeSlotList>> |
| * @return DataPartitionDataSet that contains only existing DataPartition |
| */ |
| public DataPartitionResp getDataPartition(GetDataPartitionPlan req) { |
| try { |
| return (DataPartitionResp) getConsensusManager().read(req); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_READ_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return new DataPartitionResp(res, false, Collections.emptyMap()); |
| } |
| } |
| |
| /** |
| * Get SchemaPartition and create a new one if it does not exist. |
| * |
| * @param req SchemaPartitionPlan with partitionSlotsMap |
| * @return SchemaPartitionResp with DataPartition and TSStatus. SUCCESS_STATUS if all process |
| * finish. NOT_ENOUGH_DATA_NODE if the DataNodes is not enough to create new Regions. |
| * STORAGE_GROUP_NOT_EXIST if some StorageGroup don't exist. |
| */ |
| public SchemaPartitionResp getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan req) { |
| // Check if the related Databases exist |
| for (String database : req.getPartitionSlotsMap().keySet()) { |
| if (!isDatabaseExist(database)) { |
| return new SchemaPartitionResp( |
| new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) |
| .setMessage( |
| String.format( |
| "Create SchemaPartition failed because the database: %s is not exists", |
| database)), |
| false, |
| null); |
| } |
| } |
| |
| // After all the SchemaPartitions are allocated, |
| // all the read requests about SchemaPartitionTable are parallel. |
| SchemaPartitionResp resp = getSchemaPartition(req); |
| if (resp.isAllPartitionsExist()) { |
| return resp; |
| } |
| |
| // We serialize the creation process of SchemaPartitions to |
| // ensure that each SchemaPartition is created by a unique CreateSchemaPartitionReq. |
| // Because the number of SchemaPartitions per database is limited |
| // by the number of SeriesPartitionSlots, |
| // the number of serialized CreateSchemaPartitionReqs is acceptable. |
| synchronized (this) { |
| // Here we should check again if the SchemaPartition |
| // has been created by other threads to improve concurrent performance |
| resp = getSchemaPartition(req); |
| if (resp.isAllPartitionsExist()) { |
| return resp; |
| } |
| |
| // Filter unassigned SchemaPartitionSlots |
| Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap = |
| partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap()); |
| |
| // Here we ensure that each StorageGroup has at least one SchemaRegion. |
| // And if some StorageGroups own too many slots, extend SchemaRegion for them. |
| |
| // Map<StorageGroup, unassigned SeriesPartitionSlot count> |
| Map<String, Integer> unassignedSchemaPartitionSlotsCountMap = new ConcurrentHashMap<>(); |
| unassignedSchemaPartitionSlotsMap.forEach( |
| (storageGroup, unassignedSchemaPartitionSlots) -> |
| unassignedSchemaPartitionSlotsCountMap.put( |
| storageGroup, unassignedSchemaPartitionSlots.size())); |
| TSStatus status = |
| extendRegionGroupIfNecessary( |
| unassignedSchemaPartitionSlotsCountMap, TConsensusGroupType.SchemaRegion); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // Return an error code if Region extension failed |
| resp.setStatus(status); |
| return resp; |
| } |
| |
| Map<String, SchemaPartitionTable> assignedSchemaPartition; |
| try { |
| assignedSchemaPartition = |
| getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); |
| } catch (NoAvailableRegionGroupException e) { |
| status = getConsensusManager().confirmLeader(); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // The allocation might fail due to leadership change |
| resp.setStatus(status); |
| return resp; |
| } |
| |
| LOGGER.error("Create SchemaPartition failed because: ", e); |
| resp.setStatus( |
| new TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode()) |
| .setMessage(e.getMessage())); |
| return resp; |
| } |
| |
| // Cache allocating result only if the current ConfigNode still holds its leadership |
| CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan(); |
| createPlan.setAssignedSchemaPartition(assignedSchemaPartition); |
| |
| status = consensusWritePartitionResult(createPlan); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // The allocation might fail due to consensus error |
| resp.setStatus(status); |
| return resp; |
| } |
| } |
| |
| resp = getSchemaPartition(req); |
| if (!resp.isAllPartitionsExist()) { |
| // Count the fail rate |
| AtomicInteger totalSlotNum = new AtomicInteger(); |
| req.getPartitionSlotsMap() |
| .forEach((database, partitionSlots) -> totalSlotNum.addAndGet(partitionSlots.size())); |
| |
| AtomicInteger unassignedSlotNum = new AtomicInteger(); |
| Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap = |
| partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap()); |
| unassignedSchemaPartitionSlotsMap.forEach( |
| (database, unassignedSchemaPartitionSlots) -> |
| unassignedSlotNum.addAndGet(unassignedSchemaPartitionSlots.size())); |
| |
| String errMsg = |
| String.format( |
| "Lacked %d/%d SchemaPartition allocation result in the response of getOrCreateSchemaPartition method", |
| unassignedSlotNum.get(), totalSlotNum.get()); |
| LOGGER.error(errMsg); |
| resp.setStatus( |
| new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg)); |
| return resp; |
| } |
| return resp; |
| } |
| |
| /** |
| * Get DataPartition and create a new one if it does not exist. |
| * |
| * @param req DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot, |
| * List<TimePartitionSlot>>> |
| * @return DataPartitionResp with DataPartition and TSStatus. SUCCESS_STATUS if all process |
| * finish. NOT_ENOUGH_DATA_NODE if the DataNodes is not enough to create new Regions. |
| * STORAGE_GROUP_NOT_EXIST if some StorageGroup don't exist. |
| */ |
| public DataPartitionResp getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) { |
| // Check if the related Databases exist |
| for (String database : req.getPartitionSlotsMap().keySet()) { |
| if (!isDatabaseExist(database)) { |
| return new DataPartitionResp( |
| new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) |
| .setMessage( |
| String.format( |
| "Create DataPartition failed because the database: %s is not exists", |
| database)), |
| false, |
| null); |
| } |
| } |
| |
| // After all the DataPartitions are allocated, |
| // all the read requests about DataPartitionTable are parallel. |
| DataPartitionResp resp = getDataPartition(req); |
| if (resp.isAllPartitionsExist()) { |
| return resp; |
| } |
| |
| // We serialize the creation process of DataPartitions to |
| // ensure that each DataPartition is created by a unique CreateDataPartitionReq. |
| // Because the number of DataPartitions per database is limited |
| // by the number of SeriesPartitionSlots, |
| // the number of serialized CreateDataPartitionReqs is acceptable. |
| synchronized (this) { |
| // Here we should check again if the DataPartition |
| // has been created by other threads to improve concurrent performance |
| resp = getDataPartition(req); |
| if (resp.isAllPartitionsExist()) { |
| return resp; |
| } |
| |
| // Filter unassigned DataPartitionSlots |
| Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap = |
| partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap()); |
| |
| // Here we ensure that each StorageGroup has at least one DataRegion. |
| // And if some StorageGroups own too many slots, extend DataRegion for them. |
| |
| // Map<StorageGroup, unassigned SeriesPartitionSlot count> |
| Map<String, Integer> unassignedDataPartitionSlotsCountMap = new ConcurrentHashMap<>(); |
| unassignedDataPartitionSlotsMap.forEach( |
| (storageGroup, unassignedDataPartitionSlots) -> |
| unassignedDataPartitionSlotsCountMap.put( |
| storageGroup, unassignedDataPartitionSlots.size())); |
| TSStatus status = |
| extendRegionGroupIfNecessary( |
| unassignedDataPartitionSlotsCountMap, TConsensusGroupType.DataRegion); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // Return an error code if Region extension failed |
| resp.setStatus(status); |
| return resp; |
| } |
| |
| Map<String, DataPartitionTable> assignedDataPartition; |
| try { |
| assignedDataPartition = |
| getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap); |
| } catch (DatabaseNotExistsException | NoAvailableRegionGroupException e) { |
| status = getConsensusManager().confirmLeader(); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // The allocation might fail due to leadership change |
| resp.setStatus(status); |
| return resp; |
| } |
| |
| LOGGER.error("Create DataPartition failed because: ", e); |
| if (e instanceof DatabaseNotExistsException) { |
| resp.setStatus( |
| new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) |
| .setMessage(e.getMessage())); |
| } else { |
| resp.setStatus( |
| new TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode()) |
| .setMessage(e.getMessage())); |
| } |
| return resp; |
| } |
| |
| // Cache allocating result only if the current ConfigNode still holds its leadership |
| CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan(); |
| createPlan.setAssignedDataPartition(assignedDataPartition); |
| |
| status = consensusWritePartitionResult(createPlan); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // The allocation might fail due to consensus error |
| resp.setStatus(status); |
| return resp; |
| } |
| } |
| |
| resp = getDataPartition(req); |
| if (!resp.isAllPartitionsExist()) { |
| // Count the fail rate |
| AtomicInteger totalSlotNum = new AtomicInteger(); |
| req.getPartitionSlotsMap() |
| .forEach( |
| (database, partitionSlots) -> |
| partitionSlots.forEach( |
| (seriesPartitionSlot, timeSlotList) -> |
| totalSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size()))); |
| |
| AtomicInteger unassignedSlotNum = new AtomicInteger(); |
| Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap = |
| partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap()); |
| unassignedDataPartitionSlotsMap.forEach( |
| (database, unassignedDataPartitionSlots) -> |
| unassignedDataPartitionSlots.forEach( |
| (seriesPartitionSlot, timeSlotList) -> |
| unassignedSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size()))); |
| |
| String errMsg = |
| String.format( |
| "Lacked %d/%d DataPartition allocation result in the response of getOrCreateDataPartition method", |
| unassignedSlotNum.get(), totalSlotNum.get()); |
| LOGGER.error(errMsg); |
| resp.setStatus( |
| new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg)); |
| return resp; |
| } |
| return resp; |
| } |
| |
| private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { |
| TSStatus status = getConsensusManager().confirmLeader(); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // Here we check the leadership second time |
| // since the RegionGroup creating process might take some time |
| return status; |
| } |
| try { |
| return getConsensusManager().write(plan); |
| } catch (ConsensusException e) { |
| // The allocation might fail due to consensus error |
| LOGGER.error("Write DataPartition allocation result failed because: {}", status); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return res; |
| } |
| } |
| |
| // ====================================================== |
| // Leader scheduling interfaces |
| // ====================================================== |
| |
| /** |
| * Allocate more RegionGroup to the specified StorageGroups if necessary. |
| * |
| * @param unassignedPartitionSlotsCountMap Map<StorageGroup, unassigned Partition count> |
| * @param consensusGroupType SchemaRegion or DataRegion |
| * @return SUCCESS_STATUS when RegionGroup extension successful; NOT_ENOUGH_DATA_NODE when there |
| * are not enough DataNodes; STORAGE_GROUP_NOT_EXIST when some StorageGroups don't exist |
| */ |
| private TSStatus extendRegionGroupIfNecessary( |
| Map<String, Integer> unassignedPartitionSlotsCountMap, |
| TConsensusGroupType consensusGroupType) { |
| |
| TSStatus result = new TSStatus(); |
| |
| try { |
| if (TConsensusGroupType.SchemaRegion.equals(consensusGroupType)) { |
| switch (SCHEMA_REGION_GROUP_EXTENSION_POLICY) { |
| case CUSTOM: |
| return customExtendRegionGroupIfNecessary( |
| unassignedPartitionSlotsCountMap, consensusGroupType); |
| case AUTO: |
| default: |
| return autoExtendRegionGroupIfNecessary( |
| unassignedPartitionSlotsCountMap, consensusGroupType); |
| } |
| } else { |
| switch (DATA_REGION_GROUP_EXTENSION_POLICY) { |
| case CUSTOM: |
| return customExtendRegionGroupIfNecessary( |
| unassignedPartitionSlotsCountMap, consensusGroupType); |
| case AUTO: |
| default: |
| return autoExtendRegionGroupIfNecessary( |
| unassignedPartitionSlotsCountMap, consensusGroupType); |
| } |
| } |
| } catch (NotEnoughDataNodeException e) { |
| LOGGER.error(e.getMessage()); |
| result.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode()); |
| result.setMessage(e.getMessage()); |
| } catch (DatabaseNotExistsException e) { |
| LOGGER.error(e.getMessage()); |
| result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()); |
| result.setMessage(e.getMessage()); |
| } |
| |
| return result; |
| } |
| |
| private TSStatus customExtendRegionGroupIfNecessary( |
| Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType) |
| throws DatabaseNotExistsException, NotEnoughDataNodeException { |
| |
| // Map<Database, Region allotment> |
| Map<String, Integer> allotmentMap = new ConcurrentHashMap<>(); |
| |
| for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) { |
| final String database = entry.getKey(); |
| int minRegionGroupNum = |
| getClusterSchemaManager().getMinRegionGroupNum(database, consensusGroupType); |
| int allocatedRegionGroupCount = |
| partitionInfo.getRegionGroupCount(database, consensusGroupType); |
| |
| // Extend RegionGroups until allocatedRegionGroupCount == minRegionGroupNum |
| if (allocatedRegionGroupCount < minRegionGroupNum) { |
| allotmentMap.put(database, minRegionGroupNum - allocatedRegionGroupCount); |
| } |
| } |
| |
| return generateAndAllocateRegionGroups(allotmentMap, consensusGroupType); |
| } |
| |
| private TSStatus autoExtendRegionGroupIfNecessary( |
| Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType) |
| throws NotEnoughDataNodeException, DatabaseNotExistsException { |
| |
| // Map<Database, Region allotment> |
| Map<String, Integer> allotmentMap = new ConcurrentHashMap<>(); |
| |
| for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) { |
| final String database = entry.getKey(); |
| final int unassignedPartitionSlotsCount = entry.getValue(); |
| |
| float allocatedRegionGroupCount = |
| partitionInfo.getRegionGroupCount(database, consensusGroupType); |
| // The slotCount equals to the sum of assigned slot count and unassigned slot count |
| float slotCount = |
| (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(database) |
| + unassignedPartitionSlotsCount; |
| float maxRegionGroupNum = |
| getClusterSchemaManager().getMaxRegionGroupNum(database, consensusGroupType); |
| float maxSlotCount = CONF.getSeriesSlotNum(); |
| |
| /* RegionGroup extension is required in the following cases */ |
| // 1. The number of current RegionGroup of the Database is less than the minimum number |
| int minRegionGroupNum = |
| getClusterSchemaManager().getMinRegionGroupNum(database, consensusGroupType); |
| if (allocatedRegionGroupCount < minRegionGroupNum |
| // Ensure the number of RegionGroups is enough |
| // for current SeriesPartitionSlots after extension |
| // Otherwise, more RegionGroups should be extended through case 2. |
| && slotCount <= (maxSlotCount / maxRegionGroupNum) * minRegionGroupNum) { |
| |
| // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount |
| // no less than the minRegionGroupNum |
| int delta = |
| (int) |
| Math.min( |
| unassignedPartitionSlotsCount, minRegionGroupNum - allocatedRegionGroupCount); |
| allotmentMap.put(database, delta); |
| |
| } else if (allocatedRegionGroupCount < maxRegionGroupNum |
| && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupNum) { |
| // 2. The average number of partitions held by each Region will be greater than the |
| // expected average number after the partition allocation is completed. |
| |
| // The delta is equal to the smallest integer solution that satisfies the inequality: |
| // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupNum |
| int delta = |
| Math.min( |
| (int) (maxRegionGroupNum - allocatedRegionGroupCount), |
| Math.max( |
| 1, |
| (int) |
| Math.ceil( |
| slotCount * maxRegionGroupNum / maxSlotCount |
| - allocatedRegionGroupCount))); |
| allotmentMap.put(database, delta); |
| |
| } else if (allocatedRegionGroupCount |
| == filterRegionGroupThroughStatus(database, RegionGroupStatus.Disabled).size() |
| && allocatedRegionGroupCount < maxRegionGroupNum) { |
| // 3. All RegionGroups in the specified Database are disabled currently |
| allotmentMap.put(database, 1); |
| } |
| } |
| |
| return generateAndAllocateRegionGroups(allotmentMap, consensusGroupType); |
| } |
| |
| private TSStatus generateAndAllocateRegionGroups( |
| Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType) |
| throws NotEnoughDataNodeException, DatabaseNotExistsException { |
| if (!allotmentMap.isEmpty()) { |
| CreateRegionGroupsPlan createRegionGroupsPlan = |
| getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType); |
| LOGGER.info("[CreateRegionGroups] Starting to create the following RegionGroups:"); |
| createRegionGroupsPlan.planLog(LOGGER); |
| return getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan); |
| } else { |
| return RpcUtils.SUCCESS_STATUS; |
| } |
| } |
| |
| /** |
| * Only leader use this interface. Checks whether the specified DataPartition has a successor and |
| * returns if it does. |
| * |
| * @param database DatabaseName |
| * @param seriesPartitionSlot Corresponding SeriesPartitionSlot |
| * @param timePartitionSlot Corresponding TimePartitionSlot |
| * @return The specific DataPartition's successor if exists, null otherwise |
| */ |
| public TConsensusGroupId getSuccessorDataPartition( |
| String database, |
| TSeriesPartitionSlot seriesPartitionSlot, |
| TTimePartitionSlot timePartitionSlot) { |
| return partitionInfo.getSuccessorDataPartition( |
| database, seriesPartitionSlot, timePartitionSlot); |
| } |
| |
| /** |
| * Only leader use this interface. Checks whether the specified DataPartition has a predecessor |
| * and returns if it does. |
| * |
| * @param database DatabaseName |
| * @param seriesPartitionSlot Corresponding SeriesPartitionSlot |
| * @param timePartitionSlot Corresponding TimePartitionSlot |
| * @return The specific DataPartition's predecessor if exists, null otherwise |
| */ |
| public TConsensusGroupId getPredecessorDataPartition( |
| String database, |
| TSeriesPartitionSlot seriesPartitionSlot, |
| TTimePartitionSlot timePartitionSlot) { |
| return partitionInfo.getPredecessorDataPartition( |
| database, seriesPartitionSlot, timePartitionSlot); |
| } |
| |
| /** |
| * Get the DataNodes who contain the specified Database's Schema or Data. |
| * |
| * @param database The specific Database's name |
| * @param type SchemaRegion or DataRegion |
| * @return Set {@literal <}TDataNodeLocation{@literal >}, the related DataNodes |
| */ |
| public Set<TDataNodeLocation> getDatabaseRelatedDataNodes( |
| String database, TConsensusGroupType type) { |
| return partitionInfo.getDatabaseRelatedDataNodes(database, type); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @param type The specified TConsensusGroupType |
| * @return Deep copy of all Regions' RegionReplicaSet and organized to Map |
| */ |
| public Map<TConsensusGroupId, TRegionReplicaSet> getAllReplicaSetsMap(TConsensusGroupType type) { |
| return partitionInfo.getAllReplicaSets(type).stream() |
| .collect( |
| Collectors.toMap(TRegionReplicaSet::getRegionId, regionReplicaSet -> regionReplicaSet)); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @return Deep copy of all Regions' RegionReplicaSet |
| */ |
| public List<TRegionReplicaSet> getAllReplicaSets() { |
| return partitionInfo.getAllReplicaSets(); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @param type The specified TConsensusGroupType |
| * @return Deep copy of all Regions' RegionReplicaSet with the specified TConsensusGroupType |
| */ |
| public List<TRegionReplicaSet> getAllReplicaSets(TConsensusGroupType type) { |
| return partitionInfo.getAllReplicaSets(type); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @param database The specified Database |
| * @return All Regions' RegionReplicaSet of the specified Database |
| */ |
| public List<TRegionReplicaSet> getAllReplicaSets(String database) { |
| return partitionInfo.getAllReplicaSets(database); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @param database The specified Database |
| * @param type SchemaRegion or DataRegion |
| * @return Deep copy of all Regions' RegionReplicaSet with the specified Database and |
| * TConsensusGroupType |
| */ |
| public List<TRegionReplicaSet> getAllReplicaSets(String database, TConsensusGroupType type) { |
| return partitionInfo.getAllReplicaSets(database, type); |
| } |
| |
| /** |
| * Get all RegionGroups currently owned by the specified Database. |
| * |
| * @param dataNodeId The specified dataNodeId |
| * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId |
| */ |
| public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) { |
| return partitionInfo.getAllReplicaSets(dataNodeId); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @param database The specified Database |
| * @param regionGroupIds The specified RegionGroupIds |
| * @return All Regions' RegionReplicaSet of the specified Database |
| */ |
| public List<TRegionReplicaSet> getReplicaSets( |
| String database, List<TConsensusGroupId> regionGroupIds) { |
| return partitionInfo.getReplicaSets(database, regionGroupIds); |
| } |
| |
| public boolean isDataNodeContainsRegion(int dataNodeId, TConsensusGroupId regionId) { |
| return getAllReplicaSets(dataNodeId).stream() |
| .anyMatch(tRegionReplicaSet -> tRegionReplicaSet.getRegionId().equals(regionId)); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * <p>Get the number of Regions currently owned by the specified DataNode |
| * |
| * @param dataNodeId The specified DataNode |
| * @param type SchemaRegion or DataRegion |
| * @return The number of Regions currently owned by the specified DataNode |
| */ |
| public int getRegionCount(int dataNodeId, TConsensusGroupType type) { |
| return partitionInfo.getRegionCount(dataNodeId, type); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * <p>Count the scatter width of the specified DataNode |
| * |
| * @param dataNodeId The specified DataNode |
| * @param type SchemaRegion or DataRegion |
| * @return The schema/data scatter width of the specified DataNode. The scatter width refers to |
| * the number of other DataNodes in the cluster which have at least one identical schema/data |
| * replica as the specified DataNode. |
| */ |
| public int countDataNodeScatterWidth(int dataNodeId, TConsensusGroupType type) { |
| int clusterNodeCount = getNodeManager().getRegisteredNodeCount(); |
| return partitionInfo.countDataNodeScatterWidth(dataNodeId, type, clusterNodeCount); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * <p>Get the number of RegionGroups currently owned by the specified Database |
| * |
| * @param database DatabaseName |
| * @param type SchemaRegion or DataRegion |
| * @return Number of Regions currently owned by the specified Database |
| * @throws DatabaseNotExistsException When the specified Database doesn't exist |
| */ |
| public int getRegionGroupCount(String database, TConsensusGroupType type) |
| throws DatabaseNotExistsException { |
| return partitionInfo.getRegionGroupCount(database, type); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * <p>Get the all RegionGroups currently in the cluster |
| * |
| * @param type SchemaRegion or DataRegion |
| * @return Map<Database, List<RegionGroupIds>> |
| */ |
| public Map<String, List<TConsensusGroupId>> getAllRegionGroupIdMap(TConsensusGroupType type) { |
| return partitionInfo.getAllRegionGroupIdMap(type); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * <p>Get all the RegionGroups currently owned by the specified Database |
| * |
| * @param database DatabaseName |
| * @param type SchemaRegion or DataRegion |
| * @return List of TConsensusGroupId |
| * @throws DatabaseNotExistsException When the specified Database doesn't exist |
| */ |
| public List<TConsensusGroupId> getAllRegionGroupIds(String database, TConsensusGroupType type) |
| throws DatabaseNotExistsException { |
| return partitionInfo.getAllRegionGroupIds(database, type); |
| } |
| |
| /** |
| * Check if the specified Database exists. |
| * |
| * @param database The specified Database |
| * @return True if the DatabaseSchema is exists and the Database is not pre-deleted |
| */ |
| public boolean isDatabaseExist(String database) { |
| return partitionInfo.isDatabaseExisted(database); |
| } |
| |
| /** |
| * Filter the un-exist Databases. |
| * |
| * @param databases the Databases to check |
| * @return List of PartialPath the Databases that not exist |
| */ |
| public List<PartialPath> filterUnExistDatabases(List<PartialPath> databases) { |
| List<PartialPath> unExistDatabases = new ArrayList<>(); |
| if (databases == null) { |
| return unExistDatabases; |
| } |
| for (PartialPath database : databases) { |
| if (!isDatabaseExist(database.getFullPath())) { |
| unExistDatabases.add(database); |
| } |
| } |
| return unExistDatabases; |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * <p>Get the assigned SeriesPartitionSlots count in the specified Database |
| * |
| * @param database The specified Database |
| * @return The assigned SeriesPartitionSlots count |
| */ |
| public int getAssignedSeriesPartitionSlotsCount(String database) { |
| return partitionInfo.getAssignedSeriesPartitionSlotsCount(database); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * <p>Get the assigned TimePartitionSlots count in the specified Database |
| * |
| * @param database The specified Database |
| * @return The assigned TimePartitionSlots count |
| */ |
| public long getAssignedTimePartitionSlotsCount(String database) { |
| return partitionInfo.getAssignedTimePartitionSlotsCount(database); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @param database DatabaseName |
| * @param type SchemaRegion or DataRegion |
| * @return The specific StorageGroup's Regions that sorted by the number of allocated slots |
| * @throws NoAvailableRegionGroupException When all RegionGroups within the specified StorageGroup |
| * are unavailable currently |
| */ |
| public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter( |
| String database, TConsensusGroupType type) throws NoAvailableRegionGroupException { |
| // Collect static data |
| List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter = |
| partitionInfo.getRegionGroupSlotsCounter(database, type); |
| |
| // Filter RegionGroups that have Disabled status |
| List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>(); |
| for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) { |
| RegionGroupStatus status = getLoadManager().getRegionGroupStatus(slotsCounter.getRight()); |
| if (!RegionGroupStatus.Disabled.equals(status)) { |
| result.add(slotsCounter); |
| } |
| } |
| |
| if (result.isEmpty()) { |
| throw new NoAvailableRegionGroupException(type); |
| } |
| |
| Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = |
| getLoadManager() |
| .getRegionGroupStatus(result.stream().map(Pair::getRight).collect(Collectors.toList())); |
| result.sort( |
| (o1, o2) -> { |
| // Use the number of partitions as the first priority |
| if (o1.getLeft() < o2.getLeft()) { |
| return -1; |
| } else if (o1.getLeft() > o2.getLeft()) { |
| return 1; |
| } else { |
| // Use RegionGroup status as second priority, Running > Available > Discouraged |
| return regionGroupStatusMap |
| .get(o1.getRight()) |
| .compare(regionGroupStatusMap.get(o2.getRight())); |
| } |
| }); |
| return result; |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @return Integer set of all schemaengine region id |
| */ |
| public Set<Integer> getAllSchemaPartition() { |
| return partitionInfo.getAllSchemaPartition(); |
| } |
| |
| /** |
| * Only leader use this interface. |
| * |
| * @return the next RegionGroupId |
| */ |
| public int generateNextRegionGroupId() { |
| return partitionInfo.generateNextRegionGroupId(); |
| } |
| |
| public Optional<TConsensusGroupId> generateTConsensusGroupIdByRegionId(final int regionId) { |
| if (configManager |
| .getPartitionManager() |
| .isRegionGroupExists(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) { |
| return Optional.of(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId)); |
| } |
| if (configManager |
| .getPartitionManager() |
| .isRegionGroupExists(new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) { |
| return Optional.of(new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId)); |
| } |
| String msg = |
| String.format( |
| "Submit RegionMigrateProcedure failed, because RegionGroup: %s doesn't exist", |
| regionId); |
| LOGGER.warn(msg); |
| return Optional.empty(); |
| } |
| |
| /** |
| * GetNodePathsPartition. |
| * |
| * @param physicalPlan GetNodesPathsPartitionReq |
| * @return SchemaNodeManagementPartitionDataSet that contains only existing matched |
| * SchemaPartition and matched child paths aboveMTree |
| */ |
| public SchemaNodeManagementResp getNodePathsPartition(GetNodePathsPartitionPlan physicalPlan) { |
| try { |
| return (SchemaNodeManagementResp) getConsensusManager().read(physicalPlan); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_READ_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| SchemaNodeManagementResp resp = new SchemaNodeManagementResp(); |
| resp.setStatus(res); |
| return resp; |
| } |
| } |
| |
| public void preDeleteDatabase( |
| String database, PreDeleteDatabasePlan.PreDeleteType preDeleteType) { |
| final PreDeleteDatabasePlan preDeleteDatabasePlan = |
| new PreDeleteDatabasePlan(database, preDeleteType); |
| try { |
| getConsensusManager().write(preDeleteDatabasePlan); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_WRITE_ERROR, e); |
| } |
| } |
| |
| public boolean isDatabasePreDeleted(String database) { |
| return partitionInfo.isDatabasePreDeleted(database); |
| } |
| |
| /** |
| * Get TSeriesPartitionSlot. |
| * |
| * @param devicePath Full path ending with device name |
| * @return SeriesPartitionSlot |
| */ |
| public TSeriesPartitionSlot getSeriesPartitionSlot(String devicePath) { |
| return executor.getSeriesPartitionSlot(devicePath); |
| } |
| |
| public RegionInfoListResp getRegionInfoList(GetRegionInfoListPlan req) { |
| try { |
| // Get static result |
| RegionInfoListResp regionInfoListResp = (RegionInfoListResp) getConsensusManager().read(req); |
| // Get cached result |
| Map<TConsensusGroupId, Integer> allLeadership = getLoadManager().getRegionLeaderMap(); |
| regionInfoListResp |
| .getRegionInfoList() |
| .forEach( |
| regionInfo -> { |
| regionInfo.setStatus( |
| getLoadManager() |
| .getRegionStatus( |
| regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId()) |
| .getStatus()); |
| |
| String regionType = |
| regionInfo.getDataNodeId() |
| == allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1) |
| ? RegionRoleType.Leader.toString() |
| : RegionRoleType.Follower.toString(); |
| regionInfo.setRoleType(regionType); |
| }); |
| |
| return regionInfoListResp; |
| |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_READ_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| RegionInfoListResp resp = new RegionInfoListResp(); |
| resp.setStatus(res); |
| return resp; |
| } |
| } |
| |
| /** |
| * Check if the specified RegionGroup exists. |
| * |
| * @param regionGroupId The specified RegionGroup |
| */ |
| public boolean isRegionGroupExists(TConsensusGroupId regionGroupId) { |
| return partitionInfo.isRegionGroupExisted(regionGroupId); |
| } |
| |
| public TSStatus addRegionLocation(AddRegionLocationPlan req) { |
| try { |
| return getConsensusManager().write(req); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_WRITE_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return res; |
| } |
| } |
| |
| public TSStatus removeRegionLocation(RemoveRegionLocationPlan req) { |
| try { |
| return getConsensusManager().write(req); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_WRITE_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return res; |
| } |
| } |
| |
| public GetRegionIdResp getRegionId(TGetRegionIdReq req) { |
| GetRegionIdPlan plan = new GetRegionIdPlan(req.getType()); |
| if (req.isSetDatabase()) { |
| plan.setDatabase(req.getDatabase()); |
| } else { |
| plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); |
| plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); |
| } |
| if (Objects.equals(plan.getDatabase(), "")) { |
| // Return empty result if Database not specified |
| return new GetRegionIdResp(RpcUtils.SUCCESS_STATUS, new ArrayList<>()); |
| } |
| if (req.isSetStartTimeSlot()) { |
| plan.setStartTimeSlotId(req.getStartTimeSlot()); |
| } else { |
| plan.setStartTimeSlotId(new TTimePartitionSlot(0)); |
| } |
| if (req.isSetEndTimeSlot()) { |
| plan.setEndTimeSlotId(req.getEndTimeSlot()); |
| } else { |
| plan.setEndTimeSlotId(new TTimePartitionSlot(Long.MAX_VALUE)); |
| } |
| |
| try { |
| return (GetRegionIdResp) getConsensusManager().read(plan); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_READ_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return new GetRegionIdResp(res, Collections.emptyList()); |
| } |
| } |
| |
| public GetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) { |
| long startTime = req.isSetStartTime() ? req.getStartTime() : Long.MIN_VALUE; |
| long endTime = req.isSetEndTime() ? req.getEndTime() : Long.MAX_VALUE; |
| GetTimeSlotListPlan plan = new GetTimeSlotListPlan(startTime, endTime); |
| if (req.isSetDatabase()) { |
| plan.setDatabase(req.getDatabase()); |
| } else if (req.isSetDevice()) { |
| plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); |
| plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); |
| if (Objects.equals(plan.getDatabase(), "")) { |
| // Return empty result if Database not specified |
| return new GetTimeSlotListResp(RpcUtils.SUCCESS_STATUS, new ArrayList<>()); |
| } |
| } else { |
| plan.setRegionId( |
| new TConsensusGroupId(TConsensusGroupType.DataRegion, (int) req.getRegionId())); |
| } |
| try { |
| return (GetTimeSlotListResp) getConsensusManager().read(plan); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_READ_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return new GetTimeSlotListResp(res, Collections.emptyList()); |
| } |
| } |
| |
| public CountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) { |
| long startTime = req.isSetStartTime() ? req.getStartTime() : Long.MIN_VALUE; |
| long endTime = req.isSetEndTime() ? req.getEndTime() : Long.MAX_VALUE; |
| CountTimeSlotListPlan plan = new CountTimeSlotListPlan(startTime, endTime); |
| if (req.isSetDatabase()) { |
| plan.setDatabase(req.getDatabase()); |
| } else if (req.isSetDevice()) { |
| plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); |
| plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); |
| if (Objects.equals(plan.getDatabase(), "")) { |
| // Return empty result if Database not specified |
| return new CountTimeSlotListResp(RpcUtils.SUCCESS_STATUS, 0); |
| } |
| } else { |
| plan.setRegionId( |
| new TConsensusGroupId(TConsensusGroupType.DataRegion, (int) req.getRegionId())); |
| } |
| try { |
| return (CountTimeSlotListResp) getConsensusManager().read(plan); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_READ_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return new CountTimeSlotListResp(res, 0); |
| } |
| } |
| |
| public GetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) { |
| GetSeriesSlotListPlan plan = new GetSeriesSlotListPlan(req.getDatabase(), req.getType()); |
| try { |
| return (GetSeriesSlotListResp) getConsensusManager().read(plan); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_READ_ERROR, e); |
| TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); |
| res.setMessage(e.getMessage()); |
| return new GetSeriesSlotListResp(res, Collections.emptyList()); |
| } |
| } |
| |
| /** |
| * Get database for region. |
| * |
| * @param regionId regionId |
| * @return database name |
| */ |
| public String getRegionStorageGroup(TConsensusGroupId regionId) { |
| return partitionInfo.getRegionStorageGroup(regionId); |
| } |
| |
| /** |
| * Called by {@link PartitionManager#regionMaintainer}. |
| * |
| * <p>Periodically maintain the RegionReplicas to be created or deleted |
| */ |
| public void maintainRegionReplicas() { |
| // The consensusManager of configManager may not be fully initialized at this time |
| Optional.ofNullable(getConsensusManager()) |
| .ifPresent( |
| consensusManager -> { |
| if (getConsensusManager().isLeader()) { |
| List<RegionMaintainTask> regionMaintainTaskList = |
| partitionInfo.getRegionMaintainEntryList(); |
| |
| if (regionMaintainTaskList.isEmpty()) { |
| return; |
| } |
| |
| // Group tasks by region id |
| Map<TConsensusGroupId, Queue<RegionMaintainTask>> regionMaintainTaskMap = |
| new HashMap<>(); |
| for (RegionMaintainTask regionMaintainTask : regionMaintainTaskList) { |
| regionMaintainTaskMap |
| .computeIfAbsent(regionMaintainTask.getRegionId(), k -> new LinkedList<>()) |
| .add(regionMaintainTask); |
| } |
| |
| while (!regionMaintainTaskMap.isEmpty()) { |
| // Select same type task from each region group |
| List<RegionMaintainTask> selectedRegionMaintainTask = new ArrayList<>(); |
| RegionMaintainType currentType = null; |
| for (Map.Entry<TConsensusGroupId, Queue<RegionMaintainTask>> entry : |
| regionMaintainTaskMap.entrySet()) { |
| RegionMaintainTask regionMaintainTask = entry.getValue().peek(); |
| if (regionMaintainTask == null) { |
| continue; |
| } |
| |
| if (currentType == null) { |
| currentType = regionMaintainTask.getType(); |
| selectedRegionMaintainTask.add(entry.getValue().peek()); |
| } else { |
| if (!currentType.equals(regionMaintainTask.getType())) { |
| continue; |
| } |
| |
| if (currentType.equals(RegionMaintainType.DELETE) |
| || entry |
| .getKey() |
| .getType() |
| .equals(selectedRegionMaintainTask.get(0).getRegionId().getType())) { |
| // Delete or same create task |
| selectedRegionMaintainTask.add(entry.getValue().peek()); |
| } |
| } |
| } |
| |
| if (selectedRegionMaintainTask.isEmpty()) { |
| break; |
| } |
| |
| Set<TConsensusGroupId> successfulTask = new HashSet<>(); |
| switch (currentType) { |
| case CREATE: |
| // create region |
| switch (selectedRegionMaintainTask.get(0).getRegionId().getType()) { |
| case SchemaRegion: |
| // create SchemaRegion |
| AsyncClientHandler<TCreateSchemaRegionReq, TSStatus> |
| createSchemaRegionHandler = |
| new AsyncClientHandler<>( |
| DataNodeRequestType.CREATE_SCHEMA_REGION); |
| for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { |
| RegionCreateTask schemaRegionCreateTask = |
| (RegionCreateTask) regionMaintainTask; |
| LOGGER.info( |
| "Start to create Region: {} on DataNode: {}", |
| schemaRegionCreateTask.getRegionReplicaSet().getRegionId(), |
| schemaRegionCreateTask.getTargetDataNode()); |
| createSchemaRegionHandler.putRequest( |
| schemaRegionCreateTask.getRegionId().getId(), |
| new TCreateSchemaRegionReq( |
| schemaRegionCreateTask.getRegionReplicaSet(), |
| schemaRegionCreateTask.getStorageGroup())); |
| createSchemaRegionHandler.putDataNodeLocation( |
| schemaRegionCreateTask.getRegionId().getId(), |
| schemaRegionCreateTask.getTargetDataNode()); |
| } |
| |
| AsyncDataNodeClientPool.getInstance() |
| .sendAsyncRequestToDataNodeWithRetry(createSchemaRegionHandler); |
| |
| for (Map.Entry<Integer, TSStatus> entry : |
| createSchemaRegionHandler.getResponseMap().entrySet()) { |
| if (entry.getValue().getCode() |
| == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| successfulTask.add( |
| new TConsensusGroupId( |
| TConsensusGroupType.SchemaRegion, entry.getKey())); |
| } |
| } |
| break; |
| case DataRegion: |
| // Create DataRegion |
| AsyncClientHandler<TCreateDataRegionReq, TSStatus> |
| createDataRegionHandler = |
| new AsyncClientHandler<>(DataNodeRequestType.CREATE_DATA_REGION); |
| for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { |
| RegionCreateTask dataRegionCreateTask = |
| (RegionCreateTask) regionMaintainTask; |
| LOGGER.info( |
| "Start to create Region: {} on DataNode: {}", |
| dataRegionCreateTask.getRegionReplicaSet().getRegionId(), |
| dataRegionCreateTask.getTargetDataNode()); |
| createDataRegionHandler.putRequest( |
| dataRegionCreateTask.getRegionId().getId(), |
| new TCreateDataRegionReq( |
| dataRegionCreateTask.getRegionReplicaSet(), |
| dataRegionCreateTask.getStorageGroup()) |
| .setTtl(dataRegionCreateTask.getTTL())); |
| createDataRegionHandler.putDataNodeLocation( |
| dataRegionCreateTask.getRegionId().getId(), |
| dataRegionCreateTask.getTargetDataNode()); |
| } |
| |
| AsyncDataNodeClientPool.getInstance() |
| .sendAsyncRequestToDataNodeWithRetry(createDataRegionHandler); |
| |
| for (Map.Entry<Integer, TSStatus> entry : |
| createDataRegionHandler.getResponseMap().entrySet()) { |
| if (entry.getValue().getCode() |
| == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| successfulTask.add( |
| new TConsensusGroupId( |
| TConsensusGroupType.DataRegion, entry.getKey())); |
| } |
| } |
| break; |
| } |
| break; |
| case DELETE: |
| // delete region |
| AsyncClientHandler<TConsensusGroupId, TSStatus> deleteRegionHandler = |
| new AsyncClientHandler<>(DataNodeRequestType.DELETE_REGION); |
| Map<Integer, TConsensusGroupId> regionIdMap = new HashMap<>(); |
| for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { |
| RegionDeleteTask regionDeleteTask = (RegionDeleteTask) regionMaintainTask; |
| LOGGER.info( |
| "Start to delete Region: {} on DataNode: {}", |
| regionDeleteTask.getRegionId(), |
| regionDeleteTask.getTargetDataNode()); |
| deleteRegionHandler.putRequest( |
| regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId()); |
| deleteRegionHandler.putDataNodeLocation( |
| regionDeleteTask.getRegionId().getId(), |
| regionDeleteTask.getTargetDataNode()); |
| regionIdMap.put( |
| regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId()); |
| } |
| |
| long startTime = System.currentTimeMillis(); |
| AsyncDataNodeClientPool.getInstance() |
| .sendAsyncRequestToDataNodeWithRetry(deleteRegionHandler); |
| |
| LOGGER.info( |
| "Deleting regions costs {}ms", (System.currentTimeMillis() - startTime)); |
| |
| for (Map.Entry<Integer, TSStatus> entry : |
| deleteRegionHandler.getResponseMap().entrySet()) { |
| if (entry.getValue().getCode() |
| == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| successfulTask.add(regionIdMap.get(entry.getKey())); |
| } |
| } |
| break; |
| } |
| |
| if (successfulTask.isEmpty()) { |
| break; |
| } |
| |
| for (TConsensusGroupId regionId : successfulTask) { |
| regionMaintainTaskMap.compute( |
| regionId, |
| (k, v) -> { |
| if (v == null) { |
| throw new IllegalStateException(); |
| } |
| v.poll(); |
| if (v.isEmpty()) { |
| return null; |
| } else { |
| return v; |
| } |
| }); |
| } |
| |
| // Poll the head entry if success |
| try { |
| getConsensusManager() |
| .write(new PollSpecificRegionMaintainTaskPlan(successfulTask)); |
| } catch (ConsensusException e) { |
| LOGGER.warn(CONSENSUS_WRITE_ERROR, e); |
| } |
| |
| if (successfulTask.size() < selectedRegionMaintainTask.size()) { |
| // Here we just break and wait until next schedule task |
| // due to all the RegionMaintainEntry should be executed by |
| // the order of they were offered |
| break; |
| } |
| } |
| } |
| }); |
| } |
| |
| public void startRegionCleaner() { |
| synchronized (scheduleMonitor) { |
| if (currentRegionMaintainerFuture == null) { |
| /* Start the RegionCleaner service */ |
| currentRegionMaintainerFuture = |
| ScheduledExecutorUtil.safelyScheduleAtFixedRate( |
| regionMaintainer, |
| this::maintainRegionReplicas, |
| 0, |
| REGION_MAINTAINER_WORK_INTERVAL, |
| TimeUnit.SECONDS); |
| LOGGER.info("RegionCleaner is started successfully."); |
| } |
| } |
| } |
| |
| public void stopRegionCleaner() { |
| synchronized (scheduleMonitor) { |
| if (currentRegionMaintainerFuture != null) { |
| /* Stop the RegionCleaner service */ |
| currentRegionMaintainerFuture.cancel(false); |
| currentRegionMaintainerFuture = null; |
| LOGGER.info("RegionCleaner is stopped successfully."); |
| } |
| } |
| } |
| |
| /** |
| * Filter the RegionGroups in the specified Database through the RegionGroupStatus. |
| * |
| * @param database The specified Database |
| * @param status The specified RegionGroupStatus |
| * @return Filtered RegionGroups with the specified RegionGroupStatus |
| */ |
| public List<TRegionReplicaSet> filterRegionGroupThroughStatus( |
| String database, RegionGroupStatus... status) { |
| List<TConsensusGroupId> matchedRegionGroups = |
| getLoadManager().filterRegionGroupThroughStatus(status); |
| return getReplicaSets(database, matchedRegionGroups); |
| } |
| |
| public void getSchemaRegionIds( |
| List<String> databases, Map<String, List<Integer>> schemaRegionIds) { |
| partitionInfo.getSchemaRegionIds(databases, schemaRegionIds); |
| } |
| |
| public void getDataRegionIds(List<String> databases, Map<String, List<Integer>> dataRegionIds) { |
| partitionInfo.getDataRegionIds(databases, dataRegionIds); |
| } |
| |
| /** |
| * Get the {@link TConsensusGroupType} of the given integer regionId. |
| * |
| * @param regionId The specified integer regionId |
| * @return {@link Optional#of(Object tConsensusGroupType)} of the given integer regionId, or |
| * {@link Optional#empty()} if the integer regionId does not match any of the regionGroups. |
| */ |
| public Optional<TConsensusGroupType> getRegionType(int regionId) { |
| return partitionInfo.getRegionType(regionId); |
| } |
| |
| /** |
| * Get the last DataAllotTable of the specified Database. |
| * |
| * @param database The specified Database |
| * @return The last DataAllotTable |
| */ |
| public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable(String database) { |
| return partitionInfo.getLastDataAllotTable(database); |
| } |
| |
| public ScheduledExecutorService getRegionMaintainer() { |
| return regionMaintainer; |
| } |
| |
| private ConsensusManager getConsensusManager() { |
| return configManager.getConsensusManager(); |
| } |
| |
| private ClusterSchemaManager getClusterSchemaManager() { |
| return configManager.getClusterSchemaManager(); |
| } |
| |
| private LoadManager getLoadManager() { |
| return configManager.getLoadManager(); |
| } |
| |
| private ProcedureManager getProcedureManager() { |
| return configManager.getProcedureManager(); |
| } |
| |
| private NodeManager getNodeManager() { |
| return configManager.getNodeManager(); |
| } |
| } |