blob: bdbdec39dc3157e73241a0ca0a3edc764b0cdd5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.rest;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.Metapb;
import org.apache.hugegraph.pd.grpc.Pdpb;
import org.apache.hugegraph.pd.model.RestApiResponse;
import org.apache.hugegraph.pd.model.TimeRangeRequest;
import org.apache.hugegraph.pd.service.PDRestService;
import org.apache.hugegraph.pd.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.google.protobuf.util.JsonFormat;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@RestController
@Slf4j
@RequestMapping("/v1")
public class PartitionAPI extends API {
public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
@Autowired
PDRestService pdRestService;
@GetMapping(value = "/highLevelPartitions", produces = MediaType.APPLICATION_JSON_VALUE)
public RestApiResponse getHighLevelPartitions() {
// 分区下多个图的信息
Map<Integer, Map<String, GraphStats>> partitions2GraphsMap = new HashMap<>();
Map<Integer, HighLevelPartition> resultPartitionsMap = new HashMap<>();
// 每一个分区的keyCount, 只从leader处取出
Map<Integer, Long> partition2KeyCount = new HashMap<>();
// 每一个分区的dataSize, 只从leader处取出
Map<Integer, Long> partition2DataSize = new HashMap<>();
List<Metapb.Store> stores;
Map<Long, Metapb.Store> storesMap = new HashMap<>();
try {
stores = pdRestService.getStores("");
} catch (PDException e) {
log.error("getStores error", e);
return new RestApiResponse(null, e.getErrorCode(), e.getMessage());
}
for (Metapb.Store store : stores) {
storesMap.put(store.getId(), store);
List<Metapb.GraphStats> graphStatsList = store.getStats().getGraphStatsList();
for (Metapb.GraphStats graphStats : graphStatsList) {
// 获取分区保存的图信息(只从leader处取出来)
if (Metapb.ShardRole.Leader != graphStats.getRole()) {
continue;
}
// 计算分区的keyCount(不区分图)
partition2KeyCount.put(graphStats.getPartitionId(),
partition2KeyCount.getOrDefault(graphStats.getPartitionId(),
graphStats.getApproximateKeys()));
// 计算分区的dataSize, 通过累加图的大小实现
partition2DataSize.put(graphStats.getPartitionId(),
partition2DataSize.getOrDefault(graphStats.getPartitionId(),
0L)
+ graphStats.getApproximateSize());
// 构造分区下的图信息
if (partitions2GraphsMap.get(graphStats.getPartitionId()) == null) {
partitions2GraphsMap.put(graphStats.getPartitionId(),
new HashMap<String, GraphStats>());
}
Map<String, GraphStats> partitionGraphsMap =
partitions2GraphsMap.get(graphStats.getPartitionId());
partitionGraphsMap.put(graphStats.getGraphName(), new GraphStats(graphStats));
}
}
// 构造分区的所有需返回的信息
List<Metapb.Partition> partitionList = pdRestService.getPartitions("");
for (Metapb.Partition partition : partitionList) {
// 补充分区内图信息的startKey, endKey
if (partitions2GraphsMap.get(partition.getId()) != null) {
GraphStats graphStats =
partitions2GraphsMap.get(partition.getId()).get(partition.getGraphName());
if (graphStats != null) {
graphStats.startKey = partition.getStartKey();
graphStats.endKey = partition.getEndKey();
}
}
// 构造分区整体信息(不区分图)
if ((resultPartitionsMap.get(partition.getId()) == null)
&& (!partition.getGraphName().endsWith("/s"))
) {
Metapb.PartitionStats partitionStats;
try {
partitionStats = pdRestService.getPartitionStats(partition.getGraphName(),
partition.getId());
} catch (PDException e) {
log.error("getPartitionStats error", e);
partitionStats = null;
}
// 初始化分区信息
HighLevelPartition resultPartition =
new HighLevelPartition(partition, partitionStats);
resultPartition.keyCount =
partition2KeyCount.getOrDefault(resultPartition.partitionId, 0L);
resultPartition.dataSize =
partition2DataSize.getOrDefault(resultPartition.partitionId, 0L);
for (ShardStats shard : resultPartition.shards) {
// 对副本的地址,分区信息赋值
shard.address = storesMap.get(shard.storeId).getAddress();
shard.partitionId = partition.getId();
}
if ((partitionStats != null) && (partitionStats.getLeader() != null)) {
long storeId = partitionStats.getLeader().getStoreId(); // 获取leader的storeId
resultPartition.leaderAddress =
storesMap.get(storeId).getAddress(); // 获取leader的address
}
resultPartitionsMap.put(partition.getId(), resultPartition);
}
}
// 构造需返回的分区下的图列表,只返回/g, 且按名称排序
for (Map.Entry<Integer, HighLevelPartition> entry : resultPartitionsMap.entrySet()) {
Integer partitionId = entry.getKey();
HighLevelPartition currentPartition = resultPartitionsMap.get(partitionId);
Map<String, GraphStats> graphsMap = partitions2GraphsMap
.getOrDefault(partitionId, new HashMap<>()); // 避免后面出现空指针异常
ArrayList<GraphStats> graphsList = new ArrayList<>();
for (Map.Entry<String, GraphStats> entry1 : graphsMap.entrySet()) {
if (!entry1.getKey().endsWith("/g")) {
continue; // 只保留/g的图
}
String graphName = entry1.getKey();
GraphStats tmpGraph = graphsMap.get(graphName);
final int postfixLength = 2;
tmpGraph.graphName = tmpGraph.graphName.substring(0, tmpGraph.graphName.length() -
postfixLength);
graphsList.add(tmpGraph);
}
graphsList.sort((o1, o2) -> o1.graphName.compareTo(o2.graphName));
currentPartition.graphs = graphsList;
}
List<HighLevelPartition> resultPartitionList = new ArrayList<>();
if (!resultPartitionsMap.isEmpty()) {
ArrayList<Integer> partitionids = new ArrayList(resultPartitionsMap.keySet());
partitionids.sort((o1, o2) -> o1.intValue() - o2.intValue());
for (Integer partitionId : partitionids) {
resultPartitionList.add(resultPartitionsMap.get(partitionId));
}
}
HashMap<String, Object> dataMap = new HashMap<>();
dataMap.put("partitions", resultPartitionList);
return new RestApiResponse(dataMap, Pdpb.ErrorType.OK, Pdpb.ErrorType.OK.name());
}
@GetMapping(value = "/partitions", produces = MediaType.APPLICATION_JSON_VALUE)
public RestApiResponse getPartitions() {
try {
List<Partition> partitions = new ArrayList<>();//需返回的分区对象
List<Metapb.Partition> partitionList = pdRestService.getPartitions("");
List<Metapb.Store> stores = pdRestService.getStoreStats(false);
//分区的raftNode的状态
HashMap<Long, HashMap<Integer, Metapb.RaftStats>> raftMap = new HashMap<>();
HashMap<Long, HashMap<String, Metapb.GraphStats>> shardIndexMap = new HashMap<>();
String delimiter = "@";
for (int i = 0; i < stores.size(); i++) {
Metapb.Store store = stores.get(i);
Metapb.StoreStats storeStats = store.getStats();
HashMap<Integer, Metapb.RaftStats> storeRaftStats = new HashMap<>();
List<Metapb.RaftStats> raftStatsList = storeStats.getRaftStatsList();
for (int j = 0; j < raftStatsList.size(); j++) {
Metapb.RaftStats raftStats = raftStatsList.get(j);
storeRaftStats.put(raftStats.getPartitionId(), raftStats);
}
HashMap<String, Metapb.GraphStats> partitionShardStats = new HashMap<>();
List<Metapb.GraphStats> graphStatsList = storeStats.getGraphStatsList();
StringBuilder builder = new StringBuilder();
for (int j = 0; j < graphStatsList.size(); j++) {
Metapb.GraphStats graphStats = graphStatsList.get(j);
String graphName = graphStats.getGraphName();
String partitionId = Integer.toString(graphStats.getPartitionId());
builder.append(graphName).append(delimiter).append(partitionId);
partitionShardStats.put(builder.toString(), graphStats);
builder.setLength(0);
}
raftMap.put(store.getId(), storeRaftStats);
shardIndexMap.put(store.getId(), partitionShardStats);
}
for (Metapb.Partition pt : partitionList) {
Partition partition = new Partition(pt);
String graphName = partition.getGraphName();
partition.getShards().sort(Comparator.comparing(Shard::getStoreId));
Metapb.PartitionStats partitionStats =
pdRestService.getPartitionStats(graphName, pt.getId());
Map<Long, Metapb.ShardStats> shardStats = new HashMap<>();
if (partitionStats != null) {
String dateTime = DateFormatUtils.format(
partitionStats.getTimestamp(), DEFAULT_DATETIME_FORMAT);
partition.setTimestamp(dateTime);
shardStats = getShardStats(partitionStats);
}
for (Metapb.Shard shard : pdRestService.getShardList(pt.getId())) {
Map<Long, Metapb.ShardStats> finalShardStats = shardStats;
partition.getShards().add(new Shard() {{
storeId = Long.toString(shard.getStoreId());
role = shard.getRole();
address = pdRestService.getStore(
shard.getStoreId()).getAddress();
if (finalShardStats.containsKey(shard.getStoreId())) {
state = finalShardStats.get(shard.getStoreId()).getState().toString();
progress = finalShardStats.get(shard.getStoreId()).getProgress();
role = finalShardStats.get(shard.getStoreId()).getRole();
}
HashMap<Integer, Metapb.RaftStats> storeRaftStats =
raftMap.get(shard.getStoreId());
if (storeRaftStats != null) {
Metapb.RaftStats raftStats = storeRaftStats.get(partition.getId());
if (raftStats != null) {
committedIndex = Long.toString(raftStats.getCommittedIndex());
}
}
}});
}
partition.setPartitionStats(partitionStats);
partitions.add(partition);
}
partitions.sort(
Comparator.comparing(Partition::getGraphName).thenComparing(Partition::getId));
HashMap<String, Object> dataMap = new HashMap<>();
dataMap.put("partitions", partitions);
return new RestApiResponse(dataMap, Pdpb.ErrorType.OK, Pdpb.ErrorType.OK.name());
} catch (PDException e) {
log.error("query metric data error", e);
return new RestApiResponse(null, e.getErrorCode(), e.getMessage());
}
}
@GetMapping(value = "/partitionsAndStats", produces = MediaType.APPLICATION_JSON_VALUE)
public String getPartitionsAndStats() {
//for debug use, return partition && partitionStats
try {
Map<String, List<Metapb.Partition>> graph2Partitions = new HashMap<>();
Map<String, List<Metapb.PartitionStats>> graph2PartitionStats = new HashMap<>();
for (Metapb.Graph graph : pdRestService.getGraphs()) {
List<Metapb.Partition> partitionList = new ArrayList<>();
List<Metapb.PartitionStats> partitionStatsList = new ArrayList<>();
for (Metapb.Partition partition : pdRestService.getPartitions(
graph.getGraphName())) {
Metapb.PartitionStats partitionStats = pdRestService
.getPartitionStats(graph.getGraphName(), partition.getId());
partitionList.add(partition);
partitionStatsList.add(partitionStats);
}
graph2Partitions.put(graph.getGraphName(), partitionList);
graph2PartitionStats.put(graph.getGraphName(), partitionStatsList);
}
String builder = "{\"partitions\":" + toJSON(graph2Partitions) +
",\"partitionStats\":" + toJSON(graph2PartitionStats) + "}";
return builder;
} catch (PDException e) {
log.error("PD exception:" + e);
return toJSON(e);
}
}
private Map<Long, Metapb.ShardStats> getShardStats(Metapb.PartitionStats partitionStats) {
Map<Long, Metapb.ShardStats> stats = new HashMap<>();
if (partitionStats.getShardStatsList() != null) {
partitionStats.getShardStatsList().forEach(shardStats -> {
stats.put(shardStats.getStoreId(), shardStats);
});
}
return stats;
}
@PostMapping(value = "/partitions/log", consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public String getPartitionLog(@RequestBody TimeRangeRequest request) {
try {
Date dateStart = DateUtil.getDate(request.getStartTime());
Date dateEnd = DateUtil.getDate(request.getEndTime());
List<Metapb.LogRecord> changedRecords =
pdRestService.getPartitionLog(dateStart.getTime(),
dateEnd.getTime());
if (changedRecords != null) {
JsonFormat.TypeRegistry registry = JsonFormat.TypeRegistry
.newBuilder().add(Pdpb.SplitDataRequest.getDescriptor()).build();
return toJSON(changedRecords, registry);
} else {
return toJSON(new PDException(Pdpb.ErrorType.NOT_FOUND_VALUE, "error"));
}
} catch (PDException e) {
return toJSON(e);
}
}
@GetMapping(value = "/", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Statistics getStatistics() throws PDException, ExecutionException, InterruptedException {
Statistics statistics = new Statistics();
int partitionId = -1;
return statistics;
}
@Data
class Shard {
String address;
String storeId;
Metapb.ShardRole role;
String state;
int progress;
String committedIndex;
long partitionId;
}
@Data
class Partition {
int id;
long version;
String graphName;
long startKey;
long endKey;
Metapb.PartitionState workState;
List<Shard> shards;
String timestamp;
Partition(Metapb.Partition pt) {
id = pt.getId();
version = pt.getVersion();
graphName = pt.getGraphName();
startKey = pt.getStartKey();
endKey = pt.getEndKey();
workState = pt.getState();
shards = new ArrayList<>();
}
public void setPartitionStats(Metapb.PartitionStats stats) {
}
}
@Data
class Statistics {
}
@Data
class HighLevelPartition {
int partitionId;
String state;
String leaderAddress;
long keyCount;
long dataSize;
String shardState;
int progress;
long raftTerm; //任期
List<GraphStats> graphs;
List<ShardStats> shards;
String failureCause = "";
HighLevelPartition(Metapb.Partition partition, Metapb.PartitionStats partitionStats) {
partitionId = partition.getId();
state = String.valueOf(partition.getState());
if (partitionStats != null) {
raftTerm = partitionStats.getLeaderTerm();
}
Metapb.ShardState tmpShardState = Metapb.ShardState.SState_Normal;
if (partitionStats != null) {
shards = new ArrayList<>();
for (Metapb.ShardStats shardStats : partitionStats.getShardStatsList()) {
if ((shardStats.getState() != Metapb.ShardState.UNRECOGNIZED)
&& (shardStats.getState().getNumber() > tmpShardState.getNumber())) {
tmpShardState = shardStats.getState();
progress = shardStats.getProgress();
}
shards.add(new ShardStats(shardStats));
}
} else {
shards = new ArrayList<>();
try {
for (Metapb.Shard shard : pdRestService.getShardList(partition.getId())) {
shards.add(new ShardStats(shard));
}
} catch (PDException e) {
log.error("get shard list failed, {}", e.getMessage());
}
}
// 综合所有副本的状态,给shardState赋值
shardState = tmpShardState.name();
}
}
@Data
class GraphStats {
String graphName;
long keyCount;
long startKey;
long endKey;
long dataSize;
String workState;
long partitionId;
GraphStats(Metapb.GraphStats graphStats) {
graphName = graphStats.getGraphName();
keyCount = graphStats.getApproximateKeys();
workState = graphStats.getWorkState().toString();
dataSize = graphStats.getApproximateSize();
partitionId = graphStats.getPartitionId();
}
}
@Data
class ShardStats {
long storeId;
String role;
String state;
int progress;
//额外属性
long partitionId;
String address;
ShardStats(Metapb.ShardStats shardStats) {
storeId = shardStats.getStoreId();
role = String.valueOf(shardStats.getRole());
state = shardStats.getState().toString();
progress = shardStats.getProgress();
}
ShardStats(Metapb.Shard shard) {
//当没有shardStats的初始化方法
storeId = shard.getStoreId();
role = String.valueOf(shard.getRole());
state = Metapb.ShardState.SState_Normal.name();
progress = 0;
}
}
}