HTRACE-235. htrace-zipkin - add Kafka transport support (Cosmin Lehene via Colin P. McCabe)
diff --git a/htrace-zipkin/README.md b/htrace-zipkin/README.md
new file mode 100644
index 0000000..6fbee95
--- /dev/null
+++ b/htrace-zipkin/README.md
@@ -0,0 +1,50 @@
+HTrace Zipkin Receiver
+======================
+
+Use the HTrace Java library with [Zipkin](https://github.com/openzipkin/zipkin).
+
+To use, set `"span.receiver.classes", "org.apache.htrace.impl.ZipkinSpanReceiver"`
+in the HTraceConfiguration.
+
+
+Transports
+----------
+
+The Zipkin receiver supports both the Zipkin Scribe (default) and Kafka transports,
+controlled through the `zipkin.transport.class` configuration.
+
+Scribe (Thrift) Transport
+-------------------------
+
+### Configuration
+
+Configurations are prefixed with `zipkin.scribe`.
+
+* `zipkin.scribe.hostname`, `localhost`
+* `zipkin.scribe.port`, `9410`
+
+Deprecated (backwards compatibility):
+
+* `zipkin.collector-hostname`, `localhost`
+* `zipkin.collector-port`, `9410`
+
+Kafka Transport
+---------------
+
+To use the Kafka transport, add
+`"zipkin.transport.class", "org.apache.htrace.impl.KafkaTransport"`
+to the configuration.
+
+### Configuration
+
+Configurations are prefixed with `zipkin.kafka`.
+
+* `zipkin.kafka.topic`, `zipkin`
+
+Producer specific configurations
+
+* `zipkin.kafka.metadata.broker.list`, `localhost:9092`
+* `zipkin.kafka.request.required.acks`, `0`
+* `zipkin.kafka.producer.type`, `async`
+* `zipkin.kafka.serializer.class`, `kafka.serializer.DefaultEncoder`
+* `zipkin.kafka.compression.codec`, `1`
diff --git a/htrace-zipkin/pom.xml b/htrace-zipkin/pom.xml
index 8e2c8a1..77bba02 100644
--- a/htrace-zipkin/pom.xml
+++ b/htrace-zipkin/pom.xml
@@ -28,6 +28,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>1.5.8</slf4j.version>
+ <kafka.version>0.8.2.1</kafka.version>
+ <scala.version>2.11</scala.version>
</properties>
<build>
@@ -58,30 +60,52 @@
<phase>package</phase>
<configuration>
<relocations>
+
<relocation>
<pattern>org.apache.commons.logging</pattern>
<shadedPattern>org.apache.htrace.commons.logging</shadedPattern>
</relocation>
+
<relocation>
<pattern>org.apache.thrift</pattern>
<shadedPattern>org.apache.htrace.thrift</shadedPattern>
</relocation>
+
<relocation>
<pattern>org.slf4j</pattern>
<shadedPattern>org.apache.htrace.slf4j</shadedPattern>
</relocation>
+
<relocation>
<pattern>org.apache.commons.codec</pattern>
<shadedPattern>org.apache.htrace.commons.codec</shadedPattern>
</relocation>
+
<relocation>
<pattern>org.apache.commons.lang</pattern>
<shadedPattern>org.apache.htrace.commons.lang</shadedPattern>
</relocation>
+
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>org.apache.htrace.http</shadedPattern>
</relocation>
+
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+ <shadedPattern>org.apache.htrace.kafka</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka</pattern>
+ <shadedPattern>org.apache.htrace.kafka</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>org.I0Itec</pattern>
+ <shadedPattern>org.apache.htrace.I0Itec</shadedPattern>
+ </relocation>
+
</relocations>
</configuration>
<goals>
@@ -153,6 +177,27 @@
<artifactId>commons-codec</artifactId>
<version>1.7</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.version}</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.version}</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<profiles>
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java b/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java
new file mode 100644
index 0000000..dd55bea
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace;
+
+import org.apache.htrace.core.HTraceConfiguration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Span data transport interface.
+ */
+public interface Transport extends Closeable {
+
+ /**
+ * Open connection to Transport endpoint
+ * @param conf Transport configuration
+ * @throws IOException if an I/O error occurs
+ */
+ void open(HTraceConfiguration conf) throws IOException;
+
+ /**
+ * Checks if the transport in use is open
+ * @return whether the transport is open
+ */
+ boolean isOpen();
+
+ /**
+ * Sends the list of objects to the transport endpoint
+ * @param spans to be sent
+ * @throws IOException if an I/O error occurs
+ */
+ void send(List<byte[]> spans) throws IOException;
+}
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java
new file mode 100644
index 0000000..b352f0c
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.Transport;
+import org.apache.htrace.core.HTraceConfiguration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+public class KafkaTransport implements Transport {
+
+ private static final Log LOG = LogFactory.getLog(KafkaTransport.class);
+ private static final String DEFAULT_TOPIC = "zipkin";
+ public static final String TOPIC_KEY = "zipkin.kafka.topic";
+
+ Producer<byte[], byte[]> producer;
+ private boolean isOpen = false;
+ private String topic;
+
+ /**
+ * Opens a new Kafka transport
+ * @param conf Transport configuration. Some Kafka producer configurations
+ * can be passed by prefixing the config key with zipkin.kafka
+ * (e.g. zipkin.kafka.producer.type for producer.type)
+ * @throws IOException if an I/O error occurs
+ * @throws IllegalStateException if transport is already open
+ */
+ @Override
+ public void open(HTraceConfiguration conf) throws IOException,
+ IllegalStateException {
+ if (!isOpen()) {
+ topic = conf.get(TOPIC_KEY, DEFAULT_TOPIC);
+ producer = newProducer(conf);
+ isOpen = true;
+ } else {
+ LOG.warn("Attempted to open an already opened transport");
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ @Override
+ public void send(List<byte[]> spans) throws IOException {
+
+ List<KeyedMessage<byte[], byte[]>> entries = new ArrayList<>(spans.size());
+
+ for (byte[] span : spans) {
+ entries.add(new KeyedMessage<byte[], byte[]>(topic, span));
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("sending " + entries.size() + " entries");
+ }
+ producer.send(entries);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isOpen) {
+ producer.close();
+ isOpen = false;
+ } else {
+ LOG.warn("Attempted to close already closed transport");
+ }
+ }
+
+ public Producer<byte[], byte[]> newProducer(HTraceConfiguration conf) {
+ // https://kafka.apache.org/083/configuration.html
+ Properties producerProps = new Properties();
+ // Essential producer configurations
+ producerProps.put("metadata.broker.list",
+ conf.get("zipkin.kafka.metadata.broker.list", "localhost:9092"));
+ producerProps.put("request.required.acks",
+ conf.get("zipkin.kafka.request.required.acks", "0"));
+ producerProps.put("producer.type",
+ conf.get("zipkin.kafka.producer.type", "async"));
+ producerProps.put("serializer.class",
+ conf.get("zipkin.kafka.serializer.class", "kafka.serializer.DefaultEncoder"));
+ producerProps.put("compression.codec",
+ conf.get("zipkin.kafka.compression.codec", "1"));
+
+ Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(producerProps));
+ LOG.info("Connected to Kafka transport \n" + producerProps);
+ return producer;
+ }
+
+}
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java
new file mode 100644
index 0000000..0fc7920
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import com.twitter.zipkin.gen.LogEntry;
+import com.twitter.zipkin.gen.Scribe;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.Transport;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ScribeTransport implements Transport {
+
+ /**
+ * this is used to tell scribe that the entries are for zipkin..
+ */
+ public static final String CATEGORY = "zipkin";
+
+
+ private static final Log LOG = LogFactory.getLog(ScribeTransport.class);
+ /**
+ * Default hostname to fall back on.
+ */
+ private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
+ public static final String DEPRECATED_HOSTNAME_KEY = "zipkin.collector-hostname";
+ public static final String HOSTNAME_KEY = "zipkin.scribe.hostname";
+
+ /**
+ * Default collector port.
+ */
+ private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port.
+ public static final String DEPRECATED_PORT_KEY = "zipkin.collector-port";
+ public static final String PORT_KEY = "zipkin.scribe.port";
+
+ private Scribe.Iface scribe = null;
+
+ @Override
+ public void open(HTraceConfiguration conf) throws IOException {
+ if (!isOpen()) {
+ checkDeprecation(conf, DEPRECATED_HOSTNAME_KEY, HOSTNAME_KEY);
+ checkDeprecation(conf, DEPRECATED_PORT_KEY, PORT_KEY);
+
+ String collectorHostname = conf.get(HOSTNAME_KEY,
+ conf.get(DEPRECATED_HOSTNAME_KEY,
+ DEFAULT_COLLECTOR_HOSTNAME));
+ int collectorPort = conf.getInt(PORT_KEY,
+ conf.getInt(DEPRECATED_PORT_KEY,
+ DEFAULT_COLLECTOR_PORT));
+ scribe = newScribe(collectorHostname, collectorPort);
+ LOG.info("Opened transport " + collectorHostname + ":" + collectorPort);
+ } else {
+ LOG.warn("Attempted to open an already opened transport");
+ }
+ }
+
+ private void checkDeprecation(HTraceConfiguration conf, String deprecatedKey,
+ String newKey) {
+ if (conf.get(deprecatedKey) != null) {
+ LOG.warn("Configuration \"" + deprecatedKey + "\" is deprecated. Use \"" +
+ newKey + "\" instead.");
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return scribe != null
+ && ((Scribe.Client) scribe).getInputProtocol().getTransport().isOpen();
+ }
+
+ /**
+ * The Scribe client which is used for rpc writes a list of
+ * LogEntry objects, so the span objects are first transformed into LogEntry objects before
+ * sending to the zipkin-collector.
+ *
+ * Here is a little ascii art which shows the above transformation:
+ * <pre>
+ * +------------+ +------------+ +------------+ +-----------------+
+ * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
+ * +------------+ +------------+ +------------+ (Scribe RPC) +-----------------+
+ * </pre>
+ * @param spans to be sent. The raw bytes are being sent.
+ * @throws IOException
+ */
+ @Override
+ public void send(List<byte[]> spans) throws IOException {
+
+ ArrayList<LogEntry> entries = new ArrayList<LogEntry>(spans.size());
+ for (byte[] span : spans) {
+ entries.add(new LogEntry(CATEGORY, Base64.encodeBase64String(span)));
+ }
+
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("sending " + entries.size() + " entries");
+ }
+ scribe.Log(entries); // TODO (clehene) should we instead interpret the return?
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (scribe != null) {
+ ((Scribe.Client) scribe).getInputProtocol().getTransport().close();
+ scribe = null;
+ LOG.info("Closed transport");
+ } else {
+ LOG.warn("Attempted to close an already closed transport");
+ }
+ }
+
+ private Scribe.Iface newScribe(String collectorHostname,
+ int collectorPort)
+ throws IOException {
+
+ TTransport transport = new TFramedTransport(
+ new TSocket(collectorHostname, collectorPort));
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ throw new IOException(e);
+ }
+ TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
+ TProtocol protocol = factory.getProtocol(transport);
+ return new Scribe.Client(protocol);
+ }
+
+}
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
index c106fa8..2dfe5a6 100644
--- a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
@@ -17,12 +17,10 @@
package org.apache.htrace.impl;
-import com.twitter.zipkin.gen.LogEntry;
-import com.twitter.zipkin.gen.Scribe;
-
-import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.Transport;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanReceiver;
@@ -30,27 +28,25 @@
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Zipkin is an open source tracing library. This span receiver acts as a bridge between HTrace and
@@ -58,34 +54,22 @@
* <p/>
* HTrace spans are queued into a blocking queue. From there background worker threads will
* batch the spans together and then send them through to a Zipkin collector.
+ *
+ * Pluggable Zipkin transports are supported through the "zipkin.transport.class" configuration
+ * Implementations for Scribe (ScribeTransport) (default) and Kafka (KafkaTransport) are available
+ *
*/
public class ZipkinSpanReceiver extends SpanReceiver {
+
private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class);
/**
- * Default hostname to fall back on.
- */
- private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
- public static final String HOSTNAME_KEY = "zipkin.collector-hostname";
-
- /**
- * Default collector port.
- */
- private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port.
- public static final String PORT_KEY = "zipkin.collector-port";
-
- /**
* Default number of threads to use.
*/
private static final int DEFAULT_NUM_THREAD = 1;
public static final String NUM_THREAD_KEY = "zipkin.num-threads";
/**
- * this is used to tell scribe that the entries are for zipkin..
- */
- private static final String CATEGORY = "zipkin";
-
- /**
* Whether the service which is traced is in client or a server mode. It is used while creating
* the Endpoint.
*/
@@ -106,6 +90,14 @@
*/
private static final int MAX_ERRORS = 10;
+ private static final String DEFAULT_TRANSPORT_CLASS = "org.apache.htrace.impl.ScribeTransport";
+ public static final String TRANSPORT_CLASS_KEY = "zipkin.transport.class";
+
+ /**
+ * The transport that the spans will be sent trough
+ */
+ private Transport transport;
+
/**
* The queue that will get all HTrace spans that are to be sent.
*/
@@ -146,20 +138,37 @@
private HTraceToZipkinConverter converter;
private ExecutorService service;
private HTraceConfiguration conf;
- private String collectorHostname;
- private int collectorPort;
public ZipkinSpanReceiver(HTraceConfiguration conf) {
+ this.transport = createTransport(conf);
this.queue = new ArrayBlockingQueue<Span>(1000);
this.protocolFactory = new TBinaryProtocol.Factory();
configure(conf);
}
+ private void logAndThrow(Throwable exception) {
+ LOG.error(ExceptionUtils.getStackTrace(exception));
+ throw new RuntimeException(exception);
+ }
+
+ protected Transport createTransport(HTraceConfiguration conf) {
+ ClassLoader classLoader = Builder.class.getClassLoader();
+ String className = conf.get(TRANSPORT_CLASS_KEY, DEFAULT_TRANSPORT_CLASS);
+ Transport transport = null;
+ try {
+ Class<?> cls = classLoader.loadClass(className);
+ transport = (Transport)cls.newInstance();
+ } catch (ClassNotFoundException
+ | InstantiationException
+ | IllegalAccessException e) {
+ logAndThrow(e);
+ }
+ return transport;
+ }
+
private void configure(HTraceConfiguration conf) {
this.conf = conf;
- this.collectorHostname = conf.get(HOSTNAME_KEY, DEFAULT_COLLECTOR_HOSTNAME);
- this.collectorPort = conf.getInt(PORT_KEY, DEFAULT_COLLECTOR_PORT);
// initialize the endpoint. This endpoint is used while writing the Span.
initConverter();
@@ -186,26 +195,23 @@
InetAddress tracedServiceHostname = null;
// Try and get the hostname. If it's not configured try and get the local hostname.
try {
+ //TODO (clehene) extract conf to constant
+ //TODO (clehene) has this been deprecated?
String host = conf.get("zipkin.traced-service-hostname",
InetAddress.getLocalHost().getHostAddress());
-
tracedServiceHostname = InetAddress.getByName(host);
} catch (UnknownHostException e) {
LOG.error("Couldn't get the localHost address", e);
}
short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", -1);
byte[] address = tracedServiceHostname != null
- ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes();
+ ? tracedServiceHostname.getAddress() : InetAddress.getLoopbackAddress().getAddress();
int ipv4 = ByteBuffer.wrap(address).getInt();
this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort);
}
private class WriteSpanRunnable implements Runnable {
- /**
- * scribe client to push zipkin spans
- */
- private Scribe.Iface scribe = null;
private final ByteArrayOutputStream baos;
private final TProtocol streamProtocol;
@@ -215,16 +221,15 @@
}
/**
- * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin
- * collector as a thrift object. The scribe client which is used for rpc writes a list of
- * LogEntry objects, so the span objects are first transformed into LogEntry objects before
- * sending to the zipkin-collector.
+ *
+ * This runnable converts an HTrace span to a Zipkin span and sends it across the transport
+ * as a Thrift object.
* <p/>
* Here is a little ascii art which shows the above transformation:
* <pre>
- * +------------+ +------------+ +------------+ +-----------------+
- * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
- * +------------+ +------------+ +------------+ (Scribe rpc) +-----------------+
+ * +------------+ +------------+ +-----------------+
+ * | HTrace Span|-->|Zipkin Span | ===========> | Zipkin Collector|
+ * +------------+ +------------+ (transport) +-----------------+
* </pre>
*/
@Override
@@ -236,6 +241,7 @@
while (running.get() || queue.size() > 0) {
Span firstSpan = null;
+ //TODO (clenene) the following code (try / catch) is duplicated in / from FlumeSpanReceiver
try {
// Block for up to a second. to try and get a span.
// We only block for a little bit in order to notice if the running value has changed
@@ -256,13 +262,17 @@
if (dequeuedSpans.isEmpty()) continue;
- // If this is the first time through or there was an error re-connect
- if (scribe == null) {
- startClient();
+ if (!transport.isOpen()) {
+ try {
+ transport.open(conf);
+ } catch (Throwable e) {
+ logAndThrow(e);
+ }
}
+
// Create a new list every time through so that the list doesn't change underneath
// thrift as it's sending.
- List<LogEntry> entries = new ArrayList<LogEntry>(dequeuedSpans.size());
+ List<byte[]> entries = new ArrayList<>(dequeuedSpans.size());
try {
// Convert every de-queued span
for (Span htraceSpan : dequeuedSpans) {
@@ -273,76 +283,57 @@
// Write the span to a BAOS
zipkinSpan.write(streamProtocol);
- // Do Base64 encoding and put the string into a log entry.
- LogEntry logEntry =
- new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray()));
- entries.add(logEntry);
+ entries.add(baos.toByteArray());
}
// Send the entries
- scribe.Log(entries);
+ transport.send(entries);
+
// clear the list for the next time through.
dequeuedSpans.clear();
// reset the error counter.
errorCount = 0;
} catch (Exception e) {
- LOG.error("Error when writing to the zipkin collector: " +
- collectorHostname + ":" + collectorPort, e);
-
- errorCount += 1;
- // If there have been ten errors in a row start dropping things.
- if (errorCount < MAX_ERRORS) {
- try {
- queue.addAll(dequeuedSpans);
- } catch (IllegalStateException ex) {
- LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full");
- }
- }
-
- closeClient();
- try {
- // Since there was an error sleep just a little bit to try and allow the
- // zipkin collector some time to recover.
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- // Ignored
- }
+ errorCount = handleException(dequeuedSpans, errorCount, e);
}
}
closeClient();
}
+ private long handleException(List<Span> dequeuedSpans, long errorCount, Exception e) {
+ LOG.error("Error when writing to the zipkin transport: " + transport, e);
+
+ errorCount += 1;
+ // If there have been ten errors in a row start dropping things.
+ if (errorCount < MAX_ERRORS) {
+ try {
+ queue.addAll(dequeuedSpans);
+ } catch (IllegalStateException ex) {
+ LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full");
+ }
+ }
+ closeClient();
+ try {
+ // Since there was an error sleep just a little bit to try and allow the
+ // zipkin collector some time to recover.
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // Ignored
+ }
+ return errorCount;
+ }
+
/**
* Close out the connection.
*/
- private void closeClient() {
- // close out the transport.
- if (scribe != null && scribe instanceof Scribe.Client) {
- ((Scribe.Client) scribe).getInputProtocol().getTransport().close();
- scribe = null;
+ private void closeClient(){
+ try {
+ transport.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close transport", e);
}
}
- /**
- * Re-connect to Zipkin.
- */
- private void startClient() {
- if (this.scribe == null) {
- this.scribe = newScribe();
- }
- }
- }
-
- // Override for testing
- Scribe.Iface newScribe() {
- TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
- try {
- transport.open();
- } catch (TTransportException e) {
- e.printStackTrace();
- }
- TProtocol protocol = protocolFactory.getProtocol(transport);
- return new Scribe.Client(protocol);
}
/**
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java b/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
index 6595772..0a22d68 100644
--- a/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
@@ -17,14 +17,7 @@
package org.apache.htrace.impl;
-import com.twitter.zipkin.gen.LogEntry;
-import com.twitter.zipkin.gen.ResultCode;
-import com.twitter.zipkin.gen.Scribe;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import org.apache.commons.codec.binary.Base64;
+import org.apache.htrace.Transport;
import org.apache.htrace.core.AlwaysSampler;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.MilliSpan;
@@ -40,17 +33,22 @@
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
public class TestZipkinSpanReceiver {
- private Tracer newTracer(final Scribe.Iface scribe) {
+ private Tracer newTracer(final Transport transport) {
TracerPool pool = new TracerPool("newTracer");
pool.addReceiver(new ZipkinSpanReceiver(HTraceConfiguration.EMPTY) {
- @Override Scribe.Iface newScribe() {
- return scribe;
+ @Override
+ protected Transport createTransport(HTraceConfiguration conf) {
+ return transport;
}
});
- return new Tracer.Builder().
- name("ZipkinTracer").
+ return new Tracer.Builder("ZipkinTracer").
tracerPool(pool).
conf(HTraceConfiguration.fromKeyValuePairs(
"sampler.classes", AlwaysSampler.class.getName()
@@ -60,8 +58,8 @@
@Test
public void testSimpleTraces() throws IOException, InterruptedException {
- FakeZipkinScribe scribe = new FakeZipkinScribe();
- Tracer tracer = newTracer(scribe);
+ FakeZipkinTransport transport = new FakeZipkinTransport();
+ Tracer tracer = newTracer(transport);
Span rootSpan = new MilliSpan.Builder().
description("root").
spanId(new SpanId(100, 100)).
@@ -72,43 +70,26 @@
TraceScope innerOne = tracer.newScope("innerOne");
TraceScope innerTwo = tracer.newScope("innerTwo");
innerTwo.close();
- Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("innerTwo"));
+ Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("innerTwo"));
innerOne.close();
- Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("innerOne"));
+ Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("innerOne"));
rootSpan.addKVAnnotation("foo", "bar");
rootSpan.addTimelineAnnotation("timeline");
rootScope.close();
- Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("root"));
+ Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("root"));
tracer.close();
}
@Test
public void testConcurrency() throws IOException {
- Scribe.Iface alwaysOk = new Scribe.Iface() {
- @Override
- public ResultCode Log(List<LogEntry> messages) throws TException {
- return ResultCode.OK;
- }
- };
- Tracer tracer = newTracer(alwaysOk);
+ Tracer tracer = newTracer(new FakeZipkinTransport(){
+ @Override public void send(List<byte[]> spans) throws IOException { /*do nothing*/ }
+ });
TraceCreator traceCreator = new TraceCreator(tracer);
traceCreator.createThreadedTrace();
}
- @Test
- public void testResilience() throws IOException {
- Scribe.Iface alwaysTryLater = new Scribe.Iface() {
- @Override
- public ResultCode Log(List<LogEntry> messages) throws TException {
- return ResultCode.TRY_LATER;
- }
- };
- Tracer tracer = newTracer(alwaysTryLater);
- TraceCreator traceCreator = new TraceCreator(tracer);
- traceCreator.createThreadedTrace();
- }
-
- private static class FakeZipkinScribe implements Scribe.Iface {
+ private static class FakeZipkinTransport implements Transport {
private final BlockingQueue<com.twitter.zipkin.gen.Span> receivedSpans =
new ArrayBlockingQueue<com.twitter.zipkin.gen.Span>(1);
@@ -117,19 +98,35 @@
return receivedSpans.take();
}
- @Override
- public ResultCode Log(List<LogEntry> messages) throws TException {
- for (LogEntry message : messages) {
- Assert.assertEquals("zipkin", message.category);
- byte[] bytes = Base64.decodeBase64(message.message);
- TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
- transport.write(bytes);
- com.twitter.zipkin.gen.Span zSpan = new com.twitter.zipkin.gen.Span();
- zSpan.read(new TBinaryProtocol(transport));
- receivedSpans.add(zSpan);
+ @Override
+ public void open(HTraceConfiguration conf) throws IOException {
+
+ }
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ @Override
+ public void send(List<byte[]> spans) throws IOException {
+ for (byte[] message : spans) {
+ TMemoryBuffer transport = new TMemoryBuffer(message.length);
+ try {
+ transport.write(message);
+ com.twitter.zipkin.gen.Span zSpan = new com.twitter.zipkin.gen.Span();
+ zSpan.read(new TBinaryProtocol(transport));
+ receivedSpans.add(zSpan);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
}
- return ResultCode.OK;
+ }
+
+ @Override
+ public void close() throws IOException {
+
}
}
}
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java
new file mode 100644
index 0000000..cb50032
--- /dev/null
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.zipkin;
+
+
+import com.twitter.zipkin.gen.Span;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+import org.apache.htrace.core.TracerPool;
+import org.apache.htrace.impl.KafkaTransport;
+import org.apache.htrace.impl.ZipkinSpanReceiver;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.TestZKUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+import scala.collection.JavaConversions;
+import scala.collection.mutable.Buffer;
+
+public class ITZipkinReceiver {
+
+ @Test
+ public void testKafkaTransport() throws Exception {
+
+ String topic = "zipkin";
+ // Kafka setup
+ EmbeddedZookeeper zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect());
+ ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
+ Properties props = TestUtils.createBrokerConfig(0, TestUtils.choosePort(), false);
+ KafkaConfig config = new KafkaConfig(props);
+ KafkaServer kafkaServer = TestUtils.createServer(config, new MockTime());
+
+ Buffer<KafkaServer> servers = JavaConversions.asScalaBuffer(Collections.singletonList(kafkaServer));
+ TestUtils.createTopic(zkClient, topic, 1, 1, servers, new Properties());
+ zkClient.close();
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 5000);
+
+ // HTrace
+ HTraceConfiguration hTraceConfiguration = HTraceConfiguration.fromKeyValuePairs(
+ "sampler.classes", "AlwaysSampler",
+ "span.receiver.classes", ZipkinSpanReceiver.class.getName(),
+ "zipkin.kafka.metadata.broker.list", config.advertisedHostName() + ":" + config.advertisedPort(),
+ "zipkin.kafka.topic", topic,
+ ZipkinSpanReceiver.TRANSPORT_CLASS_KEY, KafkaTransport.class.getName()
+ );
+
+ final Tracer tracer = new Tracer.Builder("test-tracer")
+ .tracerPool(new TracerPool("test-tracer-pool"))
+ .conf(hTraceConfiguration)
+ .build();
+
+ String scopeName = "test-kafka-transport-scope";
+ TraceScope traceScope = tracer.newScope(scopeName);
+ traceScope.close();
+ tracer.close();
+
+ // Kafka consumer
+ Properties consumerProps = new Properties();
+ consumerProps.put("zookeeper.connect", props.getProperty("zookeeper.connect"));
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testing.group");
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
+ ConsumerConnector connector =
+ kafka.consumer.Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProps));
+ Map<String, Integer> topicCountMap = new HashMap<>();
+ topicCountMap.put(topic, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);
+ ConsumerIterator<byte[], byte[]> it = streams.get(topic).get(0).iterator();
+
+ // Test
+ Assert.assertTrue("We should have one message in Kafka", it.hasNext());
+ Span span = new Span();
+ new TDeserializer(new TBinaryProtocol.Factory()).deserialize(span, it.next().message());
+ Assert.assertEquals("The span name should match our scope description", span.getName(), scopeName);
+
+ kafkaServer.shutdown();
+
+ }
+
+}
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java
similarity index 100%
rename from htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
rename to htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java
diff --git a/htrace-zipkin/src/test/resources/log4j.properties b/htrace-zipkin/src/test/resources/log4j.properties
new file mode 100644
index 0000000..564a77a
--- /dev/null
+++ b/htrace-zipkin/src/test/resources/log4j.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# By default, everything goes to console and file
+log4j.rootLogger=WARN, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+log4j.appender.A1.ImmediateFlush=true
+
+
+log4j.logger.kafka.utils=WARN, A1
+log4j.logger.kafka.consumer=WARN, A1
+log4j.logger.kafka.producer=WARN, A1
+
+log4j.logger.org.apache.htrace=INFO, A1
\ No newline at end of file