| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hugegraph.pd.rest; |
| |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutionException; |
| |
| import org.apache.commons.lang.time.DateFormatUtils; |
| import org.apache.hugegraph.pd.common.PDException; |
| import org.apache.hugegraph.pd.grpc.Metapb; |
| import org.apache.hugegraph.pd.grpc.Pdpb; |
| import org.apache.hugegraph.pd.model.RestApiResponse; |
| import org.apache.hugegraph.pd.model.TimeRangeRequest; |
| import org.apache.hugegraph.pd.service.PDRestService; |
| import org.apache.hugegraph.pd.util.DateUtil; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.http.MediaType; |
| import org.springframework.web.bind.annotation.GetMapping; |
| import org.springframework.web.bind.annotation.PostMapping; |
| import org.springframework.web.bind.annotation.RequestBody; |
| import org.springframework.web.bind.annotation.RequestMapping; |
| import org.springframework.web.bind.annotation.ResponseBody; |
| import org.springframework.web.bind.annotation.RestController; |
| |
| import com.google.protobuf.util.JsonFormat; |
| |
| import lombok.Data; |
| import lombok.extern.slf4j.Slf4j; |
| |
| @RestController |
| @Slf4j |
| @RequestMapping("/v1") |
| public class PartitionAPI extends API { |
| |
| public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; |
| @Autowired |
| PDRestService pdRestService; |
| |
| @GetMapping(value = "/highLevelPartitions", produces = MediaType.APPLICATION_JSON_VALUE) |
| public RestApiResponse getHighLevelPartitions() { |
| // 分区下多个图的信息 |
| Map<Integer, Map<String, GraphStats>> partitions2GraphsMap = new HashMap<>(); |
| Map<Integer, HighLevelPartition> resultPartitionsMap = new HashMap<>(); |
| // 每一个分区的keyCount, 只从leader处取出 |
| Map<Integer, Long> partition2KeyCount = new HashMap<>(); |
| // 每一个分区的dataSize, 只从leader处取出 |
| Map<Integer, Long> partition2DataSize = new HashMap<>(); |
| List<Metapb.Store> stores; |
| Map<Long, Metapb.Store> storesMap = new HashMap<>(); |
| try { |
| stores = pdRestService.getStores(""); |
| } catch (PDException e) { |
| log.error("getStores error", e); |
| return new RestApiResponse(null, e.getErrorCode(), e.getMessage()); |
| } |
| for (Metapb.Store store : stores) { |
| storesMap.put(store.getId(), store); |
| List<Metapb.GraphStats> graphStatsList = store.getStats().getGraphStatsList(); |
| for (Metapb.GraphStats graphStats : graphStatsList) { |
| // 获取分区保存的图信息(只从leader处取出来) |
| if (Metapb.ShardRole.Leader != graphStats.getRole()) { |
| continue; |
| } |
| // 计算分区的keyCount(不区分图) |
| partition2KeyCount.put(graphStats.getPartitionId(), |
| partition2KeyCount.getOrDefault(graphStats.getPartitionId(), |
| graphStats.getApproximateKeys())); |
| // 计算分区的dataSize, 通过累加图的大小实现 |
| partition2DataSize.put(graphStats.getPartitionId(), |
| partition2DataSize.getOrDefault(graphStats.getPartitionId(), |
| 0L) |
| + graphStats.getApproximateSize()); |
| // 构造分区下的图信息 |
| if (partitions2GraphsMap.get(graphStats.getPartitionId()) == null) { |
| partitions2GraphsMap.put(graphStats.getPartitionId(), |
| new HashMap<String, GraphStats>()); |
| } |
| Map<String, GraphStats> partitionGraphsMap = |
| partitions2GraphsMap.get(graphStats.getPartitionId()); |
| partitionGraphsMap.put(graphStats.getGraphName(), new GraphStats(graphStats)); |
| } |
| } |
| // 构造分区的所有需返回的信息 |
| List<Metapb.Partition> partitionList = pdRestService.getPartitions(""); |
| for (Metapb.Partition partition : partitionList) { |
| // 补充分区内图信息的startKey, endKey |
| if (partitions2GraphsMap.get(partition.getId()) != null) { |
| GraphStats graphStats = |
| partitions2GraphsMap.get(partition.getId()).get(partition.getGraphName()); |
| if (graphStats != null) { |
| graphStats.startKey = partition.getStartKey(); |
| graphStats.endKey = partition.getEndKey(); |
| } |
| } |
| // 构造分区整体信息(不区分图) |
| if ((resultPartitionsMap.get(partition.getId()) == null) |
| && (!partition.getGraphName().endsWith("/s")) |
| ) { |
| Metapb.PartitionStats partitionStats; |
| try { |
| partitionStats = pdRestService.getPartitionStats(partition.getGraphName(), |
| partition.getId()); |
| } catch (PDException e) { |
| log.error("getPartitionStats error", e); |
| partitionStats = null; |
| } |
| // 初始化分区信息 |
| HighLevelPartition resultPartition = |
| new HighLevelPartition(partition, partitionStats); |
| resultPartition.keyCount = |
| partition2KeyCount.getOrDefault(resultPartition.partitionId, 0L); |
| resultPartition.dataSize = |
| partition2DataSize.getOrDefault(resultPartition.partitionId, 0L); |
| for (ShardStats shard : resultPartition.shards) { |
| // 对副本的地址,分区信息赋值 |
| shard.address = storesMap.get(shard.storeId).getAddress(); |
| shard.partitionId = partition.getId(); |
| } |
| if ((partitionStats != null) && (partitionStats.getLeader() != null)) { |
| long storeId = partitionStats.getLeader().getStoreId(); // 获取leader的storeId |
| resultPartition.leaderAddress = |
| storesMap.get(storeId).getAddress(); // 获取leader的address |
| } |
| resultPartitionsMap.put(partition.getId(), resultPartition); |
| } |
| } |
| // 构造需返回的分区下的图列表,只返回/g, 且按名称排序 |
| for (Map.Entry<Integer, HighLevelPartition> entry : resultPartitionsMap.entrySet()) { |
| Integer partitionId = entry.getKey(); |
| HighLevelPartition currentPartition = resultPartitionsMap.get(partitionId); |
| Map<String, GraphStats> graphsMap = partitions2GraphsMap |
| .getOrDefault(partitionId, new HashMap<>()); // 避免后面出现空指针异常 |
| ArrayList<GraphStats> graphsList = new ArrayList<>(); |
| for (Map.Entry<String, GraphStats> entry1 : graphsMap.entrySet()) { |
| if (!entry1.getKey().endsWith("/g")) { |
| continue; // 只保留/g的图 |
| } |
| String graphName = entry1.getKey(); |
| GraphStats tmpGraph = graphsMap.get(graphName); |
| final int postfixLength = 2; |
| tmpGraph.graphName = tmpGraph.graphName.substring(0, tmpGraph.graphName.length() - |
| postfixLength); |
| graphsList.add(tmpGraph); |
| } |
| graphsList.sort((o1, o2) -> o1.graphName.compareTo(o2.graphName)); |
| currentPartition.graphs = graphsList; |
| } |
| List<HighLevelPartition> resultPartitionList = new ArrayList<>(); |
| if (!resultPartitionsMap.isEmpty()) { |
| ArrayList<Integer> partitionids = new ArrayList(resultPartitionsMap.keySet()); |
| partitionids.sort((o1, o2) -> o1.intValue() - o2.intValue()); |
| for (Integer partitionId : partitionids) { |
| resultPartitionList.add(resultPartitionsMap.get(partitionId)); |
| } |
| } |
| HashMap<String, Object> dataMap = new HashMap<>(); |
| dataMap.put("partitions", resultPartitionList); |
| return new RestApiResponse(dataMap, Pdpb.ErrorType.OK, Pdpb.ErrorType.OK.name()); |
| } |
| |
| @GetMapping(value = "/partitions", produces = MediaType.APPLICATION_JSON_VALUE) |
| public RestApiResponse getPartitions() { |
| try { |
| List<Partition> partitions = new ArrayList<>();//需返回的分区对象 |
| List<Metapb.Partition> partitionList = pdRestService.getPartitions(""); |
| List<Metapb.Store> stores = pdRestService.getStoreStats(false); |
| //分区的raftNode的状态 |
| HashMap<Long, HashMap<Integer, Metapb.RaftStats>> raftMap = new HashMap<>(); |
| |
| HashMap<Long, HashMap<String, Metapb.GraphStats>> shardIndexMap = new HashMap<>(); |
| String delimiter = "@"; |
| for (int i = 0; i < stores.size(); i++) { |
| Metapb.Store store = stores.get(i); |
| Metapb.StoreStats storeStats = store.getStats(); |
| HashMap<Integer, Metapb.RaftStats> storeRaftStats = new HashMap<>(); |
| List<Metapb.RaftStats> raftStatsList = storeStats.getRaftStatsList(); |
| for (int j = 0; j < raftStatsList.size(); j++) { |
| Metapb.RaftStats raftStats = raftStatsList.get(j); |
| storeRaftStats.put(raftStats.getPartitionId(), raftStats); |
| } |
| |
| HashMap<String, Metapb.GraphStats> partitionShardStats = new HashMap<>(); |
| List<Metapb.GraphStats> graphStatsList = storeStats.getGraphStatsList(); |
| StringBuilder builder = new StringBuilder(); |
| for (int j = 0; j < graphStatsList.size(); j++) { |
| Metapb.GraphStats graphStats = graphStatsList.get(j); |
| String graphName = graphStats.getGraphName(); |
| String partitionId = Integer.toString(graphStats.getPartitionId()); |
| builder.append(graphName).append(delimiter).append(partitionId); |
| partitionShardStats.put(builder.toString(), graphStats); |
| builder.setLength(0); |
| } |
| raftMap.put(store.getId(), storeRaftStats); |
| shardIndexMap.put(store.getId(), partitionShardStats); |
| } |
| |
| for (Metapb.Partition pt : partitionList) { |
| Partition partition = new Partition(pt); |
| String graphName = partition.getGraphName(); |
| partition.getShards().sort(Comparator.comparing(Shard::getStoreId)); |
| Metapb.PartitionStats partitionStats = |
| pdRestService.getPartitionStats(graphName, pt.getId()); |
| Map<Long, Metapb.ShardStats> shardStats = new HashMap<>(); |
| if (partitionStats != null) { |
| String dateTime = DateFormatUtils.format( |
| partitionStats.getTimestamp(), DEFAULT_DATETIME_FORMAT); |
| partition.setTimestamp(dateTime); |
| shardStats = getShardStats(partitionStats); |
| } |
| |
| for (Metapb.Shard shard : pdRestService.getShardList(pt.getId())) { |
| Map<Long, Metapb.ShardStats> finalShardStats = shardStats; |
| partition.getShards().add(new Shard() {{ |
| storeId = Long.toString(shard.getStoreId()); |
| role = shard.getRole(); |
| address = pdRestService.getStore( |
| shard.getStoreId()).getAddress(); |
| if (finalShardStats.containsKey(shard.getStoreId())) { |
| state = finalShardStats.get(shard.getStoreId()).getState().toString(); |
| progress = finalShardStats.get(shard.getStoreId()).getProgress(); |
| role = finalShardStats.get(shard.getStoreId()).getRole(); |
| } |
| |
| HashMap<Integer, Metapb.RaftStats> storeRaftStats = |
| raftMap.get(shard.getStoreId()); |
| if (storeRaftStats != null) { |
| Metapb.RaftStats raftStats = storeRaftStats.get(partition.getId()); |
| if (raftStats != null) { |
| committedIndex = Long.toString(raftStats.getCommittedIndex()); |
| } |
| } |
| }}); |
| } |
| |
| partition.setPartitionStats(partitionStats); |
| |
| partitions.add(partition); |
| } |
| partitions.sort( |
| Comparator.comparing(Partition::getGraphName).thenComparing(Partition::getId)); |
| HashMap<String, Object> dataMap = new HashMap<>(); |
| dataMap.put("partitions", partitions); |
| return new RestApiResponse(dataMap, Pdpb.ErrorType.OK, Pdpb.ErrorType.OK.name()); |
| } catch (PDException e) { |
| log.error("query metric data error", e); |
| return new RestApiResponse(null, e.getErrorCode(), e.getMessage()); |
| } |
| } |
| |
| @GetMapping(value = "/partitionsAndStats", produces = MediaType.APPLICATION_JSON_VALUE) |
| public String getPartitionsAndStats() { |
| //for debug use, return partition && partitionStats |
| try { |
| Map<String, List<Metapb.Partition>> graph2Partitions = new HashMap<>(); |
| Map<String, List<Metapb.PartitionStats>> graph2PartitionStats = new HashMap<>(); |
| for (Metapb.Graph graph : pdRestService.getGraphs()) { |
| List<Metapb.Partition> partitionList = new ArrayList<>(); |
| List<Metapb.PartitionStats> partitionStatsList = new ArrayList<>(); |
| for (Metapb.Partition partition : pdRestService.getPartitions( |
| graph.getGraphName())) { |
| Metapb.PartitionStats partitionStats = pdRestService |
| .getPartitionStats(graph.getGraphName(), partition.getId()); |
| partitionList.add(partition); |
| partitionStatsList.add(partitionStats); |
| } |
| graph2Partitions.put(graph.getGraphName(), partitionList); |
| graph2PartitionStats.put(graph.getGraphName(), partitionStatsList); |
| } |
| String builder = "{\"partitions\":" + toJSON(graph2Partitions) + |
| ",\"partitionStats\":" + toJSON(graph2PartitionStats) + "}"; |
| return builder; |
| } catch (PDException e) { |
| log.error("PD exception:" + e); |
| return toJSON(e); |
| } |
| } |
| |
| private Map<Long, Metapb.ShardStats> getShardStats(Metapb.PartitionStats partitionStats) { |
| Map<Long, Metapb.ShardStats> stats = new HashMap<>(); |
| if (partitionStats.getShardStatsList() != null) { |
| partitionStats.getShardStatsList().forEach(shardStats -> { |
| stats.put(shardStats.getStoreId(), shardStats); |
| }); |
| } |
| return stats; |
| } |
| |
| @PostMapping(value = "/partitions/log", consumes = MediaType.APPLICATION_JSON_VALUE, |
| produces = MediaType.APPLICATION_JSON_VALUE) |
| @ResponseBody |
| public String getPartitionLog(@RequestBody TimeRangeRequest request) { |
| try { |
| Date dateStart = DateUtil.getDate(request.getStartTime()); |
| Date dateEnd = DateUtil.getDate(request.getEndTime()); |
| List<Metapb.LogRecord> changedRecords = |
| pdRestService.getPartitionLog(dateStart.getTime(), |
| dateEnd.getTime()); |
| if (changedRecords != null) { |
| JsonFormat.TypeRegistry registry = JsonFormat.TypeRegistry |
| .newBuilder().add(Pdpb.SplitDataRequest.getDescriptor()).build(); |
| return toJSON(changedRecords, registry); |
| } else { |
| return toJSON(new PDException(Pdpb.ErrorType.NOT_FOUND_VALUE, "error")); |
| } |
| } catch (PDException e) { |
| return toJSON(e); |
| } |
| } |
| |
| @GetMapping(value = "/", produces = MediaType.APPLICATION_JSON_VALUE) |
| @ResponseBody |
| public Statistics getStatistics() throws PDException, ExecutionException, InterruptedException { |
| |
| Statistics statistics = new Statistics(); |
| int partitionId = -1; |
| return statistics; |
| } |
| |
| @Data |
| class Shard { |
| |
| String address; |
| String storeId; |
| Metapb.ShardRole role; |
| String state; |
| int progress; |
| String committedIndex; |
| long partitionId; |
| |
| } |
| |
| @Data |
| class Partition { |
| |
| int id; |
| long version; |
| String graphName; |
| long startKey; |
| long endKey; |
| |
| Metapb.PartitionState workState; |
| List<Shard> shards; |
| String timestamp; |
| |
| Partition(Metapb.Partition pt) { |
| id = pt.getId(); |
| version = pt.getVersion(); |
| graphName = pt.getGraphName(); |
| startKey = pt.getStartKey(); |
| endKey = pt.getEndKey(); |
| workState = pt.getState(); |
| shards = new ArrayList<>(); |
| } |
| |
| public void setPartitionStats(Metapb.PartitionStats stats) { |
| |
| } |
| } |
| |
| @Data |
| class Statistics { |
| |
| } |
| |
| @Data |
| class HighLevelPartition { |
| |
| int partitionId; |
| String state; |
| String leaderAddress; |
| long keyCount; |
| long dataSize; |
| String shardState; |
| int progress; |
| long raftTerm; //任期 |
| List<GraphStats> graphs; |
| List<ShardStats> shards; |
| String failureCause = ""; |
| |
| HighLevelPartition(Metapb.Partition partition, Metapb.PartitionStats partitionStats) { |
| partitionId = partition.getId(); |
| state = String.valueOf(partition.getState()); |
| if (partitionStats != null) { |
| raftTerm = partitionStats.getLeaderTerm(); |
| } |
| Metapb.ShardState tmpShardState = Metapb.ShardState.SState_Normal; |
| if (partitionStats != null) { |
| shards = new ArrayList<>(); |
| for (Metapb.ShardStats shardStats : partitionStats.getShardStatsList()) { |
| if ((shardStats.getState() != Metapb.ShardState.UNRECOGNIZED) |
| && (shardStats.getState().getNumber() > tmpShardState.getNumber())) { |
| tmpShardState = shardStats.getState(); |
| progress = shardStats.getProgress(); |
| } |
| shards.add(new ShardStats(shardStats)); |
| } |
| } else { |
| shards = new ArrayList<>(); |
| try { |
| for (Metapb.Shard shard : pdRestService.getShardList(partition.getId())) { |
| shards.add(new ShardStats(shard)); |
| } |
| } catch (PDException e) { |
| log.error("get shard list failed, {}", e.getMessage()); |
| } |
| } |
| // 综合所有副本的状态,给shardState赋值 |
| shardState = tmpShardState.name(); |
| } |
| } |
| |
| @Data |
| class GraphStats { |
| |
| String graphName; |
| long keyCount; |
| long startKey; |
| long endKey; |
| long dataSize; |
| String workState; |
| long partitionId; |
| |
| GraphStats(Metapb.GraphStats graphStats) { |
| graphName = graphStats.getGraphName(); |
| keyCount = graphStats.getApproximateKeys(); |
| workState = graphStats.getWorkState().toString(); |
| dataSize = graphStats.getApproximateSize(); |
| partitionId = graphStats.getPartitionId(); |
| } |
| } |
| |
| @Data |
| class ShardStats { |
| |
| long storeId; |
| String role; |
| String state; |
| int progress; |
| //额外属性 |
| long partitionId; |
| String address; |
| |
| ShardStats(Metapb.ShardStats shardStats) { |
| storeId = shardStats.getStoreId(); |
| role = String.valueOf(shardStats.getRole()); |
| state = shardStats.getState().toString(); |
| progress = shardStats.getProgress(); |
| } |
| |
| ShardStats(Metapb.Shard shard) { |
| //当没有shardStats的初始化方法 |
| storeId = shard.getStoreId(); |
| role = String.valueOf(shard.getRole()); |
| state = Metapb.ShardState.SState_Normal.name(); |
| progress = 0; |
| } |
| } |
| } |