blob: f951d987266e6b8006c0156605732fb3f4038ae2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentReference;
import org.apache.skywalking.apm.network.language.agent.v3.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.SpanTags;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.source.EndpointRelation;
import org.apache.skywalking.oap.server.core.source.RequestType;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import java.util.ArrayList;
import java.util.List;
import static org.apache.skywalking.oap.server.analyzer.provider.trace.parser.SpanTags.LOGIC_ENDPOINT;
/**
* RPCAnalysisListener detects all RPC relative statistics.
*/
@Slf4j
@RequiredArgsConstructor
public class RPCAnalysisListener extends CommonAnalysisListener implements EntryAnalysisListener, ExitAnalysisListener, LocalAnalysisListener {
private final List<RPCTrafficSourceBuilder> callingInTraffic = new ArrayList<>(10);
private final List<RPCTrafficSourceBuilder> callingOutTraffic = new ArrayList<>(10);
private final List<EndpointSourceBuilder> logicEndpointBuilders = new ArrayList<>(10);
private final Gson gson = new Gson();
private final SourceReceiver sourceReceiver;
private final AnalyzerModuleConfig config;
private final NetworkAddressAliasCache networkAddressAliasCache;
private final NamingControl namingControl;
@Override
public boolean containsPoint(Point point) {
return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.Local.equals(point);
}
/**
* All entry spans are transferred as the Service, Instance and Endpoint related sources. Entry spans are treated on
* the behalf of the observability status of the service reported these spans.
*
* Also, when face the MQ and uninstrumented Gateways, there is different logic to generate the relationship between
* services/instances rather than the normal RPC direct call. The reason is the same, we aren't expecting the agent
* installed in the MQ server, and Gateway may not have suitable agent. Any uninstrumented service if they have the
* capability to forward SkyWalking header through themselves, you could consider the uninstrumented configurations
* to make the topology works to be a whole.
*/
@Override
public void parseEntry(SpanObject span, SegmentObject segmentObject) {
if (span.getSkipAnalysis()) {
return;
}
if (span.getRefsCount() > 0) {
for (int i = 0; i < span.getRefsCount(); i++) {
SegmentReference reference = span.getRefs(i);
RPCTrafficSourceBuilder sourceBuilder = new RPCTrafficSourceBuilder(namingControl);
if (StringUtil.isEmpty(reference.getParentEndpoint())) {
sourceBuilder.setSourceEndpointName(Const.USER_ENDPOINT_NAME);
} else {
sourceBuilder.setSourceEndpointName(reference.getParentEndpoint());
}
final String networkAddressUsedAtPeer = reference.getNetworkAddressUsedAtPeer();
boolean isMQ = span.getSpanLayer().equals(SpanLayer.MQ);
if (isMQ || config.getUninstrumentedGatewaysConfig()
.isAddressConfiguredAsGateway(networkAddressUsedAtPeer)) {
sourceBuilder.setSourceServiceName(networkAddressUsedAtPeer);
sourceBuilder.setSourceEndpointOwnerServiceName(reference.getParentService());
sourceBuilder.setSourceServiceInstanceName(networkAddressUsedAtPeer);
if (isMQ) {
sourceBuilder.setSourceLayer(Layer.VIRTUAL_MQ);
} else {
sourceBuilder.setSourceLayer(Layer.VIRTUAL_GATEWAY);
}
sourceBuilder.setSourceEndpointOwnerServiceLayer(Layer.GENERAL);
} else {
sourceBuilder.setSourceServiceName(reference.getParentService());
sourceBuilder.setSourceServiceInstanceName(reference.getParentServiceInstance());
sourceBuilder.setSourceLayer(Layer.GENERAL);
}
sourceBuilder.setDestEndpointName(span.getOperationName());
sourceBuilder.setDestServiceInstanceName(segmentObject.getServiceInstance());
sourceBuilder.setDestServiceName(segmentObject.getService());
sourceBuilder.setDestLayer(identifyServiceLayer(span.getSpanLayer()));
sourceBuilder.setDetectPoint(DetectPoint.SERVER);
sourceBuilder.setComponentId(span.getComponentId());
setPublicAttrs(sourceBuilder, span);
callingInTraffic.add(sourceBuilder);
}
} else {
RPCTrafficSourceBuilder sourceBuilder = new RPCTrafficSourceBuilder(namingControl);
sourceBuilder.setSourceServiceName(Const.USER_SERVICE_NAME);
sourceBuilder.setSourceServiceInstanceName(Const.USER_INSTANCE_NAME);
sourceBuilder.setSourceEndpointName(Const.USER_ENDPOINT_NAME);
sourceBuilder.setSourceLayer(Layer.UNDEFINED);
sourceBuilder.setDestServiceInstanceName(segmentObject.getServiceInstance());
sourceBuilder.setDestServiceName(segmentObject.getService());
sourceBuilder.setDestLayer(identifyServiceLayer(span.getSpanLayer()));
sourceBuilder.setDestEndpointName(span.getOperationName());
sourceBuilder.setDetectPoint(DetectPoint.SERVER);
sourceBuilder.setComponentId(span.getComponentId());
setPublicAttrs(sourceBuilder, span);
callingInTraffic.add(sourceBuilder);
}
parseLogicEndpoints(span, segmentObject);
}
/**
* The exit span should be transferred to the service, instance and relationships from the client side detect
* point.
*/
@Override
public void parseExit(SpanObject span, SegmentObject segmentObject) {
if (span.getSkipAnalysis()) {
return;
}
RPCTrafficSourceBuilder sourceBuilder = new RPCTrafficSourceBuilder(namingControl);
final String networkAddress = span.getPeer();
if (StringUtil.isEmpty(networkAddress)) {
return;
}
sourceBuilder.setSourceServiceName(segmentObject.getService());
sourceBuilder.setSourceServiceInstanceName(segmentObject.getServiceInstance());
sourceBuilder.setSourceLayer(identifyServiceLayer(span.getSpanLayer()));
final NetworkAddressAlias networkAddressAlias = networkAddressAliasCache.get(networkAddress);
if (networkAddressAlias == null) {
sourceBuilder.setDestServiceName(networkAddress);
sourceBuilder.setDestServiceInstanceName(networkAddress);
sourceBuilder.setDestLayer(identifyRemoteServiceLayer(span.getSpanLayer(), span.getPeer()));
} else {
/*
* If alias exists, mean this network address is representing a real service.
*/
final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
networkAddressAlias.getRepresentServiceId());
final IDManager.ServiceInstanceID.InstanceIDDefinition instanceIDDefinition = IDManager.ServiceInstanceID
.analysisId(
networkAddressAlias.getRepresentServiceInstanceId());
sourceBuilder.setDestServiceName(serviceIDDefinition.getName());
/*
* Some agents can not have the upstream real network address, such as https://github.com/apache/skywalking-nginx-lua.
* Keeping dest instance name as NULL makes no instance relation generate from this exit span.
*/
if (!config.shouldIgnorePeerIPDue2Virtual(span.getComponentId())) {
sourceBuilder.setDestServiceInstanceName(instanceIDDefinition.getName());
}
sourceBuilder.setDestLayer(Layer.GENERAL);
}
sourceBuilder.setDetectPoint(DetectPoint.CLIENT);
sourceBuilder.setComponentId(span.getComponentId());
setPublicAttrs(sourceBuilder, span);
callingOutTraffic.add(sourceBuilder);
}
private void setPublicAttrs(RPCTrafficSourceBuilder sourceBuilder, SpanObject span) {
long latency = span.getEndTime() - span.getStartTime();
sourceBuilder.setTimeBucket(TimeBucket.getMinuteTimeBucket(span.getStartTime()));
sourceBuilder.setLatency((int) latency);
sourceBuilder.setHttpResponseStatusCode(Const.NONE);
span.getTagsList().forEach(tag -> {
final String tagKey = tag.getKey();
if (SpanTags.STATUS_CODE.equals(tagKey) || SpanTags.HTTP_RESPONSE_STATUS_CODE.equals(tagKey)) {
try {
sourceBuilder.setHttpResponseStatusCode(Integer.parseInt(tag.getValue()));
} catch (NumberFormatException e) {
log.warn("span {} has illegal status code {}", span, tag.getValue());
}
} else if (SpanTags.RPC_RESPONSE_STATUS_CODE.equals(tagKey)) {
sourceBuilder.setRpcStatusCode(tag.getValue());
}
sourceBuilder.setTag(tag);
});
sourceBuilder.setStatus(!span.getIsError());
switch (span.getSpanLayer()) {
case Http:
sourceBuilder.setType(RequestType.HTTP);
break;
case Database:
sourceBuilder.setType(RequestType.DATABASE);
break;
case MQ:
sourceBuilder.setType(RequestType.MQ);
break;
default:
sourceBuilder.setType(RequestType.RPC);
break;
}
}
@Override
public void parseLocal(final SpanObject span, final SegmentObject segmentObject) {
parseLogicEndpoints(span, segmentObject);
}
@Override
public void build() {
callingInTraffic.forEach(callingIn -> {
callingIn.prepare();
sourceReceiver.receive(callingIn.toService());
sourceReceiver.receive(callingIn.toServiceInstance());
sourceReceiver.receive(callingIn.toServiceRelation());
sourceReceiver.receive(callingIn.toServiceInstanceRelation());
// Service is equivalent to endpoint in FaaS (function as a service)
// Don't generate endpoint and endpoint dependency to avoid unnecessary costs.
if (Layer.FAAS != callingIn.getDestLayer()) {
sourceReceiver.receive(callingIn.toEndpoint());
EndpointRelation endpointRelation = callingIn.toEndpointRelation();
/*
* Parent endpoint could be none, because in SkyWalking Cross Process Propagation Headers Protocol v2,
* endpoint in ref could be empty, based on that, endpoint relation maybe can't be established.
* So, I am making this source as optional.
*
* Also, since 6.6.0, source endpoint could be none, if this trace begins by an internal task(local span or exit span), such as Timer,
* rather than, normally begin as an entry span, like a RPC server side.
*/
if (endpointRelation != null) {
sourceReceiver.receive(endpointRelation);
}
}
});
callingOutTraffic.forEach(callingOut -> {
callingOut.prepare();
sourceReceiver.receive(callingOut.toServiceRelation());
/*
* Some of the agent can not have the upstream real network address, such as https://github.com/apache/skywalking-nginx-lua.
*/
final ServiceInstanceRelation serviceInstanceRelation = callingOut.toServiceInstanceRelation();
if (serviceInstanceRelation != null) {
sourceReceiver.receive(serviceInstanceRelation);
}
});
logicEndpointBuilders.forEach(logicEndpointBuilder -> {
logicEndpointBuilder.prepare();
sourceReceiver.receive(logicEndpointBuilder.toEndpoint());
});
}
/**
* Logic endpoint could represent through an entry span or local span. It has special meaning from API
* perspective. But it is an actual RPC call.
*/
private void parseLogicEndpoints(final SpanObject span, final SegmentObject segmentObject) {
span.getTagsList().forEach(tag -> {
switch (tag.getKey()) {
case LOGIC_ENDPOINT:
final JsonObject tagValue = gson.fromJson(tag.getValue(), JsonObject.class);
final boolean isLocalSpan = SpanType.Local.equals(span.getSpanType());
String logicEndpointName;
int latency;
boolean status;
if (isLocalSpan && tagValue.has("logic-span") && tagValue.get("logic-span").getAsBoolean()) {
logicEndpointName = span.getOperationName();
latency = (int) (span.getEndTime() - span.getStartTime());
status = !span.getIsError();
} else if (tagValue.has("name") && tagValue.has("latency") && tagValue.has("status")) {
logicEndpointName = tagValue.get("name").getAsString();
latency = tagValue.get("latency").getAsInt();
status = tagValue.get("status").getAsBoolean();
} else {
break;
}
EndpointSourceBuilder sourceBuilder = new EndpointSourceBuilder(namingControl);
sourceBuilder.setTimeBucket(TimeBucket.getMinuteTimeBucket(span.getStartTime()));
sourceBuilder.setDestServiceName(segmentObject.getService());
sourceBuilder.setDestServiceInstanceName(segmentObject.getServiceInstance());
sourceBuilder.setDestEndpointName(logicEndpointName);
sourceBuilder.setDestLayer(Layer.GENERAL);
sourceBuilder.setDetectPoint(DetectPoint.SERVER);
sourceBuilder.setLatency(latency);
sourceBuilder.setStatus(status);
sourceBuilder.setType(RequestType.LOGIC);
logicEndpointBuilders.add(sourceBuilder);
default:
break;
}
});
}
/**
* Identify the layer of remote service. Such as ${@link Layer#DATABASE} and ${@link Layer#CACHE}.
*/
protected Layer identifyRemoteServiceLayer(SpanLayer spanLayer, String peer) {
switch (spanLayer) {
case Unknown:
return Layer.UNDEFINED;
case Database:
return Layer.VIRTUAL_DATABASE;
case RPCFramework:
return Layer.GENERAL;
case Http:
if (config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(peer)) {
return Layer.VIRTUAL_GATEWAY;
}
return Layer.GENERAL;
case MQ:
return Layer.VIRTUAL_MQ;
case Cache:
return Layer.VIRTUAL_CACHE;
case UNRECOGNIZED:
return Layer.UNDEFINED;
case FAAS:
return Layer.FAAS;
default:
throw new UnexpectedException("Can't transfer to the Layer. SpanLayer=" + spanLayer);
}
}
public static class Factory implements AnalysisListenerFactory {
private final SourceReceiver sourceReceiver;
private final NetworkAddressAliasCache networkAddressAliasCache;
private final NamingControl namingControl;
public Factory(ModuleManager moduleManager) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.networkAddressAliasCache = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NetworkAddressAliasCache.class);
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
}
@Override
public AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config) {
return new RPCAnalysisListener(
sourceReceiver, config, networkAddressAliasCache, namingControl);
}
}
}