blob: e666929a762ef28680c66a4a03c0ee8647f83aee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.skywalking.banyandb.v1.client.DataPoint;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.query.enumeration.Language;
import org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.Attribute;
import org.apache.skywalking.oap.server.core.query.type.Endpoint;
import org.apache.skywalking.oap.server.core.query.type.Process;
import org.apache.skywalking.oap.server.core.query.type.Service;
import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE;
public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMetadataQueryDAO {
private static final Set<String> SERVICE_TRAFFIC_TAGS = ImmutableSet.of(ServiceTraffic.NAME,
ServiceTraffic.SHORT_NAME, ServiceTraffic.GROUP, ServiceTraffic.LAYER, ServiceTraffic.SERVICE_ID);
private static final Set<String> INSTANCE_TRAFFIC_TAGS = ImmutableSet.of(InstanceTraffic.NAME,
InstanceTraffic.PROPERTIES, InstanceTraffic.LAST_PING_TIME_BUCKET, InstanceTraffic.SERVICE_ID);
private static final Set<String> INSTANCE_TRAFFIC_COMPACT_TAGS = ImmutableSet.of(InstanceTraffic.NAME,
InstanceTraffic.PROPERTIES);
private static final Set<String> ENDPOINT_TRAFFIC_TAGS = ImmutableSet.of(EndpointTraffic.NAME,
EndpointTraffic.SERVICE_ID);
private static final Set<String> PROCESS_TRAFFIC_TAGS = ImmutableSet.of(ProcessTraffic.NAME,
ProcessTraffic.SERVICE_ID, ProcessTraffic.INSTANCE_ID, ProcessTraffic.AGENT_ID, ProcessTraffic.DETECT_TYPE,
ProcessTraffic.PROPERTIES, ProcessTraffic.LABELS_JSON, ProcessTraffic.LAST_PING_TIME_BUCKET,
ProcessTraffic.PROFILING_SUPPORT_STATUS);
private static final Set<String> PROCESS_TRAFFIC_COMPACT_TAGS = ImmutableSet.of(ProcessTraffic.NAME,
ProcessTraffic.SERVICE_ID, ProcessTraffic.INSTANCE_ID, ProcessTraffic.AGENT_ID, ProcessTraffic.DETECT_TYPE,
ProcessTraffic.PROPERTIES, ProcessTraffic.LABELS_JSON);
private static final Gson GSON = new Gson();
public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@Override
public List<Service> listServices(String layer, String group) throws IOException {
MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
SERVICE_TRAFFIC_TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(group)) {
query.and(eq(ServiceTraffic.GROUP, group));
}
if (StringUtil.isNotEmpty(layer)) {
query.and(eq(ServiceTraffic.LAYER, Layer.valueOf(layer).value()));
}
}
});
final List<Service> services = new ArrayList<>();
for (final DataPoint dataPoint : resp.getDataPoints()) {
services.add(buildService(dataPoint));
}
return services;
}
@Override
public List<Service> getServices(String serviceId) throws IOException {
MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
SERVICE_TRAFFIC_TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
query.and(eq(ServiceTraffic.SERVICE_ID, serviceId));
}
}
});
final List<Service> services = new ArrayList<>();
for (final DataPoint dataPoint : resp.getDataPoints()) {
services.add(buildService(dataPoint));
}
return services;
}
@Override
public List<ServiceInstance> listInstances(Duration duration, String serviceId) throws IOException {
MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
INSTANCE_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
query.and(eq(InstanceTraffic.SERVICE_ID, serviceId));
}
final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp());
query.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket));
}
});
final List<ServiceInstance> instances = new ArrayList<>();
for (final DataPoint dataPoint : resp.getDataPoints()) {
instances.add(buildInstance(dataPoint));
}
return instances;
}
@Override
public ServiceInstance getInstance(String instanceId) throws IOException {
MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
INSTANCE_TRAFFIC_COMPACT_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(instanceId)) {
query.andWithID(instanceId);
}
}
});
return resp.size() > 0 ? buildInstance(resp.getDataPoints().get(0)) : null;
}
@Override
public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) throws IOException {
MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
ENDPOINT_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
query.and(eq(EndpointTraffic.SERVICE_ID, serviceId));
}
}
});
final List<Endpoint> endpoints = new ArrayList<>();
for (final DataPoint dataPoint : resp.getDataPoints()) {
endpoints.add(buildEndpoint(dataPoint));
}
if (StringUtil.isNotEmpty(serviceId)) {
return endpoints.stream().filter(e -> e.getName().contains(keyword)).collect(Collectors.toList());
}
return endpoints;
}
@Override
public List<Process> listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value()));
query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
}
});
final List<Process> processes = new ArrayList<>();
for (final DataPoint dataPoint : resp.getDataPoints()) {
processes.add(buildProcess(dataPoint));
}
return processes;
}
@Override
public List<Process> listProcesses(String serviceInstanceId, Duration duration, boolean includeVirtual) throws IOException {
long lastPingStartTimeBucket = duration.getStartTimeBucket();
long lastPingEndTimeBucket = duration.getEndTimeBucket();
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ProcessTraffic.INSTANCE_ID, serviceInstanceId));
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
if (!includeVirtual) {
query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
}
}
});
final List<Process> processes = new ArrayList<>();
for (final DataPoint dataPoint : resp.getDataPoints()) {
processes.add(buildProcess(dataPoint));
}
return processes;
}
@Override
public List<Process> listProcesses(String agentId) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ProcessTraffic.AGENT_ID, agentId));
query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
}
});
final List<Process> processes = new ArrayList<>();
for (final DataPoint dataPoint : resp.getDataPoints()) {
processes.add(buildProcess(dataPoint));
}
return processes;
}
@Override
public long getProcessCount(String serviceId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value()));
query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
}
});
return resp.getDataPoints()
.stream()
.collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
.size();
}
@Override
public long getProcessCount(String instanceId) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ProcessTraffic.INSTANCE_ID, instanceId));
query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
}
});
return resp.getDataPoints()
.stream()
.collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
.size();
}
@Override
public Process getProcess(String processId) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
PROCESS_TRAFFIC_COMPACT_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(processId)) {
query.andWithID(processId);
}
}
});
return resp.size() > 0 ? buildProcess(resp.getDataPoints().get(0)) : null;
}
private Service buildService(DataPoint dataPoint) {
Service service = new Service();
service.setId(dataPoint.getTagValue(ServiceTraffic.SERVICE_ID));
service.setName(dataPoint.getTagValue(ServiceTraffic.NAME));
service.setShortName(dataPoint.getTagValue(ServiceTraffic.SHORT_NAME));
service.setGroup(dataPoint.getTagValue(ServiceTraffic.GROUP));
service.getLayers().add(Layer.valueOf(((Number) dataPoint.getTagValue(ServiceTraffic.LAYER)).intValue()).name());
return service;
}
private ServiceInstance buildInstance(DataPoint dataPoint) {
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setId(dataPoint.getId());
serviceInstance.setName(dataPoint.getTagValue(InstanceTraffic.NAME));
serviceInstance.setInstanceUUID(dataPoint.getId());
final String propString = dataPoint.getTagValue(InstanceTraffic.PROPERTIES);
JsonObject properties = null;
if (StringUtil.isNotEmpty(propString)) {
properties = GSON.fromJson(propString, JsonObject.class);
}
if (properties != null) {
for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
String key = property.getKey();
String value = property.getValue().getAsString();
if (key.equals(LANGUAGE)) {
serviceInstance.setLanguage(Language.value(value));
} else {
serviceInstance.getAttributes().add(new Attribute(key, value));
}
}
} else {
serviceInstance.setLanguage(Language.UNKNOWN);
}
return serviceInstance;
}
private Endpoint buildEndpoint(DataPoint dataPoint) {
Endpoint endpoint = new Endpoint();
endpoint.setId(dataPoint.getId());
endpoint.setName(dataPoint.getTagValue(EndpointTraffic.NAME));
return endpoint;
}
private Process buildProcess(DataPoint dataPoint) {
Process process = new Process();
process.setId(dataPoint.getId());
process.setName(dataPoint.getTagValue(ProcessTraffic.NAME));
String serviceId = dataPoint.getTagValue(ProcessTraffic.SERVICE_ID);
process.setServiceId(serviceId);
process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
String instanceId = dataPoint.getTagValue(ProcessTraffic.INSTANCE_ID);
process.setInstanceId(instanceId);
process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
process.setAgentId(dataPoint.getTagValue(ProcessTraffic.AGENT_ID));
process.setDetectType(ProcessDetectType.valueOf(((Number) dataPoint.getTagValue(ProcessTraffic.DETECT_TYPE)).intValue()).name());
process.setProfilingSupportStatus(ProfilingSupportStatus.valueOf(((Number) dataPoint.getTagValue(ProcessTraffic.PROFILING_SUPPORT_STATUS)).intValue()).name());
String propString = dataPoint.getTagValue(ProcessTraffic.PROPERTIES);
if (!Strings.isNullOrEmpty(propString)) {
JsonObject properties = GSON.fromJson(propString, JsonObject.class);
for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
String key = property.getKey();
String value = property.getValue().getAsString();
process.getAttributes().add(new Attribute(key, value));
}
}
String labelJson = dataPoint.getTagValue(ProcessTraffic.LABELS_JSON);
if (!Strings.isNullOrEmpty(labelJson)) {
List<String> labels = GSON.<List<String>>fromJson(labelJson, ArrayList.class);
process.getLabels().addAll(labels);
}
return process;
}
}