blob: a5e6dc863132288284ec4279f98ae2a5828f3721 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.service;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hugegraph.pd.RegistryService;
import org.apache.hugegraph.pd.common.HgAssert;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.config.PDConfig;
import org.apache.hugegraph.pd.grpc.Metapb;
import org.apache.hugegraph.pd.grpc.Pdpb;
import org.apache.hugegraph.pd.grpc.discovery.NodeInfo;
import org.apache.hugegraph.pd.grpc.discovery.NodeInfos;
import org.apache.hugegraph.pd.grpc.discovery.Query;
import org.apache.hugegraph.pd.model.PromTargetsModel;
import org.apache.hugegraph.pd.rest.MemberAPI;
import org.apache.hugegraph.pd.util.HgMapCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
@Service
@Slf4j
public class PromTargetsService {
private final PromTargetsModel pdModel = PromTargetsModel.of()
.addLabel("__app_name", "pd")
.setScheme("http")
.setMetricsPath(
"/actuator/prometheus");
private final PromTargetsModel storeModel = PromTargetsModel.of()
.addLabel("__app_name", "store")
.setScheme("http")
.setMetricsPath(
"/actuator/prometheus");
private final HgMapCache<String, Set<String>> targetsCache =
HgMapCache.expiredOf(24 * 60 * 60 * 1000);// expired after 24H.
@Autowired
private PDConfig pdConfig;
@Autowired
private PDService pdService;
private RegistryService register;
private RegistryService getRegister() {
if (this.register == null) {
this.register = new RegistryService(this.pdConfig);
}
return this.register;
}
public List<PromTargetsModel> getAllTargets() {
List<PromTargetsModel> res = new LinkedList<>();
List<PromTargetsModel> buf =
this.toModels(this.getRegister().getNodes(Query.newBuilder().build()));
if (buf != null) {
res.addAll(buf);
}
res.add(getPdTargets());
res.add(getStoreTargets());
return res;
}
/**
* @param appName
* @return null if it's not existing
*/
public List<PromTargetsModel> getTargets(String appName) {
HgAssert.isArgumentNotNull(appName, "appName");
switch (appName) {
case "pd":
return Collections.singletonList(this.getPdTargets());
case "store":
return Collections.singletonList(this.getStoreTargets());
default:
return this.toModels(this.getRegister()
.getNodes(Query.newBuilder().setAppName(appName).build()));
}
}
private PromTargetsModel getPdTargets() {
return setTargets(pdModel, () -> this.mergeCache("pd", getPdAddresses()));
}
private PromTargetsModel getStoreTargets() {
return setTargets(storeModel, () -> this.mergeCache("store", getStoreAddresses()));
}
private PromTargetsModel setTargets(PromTargetsModel model, Supplier<Set<String>> supplier) {
return model.setTargets(supplier.get())
.setClusterId(String.valueOf(pdConfig.getClusterId()));
}
/* to prevent the failure of connection between pd and store or pd and pd.*/
//TODO: To add a schedule task to refresh targets, not to retrieve in every time.
private Set<String> mergeCache(String key, Set<String> set) {
Set<String> buf = this.targetsCache.get(key);
if (buf == null) {
buf = new HashSet<>();
this.targetsCache.put(key, buf);
}
if (set != null) {
buf.addAll(set);
}
return buf;
}
private List<PromTargetsModel> toModels(NodeInfos info) {
if (info == null) {
return null;
}
List<NodeInfo> nodes = info.getInfoList();
if (nodes == null || nodes.isEmpty()) {
return null;
}
List<PromTargetsModel> res =
nodes.stream().map(e -> {
Map<String, String> labels = e.getLabelsMap();
String target = labels.get("target");
if (HgAssert.isInvalid(target)) {
return null;
}
PromTargetsModel model = PromTargetsModel.of();
model.addTarget(target);
model.addLabel("__app_name", e.getAppName());
labels.forEach((k, v) -> {
k = k.trim();
switch (k) {
case "metrics":
model.setMetricsPath(v.trim());
break;
case "scheme":
model.setScheme(v.trim());
break;
default:
if (k.startsWith("__")) {
model.addLabel(k, v);
}
}
});
return model;
})
.filter(e -> e != null)
.collect(Collectors.toList());
if (res.isEmpty()) {
return null;
}
return res;
}
private Set<String> getPdAddresses() {
MemberAPI.CallStreamObserverWrap<Pdpb.GetMembersResponse> response =
new MemberAPI.CallStreamObserverWrap<>();
pdService.getMembers(Pdpb.GetMembersRequest.newBuilder().build(), response);
List<Metapb.Member> members = null;
try {
members = response.get().get(0).getMembersList();
} catch (Throwable e) {
log.error("Failed to get all pd members.", e);
}
Set<String> res = new HashSet<>();
if (members != null) {
members.stream().forEach(e -> res.add(e.getRestUrl()));
}
return res;
}
private Set<String> getStoreAddresses() {
Set<String> res = new HashSet<>();
List<Metapb.Store> stores = null;
try {
stores = pdService.getStoreNodeService().getStores();
} catch (PDException e) {
log.error("Failed to get all stores.", e);
}
if (stores != null) {
stores.stream().forEach(e -> {
String buf = this.getRestAddress(e);
if (buf != null) {
res.add(buf);
}
});
}
return res;
}
//TODO: optimized store registry data, to add host:port of REST server.
private String getRestAddress(Metapb.Store store) {
String address = store.getAddress();
if (address == null || address.isEmpty()) {
return null;
}
try {
Optional<String> port = store.getLabelsList().stream().map(
e -> {
if ("rest.port".equals(e.getKey())) {
return e.getValue();
}
return null;
}).filter(e -> e != null).findFirst();
if (port.isPresent()) {
address = address.substring(0, address.indexOf(':') + 1);
address = address + port.get();
}
} catch (Throwable t) {
log.error("Failed to extract the REST address of store, cause by:", t);
}
return address;
}
}