| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hugegraph.pd.client; |
| |
| import static org.apache.hugegraph.pd.watch.NodeEvent.EventType.NODE_PD_LEADER_CHANGE; |
| |
| import java.util.ArrayList; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.hugegraph.pd.common.KVPair; |
| import org.apache.hugegraph.pd.common.PDException; |
| import org.apache.hugegraph.pd.common.PartitionUtils; |
| import org.apache.hugegraph.pd.grpc.MetaTask; |
| import org.apache.hugegraph.pd.grpc.Metapb; |
| import org.apache.hugegraph.pd.grpc.Metapb.ShardGroup; |
| import org.apache.hugegraph.pd.grpc.PDGrpc; |
| import org.apache.hugegraph.pd.grpc.Pdpb; |
| import org.apache.hugegraph.pd.grpc.Pdpb.CachePartitionResponse; |
| import org.apache.hugegraph.pd.grpc.Pdpb.CacheResponse; |
| import org.apache.hugegraph.pd.grpc.Pdpb.GetGraphRequest; |
| import org.apache.hugegraph.pd.grpc.Pdpb.GetPartitionByCodeRequest; |
| import org.apache.hugegraph.pd.grpc.Pdpb.GetPartitionRequest; |
| import org.apache.hugegraph.pd.grpc.Pdpb.GetPartitionResponse; |
| import org.apache.hugegraph.pd.grpc.watch.WatchResponse; |
| import org.apache.hugegraph.pd.watch.NodeEvent; |
| import org.apache.hugegraph.pd.watch.PartitionEvent; |
| |
| import com.google.protobuf.ByteString; |
| |
| import io.grpc.ManagedChannel; |
| import io.grpc.MethodDescriptor; |
| import io.grpc.StatusRuntimeException; |
| import io.grpc.stub.AbstractBlockingStub; |
| import lombok.extern.slf4j.Slf4j; |
| |
| /** |
| * PD client implementation class |
| */ |
| @Slf4j |
| public class PDClient { |
| |
| private final PDConfig config; |
| private final Pdpb.RequestHeader header; |
| private final ClientCache cache; |
| private final StubProxy stubProxy; |
| private final List<PDEventListener> eventListeners; |
| private PDWatch.Watcher partitionWatcher; |
| private PDWatch.Watcher storeWatcher; |
| private PDWatch.Watcher graphWatcher; |
| private PDWatch.Watcher shardGroupWatcher; |
| private PDWatch pdWatch; |
| |
| private PDClient(PDConfig config) { |
| this.config = config; |
| this.header = Pdpb.RequestHeader.getDefaultInstance(); |
| this.stubProxy = new StubProxy(config.getServerHost().split(",")); |
| this.eventListeners = new CopyOnWriteArrayList<>(); |
| this.cache = new ClientCache(this); |
| } |
| |
| /** |
| * Create a PD client object and initialize the stub |
| * |
| * @param config |
| * @return |
| */ |
| public static PDClient create(PDConfig config) { |
| return new PDClient(config); |
| } |
| |
| private synchronized void newBlockingStub() throws PDException { |
| if (stubProxy.get() != null) { |
| return; |
| } |
| |
| String host = newLeaderStub(); |
| if (host.isEmpty()) { |
| throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE, |
| "PD unreachable, pd.peers=" + config.getServerHost()); |
| } |
| |
| log.info("PDClient enable cache, init PDWatch object"); |
| connectPdWatch(host); |
| } |
| |
| public void connectPdWatch(String leader) { |
| |
| if (pdWatch != null && Objects.equals(pdWatch.getCurrentHost(), leader) && |
| pdWatch.checkChannel()) { |
| return; |
| } |
| |
| log.info("PDWatch client connect host:{}", leader); |
| pdWatch = new PDWatchImpl(leader); |
| |
| partitionWatcher = pdWatch.watchPartition(new PDWatch.Listener<>() { |
| @Override |
| public void onNext(PartitionEvent response) { |
| // log.info("PDClient receive partition event {}-{} {}", |
| // response.getGraph(), response.getPartitionId(), response.getChangeType()); |
| invalidPartitionCache(response.getGraph(), response.getPartitionId()); |
| |
| if (response.getChangeType() == PartitionEvent.ChangeType.DEL) { |
| cache.removeAll(response.getGraph()); |
| } |
| |
| eventListeners.forEach(listener -> { |
| listener.onPartitionChanged(response); |
| }); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) { |
| log.error("watchPartition exception {}", throwable.getMessage()); |
| closeStub(false); |
| } |
| }); |
| |
| storeWatcher = pdWatch.watchNode(new PDWatch.Listener<>() { |
| @Override |
| public void onNext(NodeEvent response) { |
| log.info("PDClient receive store event {} {}", |
| response.getEventType(), Long.toHexString(response.getNodeId())); |
| |
| if (response.getEventType() == NODE_PD_LEADER_CHANGE) { |
| // pd raft change |
| var leaderIp = response.getGraph(); |
| log.info("watchNode: pd leader changed to {}, current watch:{}", |
| leaderIp, pdWatch.getCurrentHost()); |
| closeStub(!Objects.equals(pdWatch.getCurrentHost(), leaderIp)); |
| connectPdWatch(leaderIp); |
| } |
| |
| invalidStoreCache(response.getNodeId()); |
| eventListeners.forEach(listener -> { |
| listener.onStoreChanged(response); |
| }); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) { |
| log.error("watchNode exception {}", throwable.getMessage()); |
| closeStub(false); |
| } |
| |
| }); |
| |
| graphWatcher = pdWatch.watchGraph(new PDWatch.Listener<>() { |
| @Override |
| public void onNext(WatchResponse response) { |
| eventListeners.forEach(listener -> { |
| listener.onGraphChanged(response); |
| }); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) { |
| log.warn("graphWatcher exception {}", throwable.getMessage()); |
| } |
| }); |
| |
| shardGroupWatcher = pdWatch.watchShardGroup(new PDWatch.Listener<>() { |
| @Override |
| public void onNext(WatchResponse response) { |
| var shardResponse = response.getShardGroupResponse(); |
| // log.info("PDClient receive shard group event: raft {}-{}", shardResponse |
| // .getShardGroupId(), |
| // shardResponse.getType()); |
| if (config.isEnableCache()) { |
| switch (shardResponse.getType()) { |
| case WATCH_CHANGE_TYPE_DEL: |
| cache.deleteShardGroup(shardResponse.getShardGroupId()); |
| break; |
| case WATCH_CHANGE_TYPE_ALTER: |
| cache.updateShardGroup( |
| response.getShardGroupResponse().getShardGroup()); |
| break; |
| default: |
| break; |
| } |
| } |
| eventListeners.forEach(listener -> listener.onShardGroupChanged(response)); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) { |
| log.warn("shardGroupWatcher exception {}", throwable.getMessage()); |
| } |
| }); |
| |
| } |
| |
| private synchronized void closeStub(boolean closeWatcher) { |
| stubProxy.set(null); |
| cache.reset(); |
| |
| if (closeWatcher) { |
| if (partitionWatcher != null) { |
| partitionWatcher.close(); |
| partitionWatcher = null; |
| } |
| if (storeWatcher != null) { |
| storeWatcher.close(); |
| storeWatcher = null; |
| } |
| if (graphWatcher != null) { |
| graphWatcher.close(); |
| graphWatcher = null; |
| } |
| |
| if (shardGroupWatcher != null) { |
| shardGroupWatcher.close(); |
| shardGroupWatcher = null; |
| } |
| |
| pdWatch = null; |
| } |
| } |
| |
| private PDGrpc.PDBlockingStub getStub() throws PDException { |
| if (stubProxy.get() == null) { |
| newBlockingStub(); |
| } |
| return stubProxy.get().withDeadlineAfter(config.getGrpcTimeOut(), TimeUnit.MILLISECONDS); |
| } |
| |
| private PDGrpc.PDBlockingStub newStub() throws PDException { |
| if (stubProxy.get() == null) { |
| newBlockingStub(); |
| } |
| return PDGrpc.newBlockingStub(stubProxy.get().getChannel()) |
| .withDeadlineAfter(config.getGrpcTimeOut(), |
| TimeUnit.MILLISECONDS); |
| } |
| |
| private String newLeaderStub() { |
| String leaderHost = ""; |
| for (int i = 0; i < stubProxy.getHostCount(); i++) { |
| String host = stubProxy.nextHost(); |
| ManagedChannel channel = Channels.getChannel(host); |
| |
| PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(channel) |
| .withDeadlineAfter(config.getGrpcTimeOut(), |
| TimeUnit.MILLISECONDS); |
| try { |
| var leaderIp = getLeaderIp(stub); |
| if (!leaderIp.equalsIgnoreCase(host)) { |
| leaderHost = leaderIp; |
| stubProxy.set(PDGrpc.newBlockingStub(channel) |
| .withDeadlineAfter(config.getGrpcTimeOut(), |
| TimeUnit.MILLISECONDS)); |
| } else { |
| stubProxy.set(stub); |
| leaderHost = host; |
| } |
| stubProxy.setLeader(leaderIp); |
| |
| log.info("PDClient connect to host = {} success", leaderHost); |
| break; |
| } catch (Exception e) { |
| log.error("PDClient connect to {} exception {}, {}", host, e.getMessage(), |
| e.getCause() != null ? e.getCause().getMessage() : ""); |
| } |
| } |
| return leaderHost; |
| } |
| |
| public String getLeaderIp() { |
| |
| return getLeaderIp(stubProxy.get()); |
| } |
| |
| private String getLeaderIp(PDGrpc.PDBlockingStub stub) { |
| if (stub == null) { |
| try { |
| getStub(); |
| return stubProxy.getLeader(); |
| } catch (PDException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| Pdpb.GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder() |
| .setHeader(header) |
| .build(); |
| Metapb.Member leader = stub.getMembers(request).getLeader(); |
| return leader.getGrpcUrl(); |
| } |
| |
| /** |
| * Store registration, the store ID will be returned, and the initial registration will |
| * return a new ID |
| * |
| * @param store |
| * @return |
| */ |
| public long registerStore(Metapb.Store store) throws PDException { |
| Pdpb.RegisterStoreRequest request = Pdpb.RegisterStoreRequest.newBuilder() |
| .setHeader(header) |
| .setStore(store).build(); |
| |
| Pdpb.RegisterStoreResponse response = |
| blockingUnaryCall(PDGrpc.getRegisterStoreMethod(), request); |
| handleResponseError(response.getHeader()); |
| return response.getStoreId(); |
| } |
| |
| /** |
| * Returns the Store object based on the store ID |
| * |
| * @param storeId |
| * @return |
| * @throws PDException |
| */ |
| public Metapb.Store getStore(long storeId) throws PDException { |
| Metapb.Store store = cache.getStoreById(storeId); |
| if (store == null) { |
| Pdpb.GetStoreRequest request = Pdpb.GetStoreRequest.newBuilder() |
| .setHeader(header) |
| .setStoreId(storeId).build(); |
| Pdpb.GetStoreResponse response = getStub().getStore(request); |
| handleResponseError(response.getHeader()); |
| store = response.getStore(); |
| if (config.isEnableCache()) { |
| cache.addStore(storeId, store); |
| } |
| } |
| return store; |
| } |
| |
| /** |
| * Update the store information, including online and offline |
| * |
| * @param store |
| * @return |
| */ |
| public Metapb.Store updateStore(Metapb.Store store) throws PDException { |
| Pdpb.SetStoreRequest request = Pdpb.SetStoreRequest.newBuilder() |
| .setHeader(header) |
| .setStore(store).build(); |
| |
| Pdpb.SetStoreResponse response = getStub().setStore(request); |
| handleResponseError(response.getHeader()); |
| store = response.getStore(); |
| if (config.isEnableCache()) { |
| cache.addStore(store.getId(), store); |
| } |
| return store; |
| } |
| |
| /** |
| * Return to the active store |
| * |
| * @param graphName |
| * @return |
| */ |
| public List<Metapb.Store> getActiveStores(String graphName) throws PDException { |
| List<Metapb.Store> stores = new ArrayList<>(); |
| KVPair<Metapb.Partition, Metapb.Shard> ptShard = this.getPartitionByCode(graphName, 0); |
| while (ptShard != null) { |
| stores.add(this.getStore(ptShard.getValue().getStoreId())); |
| if (ptShard.getKey().getEndKey() < PartitionUtils.MAX_VALUE) { |
| ptShard = this.getPartitionByCode(graphName, ptShard.getKey().getEndKey()); |
| } else { |
| ptShard = null; |
| } |
| } |
| return stores; |
| } |
| |
| public List<Metapb.Store> getActiveStores() throws PDException { |
| Pdpb.GetAllStoresRequest request = Pdpb.GetAllStoresRequest.newBuilder() |
| .setHeader(header) |
| .setGraphName("") |
| .setExcludeOfflineStores(true) |
| .build(); |
| Pdpb.GetAllStoresResponse response = getStub().getAllStores(request); |
| handleResponseError(response.getHeader()); |
| return response.getStoresList(); |
| |
| } |
| |
| /** |
| * Return to the active store |
| * |
| * @param graphName |
| * @return |
| */ |
| public List<Metapb.Store> getAllStores(String graphName) throws PDException { |
| Pdpb.GetAllStoresRequest request = Pdpb.GetAllStoresRequest.newBuilder() |
| .setHeader(header) |
| .setGraphName(graphName) |
| .setExcludeOfflineStores(false) |
| .build(); |
| Pdpb.GetAllStoresResponse response = getStub().getAllStores(request); |
| handleResponseError(response.getHeader()); |
| return response.getStoresList(); |
| |
| } |
| |
| /** |
| * Store heartbeat, call regularly, stay online |
| * |
| * @param stats |
| * @throws PDException |
| */ |
| public Metapb.ClusterStats storeHeartbeat(Metapb.StoreStats stats) throws PDException { |
| Pdpb.StoreHeartbeatRequest request = Pdpb.StoreHeartbeatRequest.newBuilder() |
| .setHeader(header) |
| .setStats(stats).build(); |
| Pdpb.StoreHeartbeatResponse response = getStub().storeHeartbeat(request); |
| handleResponseError(response.getHeader()); |
| return response.getClusterStats(); |
| } |
| |
| private KVPair<Metapb.Partition, Metapb.Shard> getKvPair(String graphName, byte[] key, |
| KVPair<Metapb.Partition, |
| Metapb.Shard> partShard) throws |
| PDException { |
| if (partShard == null) { |
| GetPartitionRequest request = GetPartitionRequest.newBuilder() |
| .setHeader(header) |
| .setGraphName(graphName) |
| .setKey(ByteString.copyFrom(key)) |
| .build(); |
| GetPartitionResponse response = |
| blockingUnaryCall(PDGrpc.getGetPartitionMethod(), request); |
| handleResponseError(response.getHeader()); |
| partShard = new KVPair<>(response.getPartition(), response.getLeader()); |
| cache.update(graphName, partShard.getKey().getId(), partShard.getKey()); |
| } |
| return partShard; |
| } |
| |
| /** |
| * Query the partition to which the key belongs |
| * |
| * @param graphName |
| * @param key |
| * @return |
| * @throws PDException |
| */ |
| public KVPair<Metapb.Partition, Metapb.Shard> getPartition(String graphName, byte[] key) throws |
| PDException { |
| KVPair<Metapb.Partition, Metapb.Shard> partShard = cache.getPartitionByKey(graphName, key); |
| partShard = getKvPair(graphName, key, partShard); |
| return partShard; |
| } |
| |
| public KVPair<Metapb.Partition, Metapb.Shard> getPartition(String graphName, byte[] key, |
| int code) throws |
| PDException { |
| KVPair<Metapb.Partition, Metapb.Shard> partShard = |
| cache.getPartitionByCode(graphName, code); |
| partShard = getKvPair(graphName, key, partShard); |
| return partShard; |
| } |
| |
| /** |
| * Query the partition information based on the hashcode |
| * |
| * @param graphName |
| * @param hashCode |
| * @return |
| * @throws PDException |
| */ |
| public KVPair<Metapb.Partition, Metapb.Shard> getPartitionByCode(String graphName, |
| long hashCode) |
| throws PDException { |
| KVPair<Metapb.Partition, Metapb.Shard> partShard = |
| cache.getPartitionByCode(graphName, hashCode); |
| if (partShard == null) { |
| GetPartitionByCodeRequest request = GetPartitionByCodeRequest.newBuilder() |
| .setHeader(header) |
| .setGraphName(graphName) |
| .setCode(hashCode).build(); |
| GetPartitionResponse response = |
| blockingUnaryCall(PDGrpc.getGetPartitionByCodeMethod(), request); |
| handleResponseError(response.getHeader()); |
| partShard = new KVPair<>(response.getPartition(), response.getLeader()); |
| cache.update(graphName, partShard.getKey().getId(), partShard.getKey()); |
| cache.updateShardGroup(getShardGroup(partShard.getKey().getId())); |
| } |
| |
| if (partShard.getValue() == null) { |
| ShardGroup shardGroup = getShardGroup(partShard.getKey().getId()); |
| if (shardGroup != null) { |
| for (var shard : shardGroup.getShardsList()) { |
| if (shard.getRole() == Metapb.ShardRole.Leader) { |
| partShard.setValue(shard); |
| } |
| } |
| } else { |
| log.error("getPartitionByCode: get shard group failed, {}", |
| partShard.getKey().getId()); |
| } |
| } |
| return partShard; |
| } |
| |
| /** |
| * Obtain the hash value of the key |
| */ |
| public int keyToCode(String graphName, byte[] key) { |
| return PartitionUtils.calcHashcode(key); |
| } |
| |
| /** |
| * Returns partition information based on the partition ID and RPC request |
| * |
| * @param graphName |
| * @param partId |
| * @return |
| * @throws PDException |
| */ |
| public KVPair<Metapb.Partition, Metapb.Shard> getPartitionById(String graphName, |
| int partId) throws PDException { |
| KVPair<Metapb.Partition, Metapb.Shard> partShard = |
| cache.getPartitionById(graphName, partId); |
| if (partShard == null) { |
| Pdpb.GetPartitionByIDRequest request = Pdpb.GetPartitionByIDRequest.newBuilder() |
| .setHeader(header) |
| .setGraphName( |
| graphName) |
| .setPartitionId( |
| partId) |
| .build(); |
| GetPartitionResponse response = |
| blockingUnaryCall(PDGrpc.getGetPartitionByIDMethod(), request); |
| handleResponseError(response.getHeader()); |
| partShard = new KVPair<>(response.getPartition(), response.getLeader()); |
| if (config.isEnableCache()) { |
| cache.update(graphName, partShard.getKey().getId(), partShard.getKey()); |
| cache.updateShardGroup(getShardGroup(partShard.getKey().getId())); |
| } |
| } |
| if (partShard.getValue() == null) { |
| var shardGroup = getShardGroup(partShard.getKey().getId()); |
| if (shardGroup != null) { |
| for (var shard : shardGroup.getShardsList()) { |
| if (shard.getRole() == Metapb.ShardRole.Leader) { |
| partShard.setValue(shard); |
| } |
| } |
| } else { |
| log.error("getPartitionById: get shard group failed, {}", |
| partShard.getKey().getId()); |
| } |
| } |
| return partShard; |
| } |
| |
| public ShardGroup getShardGroup(int partId) throws PDException { |
| ShardGroup group = cache.getShardGroup(partId); |
| if (group == null) { |
| Pdpb.GetShardGroupRequest request = Pdpb.GetShardGroupRequest.newBuilder() |
| .setHeader(header) |
| .setGroupId(partId) |
| .build(); |
| Pdpb.GetShardGroupResponse response = |
| blockingUnaryCall(PDGrpc.getGetShardGroupMethod(), request); |
| handleResponseError(response.getHeader()); |
| group = response.getShardGroup(); |
| if (config.isEnableCache()) { |
| cache.updateShardGroup(group); |
| } |
| } |
| return group; |
| } |
| |
| public void updateShardGroup(ShardGroup shardGroup) throws PDException { |
| Pdpb.UpdateShardGroupRequest request = Pdpb.UpdateShardGroupRequest.newBuilder() |
| .setHeader(header) |
| .setShardGroup( |
| shardGroup) |
| .build(); |
| Pdpb.UpdateShardGroupResponse response = |
| blockingUnaryCall(PDGrpc.getUpdateShardGroupMethod(), request); |
| handleResponseError(response.getHeader()); |
| |
| if (config.isEnableCache()) { |
| cache.updateShardGroup(shardGroup); |
| } |
| } |
| |
| /** |
| * Returns information about all partitions spanned by the start and end keys |
| * |
| * @param graphName |
| * @param startKey |
| * @param endKey |
| * @return |
| * @throws PDException |
| */ |
| public List<KVPair<Metapb.Partition, Metapb.Shard>> scanPartitions(String graphName, |
| byte[] startKey, |
| byte[] endKey) throws |
| PDException { |
| List<KVPair<Metapb.Partition, Metapb.Shard>> partitions = new ArrayList<>(); |
| KVPair<Metapb.Partition, Metapb.Shard> startPartShard = getPartition(graphName, startKey); |
| KVPair<Metapb.Partition, Metapb.Shard> endPartShard = getPartition(graphName, endKey); |
| if (startPartShard == null || endPartShard == null) { |
| return null; |
| } |
| |
| partitions.add(startPartShard); |
| while (startPartShard.getKey().getEndKey() < endPartShard.getKey().getEndKey() |
| && startPartShard.getKey().getEndKey() < PartitionUtils.MAX_VALUE) { |
| startPartShard = getPartitionByCode(graphName, startPartShard.getKey().getEndKey()); |
| partitions.add(startPartShard); |
| } |
| return partitions; |
| } |
| |
| /** |
| * Query partition information based on conditions |
| * |
| * @return |
| * @throws PDException |
| */ |
| public List<Metapb.Partition> getPartitionsByStore(long storeId) throws PDException { |
| |
| Metapb.PartitionQuery query = Metapb.PartitionQuery.newBuilder() |
| .setStoreId(storeId) |
| .build(); |
| Pdpb.QueryPartitionsRequest request = Pdpb.QueryPartitionsRequest.newBuilder() |
| .setQuery(query).build(); |
| Pdpb.QueryPartitionsResponse response = |
| blockingUnaryCall(PDGrpc.getQueryPartitionsMethod(), request); |
| |
| handleResponseError(response.getHeader()); |
| return response.getPartitionsList(); |
| } |
| |
| public List<Metapb.Partition> queryPartitions(long storeId, int partitionId) throws |
| PDException { |
| |
| Metapb.PartitionQuery query = Metapb.PartitionQuery.newBuilder() |
| .setStoreId(storeId) |
| .setPartitionId(partitionId) |
| .build(); |
| Pdpb.QueryPartitionsRequest request = Pdpb.QueryPartitionsRequest.newBuilder() |
| .setQuery(query).build(); |
| Pdpb.QueryPartitionsResponse response = |
| blockingUnaryCall(PDGrpc.getQueryPartitionsMethod(), request); |
| |
| handleResponseError(response.getHeader()); |
| return response.getPartitionsList(); |
| } |
| |
| public List<Metapb.Partition> getPartitions(long storeId, String graphName) throws PDException { |
| |
| Metapb.PartitionQuery query = Metapb.PartitionQuery.newBuilder() |
| .setStoreId(storeId) |
| .setGraphName(graphName).build(); |
| Pdpb.QueryPartitionsRequest request = Pdpb.QueryPartitionsRequest.newBuilder() |
| .setQuery(query).build(); |
| Pdpb.QueryPartitionsResponse response = |
| blockingUnaryCall(PDGrpc.getQueryPartitionsMethod(), request); |
| |
| handleResponseError(response.getHeader()); |
| return response.getPartitionsList(); |
| |
| } |
| |
| public Metapb.Graph setGraph(Metapb.Graph graph) throws PDException { |
| Pdpb.SetGraphRequest request = Pdpb.SetGraphRequest.newBuilder() |
| .setGraph(graph) |
| .build(); |
| Pdpb.SetGraphResponse response = |
| blockingUnaryCall(PDGrpc.getSetGraphMethod(), request); |
| |
| handleResponseError(response.getHeader()); |
| return response.getGraph(); |
| } |
| |
| public Metapb.Graph getGraph(String graphName) throws PDException { |
| GetGraphRequest request = GetGraphRequest.newBuilder() |
| .setGraphName(graphName) |
| .build(); |
| Pdpb.GetGraphResponse response = |
| blockingUnaryCall(PDGrpc.getGetGraphMethod(), request); |
| |
| handleResponseError(response.getHeader()); |
| return response.getGraph(); |
| } |
| |
| public Metapb.Graph getGraphWithOutException(String graphName) throws |
| PDException { |
| GetGraphRequest request = GetGraphRequest.newBuilder() |
| .setGraphName( |
| graphName) |
| .build(); |
| Pdpb.GetGraphResponse response = blockingUnaryCall( |
| PDGrpc.getGetGraphMethod(), request); |
| return response.getGraph(); |
| } |
| |
| public Metapb.Graph delGraph(String graphName) throws PDException { |
| Pdpb.DelGraphRequest request = Pdpb.DelGraphRequest.newBuilder() |
| .setGraphName(graphName) |
| .build(); |
| Pdpb.DelGraphResponse response = |
| blockingUnaryCall(PDGrpc.getDelGraphMethod(), request); |
| |
| handleResponseError(response.getHeader()); |
| return response.getGraph(); |
| } |
| |
| public List<Metapb.Partition> updatePartition(List<Metapb.Partition> partitions) throws |
| PDException { |
| |
| Pdpb.UpdatePartitionRequest request = Pdpb.UpdatePartitionRequest.newBuilder() |
| .addAllPartition( |
| partitions) |
| .build(); |
| Pdpb.UpdatePartitionResponse response = |
| blockingUnaryCall(PDGrpc.getUpdatePartitionMethod(), request); |
| handleResponseError(response.getHeader()); |
| invalidPartitionCache(); |
| |
| return response.getPartitionList(); |
| } |
| |
| public Metapb.Partition delPartition(String graphName, int partitionId) throws PDException { |
| |
| Pdpb.DelPartitionRequest request = Pdpb.DelPartitionRequest.newBuilder() |
| .setGraphName(graphName) |
| .setPartitionId(partitionId) |
| .build(); |
| Pdpb.DelPartitionResponse response = |
| blockingUnaryCall(PDGrpc.getDelPartitionMethod(), request); |
| |
| handleResponseError(response.getHeader()); |
| invalidPartitionCache(graphName, partitionId); |
| return response.getPartition(); |
| } |
| |
| /** |
| * Delete the partitioned cache |
| */ |
| public void invalidPartitionCache(String graphName, int partitionId) { |
| if (null != cache.getPartitionById(graphName, partitionId)) { |
| cache.removePartition(graphName, partitionId); |
| } |
| } |
| |
| /** |
| * Delete the partitioned cache |
| */ |
| public void invalidPartitionCache() { |
| cache.removePartitions(); |
| } |
| |
| /** |
| * Delete the partitioned cache |
| */ |
| public void invalidStoreCache(long storeId) { |
| cache.removeStore(storeId); |
| } |
| |
| /** |
| * Update the cache |
| */ |
| public void updatePartitionLeader(String graphName, int partId, long leaderStoreId) { |
| KVPair<Metapb.Partition, Metapb.Shard> partShard = null; |
| try { |
| partShard = this.getPartitionById(graphName, partId); |
| |
| if (partShard != null && partShard.getValue().getStoreId() != leaderStoreId) { |
| var shardGroup = this.getShardGroup(partId); |
| Metapb.Shard shard = null; |
| List<Metapb.Shard> shards = new ArrayList<>(); |
| |
| for (Metapb.Shard s : shardGroup.getShardsList()) { |
| if (s.getStoreId() == leaderStoreId) { |
| shard = s; |
| shards.add(Metapb.Shard.newBuilder(s) |
| .setStoreId(s.getStoreId()) |
| .setRole(Metapb.ShardRole.Leader).build()); |
| } else { |
| shards.add(Metapb.Shard.newBuilder(s) |
| .setStoreId(s.getStoreId()) |
| .setRole(Metapb.ShardRole.Follower).build()); |
| } |
| } |
| |
| if (config.isEnableCache()) { |
| if (shard == null) { |
| cache.removePartition(graphName, partId); |
| } |
| } |
| } |
| } catch (PDException e) { |
| log.error("getPartitionException: {}", e.getMessage()); |
| } |
| } |
| |
| /** |
| * Update the cache |
| * |
| * @param partition |
| */ |
| public void updatePartitionCache(Metapb.Partition partition, Metapb.Shard leader) { |
| if (config.isEnableCache()) { |
| cache.update(partition.getGraphName(), partition.getId(), partition); |
| cache.updateLeader(partition.getId(), leader); |
| } |
| } |
| |
| public Pdpb.GetIdResponse getIdByKey(String key, int delta) throws PDException { |
| Pdpb.GetIdRequest request = Pdpb.GetIdRequest.newBuilder() |
| .setHeader(header) |
| .setKey(key) |
| .setDelta(delta) |
| .build(); |
| Pdpb.GetIdResponse response = blockingUnaryCall(PDGrpc.getGetIdMethod(), request); |
| handleResponseError(response.getHeader()); |
| return response; |
| } |
| |
| public Pdpb.ResetIdResponse resetIdByKey(String key) throws PDException { |
| Pdpb.ResetIdRequest request = Pdpb.ResetIdRequest.newBuilder() |
| .setHeader(header) |
| .setKey(key) |
| .build(); |
| Pdpb.ResetIdResponse response = blockingUnaryCall(PDGrpc.getResetIdMethod(), request); |
| handleResponseError(response.getHeader()); |
| return response; |
| } |
| |
| public Metapb.Member getLeader() throws PDException { |
| Pdpb.GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder() |
| .setHeader(header) |
| .build(); |
| Pdpb.GetMembersResponse response = blockingUnaryCall(PDGrpc.getGetMembersMethod(), request); |
| handleResponseError(response.getHeader()); |
| return response.getLeader(); |
| } |
| |
| public Pdpb.GetMembersResponse getMembers() throws PDException { |
| Pdpb.GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder() |
| .setHeader(header) |
| .build(); |
| Pdpb.GetMembersResponse response = blockingUnaryCall(PDGrpc.getGetMembersMethod(), request); |
| handleResponseError(response.getHeader()); |
| return response; |
| } |
| |
| public Metapb.ClusterStats getClusterStats() throws PDException { |
| Pdpb.GetClusterStatsRequest request = Pdpb.GetClusterStatsRequest.newBuilder() |
| .setHeader(header) |
| .build(); |
| Pdpb.GetClusterStatsResponse response = |
| blockingUnaryCall(PDGrpc.getGetClusterStatsMethod(), request); |
| handleResponseError(response.getHeader()); |
| return response.getCluster(); |
| } |
| |
| private <ReqT, RespT, StubT extends AbstractBlockingStub<StubT>> RespT |
| blockingUnaryCall(MethodDescriptor<ReqT, RespT> method, ReqT req) throws PDException { |
| return blockingUnaryCall(method, req, 1); |
| } |
| |
| private <ReqT, RespT, StubT extends AbstractBlockingStub<StubT>> RespT |
| blockingUnaryCall(MethodDescriptor<ReqT, RespT> method, ReqT req, int retry) throws |
| PDException { |
| io.grpc.stub.AbstractBlockingStub<StubT> stub = (AbstractBlockingStub<StubT>) getStub(); |
| try { |
| RespT resp = io.grpc.stub.ClientCalls.blockingUnaryCall(stub.getChannel(), method, |
| stub.getCallOptions(), req); |
| return resp; |
| } catch (Exception e) { |
| log.error(method.getFullMethodName() + " exception, {}", e.getMessage()); |
| if (e instanceof StatusRuntimeException) { |
| if (retry < stubProxy.getHostCount()) { |
| closeStub(true); |
| return blockingUnaryCall(method, req, ++retry); |
| } |
| } |
| } |
| return null; |
| } |
| |
| private void handleResponseError(Pdpb.ResponseHeader header) throws |
| PDException { |
| var errorType = header.getError().getType(); |
| if (header.hasError() && errorType != Pdpb.ErrorType.OK) { |
| |
| throw new PDException(header.getError().getTypeValue(), |
| String.format( |
| "PD request error, error code = %d, msg = %s", |
| header.getError().getTypeValue(), |
| header.getError().getMessage())); |
| } |
| } |
| |
| public void addEventListener(PDEventListener listener) { |
| eventListeners.add(listener); |
| } |
| |
| public PDWatch getWatchClient() { |
| return new PDWatchImpl(stubProxy.getHost()); |
| } |
| |
| /** |
| * Returns the store status information |
| */ |
| public List<Metapb.Store> getStoreStatus(boolean offlineExcluded) throws PDException { |
| Pdpb.GetAllStoresRequest request = Pdpb.GetAllStoresRequest.newBuilder() |
| .setHeader(header) |
| .setExcludeOfflineStores( |
| offlineExcluded) |
| .build(); |
| Pdpb.GetAllStoresResponse response = getStub().getStoreStatus(request); |
| handleResponseError(response.getHeader()); |
| List<Metapb.Store> stores = response.getStoresList(); |
| return stores; |
| } |
| |
| public void setGraphSpace(String graphSpaceName, long storageLimit) throws PDException { |
| Metapb.GraphSpace graphSpace = Metapb.GraphSpace.newBuilder().setName(graphSpaceName) |
| .setStorageLimit(storageLimit) |
| .setTimestamp(System.currentTimeMillis()) |
| .build(); |
| Pdpb.SetGraphSpaceRequest request = Pdpb.SetGraphSpaceRequest.newBuilder() |
| .setHeader(header) |
| .setGraphSpace(graphSpace) |
| .build(); |
| Pdpb.SetGraphSpaceResponse response = getStub().setGraphSpace(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public List<Metapb.GraphSpace> getGraphSpace(String graphSpaceName) throws |
| PDException { |
| Pdpb.GetGraphSpaceRequest.Builder builder = Pdpb.GetGraphSpaceRequest.newBuilder(); |
| Pdpb.GetGraphSpaceRequest request; |
| builder.setHeader(header); |
| if (graphSpaceName != null && graphSpaceName.length() > 0) { |
| builder.setGraphSpaceName(graphSpaceName); |
| } |
| request = builder.build(); |
| Pdpb.GetGraphSpaceResponse response = getStub().getGraphSpace(request); |
| List<Metapb.GraphSpace> graphSpaceList = response.getGraphSpaceList(); |
| handleResponseError(response.getHeader()); |
| return graphSpaceList; |
| } |
| |
| public void setPDConfig(int partitionCount, String peerList, int shardCount, |
| long version) throws PDException { |
| Metapb.PDConfig pdConfig = Metapb.PDConfig.newBuilder().setPartitionCount(partitionCount) |
| .setPeersList(peerList).setShardCount(shardCount) |
| .setVersion(version) |
| .setTimestamp(System.currentTimeMillis()) |
| .build(); |
| Pdpb.SetPDConfigRequest request = Pdpb.SetPDConfigRequest.newBuilder() |
| .setHeader(header) |
| .setPdConfig(pdConfig) |
| .build(); |
| Pdpb.SetPDConfigResponse response = getStub().setPDConfig(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public Metapb.PDConfig getPDConfig() throws PDException { |
| Pdpb.GetPDConfigRequest request = Pdpb.GetPDConfigRequest.newBuilder() |
| .setHeader(header) |
| .build(); |
| Pdpb.GetPDConfigResponse response = getStub().getPDConfig(request); |
| handleResponseError(response.getHeader()); |
| return response.getPdConfig(); |
| } |
| |
| public void setPDConfig(Metapb.PDConfig pdConfig) throws PDException { |
| Pdpb.SetPDConfigRequest request = Pdpb.SetPDConfigRequest.newBuilder() |
| .setHeader(header) |
| .setPdConfig(pdConfig) |
| .build(); |
| Pdpb.SetPDConfigResponse response = getStub().setPDConfig(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public Metapb.PDConfig getPDConfig(long version) throws PDException { |
| Pdpb.GetPDConfigRequest request = Pdpb.GetPDConfigRequest.newBuilder().setHeader( |
| header).setVersion(version).build(); |
| Pdpb.GetPDConfigResponse response = getStub().getPDConfig(request); |
| handleResponseError(response.getHeader()); |
| return response.getPdConfig(); |
| } |
| |
| public void changePeerList(String peerList) throws PDException { |
| Pdpb.ChangePeerListRequest request = Pdpb.ChangePeerListRequest.newBuilder() |
| .setPeerList(peerList) |
| .setHeader(header).build(); |
| Pdpb.getChangePeerListResponse response = |
| blockingUnaryCall(PDGrpc.getChangePeerListMethod(), request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * Working mode |
| * Auto:If the number of partitions on each store reaches the maximum value, you need to |
| * specify the store group id. The store group id is 0, which is the default partition |
| * splitData(ClusterOp.OperationMode mode, int storeGroupId, List<ClusterOp.SplitDataParam> |
| * params) |
| * mode = Auto storeGroupId, params |
| * |
| * @throws PDException |
| */ |
| public void splitData() throws PDException { |
| Pdpb.SplitDataRequest request = Pdpb.SplitDataRequest.newBuilder() |
| .setHeader(header) |
| .setMode(Pdpb.OperationMode.Auto) |
| .build(); |
| Pdpb.SplitDataResponse response = getStub().splitData(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * Working mode |
| * Auto:If the number of partitions on each store reaches the maximum value, you need to |
| * specify the store group id. The store group id is 0, which is the default partition |
| * Expert:Expert Mode,Specifier is required splitParams, limit SplitDataParam in the same |
| * store group |
| * |
| * @param mode |
| * @param params |
| * @throws PDException |
| */ |
| public void splitData(Pdpb.OperationMode mode, List<Pdpb.SplitDataParam> params) throws |
| PDException { |
| Pdpb.SplitDataRequest request = Pdpb.SplitDataRequest.newBuilder() |
| .setHeader(header) |
| .setMode(mode) |
| .addAllParam(params).build(); |
| Pdpb.SplitDataResponse response = getStub().splitData(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public void splitGraphData(String graphName, int toCount) throws PDException { |
| Pdpb.SplitGraphDataRequest request = Pdpb.SplitGraphDataRequest.newBuilder() |
| .setHeader(header) |
| .setGraphName(graphName) |
| .setToCount(toCount) |
| .build(); |
| Pdpb.SplitDataResponse response = getStub().splitGraphData(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * To automatically transfer to the same number of partitions on each Store, it is |
| * recommended to use balancePartition(int storeGroupId) to specify the storeGroupId |
| * |
| * @throws PDException |
| */ |
| public void balancePartition() throws PDException { |
| Pdpb.MovePartitionRequest request = Pdpb.MovePartitionRequest.newBuilder() |
| .setHeader(header) |
| .setMode( |
| Pdpb.OperationMode.Auto) |
| .build(); |
| Pdpb.MovePartitionResponse response = getStub().movePartition(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * Migrate partitions in manual mode |
| * //Working mode |
| * // Auto:Automatic transfer to the same number of partitions per Store |
| * // Expert:Expert Mode,Specifier is required transferParams |
| * |
| * @param params Designation transferParams, expert mode,request source store / target store |
| * in the same store group |
| * @throws PDException |
| */ |
| public void movePartition(Pdpb.OperationMode mode, List<Pdpb.MovePartitionParam> params) throws |
| PDException { |
| Pdpb.MovePartitionRequest request = Pdpb.MovePartitionRequest.newBuilder() |
| .setHeader(header) |
| .setMode(mode) |
| .addAllParam(params).build(); |
| Pdpb.MovePartitionResponse response = getStub().movePartition(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public void reportTask(MetaTask.Task task) throws PDException { |
| Pdpb.ReportTaskRequest request = Pdpb.ReportTaskRequest.newBuilder() |
| .setHeader(header) |
| .setTask(task).build(); |
| Pdpb.ReportTaskResponse response = blockingUnaryCall(PDGrpc.getReportTaskMethod(), request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public Metapb.PartitionStats getPartitionsStats(String graph, int partId) throws PDException { |
| Pdpb.GetPartitionStatsRequest request = Pdpb.GetPartitionStatsRequest.newBuilder() |
| .setHeader(header) |
| .setGraphName(graph) |
| .setPartitionId(partId) |
| .build(); |
| Pdpb.GetPartitionStatsResponse response = getStub().getPartitionStats(request); |
| handleResponseError(response.getHeader()); |
| return response.getPartitionStats(); |
| } |
| |
| /** |
| * Balance the number of leaders in different stores |
| */ |
| public void balanceLeaders() throws PDException { |
| Pdpb.BalanceLeadersRequest request = Pdpb.BalanceLeadersRequest.newBuilder() |
| .setHeader(header) |
| .build(); |
| Pdpb.BalanceLeadersResponse response = getStub().balanceLeaders(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * Remove the store from the PD |
| */ |
| public Metapb.Store delStore(long storeId) throws PDException { |
| Pdpb.DetStoreRequest request = Pdpb.DetStoreRequest.newBuilder() |
| .setHeader(header) |
| .setStoreId(storeId) |
| .build(); |
| Pdpb.DetStoreResponse response = getStub().delStore(request); |
| handleResponseError(response.getHeader()); |
| return response.getStore(); |
| } |
| |
| /** |
| * Compaction on rocksdb as a whole |
| * |
| * @throws PDException |
| */ |
| public void dbCompaction() throws PDException { |
| Pdpb.DbCompactionRequest request = Pdpb.DbCompactionRequest |
| .newBuilder() |
| .setHeader(header) |
| .build(); |
| Pdpb.DbCompactionResponse response = getStub().dbCompaction(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * Compaction on rocksdb specified tables |
| * |
| * @param tableName |
| * @throws PDException |
| */ |
| public void dbCompaction(String tableName) throws PDException { |
| Pdpb.DbCompactionRequest request = Pdpb.DbCompactionRequest |
| .newBuilder() |
| .setHeader(header) |
| .setTableName(tableName) |
| .build(); |
| Pdpb.DbCompactionResponse response = getStub().dbCompaction(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * Merge partitions to reduce the current partition to toCount |
| * |
| * @param toCount The number of partitions that can be scaled down |
| * @throws PDException |
| */ |
| public void combineCluster(int toCount) throws PDException { |
| Pdpb.CombineClusterRequest request = Pdpb.CombineClusterRequest |
| .newBuilder() |
| .setHeader(header) |
| .setToCount(toCount) |
| .build(); |
| Pdpb.CombineClusterResponse response = getStub().combineCluster(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * Scaling a single image to toCount is similar to splitting to ensure that the number of |
| * partitions in the same store group is the same. |
| * If you have special requirements, you can consider migrating to other groups |
| * |
| * @param graphName graph name |
| * @param toCount target count |
| * @throws PDException |
| */ |
| public void combineGraph(String graphName, int toCount) throws PDException { |
| Pdpb.CombineGraphRequest request = Pdpb.CombineGraphRequest |
| .newBuilder() |
| .setHeader(header) |
| .setGraphName(graphName) |
| .setToCount(toCount) |
| .build(); |
| Pdpb.CombineGraphResponse response = getStub().combineGraph(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public void deleteShardGroup(int groupId) throws PDException { |
| Pdpb.DeleteShardGroupRequest request = Pdpb.DeleteShardGroupRequest |
| .newBuilder() |
| .setHeader(header) |
| .setGroupId(groupId) |
| .build(); |
| Pdpb.DeleteShardGroupResponse response = |
| blockingUnaryCall(PDGrpc.getDeleteShardGroupMethod(), request); |
| |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * Used for the store's shard list rebuild |
| * |
| * @param groupId shard group id |
| * @param shards shard list,delete when shards size is 0 |
| */ |
| public void updateShardGroupOp(int groupId, List<Metapb.Shard> shards) throws PDException { |
| Pdpb.ChangeShardRequest request = Pdpb.ChangeShardRequest.newBuilder() |
| .setHeader(header) |
| .setGroupId(groupId) |
| .addAllShards(shards) |
| .build(); |
| Pdpb.ChangeShardResponse response = getStub().updateShardGroupOp(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| /** |
| * invoke fireChangeShard command |
| * |
| * @param groupId shard group id |
| * @param shards shard list |
| */ |
| public void changeShard(int groupId, List<Metapb.Shard> shards) throws PDException { |
| Pdpb.ChangeShardRequest request = Pdpb.ChangeShardRequest.newBuilder() |
| .setHeader(header) |
| .setGroupId(groupId) |
| .addAllShards(shards) |
| .build(); |
| Pdpb.ChangeShardResponse response = getStub().changeShard(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public ClientCache getCache() { |
| return cache; |
| } |
| |
| public CacheResponse getClientCache() throws PDException { |
| GetGraphRequest request = GetGraphRequest.newBuilder().setHeader(header).build(); |
| CacheResponse cache = getStub().getCache(request); |
| handleResponseError(cache.getHeader()); |
| return cache; |
| } |
| |
| public CachePartitionResponse getPartitionCache(String graph) throws PDException { |
| GetGraphRequest request = |
| GetGraphRequest.newBuilder().setHeader(header).setGraphName(graph).build(); |
| CachePartitionResponse ps = getStub().getPartitions(request); |
| handleResponseError(ps.getHeader()); |
| return ps; |
| } |
| |
| public void updatePdRaft(String raftConfig) throws PDException { |
| Pdpb.UpdatePdRaftRequest request = Pdpb.UpdatePdRaftRequest.newBuilder() |
| .setHeader(header) |
| .setConfig(raftConfig) |
| .build(); |
| Pdpb.UpdatePdRaftResponse response = getStub().updatePdRaft(request); |
| handleResponseError(response.getHeader()); |
| } |
| |
| public interface PDEventListener { |
| |
| void onStoreChanged(NodeEvent event); |
| |
| void onPartitionChanged(PartitionEvent event); |
| |
| void onGraphChanged(WatchResponse event); |
| |
| default void onShardGroupChanged(WatchResponse event) { |
| } |
| |
| } |
| |
| static class StubProxy { |
| |
| private final LinkedList<String> hostList = new LinkedList<>(); |
| private volatile PDGrpc.PDBlockingStub stub; |
| private String leader; |
| |
| public StubProxy(String[] hosts) { |
| for (String host : hosts) { |
| if (!host.isEmpty()) { |
| hostList.offer(host); |
| } |
| } |
| } |
| |
| public String nextHost() { |
| String host = hostList.poll(); |
| hostList.offer(host); |
| return host; |
| } |
| |
| public void set(PDGrpc.PDBlockingStub stub) { |
| this.stub = stub; |
| } |
| |
| public PDGrpc.PDBlockingStub get() { |
| return this.stub; |
| } |
| |
| public String getHost() { |
| return hostList.peek(); |
| } |
| |
| public int getHostCount() { |
| return hostList.size(); |
| } |
| |
| public String getLeader() { |
| return leader; |
| } |
| |
| public void setLeader(String leader) { |
| this.leader = leader; |
| } |
| } |
| } |