blob: 9df838111283189f02d67a764e1546df0d78debc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.hugegraph.pd.ConfigService;
import org.apache.hugegraph.pd.LogService;
import org.apache.hugegraph.pd.PartitionService;
import org.apache.hugegraph.pd.StoreMonitorDataService;
import org.apache.hugegraph.pd.StoreNodeService;
import org.apache.hugegraph.pd.TaskScheduleService;
import org.apache.hugegraph.pd.common.HgAssert;
import org.apache.hugegraph.pd.common.KVPair;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.Metapb;
import org.apache.hugegraph.pd.grpc.Pdpb;
import org.apache.hugegraph.pd.grpc.discovery.NodeInfo;
import org.apache.hugegraph.pd.grpc.discovery.NodeInfos;
import org.apache.hugegraph.pd.grpc.discovery.Query;
import org.apache.hugegraph.pd.grpc.discovery.RegisterInfo;
import org.apache.hugegraph.pd.model.RegistryRestRequest;
import org.apache.hugegraph.pd.model.RegistryRestResponse;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class PDRestService implements InitializingBean {
private static final String EMPTY_STRING = "";
@Autowired
PDService pdService;
@Autowired
DiscoveryService discoveryService;
private StoreNodeService storeNodeService;
private PartitionService partitionService;
private TaskScheduleService monitorService;
private ConfigService configService;
private LogService logService;
private StoreMonitorDataService storeMonitorDataService;
/**
* initialize
*
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
storeNodeService = pdService.getStoreNodeService();
partitionService = pdService.getPartitionService();
monitorService = pdService.getTaskService();
configService = pdService.getConfigService();
logService = pdService.getLogService();
storeMonitorDataService = pdService.getStoreMonitorDataService();
HgAssert.isNotNull(storeNodeService, "storeNodeService does not initialize");
HgAssert.isNotNull(partitionService, "partitionService does not initialize");
}
public List<Metapb.Store> getStores(String graphName) throws PDException {
return storeNodeService.getStores(graphName);
}
public Metapb.Store getStore(long storeId) throws PDException {
return storeNodeService.getStore(storeId);
}
public List<Metapb.ShardGroup> getShardGroups() throws PDException {
return storeNodeService.getShardGroups();
}
public Metapb.Store updateStore(Metapb.Store store) throws PDException {
logService.insertLog(LogService.NODE_CHANGE, LogService.REST, store);
return storeNodeService.updateStore(store);
}
public boolean removeStore(Long storeId) throws PDException {
if (storeId == null) {
return false;
}
return 0 != storeNodeService.removeStore(storeId);
}
public Metapb.GraphSpace setGraphSpace(Metapb.GraphSpace graphSpace) throws PDException {
return configService.setGraphSpace(graphSpace);
}
public List<Metapb.GraphSpace> getGraphSpaces() throws PDException {
return configService.getGraphSpace(EMPTY_STRING);
}
public Metapb.GraphSpace getGraphSpace(String graphSpaceName) throws PDException {
return configService.getGraphSpace(graphSpaceName).get(0);
}
public List<Metapb.Graph> getGraphs() throws PDException {
return partitionService.getGraphs();
}
public Metapb.Graph getGraph(String graphName) throws PDException {
return partitionService.getGraph(graphName);
}
public Metapb.Graph updateGraph(Metapb.Graph graph) throws PDException {
return partitionService.updateGraph(graph);
}
public List<Metapb.Partition> getPartitions(String graphName) {
return partitionService.getPartitions(graphName);
}
public List<Metapb.Store> patrolStores() throws PDException {
return monitorService.patrolStores();
}
public List<Metapb.Partition> patrolPartitions() throws PDException {
return monitorService.patrolPartitions();
}
public Metapb.PartitionStats getPartitionStats(String graphName, int partitionId) throws
PDException {
return partitionService.getPartitionStats(graphName, partitionId);
}
public List<Metapb.PartitionStats> getPartitionStatus(String graphName) throws PDException {
return partitionService.getPartitionStatus(graphName);
}
public Map<Integer, KVPair<Long, Long>> balancePartitions() throws PDException {
return monitorService.balancePartitionShard();
}
public List<Metapb.Partition> splitPartitions() throws PDException {
return monitorService.autoSplitPartition();
}
public List<Metapb.Store> getStoreStats(boolean isActive) throws PDException {
return storeNodeService.getStoreStatus(isActive);
}
public List<Map<String, Long>> getMonitorData(long storeId) throws PDException {
return storeMonitorDataService.getStoreMonitorData(storeId);
}
public String getMonitorDataText(long storeId) throws PDException {
return storeMonitorDataService.getStoreMonitorDataText(storeId);
}
public RegistryRestResponse register(NodeInfo nodeInfo) throws PDException {
CountDownLatch latch = new CountDownLatch(1);
final RegisterInfo[] info = {null};
RegistryRestResponse response = new RegistryRestResponse();
try {
StreamObserver<RegisterInfo> observer = new StreamObserver<RegisterInfo>() {
@Override
public void onNext(RegisterInfo value) {
info[0] = value;
latch.countDown();
}
@Override
public void onError(Throwable t) {
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
};
this.discoveryService.register(nodeInfo, observer);
latch.await();
Pdpb.Error error = info[0].getHeader().getError();
response.setErrorType(error.getType());
response.setMessage(error.getMessage());
} catch (InterruptedException e) {
response.setErrorType(Pdpb.ErrorType.UNRECOGNIZED);
response.setMessage(e.getMessage());
}
return response;
}
public ArrayList<RegistryRestRequest> getNodeInfo(Query request) throws PDException {
CountDownLatch latch = new CountDownLatch(1);
final NodeInfos[] info = {null};
RegistryRestResponse response = new RegistryRestResponse();
ArrayList<RegistryRestRequest> registryRestRequests = null;
try {
StreamObserver<NodeInfos> observer = new StreamObserver<NodeInfos>() {
@Override
public void onNext(NodeInfos value) {
info[0] = value;
latch.countDown();
}
@Override
public void onError(Throwable t) {
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
};
this.discoveryService.getNodes(request, observer);
latch.await();
List<NodeInfo> infoList = info[0].getInfoList();
registryRestRequests = new ArrayList(infoList.size());
for (int i = 0; i < infoList.size(); i++) {
NodeInfo element = infoList.get(i);
RegistryRestRequest registryRestRequest = new RegistryRestRequest();
registryRestRequest.setAddress(element.getAddress());
registryRestRequest.setAppName(element.getAppName());
registryRestRequest.setVersion(element.getVersion());
registryRestRequest.setInterval(String.valueOf(element.getInterval()));
HashMap<String, String> labels = new HashMap<>();
labels.putAll(element.getLabelsMap());
registryRestRequest.setLabels(labels);
registryRestRequests.add(registryRestRequest);
}
} catch (InterruptedException e) {
response.setErrorType(Pdpb.ErrorType.UNRECOGNIZED);
response.setMessage(e.getMessage());
}
return registryRestRequests;
}
public List<Metapb.LogRecord> getStoreStatusLog(Long start, Long end) throws PDException {
return logService.getLog(LogService.NODE_CHANGE, start, end);
}
public List<Metapb.LogRecord> getPartitionLog(Long start, Long end) throws PDException {
return logService.getLog(LogService.PARTITION_CHANGE, start, end);
}
public Map<Integer, Long> balancePartitionLeader() throws PDException {
return monitorService.balancePartitionLeader(true);
}
public void dbCompaction() throws PDException {
monitorService.dbCompaction("");
}
public List<Metapb.Shard> getShardList(int partitionId) throws PDException {
return storeNodeService.getShardList(partitionId);
}
}