blob: e92dd6a450afdb2b1fc569fe1b0ec2c07f331640 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.trace;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics2.*;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.phoenix.metrics.MetricInfo;
import org.apache.phoenix.metrics.Metrics;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.apache.htrace.TimelineAnnotation;
import org.apache.htrace.impl.MilliSpan;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import static org.apache.phoenix.metrics.MetricInfo.*;
/**
* Sink for request traces ({@link SpanReceiver}) that pushes writes to {@link MetricsSource} in a
* format that we can more easily consume.
* <p>
* <p>
* Rather than write directly to a phoenix table, we drop it into the metrics queue so we can more
* cleanly handle it asyncrhonously.Currently, {@link MilliSpan} submits the span in a synchronized
* block to all the receivers, which could have a lot of overhead if we are submitting to multiple
* receivers.
* <p>
* The format of the generated metrics is this:
* <ol>
* <li>All Metrics from the same span have the same name (allowing correlation in the sink)</li>
* <li>The description of the metric describes what it contains. For instance,
* <ul>
* <li>{@link MetricInfo#PARENT} is the id of the parent of this span. (Root span is
* {@link Span#ROOT_SPAN_ID}).</li>
* <li>{@value MetricInfo#START} is the start time of the span</li>
* <li>{@value MetricInfo#END} is the end time of the span</li>
* </ul></li>
* <li>Each span's messages are contained in a {@link MetricsTag} with the same name as above and a
* generic counter for the number of messages (to differentiate messages and provide timeline
* ordering).</li>
* </ol>
* <p>
* <i>So why even submit to metrics2 framework if we only have a single source?</i>
* <p>
* This allows us to make the updates in batches. We might have spans that finish before other spans
* (for instance in the same parent). By batching the updates we can lessen the overhead on the
* client, which is also busy doing 'real' work. <br>
* We could make our own queue and manage batching and filtering and dropping extra metrics, but
* that starts to get complicated fast (its not as easy as it sounds) so we use metrics2 to abstract
* out that pipeline and also provides us flexibility to dump metrics to other sources.
* <p>
* This is a somewhat rough implementation - we do excessive locking for correctness,
* rather than trying to make it fast, for the moment.
*/
public class TraceMetricSource implements SpanReceiver, MetricsSource {
private static final String EMPTY_STRING = "";
private static final String CONTEXT = "tracing";
private List<Metric> spans = new ArrayList<Metric>();
public TraceMetricSource() {
MetricsSystem manager = Metrics.initialize();
// Register this instance.
// For right now, we ignore the MBean registration issues that show up in DEBUG logs. Basically,
// we need a Jmx MBean compliant name. We'll get to a better name when we want that later
manager.register(CONTEXT, "Phoenix call tracing", this);
}
@Override
public void receiveSpan(Span span) {
Metric builder = new Metric(span);
// add all the metrics for the span
builder.addCounter(Interns.info(SPAN.traceName, EMPTY_STRING), span.getSpanId());
builder.addCounter(Interns.info(PARENT.traceName, EMPTY_STRING), span.getParentId());
builder.addCounter(Interns.info(START.traceName, EMPTY_STRING), span.getStartTimeMillis());
builder.addCounter(Interns.info(END.traceName, EMPTY_STRING), span.getStopTimeMillis());
// add the tags to the span. They were written in order received so we mark them as such
for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
builder.add(new MetricsTag(Interns.info(TAG.traceName, Long.toString(ta.getTime())), ta
.getMessage()));
}
// add the annotations. We assume they are serialized as strings and integers, but that can
// change in the future
Map<byte[], byte[]> annotations = span.getKVAnnotations();
for (Entry<byte[], byte[]> annotation : annotations.entrySet()) {
Pair<String, String> val =
TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue());
builder.add(new MetricsTag(Interns.info(ANNOTATION.traceName, val.getFirst()), val
.getSecond()));
}
// add the span to the list we care about
synchronized (this) {
spans.add(builder);
}
}
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
// add a marker record so we know how many spans are used
// this is also necessary to ensure that we register the metrics source as an MBean (avoiding a
// runtime warning)
MetricsRecordBuilder marker = collector.addRecord(TracingUtils.METRICS_MARKER_CONTEXT);
marker.add(new MetricsTag(new MetricsInfoImpl("stat", "num spans"), Integer
.toString(spans.size())));
// actually convert the known spans into metric records as well
synchronized (this) {
for (Metric span : spans) {
MetricsRecordBuilder builder = collector.addRecord(new MetricsInfoImpl(TracingUtils
.getTraceMetricName(span.id), span.desc));
builder.setContext(TracingUtils.METRICS_CONTEXT);
for (Pair<MetricsInfo, Long> metric : span.counters) {
builder.addCounter(metric.getFirst(), metric.getSecond());
}
for (MetricsTag tag : span.tags) {
builder.add(tag);
}
}
// reset the spans so we don't keep a big chunk of memory around
spans = new ArrayList<Metric>();
}
}
@Override
public void close() throws IOException {
// noop
}
private static class Metric {
List<Pair<MetricsInfo, Long>> counters = new ArrayList<Pair<MetricsInfo, Long>>();
List<MetricsTag> tags = new ArrayList<MetricsTag>();
private String id;
private String desc;
public Metric(Span span) {
this.id = Long.toString(span.getTraceId());
this.desc = span.getDescription();
}
/**
* @param metricsInfoImpl
* @param startTimeMillis
*/
public void addCounter(MetricsInfo metricsInfoImpl, long startTimeMillis) {
counters.add(new Pair<MetricsInfo, Long>(metricsInfoImpl, startTimeMillis));
}
/**
* @param metricsTag
*/
public void add(MetricsTag metricsTag) {
tags.add(metricsTag);
}
}
}