blob: 23495be5cf0057126e7ebb90ba53092857652021 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.zipkin;
import java.io.Closeable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import brave.Span;
import brave.Tracing;
import brave.context.slf4j.MDCScopeDecorator;
import brave.propagation.B3Propagation;
import brave.propagation.Propagation.Getter;
import brave.propagation.Propagation.Setter;
import brave.propagation.ThreadLocalCurrentTraceContext;
import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Extractor;
import brave.propagation.TraceContext.Injector;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.Sampler;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.NamedNode;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StaticService;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.CamelEvent.ExchangeSendingEvent;
import org.apache.camel.spi.CamelEvent.ExchangeSentEvent;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.RoutePolicySupport;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Reporter;
import zipkin2.reporter.libthrift.LibthriftSender;
import zipkin2.reporter.urlconnection.URLConnectionSender;
/**
* To use Zipkin with Camel then setup this {@link ZipkinTracer} in your Camel
* application.
* <p/>
* Events (span) are captured for incoming and outgoing messages being sent
* to/from Camel. This means you need to configure which which Camel endpoints
* that maps to zipkin service names. The mapping can be configured using
* <ul>
* <li>route id - A Camel route id</li>
* <li>endpoint url - A Camel endpoint url</li>
* </ul>
* For both kinds you can use wildcards and regular expressions to match, which
* is using the rules from {@link PatternHelper#matchPattern(String, String)}
* and {@link EndpointHelper#matchEndpoint(CamelContext, String, String)}
* <p/>
* To match all Camel messages you can use <tt>*</tt> in the pattern and
* configure that to the same service name. <br/>
* If no mapping has been configured then Camel will fallback and use endpoint
* uri's as service names. However its recommended to configure service mappings
* so you can use human logic names instead of Camel endpoint uris in the names.
* <p/>
* Camel will auto-configure a {@link Reporter span reporter} one hasn't been
* explicitly configured, and if the hostname and port to a zipkin collector has
* been configured as environment variables
* <ul>
* <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_HOST - The http hostname</li>
* <li>ZIPKIN_COLLECTOR_HTTP_SERVICE_PORT - The port number</li>
* </ul>
* or
* <ul>
* <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_HOST - The Scribe (Thrift RPC)
* hostname</li>
* <li>ZIPKIN_COLLECTOR_THRIFT_SERVICE_PORT - The port number</li>
* </ul>
* <p/>
* This class is implemented as both an
* {@link org.apache.camel.spi.EventNotifier} and {@link RoutePolicy} that
* allows to trap when Camel starts/ends an {@link Exchange} being routed using
* the {@link RoutePolicy} and during the routing if the {@link Exchange} sends
* messages, then we track them using the
* {@link org.apache.camel.spi.EventNotifier}.
*/
// NOTE: this implementation currently only does explicit propagation, meaning
// that non-camel
// components will not see the current trace context, and therefore will be
// unassociated. This can
// be fixed by using CurrentTraceContext to scope a span where user code is
// invoked.
// If this is desirable, an instance variable of
// CurrentTraceContext.Default.create() could do the
// trick.
@ManagedResource(description = "ZipkinTracer")
public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory, StaticService, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(ZipkinTracer.class);
private static final String ZIPKIN_COLLECTOR_HTTP_SERVICE = "zipkin-collector-http";
private static final String ZIPKIN_COLLECTOR_THRIFT_SERVICE = "zipkin-collector-thrift";
private static final Getter<Message, String> GETTER = new Getter<Message, String>() {
@Override
public String get(Message message, String key) {
return message.getHeader(key, String.class);
}
};
private static final Setter<Message, String> SETTER = new Setter<Message, String>() {
@Override
public void put(Message message, String key, String value) {
message.setHeader(key, value);
}
};
private static final Extractor<Message> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER);
private static final Injector<Message> INJECTOR = B3Propagation.B3_STRING.injector(SETTER);
private final ZipkinEventNotifier eventNotifier = new ZipkinEventNotifier();
private final Map<String, Tracing> braves = new HashMap<>();
private transient boolean useFallbackServiceNames;
private CamelContext camelContext;
private String endpoint;
private String hostName;
private int port;
private float rate = 1.0f;
private Reporter<zipkin2.Span> spanReporter;
private Map<String, String> clientServiceMappings = new HashMap<>();
private Map<String, String> serverServiceMappings = new HashMap<>();
private Set<String> excludePatterns = new HashSet<>();
private boolean includeMessageBody;
private boolean includeMessageBodyStreams;
public ZipkinTracer() {
}
@Override
public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, NamedNode route) {
// ensure this zipkin tracer gets initialized when Camel starts
init(camelContext);
return new ZipkinRoutePolicy();
}
/**
* Registers this {@link ZipkinTracer} on the {@link CamelContext} if not
* already registered.
*/
public void init(CamelContext camelContext) {
if (!camelContext.hasService(this)) {
try {
// start this service eager so we init before Camel is starting
// up
camelContext.addService(this, true, true);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
}
}
}
@Override
public CamelContext getCamelContext() {
return camelContext;
}
@Override
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
@ManagedAttribute(description = "The POST URL for zipkin's v2 api.")
public String getEndpoint() {
return endpoint;
}
/**
* Sets the POST URL for zipkin's
* <a href="http://zipkin.io/zipkin-api/#/">v2 api</a>, usually
* "http://zipkinhost:9411/api/v2/spans"
*/
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
@ManagedAttribute(description = "The hostname for the remote zipkin scribe collector.")
public String getHostName() {
return hostName;
}
/**
* Sets the hostname for the remote zipkin scribe collector.
*/
public void setHostName(String hostName) {
this.hostName = hostName;
}
@ManagedAttribute(description = "The port number for the remote zipkin scribe collector.")
public int getPort() {
return port;
}
/**
* Sets the port number for the remote zipkin scribe collector.
*/
public void setPort(int port) {
this.port = port;
}
@ManagedAttribute(description = "Rates how many events should be traced by zipkin. The rate is expressed as a percentage (1.0f = 100%, 0.5f is 50%, 0.1f is 10%).")
public float getRate() {
return rate;
}
/**
* Configures a rate that decides how many events should be traced by
* zipkin. The rate is expressed as a percentage (1.0f = 100%, 0.5f is 50%,
* 0.1f is 10%).
*
* @param rate minimum sample rate is 0.0001, or 0.01% of traces
*/
public void setRate(float rate) {
this.rate = rate;
}
/**
* Sets the reporter used to send timing data (spans) to the zipkin server.
*/
public void setSpanReporter(Reporter<zipkin2.Span> spanReporter) {
this.spanReporter = spanReporter;
}
/**
* Returns the reporter used to send timing data (spans) to the zipkin
* server.
*/
public Reporter<zipkin2.Span> getSpanReporter() {
return spanReporter;
}
public String getServiceName() {
return clientServiceMappings.get("*");
}
/**
* To use a global service name that matches all Camel events
*/
public void setServiceName(String serviceName) {
clientServiceMappings.put("*", serviceName);
serverServiceMappings.put("*", serviceName);
}
public Map<String, String> getClientServiceMappings() {
return clientServiceMappings;
}
public void setClientServiceMappings(Map<String, String> clientServiceMappings) {
this.clientServiceMappings = clientServiceMappings;
}
/**
* Adds a client service mapping that matches Camel events to the given
* zipkin service name. See more details at the class javadoc.
*
* @param pattern the pattern such as route id, endpoint url
* @param serviceName the zipkin service name
*/
public void addClientServiceMapping(String pattern, String serviceName) {
clientServiceMappings.put(pattern, serviceName);
}
public Map<String, String> getServerServiceMappings() {
return serverServiceMappings;
}
public void setServerServiceMappings(Map<String, String> serverServiceMappings) {
this.serverServiceMappings = serverServiceMappings;
}
/**
* Adds a server service mapping that matches Camel events to the given
* zipkin service name. See more details at the class javadoc.
*
* @param pattern the pattern such as route id, endpoint url
* @param serviceName the zipkin service name
*/
public void addServerServiceMapping(String pattern, String serviceName) {
serverServiceMappings.put(pattern, serviceName);
}
public Set<String> getExcludePatterns() {
return excludePatterns;
}
public void setExcludePatterns(Set<String> excludePatterns) {
this.excludePatterns = excludePatterns;
}
/**
* Adds an exclude pattern that will disable tracing with zipkin for Camel
* messages that matches the pattern.
*
* @param pattern the pattern such as route id, endpoint url
*/
public void addExcludePattern(String pattern) {
excludePatterns.add(pattern);
}
@ManagedAttribute(description = "Whether to include the Camel message body in the zipkin traces")
public boolean isIncludeMessageBody() {
return includeMessageBody;
}
/**
* Whether to include the Camel message body in the zipkin traces.
* <p/>
* This is not recommended for production usage, or when having big
* payloads. You can limit the size by configuring the <a href=
* "http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max
* debug log size</a>.
* <p/>
* By default message bodies that are stream based are <b>not</b> included.
* You can use the option {@link #setIncludeMessageBodyStreams(boolean)} to
* turn that on.
*/
@ManagedAttribute(description = "Whether to include the Camel message body in the zipkin traces")
public void setIncludeMessageBody(boolean includeMessageBody) {
this.includeMessageBody = includeMessageBody;
}
@ManagedAttribute(description = "Whether to include stream based Camel message bodies in the zipkin traces")
public boolean isIncludeMessageBodyStreams() {
return includeMessageBodyStreams;
}
/**
* Whether to include message bodies that are stream based in the zipkin
* traces.
* <p/>
* This requires enabling
* <a href="http://camel.apache.org/stream-caching.html">stream caching</a>
* on the routes or globally on the CamelContext.
* <p/>
* This is not recommended for production usage, or when having big
* payloads. You can limit the size by configuring the <a href=
* "http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max
* debug log size</a>.
*/
@ManagedAttribute(description = "Whether to include stream based Camel message bodies in the zipkin traces")
public void setIncludeMessageBodyStreams(boolean includeMessageBodyStreams) {
this.includeMessageBodyStreams = includeMessageBodyStreams;
}
@Override
protected void doStart() throws Exception {
ObjectHelper.notNull(camelContext, "CamelContext", this);
camelContext.getManagementStrategy().addEventNotifier(eventNotifier);
if (!camelContext.getRoutePolicyFactories().contains(this)) {
camelContext.addRoutePolicyFactory(this);
}
if (spanReporter == null) {
if (endpoint != null) {
LOG.info("Configuring Zipkin URLConnectionSender using endpoint: {}", endpoint);
spanReporter = AsyncReporter.create(URLConnectionSender.create(endpoint));
} else if (hostName != null && port > 0) {
LOG.info("Configuring Zipkin ScribeSpanCollector using host: {} and port: {}", hostName, port);
LibthriftSender sender = LibthriftSender.newBuilder().host(hostName).port(port).build();
spanReporter = AsyncReporter.create(sender);
} else {
// is there a zipkin service setup as ENV variable to auto
// register a span reporter
String host = ServiceHostFunction.apply(ZIPKIN_COLLECTOR_HTTP_SERVICE);
String port = ServicePortFunction.apply(ZIPKIN_COLLECTOR_HTTP_SERVICE);
if (ObjectHelper.isNotEmpty(host) && ObjectHelper.isNotEmpty(port)) {
LOG.info("Auto-configuring Zipkin URLConnectionSender using host: {} and port: {}", host, port);
int num = camelContext.getTypeConverter().mandatoryConvertTo(Integer.class, port);
String implicitEndpoint = "http://" + host + ":" + num + "/api/v2/spans";
spanReporter = AsyncReporter.create(URLConnectionSender.create(implicitEndpoint));
} else {
host = ServiceHostFunction.apply(ZIPKIN_COLLECTOR_THRIFT_SERVICE);
port = ServicePortFunction.apply(ZIPKIN_COLLECTOR_THRIFT_SERVICE);
if (ObjectHelper.isNotEmpty(host) && ObjectHelper.isNotEmpty(port)) {
LOG.info("Auto-configuring Zipkin ScribeSpanCollector using host: {} and port: {}", host, port);
int num = camelContext.getTypeConverter().mandatoryConvertTo(Integer.class, port);
LibthriftSender sender = LibthriftSender.newBuilder().host(host).port(num).build();
spanReporter = AsyncReporter.create(sender);
}
}
}
}
if (spanReporter == null) {
// Try to lookup the span reporter from the registry if only one
// instance is present
Set<Reporter> reporters = camelContext.getRegistry().findByType(Reporter.class);
if (reporters.size() == 1) {
spanReporter = reporters.iterator().next();
}
}
ObjectHelper.notNull(spanReporter, "Reporter<zipkin2.Span>", this);
if (clientServiceMappings.isEmpty() && serverServiceMappings.isEmpty()) {
LOG.warn("No service name(s) has been mapped in clientServiceMappings or serverServiceMappings. Camel will fallback and use endpoint uris as service names.");
useFallbackServiceNames = true;
}
// create braves mapped per service name
for (Map.Entry<String, String> entry : clientServiceMappings.entrySet()) {
String pattern = entry.getKey();
String serviceName = entry.getValue();
createTracingForService(pattern, serviceName);
}
for (Map.Entry<String, String> entry : serverServiceMappings.entrySet()) {
String pattern = entry.getKey();
String serviceName = entry.getValue();
createTracingForService(pattern, serviceName);
}
ServiceHelper.startService(spanReporter, eventNotifier);
}
@Override
protected void doStop() throws Exception {
// stop event notifier
camelContext.getManagementStrategy().removeEventNotifier(eventNotifier);
ServiceHelper.stopService(eventNotifier);
// stop and close collector
ServiceHelper.stopAndShutdownService(spanReporter);
if (spanReporter instanceof Closeable) {
IOHelper.close((Closeable)spanReporter);
}
// clear braves
braves.clear();
// remove route policy
camelContext.getRoutePolicyFactories().remove(this);
}
private String getServiceName(Exchange exchange, Endpoint endpoint, boolean server, boolean client) {
if (client) {
return getServiceName(exchange, endpoint, clientServiceMappings);
} else if (server) {
return getServiceName(exchange, endpoint, serverServiceMappings);
} else {
return null;
}
}
private String getServiceName(Exchange exchange, Endpoint endpoint, Map<String, String> serviceMappings) {
String answer = null;
// endpoint takes precedence over route
if (endpoint != null) {
String url = endpoint.getEndpointUri();
if (url != null) {
// exclude patterns take precedence
for (String pattern : excludePatterns) {
if (EndpointHelper.matchEndpoint(exchange.getContext(), url, pattern)) {
return null;
}
}
for (Map.Entry<String, String> entry : serviceMappings.entrySet()) {
String pattern = entry.getKey();
if (EndpointHelper.matchEndpoint(exchange.getContext(), url, pattern)) {
answer = entry.getValue();
break;
}
}
}
}
// route
if (answer == null) {
String id = routeIdExpression().evaluate(exchange, String.class);
if (id != null) {
// exclude patterns take precedence
for (String pattern : excludePatterns) {
if (PatternHelper.matchPattern(id, pattern)) {
return null;
}
}
for (Map.Entry<String, String> entry : serviceMappings.entrySet()) {
String pattern = entry.getKey();
if (PatternHelper.matchPattern(id, pattern)) {
answer = entry.getValue();
break;
}
}
}
}
if (answer == null) {
String id = exchange.getFromRouteId();
if (id != null) {
// exclude patterns take precedence
for (String pattern : excludePatterns) {
if (PatternHelper.matchPattern(id, pattern)) {
return null;
}
}
for (Map.Entry<String, String> entry : serviceMappings.entrySet()) {
String pattern = entry.getKey();
if (PatternHelper.matchPattern(id, pattern)) {
answer = entry.getValue();
break;
}
}
}
}
if (answer == null && useFallbackServiceNames) {
String key = null;
if (endpoint != null) {
key = endpoint.getEndpointKey();
} else if (exchange.getFromEndpoint() != null) {
key = exchange.getFromEndpoint().getEndpointKey();
}
// exclude patterns take precedence
for (String pattern : excludePatterns) {
if (PatternHelper.matchPattern(key, pattern)) {
return null;
}
}
String sanitizedKey = URISupport.sanitizeUri(key);
if (LOG.isTraceEnabled() && sanitizedKey != null) {
LOG.trace("Using serviceName: {} as fallback", sanitizedKey);
}
return sanitizedKey;
} else {
if (LOG.isTraceEnabled() && answer != null) {
LOG.trace("Using serviceName: {}", answer);
}
return answer;
}
}
private void createTracingForService(String pattern, String serviceName) {
Tracing brave = braves.get(pattern);
if (brave == null && !braves.containsKey(serviceName)) {
brave = newTracing(serviceName);
braves.put(serviceName, brave);
}
}
private Tracing newTracing(String serviceName) {
Tracing brave = null;
if (camelContext.isUseMDCLogging()) {
brave = Tracing.newBuilder().currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder().addScopeDecorator(MDCScopeDecorator.create()).build())
.localServiceName(serviceName).sampler(Sampler.create(rate)).spanReporter(spanReporter).build();
} else {
brave = Tracing.newBuilder().localServiceName(serviceName).sampler(Sampler.create(rate)).spanReporter(spanReporter).build();
}
return brave;
}
private Tracing getTracing(String serviceName) {
Tracing brave = null;
if (serviceName != null) {
brave = braves.get(serviceName);
if (brave == null && useFallbackServiceNames) {
LOG.debug("Creating Tracing assigned to serviceName: {}", serviceName + " as fallback");
brave = newTracing(serviceName);
braves.put(serviceName, brave);
}
}
return brave;
}
private void clientRequest(Tracing brave, String serviceName, ExchangeSendingEvent event) {
// reuse existing span if we do multiple requests from the same
ZipkinState state = event.getExchange().getProperty(ZipkinState.KEY, ZipkinState.class);
if (state == null) {
state = new ZipkinState();
event.getExchange().setProperty(ZipkinState.KEY, state);
}
// if we started from a server span then lets reuse that when we call a
// downstream service
Span last = state.peekServerSpan();
Span span;
if (last != null) {
span = brave.tracer().newChild(last.context());
} else {
span = brave.tracer().nextSpan();
}
span.kind(Span.Kind.CLIENT).start();
ZipkinClientRequestAdapter parser = new ZipkinClientRequestAdapter(this, event.getEndpoint());
INJECTOR.inject(span.context(), event.getExchange().getIn());
parser.onRequest(event.getExchange(), span.customizer());
// store span after request
state.pushClientSpan(span);
TraceContext context = span.context();
String traceId = "" + context.traceIdString();
String spanId = "" + context.spanId();
String parentId = context.parentId() != null ? "" + context.parentId() : null;
if (camelContext.isUseMDCLogging()) {
MDC.put("traceId", traceId);
MDC.put("spanId", spanId);
MDC.put("parentId", parentId);
}
if (LOG.isDebugEnabled()) {
if (parentId != null) {
LOG.debug(String.format("clientRequest [service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId));
} else {
LOG.debug(String.format("clientRequest [service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId));
}
}
}
private void clientResponse(Tracing brave, String serviceName, ExchangeSentEvent event) {
Span span = null;
ZipkinState state = event.getExchange().getProperty(ZipkinState.KEY, ZipkinState.class);
if (state != null) {
// only process if it was a zipkin client event
span = state.popClientSpan();
}
if (span != null) {
ZipkinClientResponseAdaptor parser = new ZipkinClientResponseAdaptor(this, event.getEndpoint());
parser.onResponse(event.getExchange(), span.customizer());
span.finish();
TraceContext context = span.context();
String traceId = "" + context.traceIdString();
String spanId = "" + context.spanId();
String parentId = context.parentId() != null ? "" + context.parentId() : null;
if (camelContext.isUseMDCLogging()) {
MDC.put("traceId", traceId);
MDC.put("spanId", spanId);
MDC.put("parentId", parentId);
}
if (LOG.isDebugEnabled()) {
if (parentId != null) {
LOG.debug(String.format("clientResponse[service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId));
} else {
LOG.debug(String.format("clientResponse[service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId));
}
}
}
}
private Span serverRequest(Tracing brave, String serviceName, Exchange exchange) {
// reuse existing span if we do multiple requests from the same
ZipkinState state = exchange.getProperty(ZipkinState.KEY, ZipkinState.class);
if (state == null) {
state = new ZipkinState();
exchange.setProperty(ZipkinState.KEY, state);
}
Span span = null;
TraceContextOrSamplingFlags sampleFlag = EXTRACTOR.extract(exchange.getIn());
if (ObjectHelper.isEmpty(sampleFlag)) {
span = brave.tracer().nextSpan();
INJECTOR.inject(span.context(), exchange.getIn());
} else {
span = brave.tracer().nextSpan(sampleFlag);
}
span.kind(Span.Kind.SERVER).start();
ZipkinServerRequestAdapter parser = new ZipkinServerRequestAdapter(this, exchange);
parser.onRequest(exchange, span.customizer());
// store span after request
state.pushServerSpan(span);
TraceContext context = span.context();
String traceId = "" + context.traceIdString();
String spanId = "" + context.spanId();
String parentId = context.parentId() != null ? "" + context.parentId() : null;
if (camelContext.isUseMDCLogging()) {
MDC.put("traceId", traceId);
MDC.put("spanId", spanId);
MDC.put("parentId", parentId);
}
if (LOG.isDebugEnabled()) {
if (parentId != null) {
LOG.debug(String.format("serverRequest [service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId));
} else {
LOG.debug(String.format("serverRequest [service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId));
}
}
return span;
}
private void serverResponse(Tracing brave, String serviceName, Exchange exchange) {
Span span = null;
ZipkinState state = exchange.getProperty(ZipkinState.KEY, ZipkinState.class);
if (state != null) {
// only process if it was a zipkin server event
span = state.popServerSpan();
}
if (span != null) {
ZipkinServerResponseAdapter parser = new ZipkinServerResponseAdapter(this, exchange);
parser.onResponse(exchange, span.customizer());
span.finish();
TraceContext context = span.context();
String traceId = "" + context.traceIdString();
String spanId = "" + context.spanId();
String parentId = context.parentId() != null ? "" + context.parentId() : null;
if (camelContext.isUseMDCLogging()) {
MDC.put("traceId", traceId);
MDC.put("spanId", spanId);
MDC.put("parentId", parentId);
}
if (LOG.isDebugEnabled()) {
if (parentId != null) {
LOG.debug(String.format("serverResponse[service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId));
} else {
LOG.debug(String.format("serverResponse[service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId));
}
}
}
}
private final class ZipkinEventNotifier extends EventNotifierSupport {
@Override
public void notify(CamelEvent event) throws Exception {
// use event notifier to track events when Camel messages to
// endpoints
// these events corresponds to Zipkin client events
// client events
if (event instanceof ExchangeSendingEvent) {
ExchangeSendingEvent ese = (ExchangeSendingEvent)event;
String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true);
Tracing brave = getTracing(serviceName);
if (brave != null) {
clientRequest(brave, serviceName, ese);
}
} else if (event instanceof ExchangeSentEvent) {
ExchangeSentEvent ese = (ExchangeSentEvent)event;
String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true);
Tracing brave = getTracing(serviceName);
if (brave != null) {
clientResponse(brave, serviceName, ese);
}
}
}
@Override
public boolean isEnabled(CamelEvent event) {
switch (event.getType()) {
case ExchangeSending:
case ExchangeSent:
case ExchangeCreated:
case ExchangeCompleted:
case ExchangeFailed:
return true;
default:
return false;
}
}
@Override
public String toString() {
return "ZipkinEventNotifier";
}
}
private final class ZipkinRoutePolicy extends RoutePolicySupport {
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
// use route policy to track events when Camel a Camel route
// begins/end the lifecycle of an Exchange
// these events corresponds to Zipkin server events
String serviceName = getServiceName(exchange, route.getEndpoint(), true, false);
Tracing brave = getTracing(serviceName);
if (brave != null) {
serverRequest(brave, serviceName, exchange);
}
}
// Report Server send after route has completed processing of the
// exchange.
@Override
public void onExchangeDone(Route route, Exchange exchange) {
String serviceName = getServiceName(exchange, route.getEndpoint(), true, false);
Tracing brave = getTracing(serviceName);
if (brave != null) {
serverResponse(brave, serviceName, exchange);
}
}
}
private static Expression routeIdExpression() {
return new Expression() {
@Override
public <T> T evaluate(Exchange exchange, Class<T> type) {
String answer = null;
UnitOfWork uow = exchange.getUnitOfWork();
RouteContext rc = uow != null ? uow.getRouteContext() : null;
if (rc != null) {
answer = rc.getRouteId();
}
if (answer == null) {
// fallback and get from route id on the exchange
answer = exchange.getFromRouteId();
}
return type.cast(answer);
}
};
}
}