blob: 6ec779d27edb97124ff8590b2196da43015f76d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.pd.common.KVPair;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.common.PartitionUtils;
import org.apache.hugegraph.pd.config.PDConfig;
import org.apache.hugegraph.pd.grpc.MetaTask;
import org.apache.hugegraph.pd.grpc.Metapb;
import org.apache.hugegraph.pd.grpc.Pdpb;
import org.apache.hugegraph.pd.grpc.pulse.ChangeShard;
import org.apache.hugegraph.pd.grpc.pulse.CleanPartition;
import org.apache.hugegraph.pd.grpc.pulse.CleanType;
import org.apache.hugegraph.pd.grpc.pulse.ConfChangeType;
import org.apache.hugegraph.pd.grpc.pulse.DbCompaction;
import org.apache.hugegraph.pd.grpc.pulse.MovePartition;
import org.apache.hugegraph.pd.grpc.pulse.PartitionKeyRange;
import org.apache.hugegraph.pd.grpc.pulse.SplitPartition;
import org.apache.hugegraph.pd.grpc.pulse.TransferLeader;
import org.apache.hugegraph.pd.meta.MetadataFactory;
import org.apache.hugegraph.pd.meta.PartitionMeta;
import org.apache.hugegraph.pd.meta.TaskInfoMeta;
import org.apache.hugegraph.pd.raft.RaftStateListener;
import lombok.extern.slf4j.Slf4j;
/**
* Partition management
*/
@Slf4j
public class PartitionService implements RaftStateListener {
private final long Partition_Version_Skip = 0x0F;
private final StoreNodeService storeService;
private final PartitionMeta partitionMeta;
private final PDConfig pdConfig;
// Partition command listening
private final List<PartitionInstructionListener> instructionListeners;
// Partition status listeners
private final List<PartitionStatusListener> statusListeners;
public PartitionService(PDConfig config, StoreNodeService storeService) {
this.pdConfig = config;
this.storeService = storeService;
partitionMeta = MetadataFactory.newPartitionMeta(config);
instructionListeners =
Collections.synchronizedList(new ArrayList<PartitionInstructionListener>());
statusListeners = Collections.synchronizedList(new ArrayList<PartitionStatusListener>());
}
public void init() throws PDException {
partitionMeta.init();
storeService.addStatusListener(new StoreStatusListener() {
@Override
public void onStoreStatusChanged(Metapb.Store store, Metapb.StoreState old,
Metapb.StoreState status) {
if (status == Metapb.StoreState.Tombstone) {
// When the store is stopped, notify all partitions of the store and migrate
// the data
storeOffline(store);
}
}
@Override
public void onGraphChange(Metapb.Graph graph,
Metapb.GraphState stateOld,
Metapb.GraphState stateNew) {
}
@Override
public void onStoreRaftChanged(Metapb.Store store) {
}
});
}
/**
* return key partition
*
* @param graphName
* @param key
* @return
*/
public Metapb.PartitionShard getPartitionShard(String graphName, byte[] key) throws
PDException {
long code = PartitionUtils.calcHashcode(key);
return getPartitionByCode(graphName, code);
}
/**
* Returns the partition to which it belongs based on the hashcode
*
* @param graphName
* @param code
* @return
*/
public Metapb.PartitionShard getPartitionByCode(String graphName, long code) throws
PDException {
if (code < 0 || code >= PartitionUtils.MAX_VALUE) {
throw new PDException(Pdpb.ErrorType.NOT_FOUND_VALUE, "code error");
}
// Find the partition ID based on the code, and if it doesn't find, create a new partition
Metapb.Partition partition = partitionMeta.getPartitionByCode(graphName, code);
if (partition == null) {
synchronized (this) {
if (partition == null) {
partition = newPartition(graphName, code);
}
}
}
Metapb.PartitionShard partShard = Metapb.PartitionShard.newBuilder()
.setPartition(partition)
.setLeader(storeService.getLeader(
partition, 0))
.build();
log.debug(
"{} Partition get code = {}, partition id = {}, start = {}, end = {}, leader = {}",
graphName, (code), partition.getId(), partition.getStartKey(),
partition.getEndKey(), partShard.getLeader());
return partShard;
}
/**
* Returns partition information based on ID
*
* @param graphName
* @param partId
* @return
* @throws PDException
*/
public Metapb.PartitionShard getPartitionShardById(String graphName, int partId) throws
PDException {
Metapb.Partition partition = partitionMeta.getPartitionById(graphName, partId);
if (partition == null) {
return null;
}
Metapb.PartitionShard partShard = Metapb.PartitionShard.newBuilder()
.setPartition(partition)
.setLeader(storeService.getLeader(
partition, 0))
.build();
return partShard;
}
public Metapb.Partition getPartitionById(String graphName, int partId) throws PDException {
return partitionMeta.getPartitionById(graphName, partId);
}
public List<Metapb.Partition> getPartitionById(int partId) throws PDException {
return partitionMeta.getPartitionById(partId);
}
/**
* Get all partitions of the graph
*/
public List<Metapb.Partition> getPartitions() {
return partitionMeta.getPartitions();
}
public List<Metapb.Partition> getPartitions(String graphName) {
if (StringUtils.isAllEmpty(graphName)) {
return partitionMeta.getPartitions();
}
return partitionMeta.getPartitions(graphName);
}
/**
* Find all the partitions on the store
*
* @param store
* @return
*/
public List<Metapb.Partition> getPartitionByStore(Metapb.Store store) throws PDException {
List<Metapb.Partition> partitions = new ArrayList<>();
getGraphs().forEach(graph -> {
getPartitions(graph.getGraphName()).forEach(partition -> {
try {
storeService.getShardGroup(partition.getId()).getShardsList().forEach(shard -> {
if (shard.getStoreId() == store.getId()) {
partitions.add(partition);
}
});
} catch (PDException e) {
throw new RuntimeException(e);
}
});
});
return partitions;
}
/**
* Creates a new partition
*
* @param graphName
* @return
*/
private Metapb.Partition newPartition(String graphName, long code) throws PDException {
Metapb.Graph graph = partitionMeta.getAndCreateGraph(graphName);
int partitionSize = PartitionUtils.MAX_VALUE / graph.getPartitionCount();
if (PartitionUtils.MAX_VALUE % graph.getPartitionCount() != 0) {
// There is a remainder, and the partition is inexhaustible
partitionSize++;
}
int partitionId = (int) (code / partitionSize);
long startKey = (long) partitionSize * partitionId;
long endKey = (long) partitionSize * (partitionId + 1);
// Check Local
Metapb.Partition partition = partitionMeta.getPartitionById(graphName, partitionId);
if (partition == null) {
storeService.allocShards(null, partitionId);
// Assign a store
partition = Metapb.Partition.newBuilder()
.setId(partitionId)
.setVersion(0)
.setState(Metapb.PartitionState.PState_Normal)
.setStartKey(startKey)
.setEndKey(endKey)
.setGraphName(graphName)
.build();
log.info("Create newPartition {}", partition);
}
partitionMeta.updatePartition(partition);
return partition;
}
/**
* compute graph partition id。partition gap * store group id + offset
*
* @param graph graph
* @param offset offset
* @return new partition id
* @throws PDException
*/
protected int getPartitionId(String graphName, byte[] key) throws PDException {
int code = PartitionUtils.calcHashcode(key);
Metapb.Partition partition = partitionMeta.getPartitionByCode(graphName, code);
return partition != null ? partition.getId() : -1;
}
/**
* Gets all partitions spanned by the key range
* For the time being, hashcode is used for calculation, and the normal practice is to query
* based on the key
*
* @param graphName
* @param startKey
* @param endKey
*/
public List<Metapb.PartitionShard> scanPartitions(String graphName, byte[] startKey,
byte[] endKey)
throws PDException {
int startPartId = getPartitionId(graphName, startKey);
int endPartId = getPartitionId(graphName, endKey);
List<Metapb.PartitionShard> partShards = new ArrayList<>();
for (int id = startPartId; id <= endPartId; id++) {
Metapb.Partition partition = partitionMeta.getPartitionById(graphName, id);
partShards.add(
Metapb.PartitionShard.newBuilder()
.setPartition(partition)
// Here you need to return the correct leader, and
// temporarily default to the first one
.setLeader(storeService.getLeader(partition, 0))
.build()
);
}
return partShards;
}
public synchronized long updatePartition(List<Metapb.Partition> partitions) throws PDException {
for (Metapb.Partition pt : partitions) {
Metapb.Partition oldPt = getPartitionById(pt.getGraphName(), pt.getId());
partitionMeta.updatePartition(pt);
onPartitionChanged(oldPt, pt);
}
return partitions.size();
}
/**
* Update the status of partitions and graphs
*
* @param graph
* @param partId
* @param state
* @throws PDException
*/
public synchronized void updatePartitionState(String graph, int partId,
Metapb.PartitionState state) throws PDException {
Metapb.Partition partition = getPartitionById(graph, partId);
if (partition.getState() != state) {
Metapb.Partition newPartition = partitionMeta.updatePartition(partition.toBuilder()
.setState(state)
.build());
onPartitionChanged(partition, newPartition);
}
}
public synchronized void updateGraphState(String graphName, Metapb.PartitionState state) throws
PDException {
Metapb.Graph graph = getGraph(graphName);
if (graph != null) {
partitionMeta.updateGraph(graph.toBuilder()
.setState(state).build());
}
}
public synchronized long removePartition(String graphName, int partId) throws PDException {
log.info("Partition {}-{} removePartition", graphName, partId);
Metapb.Partition partition = partitionMeta.getPartitionById(graphName, partId);
var ret = partitionMeta.removePartition(graphName, partId);
partitionMeta.reload();
onPartitionRemoved(partition);
try {
Metapb.PartitionState state = Metapb.PartitionState.PState_Normal;
for (Metapb.Partition pt : partitionMeta.getPartitions(partition.getGraphName())) {
if (pt.getState().getNumber() > state.getNumber()) {
state = pt.getState();
}
}
updateGraphState(partition.getGraphName(), state);
state = Metapb.PartitionState.PState_Normal;
for (Metapb.ShardGroup group : storeService.getShardGroups()) {
if (group.getState().getNumber() > state.getNumber()) {
state = group.getState();
}
}
storeService.updateClusterStatus(state);
} catch (PDException e) {
log.error("onPartitionChanged", e);
}
return ret;
}
public Metapb.PartitionStats getPartitionStats(String graphName, int partitionId) throws
PDException {
return partitionMeta.getPartitionStats(graphName, partitionId);
}
/**
* Get the partition status of the graph
*/
public List<Metapb.PartitionStats> getPartitionStatus(String graphName)
throws PDException {
return partitionMeta.getPartitionStats(graphName);
}
/**
* Returns the information of the graph
*/
public List<Metapb.Graph> getGraphs() throws PDException {
return partitionMeta.getGraphs();
}
public Metapb.Graph getGraph(String graphName) throws PDException {
return partitionMeta.getGraph(graphName);
}
/**
* Delete the diagram and all partitions of the diagram
*/
public Metapb.Graph delGraph(String graphName) throws PDException {
log.info("delGraph {}", graphName);
Metapb.Graph graph = getGraph(graphName);
getPartitions(graphName).forEach(partition -> {
onPartitionRemoved(partition);
});
partitionMeta.removeAllPartitions(graphName);
partitionMeta.removeGraph(graphName);
return graph;
}
/**
* To modify the graph information, you need to notify the store
*/
public synchronized Metapb.Graph updateGraph(Metapb.Graph graph) throws PDException {
Metapb.Graph lastGraph = partitionMeta.getAndCreateGraph(graph.getGraphName());
log.info("updateGraph graph: {}, last: {}", graph, lastGraph);
int partCount =
(graph.getGraphName().endsWith("/s") || graph.getGraphName().endsWith("/m")) ?
1 : pdConfig.getPartition().getTotalCount();
// set the partition count to specified if legal.
if (graph.getPartitionCount() <= partCount && graph.getPartitionCount() > 0) {
partCount = graph.getPartitionCount();
}
if (partCount == 0) {
throw new PDException(10010, "update graph error, partition count = 0");
}
graph = lastGraph.toBuilder()
.mergeFrom(graph)
.setPartitionCount(partCount)
.build();
partitionMeta.updateGraph(graph);
// The number of partitions has changed
if (lastGraph.getPartitionCount() != graph.getPartitionCount()) {
log.info("updateGraph graph: {}, partition count changed from {} to {}",
graph.getGraphName(), lastGraph.getPartitionCount(),
graph.getPartitionCount());
}
return graph;
}
// partitionId -> (storeId -> shard committedIndex)
public Map<Integer, Map<Long, Long>> getCommittedIndexStats() throws PDException {
Map<Integer, Map<Long, Long>> map = new HashMap<>();
for (Metapb.Store store : storeService.getActiveStores()) {
for (Metapb.RaftStats raftStats : store.getStats().getRaftStatsList()) {
int partitionID = raftStats.getPartitionId();
if (!map.containsKey(partitionID)) {
map.put(partitionID, new HashMap<>());
}
Map<Long, Long> storeMap = map.get(partitionID);
if (!storeMap.containsKey(store.getId())) {
storeMap.put(store.getId(), raftStats.getCommittedIndex());
}
}
}
return map;
}
/**
* The storage is taken offline and the partition data is migrated
*
* @param store
*/
public void storeOffline(Metapb.Store store) {
try {
log.info("storeOffline store id: {}, address: {}, state: {}",
store.getId(), store.getAddress(), store.getState());
List<Metapb.Partition> partitions = getPartitionByStore(store);
var partIds = new HashSet<Integer>();
for (Metapb.Partition p : partitions) {
if (partIds.contains(p.getId())) {
continue;
}
shardOffline(p, store.getId());
partIds.add(p.getId());
}
} catch (PDException e) {
log.error("storeOffline exception: ", e);
}
}
/**
* The storage is taken offline and the partition data is migrated
*/
public synchronized void shardOffline(Metapb.Partition partition, long storeId) {
try {
log.info("shardOffline Partition {} - {} shardOffline store : {}",
partition.getGraphName(), partition.getId(), storeId);
// partition = getPartitionById(partition.getGraphName(), partition.getId());
// Metapb.Partition.Builder builder = Metapb.Partition.newBuilder(partition);
// builder.clearShards();
// partition.getShardsList().forEach(shard -> {
// if (shard.getStoreId() != storeId)
// builder.addShards(shard);
// });
// partition = builder.build();
Metapb.Graph graph = getGraph(partition.getGraphName());
reallocPartitionShards(graph, partition);
} catch (PDException e) {
log.error("storeOffline exception: ", e);
}
}
private boolean isShardListEquals(List<Metapb.Shard> list1, List<Metapb.Shard> list2) {
if (list1 == list2) {
return true;
} else if (list1 != null && list2 != null) {
var s1 = list1.stream().map(Metapb.Shard::getStoreId).sorted(Long::compare)
.collect(Collectors.toList());
var s2 = list2.stream().map(Metapb.Shard::getStoreId).sorted(Long::compare)
.collect(Collectors.toList());
if (s1.size() == s2.size()) {
for (int i = 0; i < s1.size(); i++) {
if (s1.get(i) != s2.get(i)) {
return false;
}
}
return true;
}
}
return false;
}
/**
* Reassign shards
*
* @param graph
* @param partition
* @throws PDException
*/
public void reallocPartitionShards(Metapb.Graph graph, Metapb.Partition partition) throws
PDException {
if (partition == null) {
return;
}
List<Metapb.Shard> originalShards = storeService.getShardList(partition.getId());
var shardGroup = storeService.getShardGroup(partition.getId());
List<Metapb.Shard> shards = storeService.reallocShards(shardGroup);
if (isShardListEquals(originalShards, shards)) {
log.info("reallocPartitionShards:{} vs {}", shardGroup, shards);
// partition = Metapb.Partition.newBuilder(partition)
// .clearShards().addAllShards(shards)
// .build();
// partitionMeta.updatePartition(partition);
fireChangeShard(partition, shards, ConfChangeType.CONF_CHANGE_TYPE_ADJUST);
}
}
public synchronized void reallocPartitionShards(String graphName, int partitionId) throws
PDException {
reallocPartitionShards(partitionMeta.getGraph(graphName),
partitionMeta.getPartitionById(graphName, partitionId));
}
/**
* Migrate partition copies
*/
public synchronized void movePartitionsShard(Integer partitionId, long fromStore,
long toStore) {
try {
log.info("movePartitionsShard partitionId {} from store {} to store {}", partitionId,
fromStore, toStore);
for (Metapb.Graph graph : getGraphs()) {
Metapb.Partition partition =
this.getPartitionById(graph.getGraphName(), partitionId);
if (partition == null) {
continue;
}
var shardGroup = storeService.getShardGroup(partitionId);
List<Metapb.Shard> shards = new ArrayList<>();
shardGroup.getShardsList().forEach(shard -> {
if (shard.getStoreId() != fromStore) {
shards.add(shard);
}
});
shards.add(Metapb.Shard.newBuilder().setStoreId(toStore)
.setRole(Metapb.ShardRole.Follower).build());
// storeService.updateShardGroup(partitionId, shards, -1, -1);
// storeService.onShardGroupStatusChanged(shardGroup, newShardGroup);
fireChangeShard(partition, shards, ConfChangeType.CONF_CHANGE_TYPE_ADJUST);
// Shard groups have nothing to do with Graph, just one is enough
break;
}
} catch (PDException e) {
log.error("Partition {} movePartitionsShard exception {}", partitionId, e);
}
}
/**
* Split all partitions in the cluster into splits
*
* @param splits Split partitions
*/
public synchronized void splitPartition(List<KVPair<Integer, Integer>> splits) throws
PDException {
var tasks = new HashMap<String, List<KVPair<Integer, Integer>>>();
for (var pair : splits) {
for (var partition : getPartitionById(pair.getKey())) {
if (!tasks.containsKey(partition.getGraphName())) {
tasks.put(partition.getGraphName(), new ArrayList<>());
}
tasks.get(partition.getGraphName()).add(pair);
}
}
for (var entry : tasks.entrySet()) {
splitPartition(getGraph(entry.getKey()), entry.getValue());
}
}
/**
* Partition splitting, splitting a graph into N pieces
*
* @param graph graph
* @param toCount target count
* @throws PDException
*/
public synchronized void splitPartition(Metapb.Graph graph, int toCount) throws PDException {
var partitionCount = getPartitions(graph.getGraphName()).size();
var maxShardsPerStore = pdConfig.getPartition().getMaxShardsPerStore();
var shardCount = pdConfig.getPartition().getShardCount();
if (shardCount * toCount > storeService.getActiveStores().size() * maxShardsPerStore) {
throw new PDException(Pdpb.ErrorType.Too_Many_Partitions_Per_Store_VALUE,
"can't satisfy target shard group count, reached the upper " +
"limit of the cluster");
}
if (toCount % partitionCount != 0 || toCount <= partitionCount) {
throw new PDException(Pdpb.ErrorType.Invalid_Split_Partition_Count_VALUE,
"invalid split partition count, make sure to count is N time of" +
" current partition count");
}
// Since it is an integer multiple,The enrichment factor is toCount / current count
var splitCount = toCount / partitionCount;
var list = new ArrayList<KVPair<Integer, Integer>>();
for (int i = 0; i < partitionCount; i++) {
list.add(new KVPair<>(i, splitCount));
}
splitPartition(graph, list);
}
private synchronized void splitPartition(Metapb.Graph graph,
List<KVPair<Integer, Integer>> splits)
throws PDException {
var taskInfoMeta = storeService.getTaskInfoMeta();
if (taskInfoMeta.scanSplitTask(graph.getGraphName()).size() > 0) {
return;
}
splits.sort(Comparator.comparing(KVPair::getKey));
log.info("split partition, graph: {}, splits:{}", graph, splits);
// Start with the last partition subscript
var i = getPartitions(graph.getGraphName()).size();
for (var pair : splits) {
Metapb.Partition partition =
partitionMeta.getPartitionById(graph.getGraphName(), pair.getKey());
if (partition != null) {
var splitCount = pair.getValue();
long splitLen = (partition.getEndKey() - partition.getStartKey()) / splitCount;
List<Metapb.Partition> newPartitions = new ArrayList<>();
// The first partition is the original partition
newPartitions.add(partition.toBuilder()
.setStartKey(partition.getStartKey())
.setEndKey(partition.getStartKey() + splitLen)
.setId(partition.getId())
.setState(Metapb.PartitionState.PState_Offline)
.build());
int idx = 0;
for (; idx < splitCount - 2; idx++) {
newPartitions.add(partition.toBuilder()
.setStartKey(newPartitions.get(idx).getEndKey())
.setEndKey(newPartitions.get(idx).getEndKey() +
splitLen)
.setId(i)
.setState(Metapb.PartitionState.PState_Offline)
.build());
i += 1;
}
newPartitions.add(partition.toBuilder()
.setStartKey(newPartitions.get(idx).getEndKey())
.setEndKey(partition.getEndKey())
.setId(i)
.setState(Metapb.PartitionState.PState_Offline)
.build());
i += 1;
// try to save new partitions, and repair shard group
for (int j = 0; j < newPartitions.size(); j++) {
var newPartition = newPartitions.get(j);
if (j != 0) {
partitionMeta.updatePartition(newPartition);
}
// Create a shard group, if it is empty, create it according to the shard
// group of the partition, and ensure that it is on one machine
// If it exists, the number of partitions in each graph is not the same, and
// the store side needs to be copied to other machines
var shardGroup = storeService.getShardGroup(newPartition.getId());
if (shardGroup == null) {
shardGroup = storeService.getShardGroup(partition.getId()).toBuilder()
.setId(newPartition.getId())
.build();
storeService.getStoreInfoMeta().updateShardGroup(shardGroup);
updateShardGroupCache(shardGroup);
}
// check shard list
if (shardGroup.getShardsCount() != pdConfig.getPartition().getShardCount()) {
storeService.reallocShards(shardGroup);
}
}
SplitPartition splitPartition = SplitPartition.newBuilder()
.addAllNewPartition(newPartitions)
.build();
fireSplitPartition(partition, splitPartition);
// Change the partition status to Offline, and resume the partition status to
// Offline after the task is completed
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Offline);
// Record transactions
var task = MetaTask.Task.newBuilder().setPartition(partition)
.setSplitPartition(splitPartition)
.build();
taskInfoMeta.addSplitTask(pair.getKey(), task.getPartition(),
task.getSplitPartition());
}
}
}
/**
* transfer leader to other shard 。
* Just transfer a partition
*/
public void transferLeader(Integer partId, Metapb.Shard shard) {
try {
var partitions = getPartitionById(partId);
if (partitions.size() > 0) {
fireTransferLeader(partitions.get(0),
TransferLeader.newBuilder().setShard(shard).build());
}
// for (Metapb.Graph graph : getGraphs()) {
// Metapb.Partition partition = this.getPartitionById(graph.getGraphName(), partId);
// if (partition != null) {
// fireTransferLeader(partition, TransferLeader.newBuilder().setShard(shard)
// .build());
// }
// }
} catch (PDException e) {
log.error("Partition {} transferLeader exception {}", partId, e);
}
}
/**
* // todo : Check the corresponding store group and check the logic
* Partition merging: Merges the number of partitions in the entire cluster into toCount
*
* @param toCount The number of partitions to be targeted
* @throws PDException when query errors
*/
public void combinePartition(int toCount) throws PDException {
int shardsTotalCount = getShardGroupCount();
for (var graph : getGraphs()) {
// All graphs larger than the toCount partition are scaled in
if (graph.getPartitionCount() > toCount) {
combineGraphPartition(graph, toCount, shardsTotalCount);
}
}
}
/**
* For a single graph, perform partition merging
*
* @param graphName the name of the graph
* @param toCount the target partition count
* @throws PDException when query errors
*/
public void combineGraphPartition(String graphName, int toCount) throws PDException {
combineGraphPartition(getGraph(graphName), toCount, getShardGroupCount());
}
/**
* Internal implementation of single-graph merging
*
* @param graph the name of the graph
* @param toCount the target partition count
* @param shardCount the shard count of the clusters
* @throws PDException when query errors
*/
private synchronized void combineGraphPartition(Metapb.Graph graph, int toCount, int shardCount)
throws PDException {
if (graph == null) {
throw new PDException(1,
"Graph not exists, try to use full graph name, like " +
"/DEFAULT/GRAPH_NAME/g");
}
log.info("Combine graph {} partition, from {}, to {}, with shard count:{}",
graph.getGraphName(), graph.getPartitionCount(), toCount, shardCount);
if (!checkTargetCount(graph.getPartitionCount(), toCount, shardCount)) {
log.error("Combine partition, illegal toCount:{}, graph:{}", toCount,
graph.getGraphName());
throw new PDException(2,
"illegal partition toCount, should between 1 ~ shard group " +
"count and " +
" can be dived by shard group count");
}
var taskInfoMeta = storeService.getTaskInfoMeta();
if (taskInfoMeta.scanMoveTask(graph.getGraphName()).size() > 0) {
throw new PDException(3, "Graph Combine process exists");
}
// According to key start sort
var partitions = getPartitions(graph.getGraphName()).stream()
.sorted(Comparator.comparing(
Metapb.Partition::getStartKey))
.collect(Collectors.toList());
// Partition numbers do not have to be sequential
var sortPartitions = getPartitions(graph.getGraphName())
.stream()
.sorted(Comparator.comparing(Metapb.Partition::getId))
.collect(Collectors.toList());
var groupSize = partitions.size() / toCount; // merge group size
// 0~12 to 4 partitions
// scheme:0,1,2 => 0, 3,4,5 -> 1, 6,7,8 ->2, 9,10,11 -> 3
// Ensure the continuity of partitions
for (int i = 0; i < toCount; i++) {
var startKey = partitions.get(i * groupSize).getStartKey();
var endKey = partitions.get(i * groupSize + groupSize - 1).getEndKey();
// compose the key range
// the start key and end key should be changed if combine success.
var targetPartition = Metapb.Partition.newBuilder(sortPartitions.get(i))
.setStartKey(startKey)
.setEndKey(endKey)
.build();
for (int j = 0; j < groupSize; j++) {
var partition = partitions.get(i * groupSize + j);
// If the partition ID is the same, skip it
if (i == partition.getId()) {
continue;
}
log.info("combine partition of graph :{}, from part id {} to {}",
partition.getGraphName(),
partition.getId(), targetPartition.getId());
MovePartition movePartition = MovePartition.newBuilder()
.setTargetPartition(targetPartition)
.setKeyStart(partition.getStartKey())
.setKeyEnd(partition.getEndKey())
.build();
taskInfoMeta.addMovePartitionTask(partition, movePartition);
// source is offline
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Offline);
fireMovePartition(partition, movePartition);
}
// target offline
updatePartitionState(targetPartition.getGraphName(), targetPartition.getId(),
Metapb.PartitionState.PState_Offline);
}
storeService.updateClusterStatus(Metapb.ClusterState.Cluster_Offline);
}
/**
* get raft group count from storeService
*
* @return the count of raft groups
*/
private int getShardGroupCount() {
try {
return Optional.ofNullable(storeService.getShardGroups()).orElseGet(ArrayList::new)
.size();
} catch (PDException e) {
log.error("get shard group failed, error: {}", e);
}
return 0;
}
/**
* Determine whether the graph partition can be retrieved from f to t
*
* @param fromCount The number of partitions now
* @param toCount The number of partitions to be targeted
* @return true when available , or otherwise
*/
private boolean checkTargetCount(int fromCount, int toCount, int shardCount) {
// It should be between 1 ~ N and divisible
return toCount >= 1 && toCount < fromCount && fromCount % toCount == 0 &&
toCount < shardCount;
}
/**
* Process partition heartbeats and record leader information
* Check the term and version to see if it's the latest message
*
* @param stats
*/
public void partitionHeartbeat(Metapb.PartitionStats stats) throws PDException {
Metapb.ShardGroup shardGroup = storeService.getShardGroup(stats.getId());
// shard group version changes or leader changes
// (The shard group is controlled by the PD, and there may be brief inconsistencies after
// operations such as splitting, subject to PD)
// store Upload the final one raft group data
if (shardGroup != null &&
(shardGroup.getVersion() < stats.getLeaderTerm() ||
shardGroup.getConfVer() < stats.getConfVer())) {
storeService.updateShardGroup(stats.getId(),
stats.getShardList(), stats.getLeaderTerm(),
stats.getConfVer());
}
List<Metapb.Partition> partitions = getPartitionById(stats.getId());
for (Metapb.Partition partition : partitions) {
// partitionMeta.getAndCreateGraph(partition.getGraphName());
checkShardState(partition, stats);
}
// statistics
partitionMeta.updatePartitionStats(stats.toBuilder()
.setTimestamp(System.currentTimeMillis()).build());
}
/**
* Check the shard status, offline shard affects the partition status
*
* @param stats
*/
private void checkShardState(Metapb.Partition partition, Metapb.PartitionStats stats) {
try {
int offCount = 0;
for (Metapb.ShardStats shard : stats.getShardStatsList()) {
if (shard.getState() == Metapb.ShardState.SState_Offline) {
offCount++;
}
}
if (partition.getState() != Metapb.PartitionState.PState_Offline) {
if (offCount == 0) {
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Normal);
} else if (offCount * 2 < stats.getShardStatsCount()) {
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Warn);
} else {
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Warn);
}
}
} catch (Exception e) {
log.error("Partition {}-{} checkShardState exception {}",
partition.getGraphName(), partition.getId(), e);
}
}
public void addInstructionListener(PartitionInstructionListener event) {
instructionListeners.add(event);
}
public void addStatusListener(PartitionStatusListener listener) {
statusListeners.add(listener);
}
/**
* Initiates the Change Shard command
*
* @param changeType
*/
protected void fireChangeShard(Metapb.Partition partition, List<Metapb.Shard> shards,
ConfChangeType changeType) {
log.info("fireChangeShard partition: {}-{}, changeType:{} {}", partition.getGraphName(),
partition.getId(), changeType, shards);
instructionListeners.forEach(cmd -> {
try {
cmd.changeShard(partition, ChangeShard.newBuilder()
.addAllShard(shards).setChangeType(changeType)
.build());
} catch (Exception e) {
log.error("fireChangeShard", e);
}
});
}
public void changeShard(int groupId, List<Metapb.Shard> shards) throws PDException {
var partitions = getPartitionById(groupId);
if (partitions.size() == 0) {
return;
}
fireChangeShard(partitions.get(0), shards, ConfChangeType.CONF_CHANGE_TYPE_ADJUST);
}
/**
* Send a partition split message
*
* @param partition
*/
protected void fireSplitPartition(Metapb.Partition partition, SplitPartition splitPartition) {
log.info("fireSplitPartition partition: {}-{}, split :{}",
partition.getGraphName(), partition.getId(), splitPartition);
instructionListeners.forEach(cmd -> {
try {
cmd.splitPartition(partition, splitPartition);
} catch (Exception e) {
log.error("fireSplitPartition", e);
}
});
}
/**
* Send a Leader Switchover message
*/
protected void fireTransferLeader(Metapb.Partition partition, TransferLeader transferLeader) {
log.info("fireTransferLeader partition: {}-{}, leader :{}",
partition.getGraphName(), partition.getId(), transferLeader);
instructionListeners.forEach(cmd -> {
try {
cmd.transferLeader(partition, transferLeader);
} catch (Exception e) {
log.error("fireSplitPartition", e);
}
});
}
/**
* Send a message to the partition to move data
*
* @param partition Original partition
* @param movePartition Target partition,contains key range
*/
protected void fireMovePartition(Metapb.Partition partition, MovePartition movePartition) {
log.info("fireMovePartition partition: {} -> {}",
partition, movePartition);
instructionListeners.forEach(cmd -> {
try {
cmd.movePartition(partition, movePartition);
} catch (Exception e) {
log.error("fireMovePartition", e);
}
});
}
protected void fireCleanPartition(Metapb.Partition partition, CleanPartition cleanPartition) {
log.info("fireCleanPartition partition: {} -> just keep : {}->{}",
partition.getId(), cleanPartition.getKeyStart(), cleanPartition.getKeyEnd());
instructionListeners.forEach(cmd -> {
try {
cmd.cleanPartition(partition, cleanPartition);
} catch (Exception e) {
log.error("cleanPartition", e);
}
});
}
protected void fireChangePartitionKeyRange(Metapb.Partition partition,
PartitionKeyRange partitionKeyRange) {
log.info("fireChangePartitionKeyRange partition: {}-{} -> key range {}",
partition.getGraphName(), partition.getId(), partitionKeyRange);
instructionListeners.forEach(cmd -> {
try {
cmd.changePartitionKeyRange(partition, partitionKeyRange);
} catch (Exception e) {
log.error("cleanPartition", e);
}
});
}
/**
* Handle graph migration tasks
*
* @param task
*/
public synchronized void handleMoveTask(MetaTask.Task task) throws PDException {
var taskInfoMeta = storeService.getTaskInfoMeta();
var partition = task.getPartition();
var movePartition = task.getMovePartition();
MetaTask.Task pdMetaTask = taskInfoMeta.getMovePartitionTask(partition.getGraphName(),
movePartition.getTargetPartition()
.getId(),
partition.getId());
log.info("report move task, graph:{}, pid : {}->{}, state: {}",
task.getPartition().getGraphName(),
task.getPartition().getId(), task.getMovePartition().getTargetPartition().getId(),
task.getState());
// HAS BEEN PROCESSED(There is it in front)
if (pdMetaTask != null) {
var newTask = pdMetaTask.toBuilder().setState(task.getState()).build();
taskInfoMeta.updateMovePartitionTask(newTask);
List<MetaTask.Task> subTasks = taskInfoMeta.scanMoveTask(partition.getGraphName());
var finished = subTasks.stream().allMatch(t ->
t.getState() ==
MetaTask.TaskState.Task_Success ||
t.getState() ==
MetaTask.TaskState.Task_Failure);
if (finished) {
var allSuccess = subTasks.stream().allMatch(
t -> t.getState() == MetaTask.TaskState.Task_Success);
if (allSuccess) {
log.info("graph:{} combine task all success!", partition.getGraphName());
handleMoveTaskAllSuccess(subTasks, partition.getGraphName(), taskInfoMeta);
} else {
log.info("graph:{} combine task failed!", partition.getGraphName());
handleMoveTaskIfFailed(partition.getGraphName(), taskInfoMeta);
}
}
}
}
/**
* When all migration subtasks succeed:
* 1. Send cleanup source partition directives
* 2. Set up target online,renewal key range, renewal graph partition count
* 3. delete move task,mission ended
*
* @param subTasks all move sub tasks
* @param graphName graph name
* @param taskInfoMeta task info meta
* @throws PDException returns if write db failed
*/
private void handleMoveTaskAllSuccess(List<MetaTask.Task> subTasks, String graphName,
TaskInfoMeta taskInfoMeta) throws PDException {
var targetPartitionIds = new HashSet<Integer>();
var targetPartitions = new ArrayList<Metapb.Partition>();
var deleteFlags =
subTasks.stream().map(task -> task.getMovePartition().getTargetPartition().getId())
.collect(Collectors.toSet());
for (MetaTask.Task subTask : subTasks) {
var source = subTask.getPartition();
var targetPartition = subTask.getMovePartition().getTargetPartition();
// Whether it has been dealt with or not
if (!targetPartitionIds.contains(targetPartition.getId())) {
// renewal range
var old = getPartitionById(targetPartition.getGraphName(), targetPartition.getId());
var newPartition = Metapb.Partition.newBuilder(old)
.setStartKey(targetPartition.getStartKey())
.setEndKey(targetPartition.getEndKey())
.setState(Metapb.PartitionState.PState_Normal)
.build();
// Update before the key range to avoid the problem that the store does not have
// a partition and needs to be queried to the pd
updatePartition(List.of(newPartition));
targetPartitions.add(newPartition);
// Send key range change messages
PartitionKeyRange partitionKeyRange = PartitionKeyRange.newBuilder()
.setPartitionId(old.getId())
.setKeyStart(
targetPartition.getStartKey())
.setKeyEnd(
targetPartition.getEndKey())
.build();
// Notice store
fireChangePartitionKeyRange(
old.toBuilder().setState(Metapb.PartitionState.PState_Normal).build(),
partitionKeyRange);
// Set Target to go live. source could theoretically be deleted, so it is not
// processed
updatePartitionState(newPartition.getGraphName(), newPartition.getId(),
Metapb.PartitionState.PState_Normal);
targetPartitionIds.add(targetPartition.getId());
}
CleanPartition cleanPartition = CleanPartition.newBuilder()
.setKeyStart(source.getStartKey())
.setKeyEnd(source.getEndKey())
.setCleanType(
CleanType.CLEAN_TYPE_EXCLUDE_RANGE)
// The partition of the target only
// needs to clean up the data, and does
// not need to delete the partition
.setDeletePartition(!deleteFlags.contains(
source.getId()))
.build();
log.info("pd clean data: {}-{}, key range:{}-{}, type:{}, delete partition:{}",
source.getGraphName(),
source.getId(),
cleanPartition.getKeyStart(),
cleanPartition.getKeyEnd(),
CleanType.CLEAN_TYPE_EXCLUDE_RANGE,
cleanPartition.getDeletePartition());
// Clean up the data of the partition to be moved
fireCleanPartition(source, cleanPartition);
}
// renewal key range, Local updates,client renewal
// updatePartition(targetPartitions);
// renewal target Partition status, source may be deleted, so do not process
targetPartitions.forEach(p -> {
try {
updatePartitionState(p.getGraphName(), p.getId(),
Metapb.PartitionState.PState_Normal);
} catch (PDException e) {
throw new RuntimeException(e);
}
});
partitionMeta.reload();
// renewal graph partition count
var graph = getGraph(graphName).toBuilder()
.setPartitionCount(targetPartitionIds.size())
.build();
updateGraph(graph);
// The transaction is complete
taskInfoMeta.removeMoveTaskPrefix(graphName);
}
/**
* If the scale-in task fails, roll back the merge operation
* 1. Clean up the original target partition and delete the migrated data
* 2. Set the source/target partition to go live
* 3. Delete the task, and the task ends
*
* @param graphName graph name
* @param taskInfoMeta task info meta
* @throws PDException return if write to db failed
*/
private void handleMoveTaskIfFailed(String graphName, TaskInfoMeta taskInfoMeta) throws
PDException {
// Send cleanup target partition tasks,rollback target partition
var targetPartitionIds = new HashSet<Integer>();
for (var metaTask : taskInfoMeta.scanMoveTask(graphName)) {
var source = metaTask.getPartition();
// Set source to upline
updatePartitionState(source.getGraphName(), source.getId(),
Metapb.PartitionState.PState_Normal);
var movedPartition = metaTask.getMovePartition().getTargetPartition();
if (targetPartitionIds.contains(movedPartition.getId())) {
continue;
}
var targetPartition = getPartitionById(graphName, movedPartition.getId());
CleanPartition cleanPartition = CleanPartition.newBuilder()
.setKeyStart(
targetPartition.getStartKey())
.setKeyEnd(targetPartition.getEndKey())
.setCleanType(
CleanType.CLEAN_TYPE_KEEP_RANGE)
.setDeletePartition(false)
.build();
fireCleanPartition(targetPartition, cleanPartition);
targetPartitionIds.add(targetPartition.getId());
// Set Target online
updatePartitionState(targetPartition.getGraphName(), targetPartition.getId(),
Metapb.PartitionState.PState_Normal);
}
// Clean up the task list
taskInfoMeta.removeMoveTaskPrefix(graphName);
}
/**
* dispose clean task
*
* @param task clean task
*/
public void handleCleanPartitionTask(MetaTask.Task task) {
log.info("clean task {} -{}, key range:{}~{}, report: {}",
task.getPartition().getGraphName(),
task.getPartition().getId(),
task.getCleanPartition().getKeyStart(),
task.getCleanPartition().getKeyEnd(),
task.getState()
);
// If it fails, try again?
}
public synchronized void handleSplitTask(MetaTask.Task task) throws PDException {
var taskInfoMeta = storeService.getTaskInfoMeta();
var partition = task.getPartition();
MetaTask.Task pdMetaTask =
taskInfoMeta.getSplitTask(partition.getGraphName(), partition.getId());
log.info("report split task, graph:{}, pid : {}, state: {}",
task.getPartition().getGraphName(),
task.getPartition().getId(), task.getState());
if (pdMetaTask != null) {
var newTask = pdMetaTask.toBuilder().setState(task.getState()).build();
taskInfoMeta.updateSplitTask(newTask);
List<MetaTask.Task> subTasks = taskInfoMeta.scanSplitTask(partition.getGraphName());
var finished = subTasks.stream().allMatch(t ->
t.getState() ==
MetaTask.TaskState.Task_Success ||
t.getState() ==
MetaTask.TaskState.Task_Failure);
if (finished) {
var allSuccess = subTasks.stream().allMatch(
t -> t.getState() == MetaTask.TaskState.Task_Success);
if (allSuccess) {
log.info("graph:{} split task all success!", partition.getGraphName());
handleSplitTaskAllSuccess(subTasks, partition.getGraphName(), taskInfoMeta);
} else {
handleSplitTaskIfFailed(subTasks, partition.getGraphName(), taskInfoMeta);
}
}
}
}
private void handleSplitTaskAllSuccess(List<MetaTask.Task> subTasks, String graphName,
TaskInfoMeta taskInfoMeta)
throws PDException {
int addedPartitions = 0;
var partitions = new ArrayList<Metapb.Partition>();
for (MetaTask.Task subTask : subTasks) {
var source = subTask.getPartition();
var newPartition = subTask.getSplitPartition().getNewPartitionList().get(0);
// Send key range change messages
PartitionKeyRange partitionKeyRange = PartitionKeyRange.newBuilder()
.setPartitionId(source.getId())
.setKeyStart(
newPartition.getStartKey())
.setKeyEnd(
newPartition.getEndKey())
.build();
// Notice store
fireChangePartitionKeyRange(source, partitionKeyRange);
// Set Target to go live. source could theoretically be deleted, so it is not processed
CleanPartition cleanPartition = CleanPartition.newBuilder()
.setKeyStart(newPartition.getStartKey())
.setKeyEnd(newPartition.getEndKey())
.setCleanType(
CleanType.CLEAN_TYPE_KEEP_RANGE)
// The partition of the target only
// needs to clean up the data, and does
// not need to delete the partition
.setDeletePartition(false)
.build();
log.info("pd clean data: {}-{}, key range:{}-{}, type:{}, delete partition:{}",
source.getGraphName(),
source.getId(),
cleanPartition.getKeyStart(),
cleanPartition.getKeyEnd(),
CleanType.CLEAN_TYPE_EXCLUDE_RANGE,
cleanPartition.getDeletePartition());
fireCleanPartition(source, cleanPartition);
// renewal partition state
for (var sp : subTask.getSplitPartition().getNewPartitionList()) {
partitions.add(
sp.toBuilder().setState(Metapb.PartitionState.PState_Normal).build());
}
addedPartitions += subTask.getSplitPartition().getNewPartitionCount() - 1;
}
updatePartition(partitions);
partitionMeta.reload();
var graph = getGraph(graphName);
// set partition count
if (pdConfig.getConfigService().getPartitionCount() !=
storeService.getShardGroups().size()) {
pdConfig.getConfigService().setPartitionCount(storeService.getShardGroups().size());
log.info("set the partition count of config server to {}",
storeService.getShardGroups().size());
}
// renewal graph partition count
var newGraph = graph.toBuilder()
.setPartitionCount(graph.getPartitionCount() + addedPartitions)
.build();
updateGraph(newGraph);
// The transaction is complete
taskInfoMeta.removeSplitTaskPrefix(graphName);
}
private void handleSplitTaskIfFailed(List<MetaTask.Task> subTasks, String graphName,
TaskInfoMeta taskInfoMeta)
throws PDException {
for (var metaTask : subTasks) {
var splitPartitions = metaTask.getSplitPartition().getNewPartitionList();
for (int i = 1; i < splitPartitions.size(); i++) {
var split = splitPartitions.get(i);
CleanPartition cleanPartition = CleanPartition.newBuilder()
.setKeyStart(split.getStartKey())
.setKeyEnd(split.getEndKey())
.setCleanType(
CleanType.CLEAN_TYPE_EXCLUDE_RANGE)
.setDeletePartition(true)
.build();
fireCleanPartition(split, cleanPartition);
}
// set partition state normal
var partition = metaTask.getPartition();
updatePartitionState(partition.getGraphName(), partition.getId(),
Metapb.PartitionState.PState_Normal);
}
// Clean up the task list
taskInfoMeta.removeSplitTaskPrefix(graphName);
}
/**
* todo : What is the impact of partition changes??
* Received a message that the leader has changed
* Update the status of the graph and trigger a partition change
*/
protected void onPartitionChanged(Metapb.Partition old, Metapb.Partition partition) {
log.info("onPartitionChanged partition: {}", partition);
if (old != null && old.getState() != partition.getState()) {
Metapb.PartitionState state = Metapb.PartitionState.PState_Normal;
for (Metapb.Partition pt : partitionMeta.getPartitions(partition.getGraphName())) {
if (pt.getState().getNumber() > state.getNumber()) {
state = pt.getState();
}
}
try {
updateGraphState(partition.getGraphName(), state);
} catch (PDException e) {
log.error("onPartitionChanged", e);
}
}
statusListeners.forEach(e -> {
e.onPartitionChanged(old, partition);
});
}
protected void onPartitionRemoved(Metapb.Partition partition) {
log.info("onPartitionRemoved partition: {}", partition);
statusListeners.forEach(e -> {
e.onPartitionRemoved(partition);
});
}
/**
* The leader of the PD has changed and the data needs to be reloaded
*/
@Override
public void onRaftLeaderChanged() {
log.info("Partition service reload cache from rocksdb, due to leader change");
try {
partitionMeta.reload();
} catch (PDException e) {
log.error("Partition meta reload exception {}", e);
}
}
public void onPartitionStateChanged(String graph, int partId,
Metapb.PartitionState state) throws PDException {
updatePartitionState(graph, partId, state);
}
public void onShardStateChanged(String graph, int partId, Metapb.PartitionState state) {
}
/**
* Send rocksdb compaction message
*
* @param partId
* @param tableName
*/
public void fireDbCompaction(int partId, String tableName) {
try {
for (Metapb.Graph graph : getGraphs()) {
Metapb.Partition partition =
partitionMeta.getPartitionById(graph.getGraphName(), partId);
DbCompaction dbCompaction = DbCompaction.newBuilder()
.setTableName(tableName)
.build();
instructionListeners.forEach(cmd -> {
try {
cmd.dbCompaction(partition, dbCompaction);
} catch (Exception e) {
log.error("firedbCompaction", e);
}
});
}
} catch (PDException e) {
e.printStackTrace();
}
}
public void updateShardGroupCache(Metapb.ShardGroup group) {
partitionMeta.getPartitionCache().updateShardGroup(group);
}
}