| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hugegraph.pd; |
| |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.PriorityQueue; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hugegraph.pd.common.KVPair; |
| import org.apache.hugegraph.pd.common.PDException; |
| import org.apache.hugegraph.pd.config.PDConfig; |
| import org.apache.hugegraph.pd.grpc.MetaTask; |
| import org.apache.hugegraph.pd.grpc.Metapb; |
| import org.apache.hugegraph.pd.grpc.Pdpb; |
| import org.apache.hugegraph.pd.meta.TaskInfoMeta; |
| import org.apache.hugegraph.pd.raft.RaftEngine; |
| |
| import lombok.extern.slf4j.Slf4j; |
| |
| /** |
| * The task scheduling service checks the status of stores, resources, and partitions on a |
| * regular basis, migrates data in a timely manner, and errors are on nodes |
| * 1. Monitor whether the store is offline |
| * 2. Check whether the replica of the partition is correct |
| * 3. Check whether the working mode of the partition is correct |
| * 4. Monitor whether the partition needs to be split and whether the split is completed |
| */ |
| @Slf4j |
| public class TaskScheduleService { |
| |
| private static final String BALANCE_SHARD_KEY = "BALANCE_SHARD_KEY"; |
| // The dynamic balancing can only be carried out after the machine is offline for 30 minutes |
| private final long TurnOffAndBalanceInterval = 30 * 60 * 1000; |
| // leader balances the time interval |
| private final long BalanceLeaderInterval = 30 * 1000; |
| private final PDConfig pdConfig; |
| private final long clusterStartTime; // |
| private final StoreNodeService storeService; |
| private final PartitionService partitionService; |
| private final ScheduledExecutorService executor; |
| private final TaskInfoMeta taskInfoMeta; |
| private final StoreMonitorDataService storeMonitorDataService; |
| private final KvService kvService; |
| private final LogService logService; |
| private final Comparator<KVPair<Long, Integer>> kvPairComparatorAsc = (o1, o2) -> { |
| if (o1.getValue() == o2.getValue()) { |
| return o1.getKey().compareTo(o2.getKey()); |
| } |
| return o1.getValue().compareTo(o2.getValue()); |
| }; |
| private final Comparator<KVPair<Long, Integer>> kvPairComparatorDesc = (o1, o2) -> { |
| if (o1.getValue() == o2.getValue()) { |
| return o2.getKey().compareTo(o1.getKey()); |
| } |
| return o2.getValue().compareTo(o1.getValue()); |
| }; |
| private long lastStoreTurnoffTime = 0; |
| private long lastBalanceLeaderTime = 0; |
| |
| public TaskScheduleService(PDConfig config, StoreNodeService storeService, |
| PartitionService partitionService) { |
| this.pdConfig = config; |
| this.storeService = storeService; |
| this.partitionService = partitionService; |
| this.taskInfoMeta = new TaskInfoMeta(config); |
| this.logService = new LogService(pdConfig); |
| this.storeMonitorDataService = new StoreMonitorDataService(pdConfig); |
| this.clusterStartTime = System.currentTimeMillis(); |
| this.kvService = new KvService(pdConfig); |
| this.executor = new ScheduledThreadPoolExecutor(16); |
| } |
| |
| public void init() { |
| executor.scheduleWithFixedDelay(() -> { |
| try { |
| patrolStores(); |
| } catch (Throwable e) { |
| log.error("patrolStores exception: ", e); |
| } |
| |
| }, 60, 60, TimeUnit.SECONDS); |
| executor.scheduleWithFixedDelay(() -> { |
| try { |
| patrolPartitions(); |
| balancePartitionLeader(false); |
| balancePartitionShard(); |
| } catch (Throwable e) { |
| log.error("patrolPartitions exception: ", e); |
| } |
| }, pdConfig.getPatrolInterval(), pdConfig.getPatrolInterval(), TimeUnit.SECONDS); |
| executor.scheduleWithFixedDelay(() -> { |
| if (isLeader()) { |
| kvService.clearTTLData(); |
| } |
| }, 1000, 1000, TimeUnit.MILLISECONDS); |
| executor.scheduleWithFixedDelay( |
| () -> { |
| if (isLeader()) { |
| storeService.getQuotaChecker(); |
| } |
| }, 2, 30, |
| TimeUnit.SECONDS); |
| // clean expired monitor data each 10 minutes, delay 3min. |
| if (isLeader() && this.pdConfig.getStore().isMonitorDataEnabled()) { |
| executor.scheduleAtFixedRate(() -> { |
| Long expTill = System.currentTimeMillis() / 1000 - |
| this.pdConfig.getStore().getRetentionPeriod(); |
| log.debug("monitor data keys before " + expTill + " will be deleted"); |
| int records = 0; |
| try { |
| for (Metapb.Store store : storeService.getStores()) { |
| int cnt = |
| this.storeMonitorDataService.removeExpiredMonitorData(store.getId(), |
| expTill); |
| log.debug("store id :{}, records:{}", store.getId(), cnt); |
| records += cnt; |
| } |
| } catch (PDException e) { |
| throw new RuntimeException(e); |
| } |
| log.debug(String.format("%d records has been deleted", records)); |
| }, 180, 600, TimeUnit.SECONDS); |
| } |
| |
| storeService.addStatusListener(new StoreStatusListener() { |
| @Override |
| public void onStoreStatusChanged(Metapb.Store store, Metapb.StoreState old, |
| Metapb.StoreState status) { |
| if (status == Metapb.StoreState.Tombstone) { |
| lastStoreTurnoffTime = System.currentTimeMillis(); |
| } |
| |
| if (status == Metapb.StoreState.Up) { |
| executor.schedule(() -> { |
| try { |
| balancePartitionLeader(false); |
| } catch (PDException e) { |
| log.error("exception {}", e); |
| } |
| }, BalanceLeaderInterval, TimeUnit.MILLISECONDS); |
| |
| } |
| } |
| |
| @Override |
| public void onGraphChange(Metapb.Graph graph, |
| Metapb.GraphState stateOld, |
| Metapb.GraphState stateNew) { |
| |
| } |
| |
| @Override |
| public void onStoreRaftChanged(Metapb.Store store) { |
| |
| } |
| }); |
| } |
| |
| public void shutDown() { |
| executor.shutdownNow(); |
| } |
| |
| private boolean isLeader() { |
| return RaftEngine.getInstance().isLeader(); |
| } |
| |
| /** |
| * Inspect all stores to see if they are online and have enough storage space |
| */ |
| public List<Metapb.Store> patrolStores() throws PDException { |
| if (!isLeader()) { |
| return null; |
| } |
| |
| List<Metapb.Store> changedStores = new ArrayList<>(); |
| // Check your store online status |
| List<Metapb.Store> stores = storeService.getStores(""); |
| Map<Long, Metapb.Store> activeStores = storeService.getActiveStores("") |
| .stream().collect( |
| Collectors.toMap(Metapb.Store::getId, t -> t)); |
| for (Metapb.Store store : stores) { |
| Metapb.Store changeStore = null; |
| if ((store.getState() == Metapb.StoreState.Up |
| || store.getState() == Metapb.StoreState.Unknown) |
| && !activeStores.containsKey(store.getId())) { |
| // If you are not online, the modification status is offline |
| changeStore = Metapb.Store.newBuilder(store) |
| .setState(Metapb.StoreState.Offline) |
| .build(); |
| |
| } else if ((store.getState() == Metapb.StoreState.Exiting && |
| !activeStores.containsKey(store.getId())) || |
| (store.getState() == Metapb.StoreState.Offline && |
| (System.currentTimeMillis() - store.getLastHeartbeat() > |
| pdConfig.getStore().getMaxDownTime() * 1000) && |
| (System.currentTimeMillis() - clusterStartTime > |
| pdConfig.getStore().getMaxDownTime() * 1000))) { |
| // Manually change the parameter to Offline or Offline Duration |
| // Modify the status to shut down and increase checkStoreCanOffline detect |
| if (storeService.checkStoreCanOffline(store)) { |
| changeStore = Metapb.Store.newBuilder(store) |
| .setState(Metapb.StoreState.Tombstone).build(); |
| this.logService.insertLog(LogService.NODE_CHANGE, |
| LogService.TASK, changeStore); |
| log.info("patrolStores store {} Offline", changeStore.getId()); |
| } |
| } |
| if (changeStore != null) { |
| storeService.updateStore(changeStore); |
| changedStores.add(changeStore); |
| } |
| } |
| return changedStores; |
| } |
| |
| /** |
| * Inspect all partitions to check whether the number of replicas is correct and the number |
| * of replicas in the shard group |
| */ |
| public List<Metapb.Partition> patrolPartitions() throws PDException { |
| if (!isLeader()) { |
| return null; |
| } |
| |
| // If the number of replicas is inconsistent, reallocate replicas |
| for (Metapb.ShardGroup group : storeService.getShardGroups()) { |
| if (group.getShardsCount() != pdConfig.getPartition().getShardCount()) { |
| storeService.reallocShards(group); |
| kvService.put(BALANCE_SHARD_KEY, "DOING", 180 * 1000); |
| } |
| } |
| // Check if the shard is online. |
| Map<Long, Metapb.Store> tombStores = storeService.getTombStores().stream().collect( |
| Collectors.toMap(Metapb.Store::getId, t -> t)); |
| |
| var partIds = new HashSet<Integer>(); |
| |
| for (var pair : tombStores.entrySet()) { |
| for (var partition : partitionService.getPartitionByStore(pair.getValue())) { |
| if (partIds.contains(partition.getId())) { |
| continue; |
| } |
| partIds.add(partition.getId()); |
| |
| storeService.storeTurnoff(pair.getValue()); |
| partitionService.shardOffline(partition, pair.getValue().getId()); |
| } |
| |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Balance the number of partitions between stores |
| * It takes half an hour for the machine to turn to UP before it can be dynamically balanced |
| */ |
| public synchronized Map<Integer, KVPair<Long, Long>> balancePartitionShard() throws |
| PDException { |
| log.info("balancePartitions starting, isleader:{}", isLeader()); |
| |
| if (!isLeader()) { |
| return null; |
| } |
| |
| if (System.currentTimeMillis() - lastStoreTurnoffTime < TurnOffAndBalanceInterval) { |
| return null; |
| } |
| |
| int activeStores = storeService.getActiveStores().size(); |
| if (activeStores == 0) { |
| log.warn("balancePartitionShard non active stores, skip to balancePartitionShard"); |
| return null; |
| } |
| |
| if (Objects.equals(kvService.get(BALANCE_SHARD_KEY), "DOING")) { |
| return null; |
| } |
| |
| int totalShards = pdConfig.getConfigService().getPartitionCount() * |
| pdConfig.getPartition().getShardCount(); |
| int averageCount = totalShards / activeStores; |
| int remainder = totalShards % activeStores; |
| |
| // Count the partitions on each store, StoreId -> PartitionID, ShardRole |
| Map<Long, Map<Integer, Metapb.ShardRole>> partitionMap = new HashMap<>(); |
| storeService.getActiveStores().forEach(store -> { |
| partitionMap.put(store.getId(), new HashMap<>()); |
| }); |
| |
| AtomicReference<Boolean> isLeaner = new AtomicReference<>(false); |
| partitionService.getPartitions().forEach(partition -> { |
| |
| try { |
| storeService.getShardList(partition.getId()).forEach(shard -> { |
| Long storeId = shard.getStoreId(); |
| if (shard.getRole() == Metapb.ShardRole.Learner |
| || partition.getState() != Metapb.PartitionState.PState_Normal) { |
| isLeaner.set(true); |
| } |
| if (partitionMap.containsKey(storeId)) { |
| partitionMap.get(storeId).put(partition.getId(), shard.getRole()); |
| } |
| }); |
| } catch (PDException e) { |
| log.error("get partition {} shard list error:{}.", partition.getId(), |
| e.getMessage()); |
| } |
| }); |
| |
| if (isLeaner.get()) { |
| log.warn("balancePartitionShard is doing, skip this balancePartitionShard task"); |
| return null; |
| } |
| |
| // According to shard sort the quantity from highest to lowest |
| List<KVPair<Long, Integer>> sortedList = new ArrayList<>(); |
| partitionMap.forEach((storeId, shards) -> { |
| sortedList.add(new KVPair(storeId, shards.size())); |
| }); |
| sortedList.sort(((o1, o2) -> o2.getValue().compareTo(o1.getValue()))); |
| // The largest heap, moved in store -> shard count |
| PriorityQueue<KVPair<Long, Integer>> maxHeap = new PriorityQueue<>(sortedList.size(), |
| (o1, o2) -> o2.getValue() |
| .compareTo( |
| o1.getValue())); |
| |
| // of individual copies committedIndex |
| Map<Integer, Map<Long, Long>> committedIndexMap = partitionService.getCommittedIndexStats(); |
| // Partition ID -->source StoreID, target StoreID |
| Map<Integer, KVPair<Long, Long>> movedPartitions = new HashMap<>(); |
| // Remove redundant shards, traverse the stores in the order of shards from most to |
| // least, and the remainder is allocated to the store with more shards first, reducing |
| // the probability of migration |
| for (int index = 0; index < sortedList.size(); index++) { |
| long storeId = sortedList.get(index).getKey(); |
| if (!partitionMap.containsKey(storeId)) { |
| log.error("cannot found storeId {} in partitionMap", storeId); |
| return null; |
| } |
| Map<Integer, Metapb.ShardRole> shards = partitionMap.get(storeId); |
| int targetCount = index < remainder ? averageCount + 1 : averageCount; |
| // Remove the redundant shards and add the source StoreID. is not a leader, and the |
| // partition is unique |
| if (shards.size() > targetCount) { |
| int movedCount = shards.size() - targetCount; |
| log.info( |
| "balancePartitionShard storeId {}, shardsSize {}, targetCount {}, " + |
| "moveCount {}", |
| storeId, shards.size(), targetCount, movedCount); |
| for (Iterator<Integer> iterator = shards.keySet().iterator(); |
| movedCount > 0 && iterator.hasNext(); ) { |
| Integer id = iterator.next(); |
| |
| if (!movedPartitions.containsKey(id)) { |
| log.info("store {}, shard of partition {} can be moved", storeId, id); |
| movedPartitions.put(id, new KVPair<>(storeId, 0L)); |
| movedCount--; |
| } |
| } |
| } else if (shards.size() < targetCount) { |
| int addCount = targetCount - shards.size(); |
| log.info( |
| "balancePartitionShard storeId {}, shardsSize {}, targetCount {}, " + |
| "addCount {}", |
| storeId, shards.size(), targetCount, addCount); |
| maxHeap.add(new KVPair<>(storeId, addCount)); |
| } |
| } |
| |
| if (movedPartitions.size() == 0) { |
| log.warn( |
| "movedPartitions is empty, totalShards:{} averageCount:{} remainder:{} " + |
| "sortedList:{}", |
| totalShards, averageCount, remainder, sortedList); |
| } |
| Iterator<Map.Entry<Integer, KVPair<Long, Long>>> moveIterator = |
| movedPartitions.entrySet().iterator(); |
| |
| while (moveIterator.hasNext()) { |
| if (maxHeap.size() == 0) { |
| break; |
| } |
| Map.Entry<Integer, KVPair<Long, Long>> moveEntry = moveIterator.next(); |
| int partitionId = moveEntry.getKey(); |
| long sourceStoreId = moveEntry.getValue().getKey(); |
| |
| List<KVPair<Long, Integer>> tmpList = new ArrayList<>(maxHeap.size()); |
| while (maxHeap.size() > 0) { |
| KVPair<Long, Integer> pair = maxHeap.poll(); |
| long destStoreId = pair.getKey(); |
| boolean destContains = false; |
| if (partitionMap.containsKey(destStoreId)) { |
| destContains = partitionMap.get(destStoreId).containsKey(partitionId); |
| } |
| // If the destination store already contains the partition, take the store |
| if (!destContains) { |
| moveEntry.getValue().setValue(pair.getKey()); |
| log.info( |
| "balancePartitionShard will move partition {} from store {} to store " + |
| "{}", |
| moveEntry.getKey(), |
| moveEntry.getValue().getKey(), |
| moveEntry.getValue().getValue()); |
| if (pair.getValue() > 1) { |
| pair.setValue(pair.getValue() - 1); |
| tmpList.add(pair); |
| } |
| break; |
| } |
| tmpList.add(pair); |
| } |
| maxHeap.addAll(tmpList); |
| } |
| |
| kvService.put(BALANCE_SHARD_KEY, "DOING", 180 * 1000); |
| |
| // Start the migration |
| movedPartitions.forEach((partId, storePair) -> { |
| // Neither the source nor destination storeID is 0 |
| if (storePair.getKey() > 0 && storePair.getValue() > 0) { |
| partitionService.movePartitionsShard(partId, storePair.getKey(), |
| storePair.getValue()); |
| } else { |
| log.warn("balancePartitionShard key or value is zero, partId:{} storePair:{}", |
| partId, storePair); |
| } |
| }); |
| return movedPartitions; |
| } |
| |
| /** |
| * Balance the number of leaders of partitions between stores |
| */ |
| public synchronized Map<Integer, Long> balancePartitionLeader(boolean immediately) throws |
| PDException { |
| Map<Integer, Long> results = new HashMap<>(); |
| |
| if (!isLeader()) { |
| return results; |
| } |
| |
| if (!immediately && |
| System.currentTimeMillis() - lastBalanceLeaderTime < BalanceLeaderInterval) { |
| return results; |
| } |
| lastBalanceLeaderTime = System.currentTimeMillis(); |
| |
| List<Metapb.ShardGroup> shardGroups = storeService.getShardGroups(); |
| |
| // When a task is split or scaled-in, it is exited |
| var taskMeta = storeService.getTaskInfoMeta(); |
| if (taskMeta.hasSplitTaskDoing() || taskMeta.hasMoveTaskDoing()) { |
| throw new PDException(1001, "split or combine task is processing, please try later!"); |
| } |
| |
| if (Objects.equals(kvService.get(BALANCE_SHARD_KEY), "DOING")) { |
| throw new PDException(1001, "balance shard is processing, please try later!"); |
| } |
| |
| if (shardGroups.size() == 0) { |
| return results; |
| } |
| |
| Map<Long, Integer> storeShardCount = new HashMap<>(); |
| |
| shardGroups.forEach(group -> { |
| group.getShardsList().forEach(shard -> { |
| storeShardCount.put(shard.getStoreId(), |
| storeShardCount.getOrDefault(shard.getStoreId(), 0) + 1); |
| }); |
| }); |
| |
| log.info("balancePartitionLeader, shard group size: {}, by store: {}", shardGroups.size(), |
| storeShardCount); |
| |
| PriorityQueue<KVPair<Long, Integer>> targetCount = |
| new PriorityQueue<>(kvPairComparatorDesc); |
| |
| var sortedGroups = storeShardCount.entrySet().stream() |
| .map(entry -> new KVPair<>(entry.getKey(), |
| entry.getValue())) |
| .sorted(kvPairComparatorAsc) |
| .collect(Collectors.toList()); |
| int sum = 0; |
| |
| for (int i = 0; i < sortedGroups.size() - 1; i++) { |
| // at least one |
| int v = Math.max( |
| sortedGroups.get(i).getValue() / pdConfig.getPartition().getShardCount(), 1); |
| targetCount.add(new KVPair<>(sortedGroups.get(i).getKey(), v)); |
| sum += v; |
| } |
| targetCount.add(new KVPair<>(sortedGroups.get(sortedGroups.size() - 1).getKey(), |
| shardGroups.size() - sum)); |
| log.info("target count: {}", targetCount); |
| |
| for (var group : shardGroups) { |
| var map = group.getShardsList().stream() |
| .collect(Collectors.toMap(Metapb.Shard::getStoreId, shard -> shard)); |
| var tmpList = new ArrayList<KVPair<Long, Integer>>(); |
| // If there are many stores, they may not contain the corresponding store ID. Save |
| // the non-compliant stores to the temporary list until you find a suitable store |
| while (!targetCount.isEmpty()) { |
| var pair = targetCount.poll(); |
| var storeId = pair.getKey(); |
| if (map.containsKey(storeId)) { |
| if (map.get(storeId).getRole() != Metapb.ShardRole.Leader) { |
| log.info("shard group{}, store id:{}, set to leader", group.getId(), |
| storeId); |
| partitionService.transferLeader(group.getId(), map.get(storeId)); |
| results.put(group.getId(), storeId); |
| } else { |
| log.info("shard group {}, store id :{}, is leader, no need change", |
| group.getId(), storeId); |
| } |
| |
| if (pair.getValue() > 1) { |
| // count -1 |
| pair.setValue(pair.getValue() - 1); |
| tmpList.add(pair); |
| } |
| // If it is found, the processing is complete |
| break; |
| } else { |
| tmpList.add(pair); |
| } |
| } |
| targetCount.addAll(tmpList); |
| } |
| |
| return results; |
| } |
| |
| private long getMaxIndexGap(Map<Integer, Map<Long, Long>> committedIndexMap, int partitionId) { |
| long maxGap = Long.MAX_VALUE; |
| if (committedIndexMap == null || !committedIndexMap.containsKey(partitionId)) { |
| return maxGap; |
| } |
| Map<Long, Long> shardMap = committedIndexMap.get(partitionId); |
| if (shardMap == null || shardMap.size() == 0) { |
| return maxGap; |
| } |
| List<Long> sortedList = new ArrayList<>(); |
| shardMap.forEach((storeId, committedIndex) -> { |
| sortedList.add(committedIndex); |
| }); |
| sortedList.sort(Comparator.reverseOrder()); |
| maxGap = sortedList.get(0) - sortedList.get(sortedList.size() - 1); |
| return maxGap; |
| } |
| |
| /** |
| * Perform partition splitting, which is divided into automatic splitting and manual splitting |
| * |
| * @return |
| * @throws PDException |
| */ |
| public List<Metapb.Partition> splitPartition( |
| Pdpb.OperationMode mode, List<Pdpb.SplitDataParam> params) throws PDException { |
| |
| if (mode == Pdpb.OperationMode.Auto) { |
| return autoSplitPartition(); |
| } |
| |
| var list = params.stream() |
| .map(param -> new KVPair<>(param.getPartitionId(), param.getCount())) |
| .collect(Collectors.toList()); |
| |
| storeService.splitShardGroups(list); |
| return null; |
| } |
| |
| /** |
| * Partition splitting is performed automatically, and each store reaches the maximum number |
| * of partitions |
| * execution conditions |
| * The number of partitions per machine after the split is less than partition |
| * .max-partitions-per-store |
| * |
| * @throws PDException |
| */ |
| public List<Metapb.Partition> autoSplitPartition() throws PDException { |
| if (!isLeader()) { |
| return null; |
| } |
| |
| if (Metapb.ClusterState.Cluster_OK != storeService.getClusterStats().getState()) { |
| if (Metapb.ClusterState.Cluster_Offline == storeService.getClusterStats().getState()) { |
| throw new PDException(Pdpb.ErrorType.Split_Partition_Doing_VALUE, |
| "The data is splitting"); |
| } else { |
| throw new PDException(Pdpb.ErrorType.Cluster_State_Forbid_Splitting_VALUE, |
| "The current state of the cluster prohibits splitting data"); |
| } |
| } |
| |
| // The maximum split count that a compute cluster can support |
| int splitCount = pdConfig.getPartition().getMaxShardsPerStore() * |
| storeService.getActiveStores().size() / |
| (storeService.getShardGroups().size() * |
| pdConfig.getPartition().getShardCount()); |
| |
| if (splitCount < 2) { |
| throw new PDException(Pdpb.ErrorType.Too_Many_Partitions_Per_Store_VALUE, |
| "Too many partitions per store, partition.store-max-shard-count" + |
| " = " |
| + pdConfig.getPartition().getMaxShardsPerStore()); |
| } |
| |
| // If the maximum number of partitions per store is not reached, it will be split |
| log.info("Start to split partitions..., split count = {}", splitCount); |
| |
| // Set the cluster status to Offline |
| storeService.updateClusterStatus(Metapb.ClusterState.Cluster_Offline); |
| // Modify the default number of partitions |
| // pdConfig.getConfigService().setPartitionCount(storeService.getShardGroups().size() * |
| // splitCount); |
| |
| var list = storeService.getShardGroups().stream() |
| .map(shardGroup -> new KVPair<>(shardGroup.getId(), splitCount)) |
| .collect(Collectors.toList()); |
| storeService.splitShardGroups(list); |
| |
| return null; |
| } |
| |
| /** |
| * Store reports the status of the task |
| * The state of the partition changes, and the state of the ShardGroup, graph, and the entire |
| * cluster where the partition resides |
| * |
| * @param task |
| */ |
| public void reportTask(MetaTask.Task task) { |
| try { |
| switch (task.getType()) { |
| case Split_Partition: |
| partitionService.handleSplitTask(task); |
| break; |
| case Move_Partition: |
| partitionService.handleMoveTask(task); |
| break; |
| case Clean_Partition: |
| partitionService.handleCleanPartitionTask(task); |
| break; |
| default: |
| break; |
| } |
| } catch (Exception e) { |
| log.error("Report task exception {}, {}", e, task); |
| } |
| } |
| |
| /** |
| * Compaction on rocksdb |
| * |
| * @throws PDException |
| */ |
| public Boolean dbCompaction(String tableName) throws PDException { |
| if (!isLeader()) { |
| return false; |
| } |
| |
| for (Metapb.ShardGroup shardGroup : storeService.getShardGroups()) { |
| storeService.shardGroupsDbCompaction(shardGroup.getId(), tableName); |
| } |
| |
| // |
| return true; |
| } |
| |
| /** |
| * Determine whether all partitions of a store can be migrated out, and give the judgment |
| * result and migration plan |
| */ |
| public Map<String, Object> canAllPartitionsMovedOut(Metapb.Store sourceStore) throws |
| PDException { |
| if (!isLeader()) { |
| return null; |
| } |
| // Analyze whether the partition on a store can be completely checked out |
| Map<String, Object> resultMap = new HashMap<>(); |
| // The definition object is used to hold the partition above the source store StoreId |
| // ->PartitionID, ShardRole |
| Map<Long, Map<Integer, Metapb.ShardRole>> sourcePartitionMap = new HashMap<>(); |
| sourcePartitionMap.put(sourceStore.getId(), new HashMap<>()); |
| // The definition object is used to hold the partition above the other active stores |
| // StoreId ->PartitionID, ShardRole |
| Map<Long, Map<Integer, Metapb.ShardRole>> otherPartitionMap = new HashMap<>(); |
| // The amount of disk space remaining for each store |
| Map<Long, Long> availableDiskSpace = new HashMap<>(); |
| // Record the amount of data in the partition to be migrated |
| Map<Integer, Long> partitionDataSize = new HashMap<>(); |
| |
| storeService.getActiveStores().forEach(store -> { |
| if (store.getId() != sourceStore.getId()) { |
| otherPartitionMap.put(store.getId(), new HashMap<>()); |
| // Records the remaining disk space of other stores, in bytes |
| availableDiskSpace.put(store.getId(), store.getStats().getAvailable()); |
| } else { |
| resultMap.put("current_store_is_online", true); |
| } |
| }); |
| // Count the size of the partition to be migrated (from storeStats in KB) |
| for (Metapb.GraphStats graphStats : sourceStore.getStats().getGraphStatsList()) { |
| partitionDataSize.put(graphStats.getPartitionId(), |
| partitionDataSize.getOrDefault(graphStats.getPartitionId(), 0L) |
| + graphStats.getApproximateSize()); |
| } |
| // Assign values to sourcePartitionMap and otherPartitionMap |
| partitionService.getPartitions().forEach(partition -> { |
| try { |
| storeService.getShardList(partition.getId()).forEach(shard -> { |
| long storeId = shard.getStoreId(); |
| if (storeId == sourceStore.getId()) { |
| sourcePartitionMap.get(storeId).put(partition.getId(), shard.getRole()); |
| } else { |
| if (otherPartitionMap.containsKey(storeId)) { |
| otherPartitionMap.get(storeId).put(partition.getId(), shard.getRole()); |
| } |
| } |
| |
| }); |
| } catch (PDException e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| // Count the partitions to be removed: all partitions on the source store |
| Map<Integer, KVPair<Long, Long>> movedPartitions = new HashMap<>(); |
| for (Map.Entry<Integer, Metapb.ShardRole> entry : sourcePartitionMap.get( |
| sourceStore.getId()).entrySet()) { |
| movedPartitions.put(entry.getKey(), new KVPair<>(sourceStore.getId(), 0L)); |
| } |
| // Count the number of partitions of other stores and save them with a small top heap, so |
| // that stores with fewer partitions are always prioritized |
| PriorityQueue<KVPair<Long, Integer>> minHeap = new PriorityQueue<>(otherPartitionMap.size(), |
| (o1, o2) -> o1.getValue() |
| .compareTo( |
| o2.getValue())); |
| otherPartitionMap.forEach((storeId, shards) -> { |
| minHeap.add(new KVPair(storeId, shards.size())); |
| }); |
| // Traverse the partitions to be migrated, and prioritize the migration to the store with |
| // fewer partitions |
| Iterator<Map.Entry<Integer, KVPair<Long, Long>>> moveIterator = |
| movedPartitions.entrySet().iterator(); |
| while (moveIterator.hasNext()) { |
| Map.Entry<Integer, KVPair<Long, Long>> moveEntry = moveIterator.next(); |
| int partitionId = moveEntry.getKey(); |
| // Record the elements that have popped up in the priority |
| List<KVPair<Long, Integer>> tmpList = new ArrayList<>(); |
| while (minHeap.size() > 0) { |
| KVPair<Long, Integer> pair = minHeap.poll(); // The first element pops up |
| long storeId = pair.getKey(); |
| int partitionCount = pair.getValue(); |
| Map<Integer, Metapb.ShardRole> shards = otherPartitionMap.get(storeId); |
| final int unitRate = 1024; // Balance the feed rate of different storage units |
| if ((!shards.containsKey(partitionId)) && ( |
| availableDiskSpace.getOrDefault(storeId, 0L) / unitRate >= |
| partitionDataSize.getOrDefault(partitionId, 0L))) { |
| // If the partition is not included on the destination store and the |
| // remaining space of the destination store can accommodate the partition, |
| // the migration is performed |
| moveEntry.getValue().setValue(storeId); // Set the target store for the move |
| log.info("plan to move partition {} to store {}, " + |
| "available disk space {}, current partitionSize:{}", |
| partitionId, |
| storeId, |
| availableDiskSpace.getOrDefault(storeId, 0L) / unitRate, |
| partitionDataSize.getOrDefault(partitionId, 0L) |
| ); |
| // Update the expected remaining space for the store |
| availableDiskSpace.put(storeId, availableDiskSpace.getOrDefault(storeId, 0L) |
| - partitionDataSize.getOrDefault(partitionId, |
| 0L) * |
| unitRate); |
| // Update the number of partitions for that store in the stat variable |
| partitionCount += 1; |
| pair.setValue(partitionCount); |
| tmpList.add(pair); |
| break; |
| } else { |
| tmpList.add(pair); |
| } |
| } |
| minHeap.addAll(tmpList); |
| } |
| // Check that there are no partitions that don't have a target store assigned |
| List<Integer> remainPartitions = new ArrayList<>(); |
| movedPartitions.forEach((partId, storePair) -> { |
| if (storePair.getValue() == 0L) { |
| remainPartitions.add(partId); |
| } |
| }); |
| if (remainPartitions.size() > 0) { |
| resultMap.put("flag", false); |
| resultMap.put("movedPartitions", null); |
| } else { |
| resultMap.put("flag", true); |
| resultMap.put("movedPartitions", movedPartitions); |
| } |
| return resultMap; |
| |
| } |
| |
| public Map<Integer, KVPair<Long, Long>> movePartitions( |
| Map<Integer, KVPair<Long, Long>> movedPartitions) { |
| if (!isLeader()) { |
| return null; |
| } |
| // Start the migration |
| log.info("begin move partitions:"); |
| movedPartitions.forEach((partId, storePair) -> { |
| // Neither the source nor destination storeID is 0 |
| if (storePair.getKey() > 0 && storePair.getValue() > 0) { |
| partitionService.movePartitionsShard(partId, storePair.getKey(), |
| storePair.getValue()); |
| } |
| }); |
| return movedPartitions; |
| } |
| |
| } |