blob: 9b98cbbb0332530969ed17799da32709d322d9ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.common;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hugegraph.pd.grpc.Metapb;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
/**
* abandon copy on write way
* 1. When the number of graph * partitions is extremely large, the efficiency is severely
* reduced and cannot be used
*/
public class PartitionCache {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Map<String, AtomicBoolean> locks = new HashMap<>();
Lock writeLock = readWriteLock.writeLock();
// One cache per graph
private volatile Map<String, RangeMap<Long, Integer>> keyToPartIdCache;
// graphName + PartitionID
private volatile Map<String, Map<Integer, Metapb.Partition>> partitionCache;
private volatile Map<Integer, Metapb.ShardGroup> shardGroupCache;
private volatile Map<Long, Metapb.Store> storeCache;
private volatile Map<String, Metapb.Graph> graphCache;
public PartitionCache() {
keyToPartIdCache = new HashMap<>();
partitionCache = new HashMap<>();
shardGroupCache = new ConcurrentHashMap<>();
storeCache = new ConcurrentHashMap<>();
graphCache = new ConcurrentHashMap<>();
}
private AtomicBoolean getOrCreateGraphLock(String graphName) {
var lock = this.locks.get(graphName);
if (lock == null) {
try {
writeLock.lock();
if ((lock = this.locks.get(graphName)) == null) {
lock = new AtomicBoolean();
locks.put(graphName, lock);
}
} finally {
writeLock.unlock();
}
}
return lock;
}
public void waitGraphLock(String graphName) {
var lock = getOrCreateGraphLock(graphName);
while (lock.get()) {
Thread.onSpinWait();
}
}
public void lockGraph(String graphName) {
var lock = getOrCreateGraphLock(graphName);
while (lock.compareAndSet(false, true)) {
Thread.onSpinWait();
}
}
public void unlockGraph(String graphName) {
var lock = getOrCreateGraphLock(graphName);
lock.set(false);
}
/**
* Returns partition information based on partitionId
*
* @param graphName
* @param partId
* @return
*/
public KVPair<Metapb.Partition, Metapb.Shard> getPartitionById(String graphName, int partId) {
waitGraphLock(graphName);
var graphs = partitionCache.get(graphName);
if (graphs != null) {
var partition = graphs.get(partId);
if (partition != null) {
return new KVPair<>(partition, getLeaderShard(partId));
}
}
return null;
}
/**
* Returns the partition information where the key is located
*
* @param key
* @return
*/
public KVPair<Metapb.Partition, Metapb.Shard> getPartitionByKey(String graphName, byte[] key) {
int code = PartitionUtils.calcHashcode(key);
return getPartitionByCode(graphName, code);
}
/**
* Returns partition information based on the hashcode of the key
*
* @param graphName
* @param code
* @return
*/
public KVPair<Metapb.Partition, Metapb.Shard> getPartitionByCode(String graphName, long code) {
waitGraphLock(graphName);
RangeMap<Long, Integer> rangeMap = keyToPartIdCache.get(graphName);
if (rangeMap != null) {
Integer partId = rangeMap.get(code);
if (partId != null) {
return getPartitionById(graphName, partId);
}
}
return null;
}
public List<Metapb.Partition> getPartitions(String graphName) {
waitGraphLock(graphName);
List<Metapb.Partition> partitions = new ArrayList<>();
if (!partitionCache.containsKey(graphName)) {
return partitions;
}
partitionCache.get(graphName).forEach((k, v) -> {
partitions.add(v);
});
return partitions;
}
public boolean addPartition(String graphName, int partId, Metapb.Partition partition) {
waitGraphLock(graphName);
Metapb.Partition old = null;
if (partitionCache.containsKey(graphName)) {
old = partitionCache.get(graphName).get(partId);
}
if (old != null && old.equals(partition)) {
return false;
}
try {
lockGraph(graphName);
partitionCache.computeIfAbsent(graphName, k -> new HashMap<>()).put(partId, partition);
if (old != null) {
// old [1-3] is covered by [2-3]. When [1-3) becomes [1-2], the original [1-3]
// should not be deleted
// When you confirm that the old start and end are your own, you can delete the
// old ones. (i.e. not covered yet)
var graphRange = keyToPartIdCache.get(graphName);
if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) &&
Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) {
graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey());
}
}
keyToPartIdCache.computeIfAbsent(graphName, k -> TreeRangeMap.create())
.put(Range.closedOpen(partition.getStartKey(),
partition.getEndKey()), partId);
} finally {
unlockGraph(graphName);
}
return true;
}
public void updatePartition(String graphName, int partId, Metapb.Partition partition) {
try {
lockGraph(graphName);
Metapb.Partition old = null;
var graphs = partitionCache.get(graphName);
if (graphs != null) {
old = graphs.get(partId);
}
if (old != null) {
var graphRange = keyToPartIdCache.get(graphName);
if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) &&
Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) {
graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey());
}
}
partitionCache.computeIfAbsent(graphName, k -> new HashMap<>()).put(partId, partition);
keyToPartIdCache.computeIfAbsent(graphName, k -> TreeRangeMap.create())
.put(Range.closedOpen(partition.getStartKey(), partition.getEndKey()),
partId);
} finally {
unlockGraph(graphName);
}
}
public boolean updatePartition(Metapb.Partition partition) {
var graphName = partition.getGraphName();
var partitionId = partition.getId();
var old = getPartitionById(graphName, partitionId);
if (old != null && Objects.equals(partition, old.getKey())) {
return false;
}
updatePartition(graphName, partitionId, partition);
return true;
}
public void removePartition(String graphName, int partId) {
try {
lockGraph(graphName);
var partition = partitionCache.get(graphName).remove(partId);
if (partition != null) {
var graphRange = keyToPartIdCache.get(graphName);
if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) &&
Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) {
graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey());
}
}
} finally {
unlockGraph(graphName);
}
}
/**
* remove partition id of graph name
*
* @param graphName
* @param id
*/
public void remove(String graphName, int id) {
removePartition(graphName, id);
}
/**
* remove all partitions
*/
public void removePartitions() {
writeLock.lock();
try {
partitionCache = new HashMap<>();
keyToPartIdCache = new HashMap<>();
locks.clear();
} finally {
writeLock.unlock();
}
}
/**
* remove partition cache of graphName
*
* @param graphName
*/
public void removeAll(String graphName) {
try {
lockGraph(graphName);
partitionCache.remove(graphName);
keyToPartIdCache.remove(graphName);
locks.remove(graphName);
} finally {
unlockGraph(graphName);
}
}
private String makePartitionKey(String graphName, int partId) {
return graphName + "/" + partId;
}
public boolean updateShardGroup(Metapb.ShardGroup shardGroup) {
Metapb.ShardGroup oldShardGroup = shardGroupCache.get(shardGroup.getId());
if (oldShardGroup != null && oldShardGroup.equals(shardGroup)) {
return false;
}
shardGroupCache.put(shardGroup.getId(), shardGroup);
return true;
}
public void deleteShardGroup(int shardGroupId) {
shardGroupCache.remove(shardGroupId);
}
public Metapb.ShardGroup getShardGroup(int groupId) {
return shardGroupCache.get(groupId);
}
public boolean addStore(Long storeId, Metapb.Store store) {
Metapb.Store oldStore = storeCache.get(storeId);
if (oldStore != null && oldStore.equals(store)) {
return false;
}
storeCache.put(storeId, store);
return true;
}
public Metapb.Store getStoreById(Long storeId) {
return storeCache.get(storeId);
}
public void removeStore(Long storeId) {
storeCache.remove(storeId);
}
public boolean hasGraph(String graphName) {
return getPartitions(graphName).size() > 0;
}
public void updateGraph(Metapb.Graph graph) {
if (Objects.equals(graph, getGraph(graph.getGraphName()))) {
return;
}
graphCache.put(graph.getGraphName(), graph);
}
public Metapb.Graph getGraph(String graphName) {
return graphCache.get(graphName);
}
public List<Metapb.Graph> getGraphs() {
List<Metapb.Graph> graphs = new ArrayList<>();
graphCache.forEach((k, v) -> {
graphs.add(v);
});
return graphs;
}
public void reset() {
writeLock.lock();
try {
partitionCache = new HashMap<>();
keyToPartIdCache = new HashMap<>();
shardGroupCache = new ConcurrentHashMap<>();
storeCache = new ConcurrentHashMap<>();
graphCache = new ConcurrentHashMap<>();
locks.clear();
} finally {
writeLock.unlock();
}
}
public void clear() {
reset();
}
public String debugCacheByGraphName(String graphName) {
StringBuilder builder = new StringBuilder();
builder.append("Graph:").append(graphName).append(", cache info: range info: {");
var rangeMap = keyToPartIdCache.get(graphName);
builder.append(rangeMap == null ? "" : rangeMap).append("}");
if (rangeMap != null) {
builder.append(", partition info : {");
rangeMap.asMapOfRanges().forEach((k, v) -> {
var partition = partitionCache.get(graphName).get(v);
builder.append("[part_id:").append(v);
if (partition != null) {
builder.append(", start_key:").append(partition.getStartKey())
.append(", end_key:").append(partition.getEndKey())
.append(", state:").append(partition.getState().name());
}
builder.append("], ");
});
builder.append("}");
}
builder.append(", graph info:{");
var graph = graphCache.get(graphName);
if (graph != null) {
builder.append("partition_count:").append(graph.getPartitionCount())
.append(", state:").append(graph.getState().name());
}
builder.append("}]");
return builder.toString();
}
public Metapb.Shard getLeaderShard(int partitionId) {
var shardGroup = shardGroupCache.get(partitionId);
if (shardGroup != null) {
for (Metapb.Shard shard : shardGroup.getShardsList()) {
if (shard.getRole() == Metapb.ShardRole.Leader) {
return shard;
}
}
}
return null;
}
public void updateShardGroupLeader(int partitionId, Metapb.Shard leader) {
if (shardGroupCache.containsKey(partitionId) && leader != null) {
if (!Objects.equals(getLeaderShard(partitionId), leader)) {
var shardGroup = shardGroupCache.get(partitionId);
var builder = Metapb.ShardGroup.newBuilder(shardGroup).clearShards();
for (var shard : shardGroup.getShardsList()) {
builder.addShards(
Metapb.Shard.newBuilder()
.setStoreId(shard.getStoreId())
.setRole(shard.getStoreId() == leader.getStoreId() ?
Metapb.ShardRole.Leader :
Metapb.ShardRole.Follower)
.build()
);
}
shardGroupCache.put(partitionId, builder.build());
}
}
}
public String debugShardGroup() {
StringBuilder builder = new StringBuilder();
builder.append("shard group cache:{");
shardGroupCache.forEach((partitionId, shardGroup) -> {
builder.append(partitionId).append("::{")
.append("version:").append(shardGroup.getVersion())
.append(", conf_version:").append(shardGroup.getConfVer())
.append(", state:").append(shardGroup.getState().name())
.append(", shards:[");
for (var shard : shardGroup.getShardsList()) {
builder.append("{store_id:").append(shard.getStoreId())
.append(", role:").append(shard.getRole().name())
.append("},");
}
builder.append("], ");
});
builder.append("}");
return builder.toString();
}
}