blob: b1317ddda11d9394798125bc743f575a29a67304 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.accumulo.core.trace;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.NullScope;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.apache.htrace.SpanReceiverBuilder;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope;
import org.apache.htrace.impl.CountSampler;
import org.apache.htrace.impl.ProbabilitySampler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Utility class for tracing within Accumulo. Not intended for client use!
public class TraceUtil {
private static final Logger log = LoggerFactory.getLogger(TraceUtil.class);
public static final String TRACE_HOST_PROPERTY = "";
public static final String TRACE_SERVICE_PROPERTY = "trace.service";
public static final String TRACER_ZK_HOST = "";
public static final String TRACER_ZK_TIMEOUT = "tracer.zookeeper.timeout";
public static final String TRACER_ZK_PATH = "tracer.zookeeper.path";
private static final HashSet<SpanReceiver> receivers = new HashSet<>();
* Enable tracing by setting up SpanReceivers for the current process. If host name is null, it
* will be determined. If service name is null, the simple name of the class will be used.
* Properties required in the client configuration include
* {@link org.apache.accumulo.core.conf.ClientProperty#TRACE_SPAN_RECEIVERS} and any properties
* specific to the span receiver.
public static void enableClientTraces(String hostname, String service, Properties properties) {
// @formatter:off
enableTracing(hostname, service,
ClientProperty.getPrefix(properties, ClientProperty.TRACE_SPAN_RECEIVER_PREFIX)));
// @formatter:on
* Enable tracing by setting up SpanReceivers for the current process. If host name is null, it
* will be determined. If service name is null, the simple name of the class will be used.
public static void enableServerTraces(String hostname, String service,
AccumuloConfiguration conf) {
// @formatter:off
enableTracing(hostname, service,
// @formatter:on
private static void enableTracing(String hostname, String service, String spanReceivers,
String zookeepers, long timeout, String zkPath, Map<String,String> spanReceiverProps) {
String> htraceConfigProps = spanReceiverProps.entrySet().stream().collect(Collectors.toMap(
k -> String.valueOf(k).substring(Property.TRACE_SPAN_RECEIVER_PREFIX.getKey().length()),
String::valueOf, (a, b) -> {
throw new AssertionError("duplicate can't happen");
}, HashMap::new));
htraceConfigProps.put(TRACER_ZK_HOST, zookeepers);
htraceConfigProps.put(TRACER_ZK_TIMEOUT, Long.toString(timeout));
htraceConfigProps.put(TRACER_ZK_PATH, zkPath);
if (hostname != null) {
htraceConfigProps.put(TRACE_HOST_PROPERTY, hostname);
if (service != null) {
htraceConfigProps.put(TRACE_SERVICE_PROPERTY, service);
ShutdownHookManager.get().addShutdownHook(TraceUtil::disable, 0);
synchronized (receivers) {
if (!receivers.isEmpty()) {"Already loaded span receivers, enable tracing does not need to be called again");
if (spanReceivers == null) {
Stream.of(spanReceivers.split(",")).map(String::trim).filter(s -> !s.isEmpty())
.forEach(className -> {
HTraceConfiguration htraceConf = HTraceConfiguration.fromMap(htraceConfigProps);
SpanReceiverBuilder builder = new SpanReceiverBuilder(htraceConf);
SpanReceiver rcvr = builder.spanReceiverClass(className.trim()).build();
if (rcvr == null) {
log.warn("Failed to load SpanReceiver {}", className);
} else {
log.debug("SpanReceiver {} was loaded successfully.", className);
for (SpanReceiver rcvr : receivers) {
* Disable tracing by closing SpanReceivers for the current process.
public static void disable() {
synchronized (receivers) {
receivers.forEach(rcvr -> {
try {
} catch (IOException e) {
log.warn("Unable to close SpanReceiver correctly: {}", e.getMessage(), e);
* Continue a trace by starting a new span with a given parent and description.
public static TraceScope trace(TInfo info, String description) {
return info.traceId == 0 ? NullScope.INSTANCE
: Trace.startSpan(description, new TraceInfo(info.traceId, info.parentId));
private static final TInfo DONT_TRACE = new TInfo(0, 0);
* Obtain {@link org.apache.accumulo.core.trace.thrift.TInfo} for the current span.
public static TInfo traceInfo() {
Span span = Trace.currentSpan();
if (span != null) {
return new TInfo(span.getTraceId(), span.getSpanId());
return DONT_TRACE;
public static CountSampler countSampler(long frequency) {
return new CountSampler(HTraceConfiguration.fromMap(Collections
.singletonMap(CountSampler.SAMPLER_FREQUENCY_CONF_KEY, Long.toString(frequency))));
public static ProbabilitySampler probabilitySampler(double fraction) {
return new ProbabilitySampler(HTraceConfiguration.fromMap(Collections
.singletonMap(ProbabilitySampler.SAMPLER_FRACTION_CONF_KEY, Double.toString(fraction))));
* To move trace data from client to server, the RPC call must be annotated to take a TInfo object
* as its first argument. The user can simply pass null, so long as they wrap their Client and
* Service objects with these functions.
* <pre>
* Trace.on(&quot;remoteMethod&quot;);
* Iface c = new Client();
* c = TraceWrap.client(c);
* c.remoteMethod(null, arg2, arg3);
* </pre>
* The wrapper will see the annotated method and send or re-establish the trace information.
* Note that the result of these calls is a Proxy object that conforms to the basic interfaces,
* but is not your concrete instance.
public static <T> T wrapClient(final T instance) {
InvocationHandler handler = (obj, method, args) -> {
if (args == null || args.length < 1 || args[0] != null) {
return method.invoke(instance, args);
if (TInfo.class.isAssignableFrom(method.getParameterTypes()[0])) {
args[0] = traceInfo();
try (TraceScope span = Trace.startSpan("client:" + method.getName())) {
return method.invoke(instance, args);
} catch (InvocationTargetException ex) {
throw ex.getCause();
return wrapRpc(handler, instance);
public static <T> T wrapService(final T instance) {
InvocationHandler handler = (obj, method, args) -> {
try {
if (args == null || args.length < 1 || args[0] == null || !(args[0] instanceof TInfo)) {
return method.invoke(instance, args);
try (TraceScope span = trace((TInfo) args[0], method.getName())) {
return method.invoke(instance, args);
} catch (InvocationTargetException ex) {
throw ex.getCause();
return wrapRpc(handler, instance);
private static <T> T wrapRpc(final InvocationHandler handler, final T instance) {
T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(), handler);
return proxiedInstance;