blob: 0fc79200b95d0b5cadc1ed89176cfb90e43e245f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import com.twitter.zipkin.gen.LogEntry;
import com.twitter.zipkin.gen.Scribe;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.Transport;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ScribeTransport implements Transport {
/**
* this is used to tell scribe that the entries are for zipkin..
*/
public static final String CATEGORY = "zipkin";
private static final Log LOG = LogFactory.getLog(ScribeTransport.class);
/**
* Default hostname to fall back on.
*/
private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
public static final String DEPRECATED_HOSTNAME_KEY = "zipkin.collector-hostname";
public static final String HOSTNAME_KEY = "zipkin.scribe.hostname";
/**
* Default collector port.
*/
private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port.
public static final String DEPRECATED_PORT_KEY = "zipkin.collector-port";
public static final String PORT_KEY = "zipkin.scribe.port";
private Scribe.Iface scribe = null;
@Override
public void open(HTraceConfiguration conf) throws IOException {
if (!isOpen()) {
checkDeprecation(conf, DEPRECATED_HOSTNAME_KEY, HOSTNAME_KEY);
checkDeprecation(conf, DEPRECATED_PORT_KEY, PORT_KEY);
String collectorHostname = conf.get(HOSTNAME_KEY,
conf.get(DEPRECATED_HOSTNAME_KEY,
DEFAULT_COLLECTOR_HOSTNAME));
int collectorPort = conf.getInt(PORT_KEY,
conf.getInt(DEPRECATED_PORT_KEY,
DEFAULT_COLLECTOR_PORT));
scribe = newScribe(collectorHostname, collectorPort);
LOG.info("Opened transport " + collectorHostname + ":" + collectorPort);
} else {
LOG.warn("Attempted to open an already opened transport");
}
}
private void checkDeprecation(HTraceConfiguration conf, String deprecatedKey,
String newKey) {
if (conf.get(deprecatedKey) != null) {
LOG.warn("Configuration \"" + deprecatedKey + "\" is deprecated. Use \"" +
newKey + "\" instead.");
}
}
@Override
public boolean isOpen() {
return scribe != null
&& ((Scribe.Client) scribe).getInputProtocol().getTransport().isOpen();
}
/**
* The Scribe client which is used for rpc writes a list of
* LogEntry objects, so the span objects are first transformed into LogEntry objects before
* sending to the zipkin-collector.
*
* Here is a little ascii art which shows the above transformation:
* <pre>
* +------------+ +------------+ +------------+ +-----------------+
* | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
* +------------+ +------------+ +------------+ (Scribe RPC) +-----------------+
* </pre>
* @param spans to be sent. The raw bytes are being sent.
* @throws IOException
*/
@Override
public void send(List<byte[]> spans) throws IOException {
ArrayList<LogEntry> entries = new ArrayList<LogEntry>(spans.size());
for (byte[] span : spans) {
entries.add(new LogEntry(CATEGORY, Base64.encodeBase64String(span)));
}
try {
if (LOG.isTraceEnabled()) {
LOG.trace("sending " + entries.size() + " entries");
}
scribe.Log(entries); // TODO (clehene) should we instead interpret the return?
} catch (TException e) {
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
if (scribe != null) {
((Scribe.Client) scribe).getInputProtocol().getTransport().close();
scribe = null;
LOG.info("Closed transport");
} else {
LOG.warn("Attempted to close an already closed transport");
}
}
private Scribe.Iface newScribe(String collectorHostname,
int collectorPort)
throws IOException {
TTransport transport = new TFramedTransport(
new TSocket(collectorHostname, collectorPort));
try {
transport.open();
} catch (TTransportException e) {
throw new IOException(e);
}
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
TProtocol protocol = factory.getProtocol(transport);
return new Scribe.Client(protocol);
}
}