| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.htrace.impl; |
| |
| import com.twitter.zipkin.gen.LogEntry; |
| import com.twitter.zipkin.gen.Scribe; |
| |
| import org.apache.commons.codec.binary.Base64; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.htrace.core.HTraceConfiguration; |
| import org.apache.htrace.core.Span; |
| import org.apache.htrace.core.SpanReceiver; |
| import org.apache.htrace.core.TracerId; |
| import org.apache.htrace.zipkin.HTraceToZipkinConverter; |
| import org.apache.thrift.protocol.TBinaryProtocol; |
| import org.apache.thrift.protocol.TProtocol; |
| import org.apache.thrift.protocol.TProtocolFactory; |
| import org.apache.thrift.transport.TFramedTransport; |
| import org.apache.thrift.transport.TIOStreamTransport; |
| import org.apache.thrift.transport.TSocket; |
| import org.apache.thrift.transport.TTransport; |
| import org.apache.thrift.transport.TTransportException; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| /** |
| * Zipkin is an open source tracing library. This span receiver acts as a bridge between HTrace and |
| * Zipkin, that converts HTrace Span objects into Zipkin Span objects. |
| * <p/> |
| * HTrace spans are queued into a blocking queue. From there background worker threads will |
| * batch the spans together and then send them through to a Zipkin collector. |
| */ |
| public class ZipkinSpanReceiver implements SpanReceiver { |
| private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class); |
| |
| /** |
| * Default hostname to fall back on. |
| */ |
| private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost"; |
| public static final String HOSTNAME_KEY = "zipkin.collector-hostname"; |
| |
| /** |
| * Default collector port. |
| */ |
| private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port. |
| public static final String PORT_KEY = "zipkin.collector-port"; |
| |
| /** |
| * Default number of threads to use. |
| */ |
| private static final int DEFAULT_NUM_THREAD = 1; |
| public static final String NUM_THREAD_KEY = "zipkin.num-threads"; |
| |
| /** |
| * this is used to tell scribe that the entries are for zipkin.. |
| */ |
| private static final String CATEGORY = "zipkin"; |
| |
| /** |
| * Whether the service which is traced is in client or a server mode. It is used while creating |
| * the Endpoint. |
| */ |
| private static final boolean DEFAULT_IN_CLIENT_MODE = false; |
| |
| /** |
| * How long this receiver will try and wait for all threads to shutdown. |
| */ |
| private static final int SHUTDOWN_TIMEOUT = 30; |
| |
| /** |
| * How many spans this receiver will try and send in one batch. |
| */ |
| private static final int MAX_SPAN_BATCH_SIZE = 100; |
| |
| /** |
| * How many errors in a row before we start dropping traces on the floor. |
| */ |
| private static final int MAX_ERRORS = 10; |
| |
| /** |
| * The queue that will get all HTrace spans that are to be sent. |
| */ |
| private final BlockingQueue<Span> queue; |
| |
| /** |
| * Factory used to encode a Zipkin Span to bytes. |
| */ |
| private final TProtocolFactory protocolFactory; |
| |
| /** |
| * Boolean used to signal that the threads should end. |
| */ |
| private final AtomicBoolean running = new AtomicBoolean(true); |
| |
| /** |
| * The thread factory used to create new ExecutorService. |
| * <p/> |
| * This will be the same factory for the lifetime of this object so that |
| * no thread names will ever be duplicated. |
| */ |
| private final ThreadFactory tf = new ThreadFactory() { |
| private final AtomicLong receiverIdx = new AtomicLong(0); |
| |
| @Override |
| public Thread newThread(Runnable r) { |
| Thread t = new Thread(r); |
| t.setDaemon(true); |
| t.setName(String.format("zipkinSpanReceiver-%d", |
| receiverIdx.getAndIncrement())); |
| return t; |
| } |
| }; |
| |
| private final TracerId tracerId; |
| |
| //////////////////// |
| /// Variables that will change on each call to configure() |
| /////////////////// |
| private HTraceToZipkinConverter converter; |
| private ExecutorService service; |
| private HTraceConfiguration conf; |
| private String collectorHostname; |
| private int collectorPort; |
| |
| public ZipkinSpanReceiver(HTraceConfiguration conf) { |
| this.queue = new ArrayBlockingQueue<Span>(1000); |
| this.protocolFactory = new TBinaryProtocol.Factory(); |
| this.tracerId = new TracerId(conf); |
| configure(conf); |
| } |
| |
| private void configure(HTraceConfiguration conf) { |
| this.conf = conf; |
| |
| this.collectorHostname = conf.get(HOSTNAME_KEY, DEFAULT_COLLECTOR_HOSTNAME); |
| this.collectorPort = conf.getInt(PORT_KEY, DEFAULT_COLLECTOR_PORT); |
| |
| // initialize the endpoint. This endpoint is used while writing the Span. |
| initConverter(); |
| |
| int numThreads = conf.getInt(NUM_THREAD_KEY, DEFAULT_NUM_THREAD); |
| |
| // If there are already threads runnnig tear them down. |
| if (this.service != null) { |
| this.service.shutdownNow(); |
| this.service = null; |
| } |
| |
| this.service = Executors.newFixedThreadPool(numThreads, tf); |
| |
| for (int i = 0; i < numThreads; i++) { |
| this.service.submit(new WriteSpanRunnable()); |
| } |
| } |
| |
| /** |
| * Set up the HTrace to Zipkin converter. |
| */ |
| private void initConverter() { |
| InetAddress tracedServiceHostname = null; |
| // Try and get the hostname. If it's not configured try and get the local hostname. |
| try { |
| String host = conf.get("zipkin.traced-service-hostname", |
| InetAddress.getLocalHost().getHostAddress()); |
| |
| tracedServiceHostname = InetAddress.getByName(host); |
| } catch (UnknownHostException e) { |
| LOG.error("Couldn't get the localHost address", e); |
| } |
| short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", -1); |
| byte[] address = tracedServiceHostname != null |
| ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes(); |
| int ipv4 = ByteBuffer.wrap(address).getInt(); |
| this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort); |
| } |
| |
| |
| private class WriteSpanRunnable implements Runnable { |
| /** |
| * scribe client to push zipkin spans |
| */ |
| private Scribe.Iface scribe = null; |
| private final ByteArrayOutputStream baos; |
| private final TProtocol streamProtocol; |
| |
| public WriteSpanRunnable() { |
| baos = new ByteArrayOutputStream(); |
| streamProtocol = protocolFactory.getProtocol(new TIOStreamTransport(baos)); |
| } |
| |
| /** |
| * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin |
| * collector as a thrift object. The scribe client which is used for rpc writes a list of |
| * LogEntry objects, so the span objects are first transformed into LogEntry objects before |
| * sending to the zipkin-collector. |
| * <p/> |
| * Here is a little ascii art which shows the above transformation: |
| * <pre> |
| * +------------+ +------------+ +------------+ +-----------------+ |
| * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector| |
| * +------------+ +------------+ +------------+ (Scribe rpc) +-----------------+ |
| * </pre> |
| */ |
| @Override |
| public void run() { |
| |
| List<Span> dequeuedSpans = new ArrayList<Span>(MAX_SPAN_BATCH_SIZE); |
| |
| long errorCount = 0; |
| |
| while (running.get() || queue.size() > 0) { |
| Span firstSpan = null; |
| try { |
| // Block for up to a second. to try and get a span. |
| // We only block for a little bit in order to notice if the running value has changed |
| firstSpan = queue.poll(1, TimeUnit.SECONDS); |
| |
| // If the poll was successful then it's possible that there |
| // will be other spans to get. Try and get them. |
| if (firstSpan != null) { |
| // Add the first one that we got |
| dequeuedSpans.add(firstSpan); |
| // Try and get up to 100 queues |
| queue.drainTo(dequeuedSpans, MAX_SPAN_BATCH_SIZE - 1); |
| } |
| |
| } catch (InterruptedException ie) { |
| // Ignored. |
| } |
| |
| if (dequeuedSpans.isEmpty()) continue; |
| |
| // If this is the first time through or there was an error re-connect |
| if (scribe == null) { |
| startClient(); |
| } |
| // Create a new list every time through so that the list doesn't change underneath |
| // thrift as it's sending. |
| List<LogEntry> entries = new ArrayList<LogEntry>(dequeuedSpans.size()); |
| try { |
| // Convert every de-queued span |
| for (Span htraceSpan : dequeuedSpans) { |
| // convert the HTrace span to Zipkin span |
| com.twitter.zipkin.gen.Span zipkinSpan = converter.convert(htraceSpan); |
| // Clear any old data. |
| baos.reset(); |
| // Write the span to a BAOS |
| zipkinSpan.write(streamProtocol); |
| |
| // Do Base64 encoding and put the string into a log entry. |
| LogEntry logEntry = |
| new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray())); |
| entries.add(logEntry); |
| } |
| |
| // Send the entries |
| scribe.Log(entries); |
| // clear the list for the next time through. |
| dequeuedSpans.clear(); |
| // reset the error counter. |
| errorCount = 0; |
| } catch (Exception e) { |
| LOG.error("Error when writing to the zipkin collector: " + |
| collectorHostname + ":" + collectorPort, e); |
| |
| errorCount += 1; |
| // If there have been ten errors in a row start dropping things. |
| if (errorCount < MAX_ERRORS) { |
| try { |
| queue.addAll(dequeuedSpans); |
| } catch (IllegalStateException ex) { |
| LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full"); |
| } |
| } |
| |
| closeClient(); |
| try { |
| // Since there was an error sleep just a little bit to try and allow the |
| // zipkin collector some time to recover. |
| Thread.sleep(500); |
| } catch (InterruptedException e1) { |
| // Ignored |
| } |
| } |
| } |
| closeClient(); |
| } |
| |
| /** |
| * Close out the connection. |
| */ |
| private void closeClient() { |
| // close out the transport. |
| if (scribe != null && scribe instanceof Scribe.Client) { |
| ((Scribe.Client) scribe).getInputProtocol().getTransport().close(); |
| scribe = null; |
| } |
| } |
| |
| /** |
| * Re-connect to Zipkin. |
| */ |
| private void startClient() { |
| if (this.scribe == null) { |
| this.scribe = newScribe(); |
| } |
| } |
| } |
| |
| // Override for testing |
| Scribe.Iface newScribe() { |
| TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort)); |
| try { |
| transport.open(); |
| } catch (TTransportException e) { |
| e.printStackTrace(); |
| } |
| TProtocol protocol = protocolFactory.getProtocol(transport); |
| return new Scribe.Client(protocol); |
| } |
| |
| /** |
| * Close the receiver. |
| * <p/> |
| * This tries to shut |
| * |
| * @throws IOException |
| */ |
| @Override |
| public void close() throws IOException { |
| running.set(false); |
| service.shutdown(); |
| try { |
| if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { |
| LOG.error("Was not able to process all remaining spans to write upon closing in: " + |
| SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS + ". There could be un-sent spans still left." + |
| " They have been dropped."); |
| } |
| } catch (InterruptedException e1) { |
| LOG.warn("Thread interrupted when terminating executor.", e1); |
| } |
| } |
| |
| @Override |
| public void receiveSpan(Span span) { |
| if (running.get()) { |
| try { |
| if (span.getTracerId().isEmpty()) { |
| span.setTracerId(tracerId.get()); |
| } |
| this.queue.add(span); |
| } catch (IllegalStateException e) { |
| LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue." |
| + " Blocking Queue was full."); |
| } |
| } |
| } |
| } |