| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hugegraph.pd; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Random; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hugegraph.pd.common.KVPair; |
| import org.apache.hugegraph.pd.common.PDException; |
| import org.apache.hugegraph.pd.config.PDConfig; |
| import org.apache.hugegraph.pd.grpc.Metapb; |
| import org.apache.hugegraph.pd.grpc.Metapb.GraphMode; |
| import org.apache.hugegraph.pd.grpc.Metapb.GraphModeReason; |
| import org.apache.hugegraph.pd.grpc.Metapb.GraphState; |
| import org.apache.hugegraph.pd.grpc.Pdpb; |
| import org.apache.hugegraph.pd.grpc.Pdpb.CacheResponse; |
| import org.apache.hugegraph.pd.grpc.pulse.ConfChangeType; |
| import org.apache.hugegraph.pd.meta.MetadataFactory; |
| import org.apache.hugegraph.pd.meta.MetadataKeyHelper; |
| import org.apache.hugegraph.pd.meta.StoreInfoMeta; |
| import org.apache.hugegraph.pd.meta.TaskInfoMeta; |
| |
| import com.google.gson.Gson; |
| |
| import lombok.extern.slf4j.Slf4j; |
| |
| /** |
| * Hg Store registration and keep-alive management |
| */ |
| @Slf4j |
| public class StoreNodeService { |
| |
| private static final Long STORE_HEART_BEAT_INTERVAL = 30000L; |
| private static final String graphSpaceConfPrefix = "HUGEGRAPH/hg/GRAPHSPACE/CONF/"; |
| private final List<StoreStatusListener> statusListeners; |
| private final List<ShardGroupStatusListener> shardGroupStatusListeners; |
| private final StoreInfoMeta storeInfoMeta; |
| private final TaskInfoMeta taskInfoMeta; |
| private final Random random = new Random(System.currentTimeMillis()); |
| private final KvService kvService; |
| private final ConfigService configService; |
| private final PDConfig pdConfig; |
| private PartitionService partitionService; |
| private final Runnable quotaChecker = () -> { |
| try { |
| getQuota(); |
| } catch (Exception e) { |
| log.error( |
| "obtaining and sending graph space quota information with error: ", |
| e); |
| } |
| }; |
| private Metapb.ClusterStats clusterStats; |
| |
| public StoreNodeService(PDConfig config) { |
| this.pdConfig = config; |
| storeInfoMeta = MetadataFactory.newStoreInfoMeta(pdConfig); |
| taskInfoMeta = MetadataFactory.newTaskInfoMeta(pdConfig); |
| shardGroupStatusListeners = Collections.synchronizedList(new ArrayList<>()); |
| statusListeners = Collections.synchronizedList(new ArrayList<StoreStatusListener>()); |
| clusterStats = Metapb.ClusterStats.newBuilder() |
| .setState(Metapb.ClusterState.Cluster_Not_Ready) |
| .setTimestamp(System.currentTimeMillis()) |
| .build(); |
| kvService = new KvService(pdConfig); |
| configService = new ConfigService(pdConfig); |
| } |
| |
| public void init(PartitionService partitionService) { |
| this.partitionService = partitionService; |
| partitionService.addStatusListener(new PartitionStatusListener() { |
| @Override |
| public void onPartitionChanged(Metapb.Partition old, Metapb.Partition partition) { |
| if (old != null && old.getState() != partition.getState()) { |
| try { |
| List<Metapb.Partition> partitions = |
| partitionService.getPartitionById(partition.getId()); |
| Metapb.PartitionState state = Metapb.PartitionState.PState_Normal; |
| for (Metapb.Partition pt : partitions) { |
| if (pt.getState().getNumber() > state.getNumber()) { |
| state = pt.getState(); |
| } |
| } |
| updateShardGroupState(partition.getId(), state); |
| |
| for (Metapb.ShardGroup group : getShardGroups()) { |
| if (group.getState().getNumber() > state.getNumber()) { |
| state = group.getState(); |
| } |
| } |
| updateClusterStatus(state); |
| } catch (PDException e) { |
| log.error("onPartitionChanged exception: ", e); |
| } |
| } |
| } |
| |
| @Override |
| public void onPartitionRemoved(Metapb.Partition partition) { |
| |
| } |
| }); |
| } |
| |
| /** |
| * Whether the cluster is ready or not |
| * |
| * @return |
| */ |
| public boolean isOK() { |
| return this.clusterStats.getState().getNumber() < |
| Metapb.ClusterState.Cluster_Offline.getNumber(); |
| } |
| |
| /** |
| * Store registration, record the IP address of the Store, and the first registration needs |
| * to generate a store_ID |
| * |
| * @param store |
| */ |
| public Metapb.Store register(Metapb.Store store) throws PDException { |
| if (store.getId() == 0) { |
| // Initial registration, generate a new ID, and ensure that the ID is not duplicated. |
| store = newStoreNode(store); |
| } |
| |
| if (!storeInfoMeta.storeExists(store.getId())) { |
| log.error("Store id {} does not belong to this PD, address = {}", store.getId(), |
| store.getAddress()); |
| // storeId does not exist, an exception is thrown |
| throw new PDException(Pdpb.ErrorType.STORE_ID_NOT_EXIST_VALUE, |
| String.format("Store id %d doest not exist.", store.getId())); |
| } |
| |
| // If the store status is Tombstone, the registration is denied. |
| Metapb.Store lastStore = storeInfoMeta.getStore(store.getId()); |
| if (lastStore.getState() == Metapb.StoreState.Tombstone) { |
| log.error("Store id {} has been removed, Please reinitialize, address = {}", |
| store.getId(), store.getAddress()); |
| // storeId does not exist, an exception is thrown |
| throw new PDException(Pdpb.ErrorType.STORE_HAS_BEEN_REMOVED_VALUE, |
| String.format("Store id %d has been removed. %s", store.getId(), |
| store.getAddress())); |
| } |
| |
| // offline or up, or in the initial activation list, go live automatically |
| Metapb.StoreState storeState = lastStore.getState(); |
| if (storeState == Metapb.StoreState.Offline || storeState == Metapb.StoreState.Up |
| || inInitialStoreList(store)) { |
| storeState = Metapb.StoreState.Up; |
| } else { |
| storeState = Metapb.StoreState.Pending; |
| } |
| |
| store = Metapb.Store.newBuilder(lastStore) |
| .setAddress(store.getAddress()) |
| .setRaftAddress(store.getRaftAddress()) |
| .setDataVersion(store.getDataVersion()) |
| .setDeployPath(store.getDeployPath()) |
| .setVersion(store.getVersion()) |
| .setDataPath(store.getDataPath()) |
| .setState(storeState).setCores(store.getCores()) |
| .clearLabels().addAllLabels(store.getLabelsList()) |
| .setLastHeartbeat(System.currentTimeMillis()).build(); |
| |
| long current = System.currentTimeMillis(); |
| boolean raftChanged = false; |
| // On-line status Raft Address there has been a change |
| if (!Objects.equals(lastStore.getRaftAddress(), store.getRaftAddress()) && |
| storeState == Metapb.StoreState.Up) { |
| // If the time interval is too short and the raft changes, it is considered an |
| // invalid store |
| if (current - lastStore.getLastHeartbeat() < STORE_HEART_BEAT_INTERVAL * 0.8) { |
| throw new PDException(Pdpb.ErrorType.STORE_PROHIBIT_DUPLICATE_VALUE, |
| String.format("Store id %d may be duplicate. addr: %s", |
| store.getId(), store.getAddress())); |
| } else if (current - lastStore.getLastHeartbeat() > STORE_HEART_BEAT_INTERVAL * 1.2) { |
| // It is considered that a change has occurred |
| raftChanged = true; |
| } else { |
| // Wait for the next registration |
| return Metapb.Store.newBuilder(store).setId(0L).build(); |
| } |
| } |
| |
| // Store information |
| storeInfoMeta.updateStore(store); |
| if (storeState == Metapb.StoreState.Up) { |
| // Update the store active status |
| storeInfoMeta.keepStoreAlive(store); |
| onStoreStatusChanged(store, Metapb.StoreState.Offline, Metapb.StoreState.Up); |
| checkStoreStatus(); |
| } |
| |
| // Wait for the store information to be saved before sending the changes |
| if (raftChanged) { |
| onStoreRaftAddressChanged(store); |
| } |
| |
| log.info("Store register, id = {} {}", store.getId(), store); |
| return store; |
| } |
| |
| private boolean inInitialStoreList(Metapb.Store store) { |
| return this.pdConfig.getInitialStoreMap().containsKey(store.getAddress()); |
| } |
| |
| /** |
| * Creates a new store object |
| * |
| * @param store |
| * @return |
| * @throws PDException |
| */ |
| private synchronized Metapb.Store newStoreNode(Metapb.Store store) throws PDException { |
| long id = random.nextLong() & Long.MAX_VALUE; |
| while (id == 0 || storeInfoMeta.storeExists(id)) { |
| id = random.nextLong() & Long.MAX_VALUE; |
| } |
| store = Metapb.Store.newBuilder(store) |
| .setId(id) |
| .setState(Metapb.StoreState.Pending) |
| .setStartTimestamp(System.currentTimeMillis()).build(); |
| storeInfoMeta.updateStore(store); |
| return store; |
| } |
| |
| /** |
| * Returns Store information based on store_id |
| * |
| * @param id |
| * @return |
| * @throws PDException |
| */ |
| public Metapb.Store getStore(long id) throws PDException { |
| Metapb.Store store = storeInfoMeta.getStore(id); |
| if (store == null) { |
| throw new PDException(Pdpb.ErrorType.STORE_ID_NOT_EXIST_VALUE, |
| String.format("Store id %x doest not exist.", id)); |
| } |
| return store; |
| } |
| |
| /** |
| * Update the store information, detect the change of store status, and notify Hugestore |
| */ |
| public synchronized Metapb.Store updateStore(Metapb.Store store) throws PDException { |
| log.info("updateStore storeId: {}, address: {}, state: {}", store.getId(), |
| store.getAddress(), store.getState()); |
| Metapb.Store lastStore = storeInfoMeta.getStore(store.getId()); |
| if (lastStore == null) { |
| return null; |
| } |
| Metapb.Store.Builder builder = |
| Metapb.Store.newBuilder(lastStore).clearLabels().clearStats(); |
| store = builder.mergeFrom(store).build(); |
| if (store.getState() == Metapb.StoreState.Tombstone) { |
| List<Metapb.Store> activeStores = getStores(); |
| if (lastStore.getState() == Metapb.StoreState.Up |
| && activeStores.size() - 1 < pdConfig.getMinStoreCount()) { |
| throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE, |
| "The number of active stores is less then " + |
| pdConfig.getMinStoreCount()); |
| } |
| } |
| |
| storeInfoMeta.updateStore(store); |
| if (store.getState() != Metapb.StoreState.Unknown && |
| store.getState() != lastStore.getState()) { |
| // If you want to take the store offline |
| if (store.getState() == Metapb.StoreState.Exiting) { |
| if (lastStore.getState() == Metapb.StoreState.Exiting) { |
| // If it is already in the offline state, no further processing will be made |
| return lastStore; |
| } |
| |
| List<Metapb.Store> activeStores = this.getActiveStores(); |
| Map<Long, Metapb.Store> storeMap = new HashMap<>(); |
| activeStores.forEach(s -> { |
| storeMap.put(s.getId(), s); |
| }); |
| // If the store is offline, delete it directly from active, and if the store is |
| // online, temporarily delete it from active, and then delete it when the status |
| // is set to Tombstone |
| if (!storeMap.containsKey(store.getId())) { |
| log.info("updateStore removeActiveStores store {}", store.getId()); |
| storeInfoMeta.removeActiveStore(store); |
| } |
| storeTurnoff(store); |
| } else if (store.getState() == Metapb.StoreState.Offline) { |
| // Monitor that the store has gone offline and is removed from the active |
| storeInfoMeta.removeActiveStore(store); |
| } else if (store.getState() == Metapb.StoreState.Tombstone) { |
| // When the status changes, the store is shut down, the shardGroup is modified, |
| // and the replica is migrated |
| log.info("updateStore removeActiveStores store {}", store.getId()); |
| storeInfoMeta.removeActiveStore(store); |
| // Storage goes offline |
| storeTurnoff(store); |
| } else if (store.getState() == Metapb.StoreState.Up) { |
| storeInfoMeta.keepStoreAlive(store); |
| checkStoreStatus(); |
| } |
| onStoreStatusChanged(lastStore, lastStore.getState(), store.getState()); |
| } |
| return store; |
| } |
| |
| /** |
| * The shard of the shardGroup is reassigned |
| * |
| * @param store |
| * @throws PDException |
| */ |
| public synchronized void storeTurnoff(Metapb.Store store) throws PDException { |
| // Traverse ShardGroup,redistribution |
| for (Metapb.ShardGroup group : getShardGroupsByStore(store.getId())) { |
| Metapb.ShardGroup.Builder builder = Metapb.ShardGroup.newBuilder(group); |
| builder.clearShards(); |
| group.getShardsList().forEach(shard -> { |
| if (shard.getStoreId() != store.getId()) { |
| builder.addShards(shard); |
| } |
| }); |
| reallocShards(builder.build()); |
| } |
| } |
| |
| /** |
| * Returns stores information based on the graph name, and if graphName is empty, all store |
| * information is returned |
| * |
| * @throws PDException |
| */ |
| public List<Metapb.Store> getStores() throws PDException { |
| return storeInfoMeta.getStores(null); |
| } |
| |
| public List<Metapb.Store> getStores(String graphName) throws PDException { |
| return storeInfoMeta.getStores(graphName); |
| } |
| |
| public List<Metapb.Store> getStoreStatus(boolean isActive) throws PDException { |
| return storeInfoMeta.getStoreStatus(isActive); |
| } |
| |
| public List<Metapb.ShardGroup> getShardGroups() throws PDException { |
| return storeInfoMeta.getShardGroups(); |
| } |
| |
| public Metapb.ShardGroup getShardGroup(int groupId) throws PDException { |
| return storeInfoMeta.getShardGroup(groupId); |
| } |
| |
| public List<Metapb.Shard> getShardList(int groupId) throws PDException { |
| var shardGroup = getShardGroup(groupId); |
| if (shardGroup != null) { |
| return shardGroup.getShardsList(); |
| } |
| return new ArrayList<>(); |
| } |
| |
| public List<Metapb.ShardGroup> getShardGroupsByStore(long storeId) throws PDException { |
| List<Metapb.ShardGroup> shardGroups = new ArrayList<>(); |
| storeInfoMeta.getShardGroups().forEach(shardGroup -> { |
| shardGroup.getShardsList().forEach(shard -> { |
| if (shard.getStoreId() == storeId) { |
| shardGroups.add(shardGroup); |
| } |
| }); |
| }); |
| return shardGroups; |
| } |
| |
| /** |
| * Returns the active store |
| * |
| * @param graphName |
| * @return |
| * @throws PDException |
| */ |
| public List<Metapb.Store> getActiveStores(String graphName) throws PDException { |
| return storeInfoMeta.getActiveStores(graphName); |
| } |
| |
| public List<Metapb.Store> getActiveStores() throws PDException { |
| return storeInfoMeta.getActiveStores(); |
| } |
| |
| public List<Metapb.Store> getTombStores() throws PDException { |
| List<Metapb.Store> stores = new ArrayList<>(); |
| for (Metapb.Store store : this.getStores()) { |
| if (store.getState() == Metapb.StoreState.Tombstone) { |
| stores.add(store); |
| } |
| } |
| return stores; |
| } |
| |
| public long removeStore(Long storeId) throws PDException { |
| return storeInfoMeta.removeStore(storeId); |
| } |
| |
| /** |
| * todo : New logic |
| * Assign a store to the partition and decide how many peers to allocate according to the |
| * configuration of the graph |
| * After allocating all the shards, save the ShardGroup object (store does not change, only |
| * executes once) |
| */ |
| public synchronized List<Metapb.Shard> allocShards(Metapb.Graph graph, int partId) throws |
| PDException { |
| // Multiple graphs share raft grouping, so assigning shard only depends on partitionId. |
| // The number of partitions can be set based on the size of the data, but the total |
| // number cannot exceed the number of raft groups |
| if (storeInfoMeta.getShardGroup(partId) == null) { |
| // Get active store key |
| List<Metapb.Store> stores = storeInfoMeta.getActiveStores(); |
| |
| if (stores.size() == 0) { |
| throw new PDException(Pdpb.ErrorType.NO_ACTIVE_STORE_VALUE, |
| "There is no any online store"); |
| } |
| |
| if (stores.size() < pdConfig.getMinStoreCount()) { |
| throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE, |
| "The number of active stores is less then " + |
| pdConfig.getMinStoreCount()); |
| } |
| |
| int shardCount = pdConfig.getPartition().getShardCount(); |
| shardCount = Math.min(shardCount, stores.size()); |
| // Two shards could not elect a leader |
| // It cannot be 0 |
| |
| if (shardCount == 2 || shardCount < 1) { |
| shardCount = 1; |
| } |
| |
| // All ShardGroups are created at one time to ensure that the initial groupIDs are |
| // orderly and easy for humans to read |
| for (int groupId = 0; groupId < pdConfig.getConfigService().getPartitionCount(); |
| groupId++) { |
| int storeIdx = groupId % stores.size(); // Assignment rules, simplified to modulo |
| List<Metapb.Shard> shards = new ArrayList<>(); |
| for (int i = 0; i < shardCount; i++) { |
| Metapb.Shard shard = |
| Metapb.Shard.newBuilder().setStoreId(stores.get(storeIdx).getId()) |
| .setRole(i == 0 ? Metapb.ShardRole.Leader : |
| Metapb.ShardRole.Follower) // |
| .build(); |
| shards.add(shard); |
| storeIdx = (storeIdx + 1) >= stores.size() ? 0 : ++storeIdx; // Sequential |
| } |
| |
| Metapb.ShardGroup group = Metapb.ShardGroup.newBuilder() |
| .setId(groupId) |
| .setState( |
| Metapb.PartitionState.PState_Normal) |
| .addAllShards(shards).build(); |
| |
| // new group |
| storeInfoMeta.updateShardGroup(group); |
| partitionService.updateShardGroupCache(group); |
| onShardGroupStatusChanged(group, group); |
| log.info("alloc shard group: id {}", groupId); |
| } |
| } |
| return storeInfoMeta.getShardGroup(partId).getShardsList(); |
| } |
| |
| /** |
| * Based on the shard_count of the graph, reallocate shards |
| * Send change shard |
| */ |
| public synchronized List<Metapb.Shard> reallocShards(Metapb.ShardGroup shardGroup) throws |
| PDException { |
| List<Metapb.Store> stores = storeInfoMeta.getActiveStores(); |
| |
| if (stores.size() == 0) { |
| throw new PDException(Pdpb.ErrorType.NO_ACTIVE_STORE_VALUE, |
| "There is no any online store"); |
| } |
| |
| if (stores.size() < pdConfig.getMinStoreCount()) { |
| throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE, |
| "The number of active stores is less then " + |
| pdConfig.getMinStoreCount()); |
| } |
| |
| int shardCount = pdConfig.getPartition().getShardCount(); |
| shardCount = Math.min(shardCount, stores.size()); |
| if (shardCount == 2 || shardCount < 1) { |
| // Two shards could not elect a leader |
| // It cannot be 0 |
| shardCount = 1; |
| } |
| |
| List<Metapb.Shard> shards = new ArrayList<>(); |
| shards.addAll(shardGroup.getShardsList()); |
| |
| if (shardCount > shards.size()) { |
| // Need to add shards |
| log.info("reallocShards ShardGroup {}, add shards from {} to {}", |
| shardGroup.getId(), shards.size(), shardCount); |
| int storeIdx = shardGroup.getId() % stores.size(); |
| for (int addCount = shardCount - shards.size(); addCount > 0; ) { |
| // Check if it already exists |
| if (!isStoreInShards(shards, stores.get(storeIdx).getId())) { |
| Metapb.Shard shard = Metapb.Shard.newBuilder() |
| .setStoreId(stores.get(storeIdx).getId()) |
| .build(); |
| shards.add(shard); |
| addCount--; |
| } |
| storeIdx = (storeIdx + 1) >= stores.size() ? 0 : ++storeIdx; |
| } |
| } else if (shardCount < shards.size()) { |
| // Need to reduce shard |
| log.info("reallocShards ShardGroup {}, remove shards from {} to {}", |
| shardGroup.getId(), shards.size(), shardCount); |
| |
| int subCount = shards.size() - shardCount; |
| Iterator<Metapb.Shard> iterator = shards.iterator(); |
| while (iterator.hasNext() && subCount > 0) { |
| if (iterator.next().getRole() != Metapb.ShardRole.Leader) { |
| iterator.remove(); |
| subCount--; |
| } |
| } |
| } else { |
| return shards; |
| } |
| |
| Metapb.ShardGroup group = Metapb.ShardGroup.newBuilder(shardGroup) |
| .clearShards() |
| .addAllShards(shards).build(); |
| storeInfoMeta.updateShardGroup(group); |
| partitionService.updateShardGroupCache(group); |
| // change shard group |
| onShardGroupStatusChanged(shardGroup, group); |
| |
| var partitions = partitionService.getPartitionById(shardGroup.getId()); |
| if (partitions.size() > 0) { |
| // send one message, change shard is regardless with partition/graph |
| partitionService.fireChangeShard(partitions.get(0), shards, |
| ConfChangeType.CONF_CHANGE_TYPE_ADJUST); |
| } |
| |
| log.info("reallocShards ShardGroup {}, shards: {}", group.getId(), group.getShardsList()); |
| return shards; |
| } |
| |
| /** |
| * According to the number of partitions,distribute group shard |
| * |
| * @param groups list of (partition id, count) |
| * @return total groups |
| */ |
| public synchronized int splitShardGroups(List<KVPair<Integer, Integer>> groups) throws |
| PDException { |
| int sum = groups.stream().map(pair -> pair.getValue()).reduce(0, Integer::sum); |
| // shard group is too big |
| if (sum > getActiveStores().size() * pdConfig.getPartition().getMaxShardsPerStore()) { |
| throw new PDException(Pdpb.ErrorType.Too_Many_Partitions_Per_Store_VALUE, |
| "can't satisfy target shard group count"); |
| } |
| |
| partitionService.splitPartition(groups); |
| |
| return sum; |
| } |
| |
| /** |
| * Alloc shard group, prepare for the split |
| * |
| * @param |
| * @return true |
| * @throws PDException |
| */ |
| private boolean isStoreInShards(List<Metapb.Shard> shards, long storeId) { |
| AtomicBoolean exist = new AtomicBoolean(false); |
| shards.forEach(s -> { |
| if (s.getStoreId() == storeId) { |
| exist.set(true); |
| } |
| }); |
| return exist.get(); |
| } |
| |
| /** |
| * update shard group and cache. |
| * send shard group change message. |
| * |
| * @param groupId : shard group |
| * @param shards : shard lists |
| * @param version: term version, ignored if less than 0 |
| * @param confVersion : conf version, ignored if less than 0 |
| * @return |
| */ |
| public synchronized Metapb.ShardGroup updateShardGroup(int groupId, List<Metapb.Shard> shards, |
| long version, long confVersion) throws |
| PDException { |
| Metapb.ShardGroup group = this.storeInfoMeta.getShardGroup(groupId); |
| |
| if (group == null) { |
| return null; |
| } |
| |
| var builder = Metapb.ShardGroup.newBuilder(group); |
| if (version >= 0) { |
| builder.setVersion(version); |
| } |
| |
| if (confVersion >= 0) { |
| builder.setConfVer(confVersion); |
| } |
| |
| var newGroup = builder.clearShards().addAllShards(shards).build(); |
| |
| storeInfoMeta.updateShardGroup(newGroup); |
| partitionService.updateShardGroupCache(newGroup); |
| onShardGroupStatusChanged(group, newGroup); |
| log.info("Raft {} updateShardGroup {}", groupId, newGroup); |
| return group; |
| } |
| |
| /** |
| * Notify the Store to rebuild the shard group |
| * |
| * @param groupId raft group id |
| * @param shards shard list: If it is empty, delete the corresponding one partition engine |
| */ |
| public void shardGroupOp(int groupId, List<Metapb.Shard> shards) throws PDException { |
| |
| var shardGroup = getShardGroup(groupId); |
| |
| if (shardGroup == null) { |
| return; |
| } |
| |
| var newGroup = shardGroup.toBuilder().clearShards().addAllShards(shards).build(); |
| if (shards.size() == 0) { |
| var partitions = partitionService.getPartitionById(groupId); |
| for (var partition : partitions) { |
| partitionService.removePartition(partition.getGraphName(), groupId); |
| } |
| deleteShardGroup(groupId); |
| } |
| |
| onShardGroupOp(newGroup); |
| } |
| |
| /** |
| * Delete shard group |
| * |
| * @param groupId shard group id |
| */ |
| public synchronized void deleteShardGroup(int groupId) throws PDException { |
| Metapb.ShardGroup group = this.storeInfoMeta.getShardGroup(groupId); |
| if (group != null) { |
| storeInfoMeta.deleteShardGroup(groupId); |
| } |
| |
| onShardGroupStatusChanged(group, null); |
| |
| // Fix the number of partitions for the store. (Result from partition merge) |
| var shardGroups = getShardGroups(); |
| if (shardGroups != null) { |
| var count1 = pdConfig.getConfigService().getPDConfig().getPartitionCount(); |
| var maxGroupId = |
| getShardGroups().stream().map(Metapb.ShardGroup::getId).max(Integer::compareTo); |
| if (maxGroupId.get() < count1) { |
| pdConfig.getConfigService().setPartitionCount(maxGroupId.get() + 1); |
| } |
| } |
| } |
| |
| public synchronized void updateShardGroupState(int groupId, Metapb.PartitionState state) throws |
| PDException { |
| Metapb.ShardGroup shardGroup = storeInfoMeta.getShardGroup(groupId) |
| .toBuilder() |
| .setState(state).build(); |
| storeInfoMeta.updateShardGroup(shardGroup); |
| partitionService.updateShardGroupCache(shardGroup); |
| } |
| |
| /** |
| * Receive the heartbeat of the Store |
| * |
| * @param storeStats |
| * @throws PDException |
| */ |
| public Metapb.ClusterStats heartBeat(Metapb.StoreStats storeStats) throws PDException { |
| this.storeInfoMeta.updateStoreStats(storeStats); |
| Metapb.Store lastStore = this.getStore(storeStats.getStoreId()); |
| if (lastStore == null) { |
| // store does not exist |
| throw new PDException(Pdpb.ErrorType.STORE_ID_NOT_EXIST_VALUE, |
| String.format("Store id %d does not exist.", |
| storeStats.getStoreId())); |
| } |
| if (lastStore.getState() == Metapb.StoreState.Tombstone) { |
| throw new PDException(Pdpb.ErrorType.STORE_HAS_BEEN_REMOVED_VALUE, |
| String.format( |
| "Store id %d is useless since it's state is Tombstone", |
| storeStats.getStoreId())); |
| } |
| Metapb.Store nowStore; |
| // If you are going to take the store offline |
| if (lastStore.getState() == Metapb.StoreState.Exiting) { |
| List<Metapb.Store> activeStores = this.getActiveStores(); |
| Map<Long, Metapb.Store> storeMap = new HashMap<>(); |
| activeStores.forEach(store -> { |
| storeMap.put(store.getId(), store); |
| }); |
| // If the partition of the offline store is 0, it means that the migration has been |
| // completed and can be taken offline, if it is not 0, the migration is still in |
| // progress and you need to wait |
| if (storeStats.getPartitionCount() > 0 && |
| storeMap.containsKey(storeStats.getStoreId())) { |
| nowStore = Metapb.Store.newBuilder(lastStore) |
| .setStats(storeStats) |
| .setLastHeartbeat(System.currentTimeMillis()) |
| .setState(Metapb.StoreState.Exiting).build(); |
| this.storeInfoMeta.updateStore(nowStore); |
| return this.clusterStats; |
| } else { |
| nowStore = Metapb.Store.newBuilder(lastStore) |
| .setStats(storeStats) |
| .setLastHeartbeat(System.currentTimeMillis()) |
| .setState(Metapb.StoreState.Tombstone).build(); |
| this.storeInfoMeta.updateStore(nowStore); |
| storeInfoMeta.removeActiveStore(nowStore); |
| return this.clusterStats; |
| } |
| } |
| |
| if (lastStore.getState() == Metapb.StoreState.Pending) { |
| nowStore = Metapb.Store.newBuilder(lastStore) |
| .setStats(storeStats) |
| .setLastHeartbeat(System.currentTimeMillis()) |
| .setState(Metapb.StoreState.Pending).build(); |
| this.storeInfoMeta.updateStore(nowStore); |
| return this.clusterStats; |
| } else { |
| if (lastStore.getState() == Metapb.StoreState.Offline) { |
| this.updateStore( |
| Metapb.Store.newBuilder(lastStore).setState(Metapb.StoreState.Up).build()); |
| } |
| nowStore = Metapb.Store.newBuilder(lastStore) |
| .setState(Metapb.StoreState.Up) |
| .setStats(storeStats) |
| .setLastHeartbeat(System.currentTimeMillis()).build(); |
| this.storeInfoMeta.updateStore(nowStore); |
| this.storeInfoMeta.keepStoreAlive(nowStore); |
| this.checkStoreStatus(); |
| return this.clusterStats; |
| } |
| } |
| |
| public synchronized Metapb.ClusterStats updateClusterStatus(Metapb.ClusterState state) { |
| this.clusterStats = clusterStats.toBuilder().setState(state).build(); |
| return this.clusterStats; |
| } |
| |
| public Metapb.ClusterStats updateClusterStatus(Metapb.PartitionState state) { |
| Metapb.ClusterState cstate = Metapb.ClusterState.Cluster_OK; |
| switch (state) { |
| case PState_Normal: |
| cstate = Metapb.ClusterState.Cluster_OK; |
| break; |
| case PState_Warn: |
| cstate = Metapb.ClusterState.Cluster_Warn; |
| break; |
| case PState_Fault: |
| cstate = Metapb.ClusterState.Cluster_Fault; |
| break; |
| case PState_Offline: |
| cstate = Metapb.ClusterState.Cluster_Offline; |
| break; |
| } |
| return updateClusterStatus(cstate); |
| } |
| |
| public Metapb.ClusterStats getClusterStats() { |
| return this.clusterStats; |
| } |
| |
| /** |
| * Check the cluster health status |
| * Whether the number of active machines is greater than the minimum threshold |
| * The number of partition shards online has exceeded half |
| */ |
| public synchronized void checkStoreStatus() { |
| Metapb.ClusterStats.Builder builder = Metapb.ClusterStats.newBuilder() |
| .setState( |
| Metapb.ClusterState.Cluster_OK); |
| try { |
| List<Metapb.Store> activeStores = this.getActiveStores(); |
| if (activeStores.size() < pdConfig.getMinStoreCount()) { |
| builder.setState(Metapb.ClusterState.Cluster_Not_Ready); |
| builder.setMessage("The number of active stores is " + activeStores.size() |
| + ", less than pd.initial-store-count:" + |
| pdConfig.getMinStoreCount()); |
| } |
| Map<Long, Metapb.Store> storeMap = new HashMap<>(); |
| activeStores.forEach(store -> { |
| storeMap.put(store.getId(), store); |
| }); |
| |
| if (builder.getState() == Metapb.ClusterState.Cluster_OK) { |
| // Check whether the number of online shards for each partition is greater than half |
| for (Metapb.ShardGroup group : this.getShardGroups()) { |
| int count = 0; |
| for (Metapb.Shard shard : group.getShardsList()) { |
| count += storeMap.containsKey(shard.getStoreId()) ? 1 : 0; |
| } |
| if (count * 2 < group.getShardsList().size()) { |
| builder.setState(Metapb.ClusterState.Cluster_Not_Ready); |
| builder.setMessage( |
| "Less than half of active shard, partitionId is " + group.getId()); |
| break; |
| } |
| } |
| } |
| |
| } catch (PDException e) { |
| log.error("StoreNodeService updateClusterStatus exception {}", e); |
| } |
| this.clusterStats = builder.setTimestamp(System.currentTimeMillis()).build(); |
| if (this.clusterStats.getState() != Metapb.ClusterState.Cluster_OK) { |
| log.error("The cluster is not ready, {}", this.clusterStats); |
| } |
| } |
| |
| public void addStatusListener(StoreStatusListener listener) { |
| statusListeners.add(listener); |
| } |
| |
| protected void onStoreRaftAddressChanged(Metapb.Store store) { |
| log.info("onStoreRaftAddressChanged storeId = {}, new raft addr:", store.getId(), |
| store.getRaftAddress()); |
| statusListeners.forEach(e -> { |
| e.onStoreRaftChanged(store); |
| }); |
| } |
| |
| public void addShardGroupStatusListener(ShardGroupStatusListener listener) { |
| shardGroupStatusListeners.add(listener); |
| } |
| |
| protected void onStoreStatusChanged(Metapb.Store store, Metapb.StoreState old, |
| Metapb.StoreState stats) { |
| log.info("onStoreStatusChanged storeId = {} from {} to {}", store.getId(), old, stats); |
| statusListeners.forEach(e -> { |
| e.onStoreStatusChanged(store, old, stats); |
| }); |
| } |
| |
| protected void onShardGroupStatusChanged(Metapb.ShardGroup group, Metapb.ShardGroup newGroup) { |
| log.info("onShardGroupStatusChanged, groupId: {}, from {} to {}", group.getId(), group, |
| newGroup); |
| shardGroupStatusListeners.forEach(e -> e.onShardListChanged(group, newGroup)); |
| } |
| |
| protected void onShardGroupOp(Metapb.ShardGroup shardGroup) { |
| log.info("onShardGroupOp, group id: {}, shard group:{}", shardGroup.getId(), shardGroup); |
| shardGroupStatusListeners.forEach(e -> e.onShardListOp(shardGroup)); |
| } |
| |
| /** |
| * Check whether the current store can be discontinued |
| * If the number of active machines is less than or equal to the minimum threshold, they |
| * cannot be taken offline |
| * If the number of shards in the partition is not more than half, it cannot be offline |
| */ |
| public boolean checkStoreCanOffline(Metapb.Store currentStore) { |
| try { |
| long currentStoreId = currentStore.getId(); |
| List<Metapb.Store> activeStores = this.getActiveStores(); |
| Map<Long, Metapb.Store> storeMap = new HashMap<>(); |
| activeStores.forEach(store -> { |
| if (store.getId() != currentStoreId) { |
| storeMap.put(store.getId(), store); |
| } |
| }); |
| |
| if (storeMap.size() < pdConfig.getMinStoreCount()) { |
| return false; |
| } |
| |
| // Check whether the number of online shards for each partition is greater than half |
| for (Metapb.ShardGroup group : this.getShardGroups()) { |
| int count = 0; |
| for (Metapb.Shard shard : group.getShardsList()) { |
| long storeId = shard.getStoreId(); |
| count += storeMap.containsKey(storeId) ? 1 : 0; |
| } |
| if (count * 2 < group.getShardsList().size()) { |
| return false; |
| } |
| } |
| } catch (PDException e) { |
| log.error("StoreNodeService checkStoreCanOffline exception {}", e); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Compaction on rocksdb on the store |
| * |
| * @param groupId |
| * @param tableName |
| * @return |
| */ |
| public synchronized void shardGroupsDbCompaction(int groupId, String tableName) throws |
| PDException { |
| |
| // Notify all stores to compaction rocksdb |
| partitionService.fireDbCompaction(groupId, tableName); |
| // TODO How to deal with exceptions? |
| } |
| |
| public Map getQuota() throws PDException { |
| List<Metapb.Graph> graphs = partitionService.getGraphs(); |
| String delimiter = String.valueOf(MetadataKeyHelper.DELIMITER); |
| HashMap<String, Long> storages = new HashMap<>(); |
| for (Metapb.Graph g : graphs) { |
| String graphName = g.getGraphName(); |
| String[] splits = graphName.split(delimiter); |
| if (!graphName.endsWith("/g") || splits.length < 2) { |
| continue; |
| } |
| String graphSpace = splits[0]; |
| storages.putIfAbsent(graphSpace, 0L); |
| List<Metapb.Store> stores = getStores(graphName); |
| long dataSize = 0; |
| for (Metapb.Store store : stores) { |
| List<Metapb.GraphStats> gss = store.getStats() |
| .getGraphStatsList(); |
| for (Metapb.GraphStats gs : gss) { |
| boolean nameEqual = graphName.equals(gs.getGraphName()); |
| boolean roleEqual = Metapb.ShardRole.Leader.equals( |
| gs.getRole()); |
| if (nameEqual && roleEqual) { |
| dataSize += gs.getApproximateSize(); |
| } |
| } |
| } |
| Long size = storages.get(graphSpace); |
| size += dataSize; |
| storages.put(graphSpace, size); |
| |
| } |
| Metapb.GraphSpace.Builder spaceBuilder = Metapb.GraphSpace.newBuilder(); |
| HashMap<String, Boolean> limits = new HashMap<>(); |
| for (Map.Entry<String, Long> item : storages.entrySet()) { |
| String spaceName = item.getKey(); |
| String value = kvService.get(graphSpaceConfPrefix + spaceName); |
| if (!StringUtils.isEmpty(value)) { |
| HashMap config = new Gson().fromJson(value, HashMap.class); |
| Long size = item.getValue(); |
| int limit = ((Double) config.get("storage_limit")).intValue(); |
| long limitByLong = limit * 1024L * 1024L; |
| try { |
| spaceBuilder.setName(spaceName).setStorageLimit(limitByLong).setUsedSize(size); |
| Metapb.GraphSpace graphSpace = spaceBuilder.build(); |
| configService.setGraphSpace(graphSpace); |
| } catch (Exception e) { |
| log.error("update graph space with error:", e); |
| } |
| // KB and GB * 1024L * 1024L |
| if (size > limitByLong) { |
| limits.put(spaceName, true); |
| continue; |
| } |
| } |
| limits.put(spaceName, false); |
| |
| } |
| GraphState.Builder stateBuilder = GraphState.newBuilder() |
| .setMode(GraphMode.ReadOnly) |
| .setReason( |
| GraphModeReason.Quota); |
| for (Metapb.Graph g : graphs) { |
| String graphName = g.getGraphName(); |
| String[] splits = graphName.split(delimiter); |
| if (!graphName.endsWith("/g") || splits.length < 2) { |
| continue; |
| } |
| String graphSpace = splits[0]; |
| Metapb.GraphState gsOld = g.getGraphState(); |
| GraphMode gmOld = gsOld != null ? gsOld.getMode() : GraphMode.ReadWrite; |
| GraphMode gmNew = limits.get( |
| graphSpace) ? GraphMode.ReadOnly : GraphMode.ReadWrite; |
| if (gmOld == null || gmOld.getNumber() != gmNew.getNumber()) { |
| stateBuilder.setMode(gmNew); |
| if (gmNew.getNumber() == GraphMode.ReadOnly.getNumber()) { |
| stateBuilder.setReason(GraphModeReason.Quota); |
| } |
| GraphState gsNew = stateBuilder.build(); |
| Metapb.Graph newGraph = g.toBuilder().setGraphState(gsNew) |
| .build(); |
| partitionService.updateGraph(newGraph); |
| statusListeners.forEach(listener -> { |
| listener.onGraphChange(newGraph, gsOld, gsNew); |
| }); |
| } |
| } |
| |
| return limits; |
| } |
| |
| public Runnable getQuotaChecker() { |
| return quotaChecker; |
| } |
| |
| public TaskInfoMeta getTaskInfoMeta() { |
| return taskInfoMeta; |
| } |
| |
| public StoreInfoMeta getStoreInfoMeta() { |
| return storeInfoMeta; |
| } |
| |
| /** |
| * Get the leader of the partition |
| * |
| * @param partition |
| * @param initIdx |
| * @return |
| */ |
| public Metapb.Shard getLeader(Metapb.Partition partition, int initIdx) { |
| Metapb.Shard leader = null; |
| try { |
| var shardGroup = this.getShardGroup(partition.getId()); |
| for (Metapb.Shard shard : shardGroup.getShardsList()) { |
| if (shard.getRole() == Metapb.ShardRole.Leader) { |
| leader = shard; |
| } |
| } |
| } catch (Exception e) { |
| log.error("get leader error: group id:{}, error: {}", |
| partition.getId(), e.getMessage()); |
| } |
| return leader; |
| } |
| |
| public CacheResponse getCache() throws PDException { |
| |
| List<Metapb.Store> stores = getStores(); |
| List<Metapb.ShardGroup> groups = getShardGroups(); |
| List<Metapb.Graph> graphs = partitionService.getGraphs(); |
| CacheResponse cache = CacheResponse.newBuilder().addAllGraphs(graphs) |
| .addAllShards(groups) |
| .addAllStores(stores) |
| .build(); |
| return cache; |
| } |
| } |