blob: 5190ac43be8531d847b5fbc14a9cfdbb37e2bb44 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.Call;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
import static java.util.Objects.nonNull;
public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITopologyQueryDAO {
public BanyanDBTopologyQueryDAO(final BanyanDBStorageClient client) {
super(client);
}
@Override
public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(Duration duration, List<String> serviceIds) throws IOException {
if (CollectionUtils.isEmpty(serviceIds)) {
throw new UnexpectedException("Service id is empty");
}
List<QueryBuilder<MeasureQuery>> queryBuilderList = buildServiceRelationsQueries(serviceIds);
return queryServiceRelation(duration, queryBuilderList, DetectPoint.SERVER);
}
@Override
public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(Duration duration, List<String> serviceIds) throws IOException {
if (CollectionUtils.isEmpty(serviceIds)) {
throw new UnexpectedException("Service id is empty");
}
List<QueryBuilder<MeasureQuery>> queryBuilderList = buildServiceRelationsQueries(serviceIds);
return queryServiceRelation(duration, queryBuilderList, DetectPoint.CLIENT);
}
@Override
public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(Duration duration) throws IOException {
return queryServiceRelation(duration, Collections.singletonList(emptyMeasureQuery()), DetectPoint.SERVER);
}
@Override
public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(Duration duration) throws IOException {
return queryServiceRelation(duration, Collections.singletonList(emptyMeasureQuery()), DetectPoint.CLIENT);
}
private List<QueryBuilder<MeasureQuery>> buildServiceRelationsQueries(List<String> serviceIds) {
List<QueryBuilder<MeasureQuery>> queryBuilderList = new ArrayList<>(serviceIds.size());
for (final String serviceId : serviceIds) {
queryBuilderList.add(new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, serviceId));
}
});
queryBuilderList.add(new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceId));
}
});
}
return queryBuilderList;
}
List<Call.CallDetail> queryServiceRelation(Duration duration, List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint) throws IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
startTB = duration.getStartTimeBucketInSec();
endTB = duration.getEndTimeBucketInSec();
}
TimestampRange timestampRange = null;
if (startTB > 0 && endTB > 0) {
timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
}
final String modelName = detectPoint == DetectPoint.SERVER ? ServiceRelationServerSideMetrics.INDEX_NAME :
ServiceRelationClientSideMetrics.INDEX_NAME;
final Map<String, Call.CallDetail> callMap = new HashMap<>();
for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
MeasureQueryResponse resp = query(modelName,
ImmutableSet.of(ServiceRelationClientSideMetrics.COMPONENT_ID,
ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
Metrics.ENTITY_ID),
Collections.emptySet(), timestampRange, q);
if (resp.size() == 0) {
continue;
}
final Call.CallDetail call = new Call.CallDetail();
final String entityId = resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
final int componentId = ((Number) resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue();
call.buildFromServiceRelation(entityId, componentId, detectPoint);
callMap.putIfAbsent(entityId, call);
}
return new ArrayList<>(callMap.values());
}
@Override
public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId, String serverServiceId, Duration duration) throws IOException {
List<QueryBuilder<MeasureQuery>> queryBuilderList = buildInstanceRelationsQueries(clientServiceId, serverServiceId);
return queryInstanceRelation(duration, queryBuilderList, DetectPoint.SERVER);
}
@Override
public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId, String serverServiceId, Duration duration) throws IOException {
List<QueryBuilder<MeasureQuery>> queryBuilderList = buildInstanceRelationsQueries(clientServiceId, serverServiceId);
return queryInstanceRelation(duration, queryBuilderList, DetectPoint.CLIENT);
}
private List<QueryBuilder<MeasureQuery>> buildInstanceRelationsQueries(String clientServiceId, String serverServiceId) {
List<QueryBuilder<MeasureQuery>> queryBuilderList = new ArrayList<>(2);
queryBuilderList.add(new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, clientServiceId))
.and(eq(ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, serverServiceId));
}
});
queryBuilderList.add(new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, clientServiceId))
.and(eq(ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, serverServiceId));
}
});
return queryBuilderList;
}
List<Call.CallDetail> queryInstanceRelation(Duration duration, List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint) throws IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
startTB = duration.getStartTimeBucketInSec();
endTB = duration.getEndTimeBucketInSec();
}
TimestampRange timestampRange = null;
if (startTB > 0 && endTB > 0) {
timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
}
final String modelName = detectPoint == DetectPoint.SERVER ? ServiceInstanceRelationServerSideMetrics.INDEX_NAME :
ServiceInstanceRelationClientSideMetrics.INDEX_NAME;
final Map<String, Call.CallDetail> callMap = new HashMap<>();
for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
MeasureQueryResponse resp = query(modelName,
ImmutableSet.of(ServiceInstanceRelationServerSideMetrics.COMPONENT_ID,
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
Metrics.ENTITY_ID),
Collections.emptySet(), timestampRange, q);
if (resp.size() == 0) {
continue;
}
final Call.CallDetail call = new Call.CallDetail();
final String entityId = resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
final int componentId = ((Number) resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue();
call.buildFromInstanceRelation(entityId, componentId, detectPoint);
callMap.putIfAbsent(entityId, call);
}
return new ArrayList<>(callMap.values());
}
@Override
public List<Call.CallDetail> loadEndpointRelation(Duration duration, String destEndpointId) throws IOException {
List<QueryBuilder<MeasureQuery>> queryBuilderList = buildEndpointRelationsQueries(destEndpointId);
return queryEndpointRelation(duration, queryBuilderList, DetectPoint.SERVER);
}
@Override
public List<Call.CallDetail> loadProcessRelationDetectedAtClientSide(String serviceInstanceId, Duration duration) throws IOException {
return queryProcessRelation(duration, serviceInstanceId, DetectPoint.CLIENT);
}
@Override
public List<Call.CallDetail> loadProcessRelationDetectedAtServerSide(String serviceInstanceId, Duration duration) throws IOException {
return queryProcessRelation(duration, serviceInstanceId, DetectPoint.SERVER);
}
private List<QueryBuilder<MeasureQuery>> buildEndpointRelationsQueries(String destEndpointId) {
List<QueryBuilder<MeasureQuery>> queryBuilderList = new ArrayList<>(2);
queryBuilderList.add(new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId));
}
});
queryBuilderList.add(new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId));
}
});
return queryBuilderList;
}
List<Call.CallDetail> queryEndpointRelation(Duration duration, List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint) throws IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
startTB = duration.getStartTimeBucketInSec();
endTB = duration.getEndTimeBucketInSec();
}
TimestampRange timestampRange = null;
if (startTB > 0 && endTB > 0) {
timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
}
final Map<String, Call.CallDetail> callMap = new HashMap<>();
for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
MeasureQueryResponse resp = query(EndpointRelationServerSideMetrics.INDEX_NAME,
ImmutableSet.of(EndpointRelationServerSideMetrics.DEST_ENDPOINT,
EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
Metrics.ENTITY_ID),
Collections.emptySet(), timestampRange, q);
if (resp.size() == 0) {
continue;
}
final Call.CallDetail call = new Call.CallDetail();
final String entityId = resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
call.buildFromEndpointRelation(entityId, detectPoint);
callMap.putIfAbsent(entityId, call);
}
return new ArrayList<>(callMap.values());
}
List<Call.CallDetail> queryProcessRelation(Duration duration, String serviceInstanceId, DetectPoint detectPoint) throws IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
startTB = duration.getStartTimeBucketInSec();
endTB = duration.getEndTimeBucketInSec();
}
TimestampRange timestampRange = null;
if (startTB > 0 && endTB > 0) {
timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
}
final String modelName = detectPoint == DetectPoint.SERVER ? ProcessRelationServerSideMetrics.INDEX_NAME :
ProcessRelationClientSideMetrics.INDEX_NAME;
final Map<String, Call.CallDetail> callMap = new HashMap<>();
MeasureQueryResponse resp = query(modelName,
ImmutableSet.of(Metrics.ENTITY_ID, ProcessRelationClientSideMetrics.COMPONENT_ID),
Collections.emptySet(), timestampRange, new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ProcessRelationServerSideMetrics.SERVICE_INSTANCE_ID, serviceInstanceId));
}
});
final Call.CallDetail call = new Call.CallDetail();
final String entityId = resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
final int componentId = ((Number) resp.getDataPoints().get(0).getTagValue(ProcessRelationClientSideMetrics.COMPONENT_ID)).intValue();
call.buildProcessRelation(entityId, componentId, detectPoint);
callMap.putIfAbsent(entityId, call);
return new ArrayList<>(callMap.values());
}
}