| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hugegraph.pd.client; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.hugegraph.pd.common.GraphCache; |
| import org.apache.hugegraph.pd.common.KVPair; |
| import org.apache.hugegraph.pd.common.PDException; |
| import org.apache.hugegraph.pd.common.PartitionUtils; |
| import org.apache.hugegraph.pd.grpc.Metapb; |
| import org.apache.hugegraph.pd.grpc.Metapb.Partition; |
| import org.apache.hugegraph.pd.grpc.Metapb.Shard; |
| import org.apache.hugegraph.pd.grpc.Metapb.ShardGroup; |
| import org.apache.hugegraph.pd.grpc.Pdpb.CachePartitionResponse; |
| import org.apache.hugegraph.pd.grpc.Pdpb.CacheResponse; |
| |
| import com.google.common.collect.Range; |
| import com.google.common.collect.RangeMap; |
| |
| import lombok.extern.slf4j.Slf4j; |
| |
| @Slf4j |
| public class ClientCache { |
| |
| private final AtomicBoolean initialized = new AtomicBoolean(false); |
| private final org.apache.hugegraph.pd.client.PDClient client; |
| private volatile Map<Integer, KVPair<ShardGroup, Shard>> groups; |
| private volatile Map<Long, Metapb.Store> stores; |
| private volatile Map<String, GraphCache> caches = new ConcurrentHashMap<>(); |
| |
| public ClientCache(org.apache.hugegraph.pd.client.PDClient pdClient) { |
| groups = new ConcurrentHashMap<>(); |
| stores = new ConcurrentHashMap<>(); |
| client = pdClient; |
| } |
| |
| private GraphCache getGraphCache(String graphName) { |
| GraphCache graph; |
| if ((graph = caches.get(graphName)) == null) { |
| synchronized (caches) { |
| if ((graph = caches.get(graphName)) == null) { |
| graph = new GraphCache(); |
| caches.put(graphName, graph); |
| } |
| } |
| } |
| return graph; |
| } |
| |
| public KVPair<Partition, Shard> getPartitionById(String graphName, int partId) { |
| try { |
| GraphCache graph = initGraph(graphName); |
| Partition partition = graph.getPartition(partId); |
| if (partition == null) { |
| return null; |
| } |
| KVPair<ShardGroup, Shard> group = groups.get(partId); |
| if (group == null) { |
| return null; |
| } |
| Shard shard = group.getValue(); |
| if (shard == null) { |
| return null; |
| } |
| return new KVPair<>(partition, shard); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private KVPair<Partition, Shard> getPair(int partId, GraphCache graph) { |
| Partition p = graph.getPartition(partId); |
| KVPair<ShardGroup, Shard> pair = groups.get(partId); |
| if (p != null && pair != null) { |
| Shard s = pair.getValue(); |
| if (s == null) { |
| pair.setValue(getLeader(partId)); |
| return new KVPair<>(p, pair.getValue()); |
| } else { |
| return new KVPair<>(p, s); |
| } |
| } |
| return null; |
| } |
| |
| public KVPair<Partition, Shard> getPartitionByCode(String graphName, long code) { |
| try { |
| GraphCache graph = initGraph(graphName); |
| RangeMap<Long, Integer> range = graph.getRange(); |
| Integer pId = range.get(code); |
| if (pId != null) { |
| return getPair(pId, graph); |
| } |
| return null; |
| } catch (PDException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private GraphCache initGraph(String graphName) throws PDException { |
| initCache(); |
| GraphCache graph = getGraphCache(graphName); |
| if (!graph.getInitialized().get()) { |
| synchronized (graph) { |
| if (!graph.getInitialized().get()) { |
| CachePartitionResponse pc = client.getPartitionCache(graphName); |
| RangeMap<Long, Integer> range = graph.getRange(); |
| List<Partition> ps = pc.getPartitionsList(); |
| HashMap<Integer, Partition> gps = new HashMap<>(ps.size(), 1); |
| for (Partition p : ps) { |
| gps.put(p.getId(), p); |
| range.put(Range.closedOpen(p.getStartKey(), p.getEndKey()), p.getId()); |
| } |
| graph.setPartitions(gps); |
| graph.getInitialized().set(true); |
| } |
| } |
| } |
| return graph; |
| } |
| |
| private void initCache() throws PDException { |
| if (!initialized.get()) { |
| synchronized (this) { |
| if (!initialized.get()) { |
| CacheResponse cache = client.getClientCache(); |
| List<ShardGroup> shardGroups = cache.getShardsList(); |
| for (ShardGroup s : shardGroups) { |
| this.groups.put(s.getId(), new KVPair<>(s, getLeader(s.getId()))); |
| } |
| List<Metapb.Store> stores = cache.getStoresList(); |
| for (Metapb.Store store : stores) { |
| this.stores.put(store.getId(), store); |
| } |
| List<Metapb.Graph> graphs = cache.getGraphsList(); |
| for (Metapb.Graph g : graphs) { |
| GraphCache c = new GraphCache(g); |
| caches.put(g.getGraphName(), c); |
| } |
| initialized.set(true); |
| } |
| } |
| } |
| } |
| |
| public KVPair<Partition, Shard> getPartitionByKey(String graphName, byte[] key) { |
| int code = PartitionUtils.calcHashcode(key); |
| return getPartitionByCode(graphName, code); |
| } |
| |
| public boolean update(String graphName, int partId, Partition partition) { |
| GraphCache graph = getGraphCache(graphName); |
| try { |
| Partition p = graph.getPartition(partId); |
| if (p != null && p.equals(partition)) { |
| return false; |
| } |
| RangeMap<Long, Integer> range = graph.getRange(); |
| graph.addPartition(partId, partition); |
| if (p != null) { |
| if (Objects.equals(partition.getId(), range.get(partition.getStartKey())) && |
| Objects.equals(partition.getId(), range.get(partition.getEndKey() - 1))) { |
| range.remove(range.getEntry(partition.getStartKey()).getKey()); |
| } |
| } |
| range.put(Range.closedOpen(partition.getStartKey(), partition.getEndKey()), partId); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| return true; |
| } |
| |
| public void removePartition(String graphName, int partId) { |
| GraphCache graph = getGraphCache(graphName); |
| Partition p = graph.removePartition(partId); |
| if (p != null) { |
| RangeMap<Long, Integer> range = graph.getRange(); |
| if (Objects.equals(p.getId(), range.get(p.getStartKey())) && |
| Objects.equals(p.getId(), range.get(p.getEndKey() - 1))) { |
| range.remove(range.getEntry(p.getStartKey()).getKey()); |
| } |
| } |
| } |
| |
| /** |
| * remove all partitions |
| */ |
| public void removePartitions() { |
| for (Entry<String, GraphCache> entry : caches.entrySet()) { |
| removePartitions(entry.getValue()); |
| } |
| } |
| |
| private void removePartitions(GraphCache graph) { |
| graph.getState().clear(); |
| graph.getRange().clear(); |
| } |
| |
| /** |
| * remove partition cache of graphName |
| * |
| * @param graphName |
| */ |
| public void removeAll(String graphName) { |
| GraphCache graph = caches.get(graphName); |
| if (graph != null) { |
| removePartitions(graph); |
| } |
| } |
| |
| public boolean updateShardGroup(ShardGroup shardGroup) { |
| KVPair<ShardGroup, Shard> old = groups.get(shardGroup.getId()); |
| Shard leader = getLeader(shardGroup); |
| if (old != null) { |
| old.setKey(shardGroup); |
| old.setValue(leader); |
| return false; |
| } |
| groups.put(shardGroup.getId(), new KVPair<>(shardGroup, leader)); |
| return true; |
| } |
| |
| public void deleteShardGroup(int shardGroupId) { |
| groups.remove(shardGroupId); |
| } |
| |
| public ShardGroup getShardGroup(int groupId) { |
| KVPair<ShardGroup, Shard> pair = groups.get(groupId); |
| if (pair != null) { |
| return pair.getKey(); |
| } |
| return null; |
| } |
| |
| public boolean addStore(Long storeId, Metapb.Store store) { |
| Metapb.Store oldStore = stores.get(storeId); |
| if (oldStore != null && oldStore.equals(store)) { |
| return false; |
| } |
| stores.put(storeId, store); |
| return true; |
| } |
| |
| public Metapb.Store getStoreById(Long storeId) { |
| return stores.get(storeId); |
| } |
| |
| public void removeStore(Long storeId) { |
| stores.remove(storeId); |
| } |
| |
| public void reset() { |
| groups = new ConcurrentHashMap<>(); |
| stores = new ConcurrentHashMap<>(); |
| caches = new ConcurrentHashMap<>(); |
| } |
| |
| public Shard getLeader(int partitionId) { |
| KVPair<ShardGroup, Shard> pair = groups.get(partitionId); |
| if (pair != null) { |
| if (pair.getValue() != null) { |
| return pair.getValue(); |
| } |
| for (Shard shard : pair.getKey().getShardsList()) { |
| if (shard.getRole() == Metapb.ShardRole.Leader) { |
| pair.setValue(shard); |
| return shard; |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| public Shard getLeader(ShardGroup shardGroup) { |
| if (shardGroup != null) { |
| for (Shard shard : shardGroup.getShardsList()) { |
| if (shard.getRole() == Metapb.ShardRole.Leader) { |
| return shard; |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| public void updateLeader(int partitionId, Shard leader) { |
| KVPair<ShardGroup, Shard> pair = groups.get(partitionId); |
| if (pair != null && leader != null) { |
| Shard l = getLeader(partitionId); |
| if (l == null || leader.getStoreId() != l.getStoreId()) { |
| ShardGroup shardGroup = pair.getKey(); |
| ShardGroup.Builder builder = ShardGroup.newBuilder(shardGroup).clearShards(); |
| for (var shard : shardGroup.getShardsList()) { |
| builder.addShards( |
| Shard.newBuilder() |
| .setStoreId(shard.getStoreId()) |
| .setRole(shard.getStoreId() == leader.getStoreId() ? |
| Metapb.ShardRole.Leader : Metapb.ShardRole.Follower) |
| .build() |
| ); |
| } |
| pair.setKey(builder.build()); |
| pair.setValue(leader); |
| } |
| } |
| } |
| } |