HTRACE-235. htrace-zipkin - add Kafka transport support (Cosmin Lehene via Colin P. McCabe)
diff --git a/htrace-zipkin/README.md b/htrace-zipkin/README.md
new file mode 100644
index 0000000..6fbee95
--- /dev/null
+++ b/htrace-zipkin/README.md
@@ -0,0 +1,50 @@
+HTrace Zipkin Receiver
+======================
+
+Use the HTrace Java library with [Zipkin](https://github.com/openzipkin/zipkin).
+
+To use, set `"span.receiver.classes", "org.apache.htrace.impl.ZipkinSpanReceiver"`
+in the HTraceConfiguration.  
+
+
+Transports
+----------
+
+The Zipkin receiver supports both the Zipkin Scribe (default) and Kafka transports,
+controlled through the `zipkin.transport.class` configuration.  
+
+Scribe (Thrift) Transport
+-------------------------
+
+### Configuration
+
+Configurations are prefixed with `zipkin.scribe`.
+
+* `zipkin.scribe.hostname`, `localhost`
+* `zipkin.scribe.port`, `9410`
+
+Deprecated (backwards compatibility):
+
+* `zipkin.collector-hostname`, `localhost`
+* `zipkin.collector-port`, `9410`
+
+Kafka Transport
+---------------
+
+To use the Kafka transport, add 
+`"zipkin.transport.class", "org.apache.htrace.impl.KafkaTransport"`
+to the configuration.
+
+### Configuration
+
+Configurations are prefixed with `zipkin.kafka`.  
+
+* `zipkin.kafka.topic`, `zipkin`
+
+Producer specific configurations  
+
+* `zipkin.kafka.metadata.broker.list`, `localhost:9092`
+* `zipkin.kafka.request.required.acks`, `0`
+* `zipkin.kafka.producer.type`, `async`
+* `zipkin.kafka.serializer.class`, `kafka.serializer.DefaultEncoder`
+* `zipkin.kafka.compression.codec`, `1`
diff --git a/htrace-zipkin/pom.xml b/htrace-zipkin/pom.xml
index 8e2c8a1..77bba02 100644
--- a/htrace-zipkin/pom.xml
+++ b/htrace-zipkin/pom.xml
@@ -28,6 +28,8 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <slf4j.version>1.5.8</slf4j.version>
+    <kafka.version>0.8.2.1</kafka.version>
+    <scala.version>2.11</scala.version>
   </properties>
 
   <build>
@@ -58,30 +60,52 @@
             <phase>package</phase>
             <configuration>
               <relocations>
+
                 <relocation>
                   <pattern>org.apache.commons.logging</pattern>
                   <shadedPattern>org.apache.htrace.commons.logging</shadedPattern>
                 </relocation>
+
                 <relocation>
                   <pattern>org.apache.thrift</pattern>
                   <shadedPattern>org.apache.htrace.thrift</shadedPattern>
                 </relocation>
+
                 <relocation>
                   <pattern>org.slf4j</pattern>
                   <shadedPattern>org.apache.htrace.slf4j</shadedPattern>
                 </relocation>
+
                 <relocation>
                   <pattern>org.apache.commons.codec</pattern>
                   <shadedPattern>org.apache.htrace.commons.codec</shadedPattern>
                 </relocation>
+
                 <relocation>
                   <pattern>org.apache.commons.lang</pattern>
                   <shadedPattern>org.apache.htrace.commons.lang</shadedPattern>
                 </relocation>
+
                 <relocation>
                   <pattern>org.apache.http</pattern>
                   <shadedPattern>org.apache.htrace.http</shadedPattern>
                 </relocation>
+
+                <relocation>
+                  <pattern>org.apache.kafka</pattern>
+                  <shadedPattern>org.apache.htrace.kafka</shadedPattern>
+                </relocation>
+
+                <relocation>
+                  <pattern>kafka</pattern>
+                  <shadedPattern>org.apache.htrace.kafka</shadedPattern>
+                </relocation>
+
+                <relocation>
+                  <pattern>org.I0Itec</pattern>
+                  <shadedPattern>org.apache.htrace.I0Itec</shadedPattern>
+                </relocation>
+
               </relocations>
             </configuration>
             <goals>
@@ -153,6 +177,27 @@
       <artifactId>commons-codec</artifactId>
       <version>1.7</version>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.version}</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.version}</artifactId>
+      <version>${kafka.version}</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <profiles>
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java b/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java
new file mode 100644
index 0000000..dd55bea
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace;
+
+import org.apache.htrace.core.HTraceConfiguration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Span data transport interface.
+ */
+public interface Transport extends Closeable {
+
+  /**
+   * Open connection to Transport endpoint
+   * @param conf Transport configuration
+   * @throws IOException if an I/O error occurs
+   */
+  void open(HTraceConfiguration conf) throws IOException;
+
+  /**
+   * Checks if the transport in use is open
+   * @return whether the transport is open
+   */
+  boolean isOpen();
+
+  /**
+   * Sends the list of objects to the transport endpoint
+   * @param spans to be sent
+   * @throws IOException if an I/O error occurs
+   */
+  void send(List<byte[]> spans) throws IOException;
+}
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java
new file mode 100644
index 0000000..b352f0c
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.Transport;
+import org.apache.htrace.core.HTraceConfiguration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+public class KafkaTransport implements Transport {
+
+  private static final Log LOG = LogFactory.getLog(KafkaTransport.class);
+  private static final String DEFAULT_TOPIC = "zipkin";
+  public static final String TOPIC_KEY = "zipkin.kafka.topic";
+
+  Producer<byte[], byte[]> producer;
+  private boolean isOpen = false;
+  private String topic;
+
+  /**
+   * Opens a new Kafka transport
+   * @param conf Transport configuration. Some Kafka producer configurations
+   *             can be passed by prefixing the config key with zipkin.kafka
+   *             (e.g. zipkin.kafka.producer.type for producer.type)
+   * @throws IOException if an I/O error occurs
+   * @throws IllegalStateException if transport is already open
+   */
+  @Override
+  public void open(HTraceConfiguration conf) throws IOException,
+                                                    IllegalStateException {
+    if (!isOpen()) {
+      topic = conf.get(TOPIC_KEY, DEFAULT_TOPIC);
+      producer = newProducer(conf);
+      isOpen = true;
+    } else {
+      LOG.warn("Attempted to open an already opened transport");
+    }
+  }
+
+  @Override
+  public boolean isOpen() {
+    return isOpen;
+  }
+
+  @Override
+  public void send(List<byte[]> spans) throws IOException {
+
+    List<KeyedMessage<byte[], byte[]>> entries = new ArrayList<>(spans.size());
+
+    for (byte[] span : spans) {
+      entries.add(new KeyedMessage<byte[], byte[]>(topic, span));
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("sending " + entries.size() + " entries");
+    }
+    producer.send(entries);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (isOpen) {
+      producer.close();
+      isOpen = false;
+    } else {
+      LOG.warn("Attempted to close already closed transport");
+    }
+  }
+
+  public Producer<byte[], byte[]> newProducer(HTraceConfiguration conf) {
+    // https://kafka.apache.org/083/configuration.html
+    Properties producerProps = new Properties();
+    // Essential producer configurations
+    producerProps.put("metadata.broker.list",
+                      conf.get("zipkin.kafka.metadata.broker.list", "localhost:9092"));
+    producerProps.put("request.required.acks",
+                      conf.get("zipkin.kafka.request.required.acks", "0"));
+    producerProps.put("producer.type",
+                      conf.get("zipkin.kafka.producer.type", "async"));
+    producerProps.put("serializer.class",
+                      conf.get("zipkin.kafka.serializer.class", "kafka.serializer.DefaultEncoder"));
+    producerProps.put("compression.codec",
+                      conf.get("zipkin.kafka.compression.codec", "1"));
+
+    Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(producerProps));
+    LOG.info("Connected to Kafka transport \n" +  producerProps);
+    return producer;
+  }
+
+}
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java
new file mode 100644
index 0000000..0fc7920
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import com.twitter.zipkin.gen.LogEntry;
+import com.twitter.zipkin.gen.Scribe;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.Transport;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ScribeTransport implements Transport {
+
+  /**
+   * this is used to tell scribe that the entries are for zipkin..
+   */
+  public static final String CATEGORY = "zipkin";
+
+
+  private static final Log LOG = LogFactory.getLog(ScribeTransport.class);
+  /**
+   * Default hostname to fall back on.
+   */
+  private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
+  public static final String DEPRECATED_HOSTNAME_KEY = "zipkin.collector-hostname";
+  public static final String HOSTNAME_KEY = "zipkin.scribe.hostname";
+
+  /**
+   * Default collector port.
+   */
+  private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port.
+  public static final String DEPRECATED_PORT_KEY = "zipkin.collector-port";
+  public static final String PORT_KEY = "zipkin.scribe.port";
+
+  private Scribe.Iface scribe = null;
+
+  @Override
+  public void open(HTraceConfiguration conf) throws IOException {
+    if (!isOpen()) {
+      checkDeprecation(conf, DEPRECATED_HOSTNAME_KEY, HOSTNAME_KEY);
+      checkDeprecation(conf, DEPRECATED_PORT_KEY, PORT_KEY);
+
+      String collectorHostname = conf.get(HOSTNAME_KEY,
+                                          conf.get(DEPRECATED_HOSTNAME_KEY,
+                                                   DEFAULT_COLLECTOR_HOSTNAME));
+      int collectorPort = conf.getInt(PORT_KEY,
+                                      conf.getInt(DEPRECATED_PORT_KEY,
+                                                  DEFAULT_COLLECTOR_PORT));
+      scribe = newScribe(collectorHostname, collectorPort);
+      LOG.info("Opened transport " + collectorHostname + ":" + collectorPort);
+    } else {
+      LOG.warn("Attempted to open an already opened transport");
+    }
+  }
+
+  private void checkDeprecation(HTraceConfiguration conf, String deprecatedKey,
+                                String newKey) {
+    if (conf.get(deprecatedKey) != null) {
+      LOG.warn("Configuration \"" + deprecatedKey + "\" is deprecated. Use \"" +
+               newKey + "\" instead.");
+    }
+  }
+
+  @Override
+  public boolean isOpen() {
+    return scribe != null
+           && ((Scribe.Client) scribe).getInputProtocol().getTransport().isOpen();
+  }
+
+  /**
+   * The Scribe client which is used for rpc writes a list of
+   * LogEntry objects, so the span objects are first transformed into LogEntry objects before
+   * sending to the zipkin-collector.
+   *
+   * Here is a little ascii art which shows the above transformation:
+   * <pre>
+   *  +------------+   +------------+   +------------+              +-----------------+
+   *  | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
+   *  +------------+   +------------+   +------------+ (Scribe RPC) +-----------------+
+   *  </pre>
+   * @param spans to be sent. The raw bytes are being sent.
+   * @throws IOException
+   */
+  @Override
+  public void send(List<byte[]> spans) throws IOException {
+
+    ArrayList<LogEntry> entries = new ArrayList<LogEntry>(spans.size());
+    for (byte[] span : spans) {
+      entries.add(new LogEntry(CATEGORY, Base64.encodeBase64String(span)));
+    }
+
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("sending " + entries.size() + " entries");
+      }
+      scribe.Log(entries); // TODO (clehene) should we instead interpret the return?
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (scribe != null) {
+      ((Scribe.Client) scribe).getInputProtocol().getTransport().close();
+      scribe = null;
+      LOG.info("Closed transport");
+    } else {
+      LOG.warn("Attempted to close an already closed transport");
+    }
+  }
+
+  private Scribe.Iface newScribe(String collectorHostname,
+                                 int collectorPort)
+      throws IOException {
+
+    TTransport transport = new TFramedTransport(
+        new TSocket(collectorHostname, collectorPort));
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      throw new IOException(e);
+    }
+    TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
+    TProtocol protocol = factory.getProtocol(transport);
+    return new Scribe.Client(protocol);
+  }
+
+}
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
index c106fa8..2dfe5a6 100644
--- a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
@@ -17,12 +17,10 @@
 
 package org.apache.htrace.impl;
 
-import com.twitter.zipkin.gen.LogEntry;
-import com.twitter.zipkin.gen.Scribe;
-
-import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.Transport;
 import org.apache.htrace.core.HTraceConfiguration;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.SpanReceiver;
@@ -30,27 +28,25 @@
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Zipkin is an open source tracing library. This span receiver acts as a bridge between HTrace and
@@ -58,34 +54,22 @@
  * <p/>
  * HTrace spans are queued into a blocking queue.  From there background worker threads will
  * batch the spans together and then send them through to a Zipkin collector.
+ *
+ * Pluggable Zipkin transports are supported through the "zipkin.transport.class" configuration
+ * Implementations for Scribe (ScribeTransport) (default) and Kafka (KafkaTransport) are available
+ *
  */
 public class ZipkinSpanReceiver extends SpanReceiver {
+
   private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class);
 
   /**
-   * Default hostname to fall back on.
-   */
-  private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
-  public static final String HOSTNAME_KEY = "zipkin.collector-hostname";
-
-  /**
-   * Default collector port.
-   */
-  private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port.
-  public static final String PORT_KEY = "zipkin.collector-port";
-
-  /**
    * Default number of threads to use.
    */
   private static final int DEFAULT_NUM_THREAD = 1;
   public static final String NUM_THREAD_KEY = "zipkin.num-threads";
 
   /**
-   * this is used to tell scribe that the entries are for zipkin..
-   */
-  private static final String CATEGORY = "zipkin";
-
-  /**
    * Whether the service which is traced is in client or a server mode. It is used while creating
    * the Endpoint.
    */
@@ -106,6 +90,14 @@
    */
   private static final int MAX_ERRORS = 10;
 
+  private static final String DEFAULT_TRANSPORT_CLASS = "org.apache.htrace.impl.ScribeTransport";
+  public static final String TRANSPORT_CLASS_KEY = "zipkin.transport.class";
+
+  /**
+   * The transport that the spans will be sent trough
+   */
+  private Transport transport;
+
   /**
    * The queue that will get all HTrace spans that are to be sent.
    */
@@ -146,20 +138,37 @@
   private HTraceToZipkinConverter converter;
   private ExecutorService service;
   private HTraceConfiguration conf;
-  private String collectorHostname;
-  private int collectorPort;
 
   public ZipkinSpanReceiver(HTraceConfiguration conf) {
+    this.transport = createTransport(conf);
     this.queue = new ArrayBlockingQueue<Span>(1000);
     this.protocolFactory = new TBinaryProtocol.Factory();
     configure(conf);
   }
 
+  private void logAndThrow(Throwable exception) {
+    LOG.error(ExceptionUtils.getStackTrace(exception));
+    throw new RuntimeException(exception);
+  }
+
+  protected Transport createTransport(HTraceConfiguration conf) {
+    ClassLoader classLoader = Builder.class.getClassLoader();
+    String className = conf.get(TRANSPORT_CLASS_KEY, DEFAULT_TRANSPORT_CLASS);
+    Transport transport = null;
+    try {
+      Class<?> cls = classLoader.loadClass(className);
+      transport = (Transport)cls.newInstance();
+    } catch (ClassNotFoundException
+        | InstantiationException
+        | IllegalAccessException e) {
+      logAndThrow(e);
+    }
+    return transport;
+  }
+
   private void configure(HTraceConfiguration conf) {
     this.conf = conf;
 
-    this.collectorHostname = conf.get(HOSTNAME_KEY, DEFAULT_COLLECTOR_HOSTNAME);
-    this.collectorPort = conf.getInt(PORT_KEY, DEFAULT_COLLECTOR_PORT);
 
     // initialize the endpoint. This endpoint is used while writing the Span.
     initConverter();
@@ -186,26 +195,23 @@
     InetAddress tracedServiceHostname = null;
     // Try and get the hostname.  If it's not configured try and get the local hostname.
     try {
+      //TODO (clehene) extract conf to constant
+      //TODO (clehene) has this been deprecated?
       String host = conf.get("zipkin.traced-service-hostname",
           InetAddress.getLocalHost().getHostAddress());
-
       tracedServiceHostname = InetAddress.getByName(host);
     } catch (UnknownHostException e) {
       LOG.error("Couldn't get the localHost address", e);
     }
     short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", -1);
     byte[] address = tracedServiceHostname != null
-        ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes();
+        ? tracedServiceHostname.getAddress() : InetAddress.getLoopbackAddress().getAddress();
     int ipv4 = ByteBuffer.wrap(address).getInt();
     this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort);
   }
 
 
   private class WriteSpanRunnable implements Runnable {
-    /**
-     * scribe client to push zipkin spans
-     */
-    private Scribe.Iface scribe = null;
     private final ByteArrayOutputStream baos;
     private final TProtocol streamProtocol;
 
@@ -215,16 +221,15 @@
     }
 
     /**
-     * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin
-     * collector as a thrift object. The scribe client which is used for rpc writes a list of
-     * LogEntry objects, so the span objects are first transformed into LogEntry objects before
-     * sending to the zipkin-collector.
+     *
+     * This runnable converts an HTrace span to a Zipkin span and sends it across the transport
+     * as a Thrift object.
      * <p/>
      * Here is a little ascii art which shows the above transformation:
      * <pre>
-     *  +------------+   +------------+   +------------+              +-----------------+
-     *  | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
-     *  +------------+   +------------+   +------------+ (Scribe rpc) +-----------------+
+     *  +------------+   +------------+              +-----------------+
+     *  | HTrace Span|-->|Zipkin Span | ===========> | Zipkin Collector|
+     *  +------------+   +------------+ (transport)  +-----------------+
      *  </pre>
      */
     @Override
@@ -236,6 +241,7 @@
 
       while (running.get() || queue.size() > 0) {
         Span firstSpan = null;
+        //TODO (clenene) the following code (try / catch) is duplicated in / from FlumeSpanReceiver
         try {
           // Block for up to a second. to try and get a span.
           // We only block for a little bit in order to notice if the running value has changed
@@ -256,13 +262,17 @@
 
         if (dequeuedSpans.isEmpty()) continue;
 
-        // If this is the first time through or there was an error re-connect
-        if (scribe == null) {
-          startClient();
+        if (!transport.isOpen()) {
+          try {
+            transport.open(conf);
+          } catch (Throwable e) {
+            logAndThrow(e);
+          }
         }
+
         // Create a new list every time through so that the list doesn't change underneath
         // thrift as it's sending.
-        List<LogEntry> entries = new ArrayList<LogEntry>(dequeuedSpans.size());
+        List<byte[]> entries = new ArrayList<>(dequeuedSpans.size());
         try {
           // Convert every de-queued span
           for (Span htraceSpan : dequeuedSpans) {
@@ -273,76 +283,57 @@
             // Write the span to a BAOS
             zipkinSpan.write(streamProtocol);
 
-            // Do Base64 encoding and put the string into a log entry.
-            LogEntry logEntry =
-                new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray()));
-            entries.add(logEntry);
+            entries.add(baos.toByteArray());
           }
 
           // Send the entries
-          scribe.Log(entries);
+          transport.send(entries);
+
           // clear the list for the next time through.
           dequeuedSpans.clear();
           // reset the error counter.
           errorCount = 0;
         } catch (Exception e) {
-          LOG.error("Error when writing to the zipkin collector: " +
-              collectorHostname + ":" + collectorPort, e);
-
-          errorCount += 1;
-          // If there have been ten errors in a row start dropping things.
-          if (errorCount < MAX_ERRORS) {
-            try {
-              queue.addAll(dequeuedSpans);
-            } catch (IllegalStateException ex) {
-              LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full");
-            }
-          }
-
-          closeClient();
-          try {
-            // Since there was an error sleep just a little bit to try and allow the
-            // zipkin collector some time to recover.
-            Thread.sleep(500);
-          } catch (InterruptedException e1) {
-            // Ignored
-          }
+          errorCount = handleException(dequeuedSpans, errorCount, e);
         }
       }
       closeClient();
     }
 
+    private long handleException(List<Span> dequeuedSpans, long errorCount, Exception e) {
+      LOG.error("Error when writing to the zipkin transport: " + transport, e);
+
+      errorCount += 1;
+      // If there have been ten errors in a row start dropping things.
+      if (errorCount < MAX_ERRORS) {
+        try {
+          queue.addAll(dequeuedSpans);
+        } catch (IllegalStateException ex) {
+          LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full");
+        }
+      }
+      closeClient();
+      try {
+        // Since there was an error sleep just a little bit to try and allow the
+        // zipkin collector some time to recover.
+        Thread.sleep(500);
+      } catch (InterruptedException e1) {
+        // Ignored
+      }
+      return errorCount;
+    }
+
     /**
      * Close out the connection.
      */
-    private void closeClient() {
-      // close out the transport.
-      if (scribe != null && scribe instanceof Scribe.Client) {
-        ((Scribe.Client) scribe).getInputProtocol().getTransport().close();
-        scribe = null;
+    private void closeClient(){
+      try {
+        transport.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close transport", e);
       }
     }
 
-    /**
-     * Re-connect to Zipkin.
-     */
-    private void startClient() {
-      if (this.scribe == null) {
-        this.scribe = newScribe();
-      }
-    }
-  }
-
-  // Override for testing
-  Scribe.Iface newScribe() {
-    TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
-    try {
-      transport.open();
-    } catch (TTransportException e) {
-      e.printStackTrace();
-    }
-    TProtocol protocol = protocolFactory.getProtocol(transport);
-    return new Scribe.Client(protocol);
   }
 
   /**
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java b/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
index 6595772..0a22d68 100644
--- a/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
@@ -17,14 +17,7 @@
 
 package org.apache.htrace.impl;
 
-import com.twitter.zipkin.gen.LogEntry;
-import com.twitter.zipkin.gen.ResultCode;
-import com.twitter.zipkin.gen.Scribe;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import org.apache.commons.codec.binary.Base64;
+import org.apache.htrace.Transport;
 import org.apache.htrace.core.AlwaysSampler;
 import org.apache.htrace.core.HTraceConfiguration;
 import org.apache.htrace.core.MilliSpan;
@@ -40,17 +33,22 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
 public class TestZipkinSpanReceiver {
 
-  private Tracer newTracer(final Scribe.Iface scribe) {
+  private Tracer newTracer(final Transport transport) {
     TracerPool pool = new TracerPool("newTracer");
     pool.addReceiver(new ZipkinSpanReceiver(HTraceConfiguration.EMPTY) {
-      @Override Scribe.Iface newScribe() {
-        return scribe;
+      @Override
+      protected Transport createTransport(HTraceConfiguration conf) {
+        return transport;
       }
     });
-    return new Tracer.Builder().
-        name("ZipkinTracer").
+    return new Tracer.Builder("ZipkinTracer").
         tracerPool(pool).
         conf(HTraceConfiguration.fromKeyValuePairs(
             "sampler.classes", AlwaysSampler.class.getName()
@@ -60,8 +58,8 @@
 
   @Test
   public void testSimpleTraces() throws IOException, InterruptedException {
-    FakeZipkinScribe scribe = new FakeZipkinScribe();
-    Tracer tracer = newTracer(scribe);
+    FakeZipkinTransport transport = new FakeZipkinTransport();
+    Tracer tracer = newTracer(transport);
     Span rootSpan = new MilliSpan.Builder().
         description("root").
         spanId(new SpanId(100, 100)).
@@ -72,43 +70,26 @@
     TraceScope innerOne = tracer.newScope("innerOne");
     TraceScope innerTwo = tracer.newScope("innerTwo");
     innerTwo.close();
-    Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("innerTwo"));
+    Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("innerTwo"));
     innerOne.close();
-    Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("innerOne"));
+    Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("innerOne"));
     rootSpan.addKVAnnotation("foo", "bar");
     rootSpan.addTimelineAnnotation("timeline");
     rootScope.close();
-    Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("root"));
+    Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("root"));
     tracer.close();
   }
 
   @Test
   public void testConcurrency() throws IOException {
-    Scribe.Iface alwaysOk = new Scribe.Iface() {
-      @Override
-      public ResultCode Log(List<LogEntry> messages) throws TException {
-        return ResultCode.OK;
-      }
-    };
-    Tracer tracer = newTracer(alwaysOk);
+    Tracer tracer = newTracer(new FakeZipkinTransport(){
+      @Override public void send(List<byte[]> spans) throws IOException { /*do nothing*/ }
+    });
     TraceCreator traceCreator = new TraceCreator(tracer);
     traceCreator.createThreadedTrace();
   }
 
-  @Test
-  public void testResilience() throws IOException {
-    Scribe.Iface alwaysTryLater = new Scribe.Iface() {
-      @Override
-      public ResultCode Log(List<LogEntry> messages) throws TException {
-        return ResultCode.TRY_LATER;
-      }
-    };
-    Tracer tracer = newTracer(alwaysTryLater);
-    TraceCreator traceCreator = new TraceCreator(tracer);
-    traceCreator.createThreadedTrace();
-  }
-
-  private static class FakeZipkinScribe implements Scribe.Iface {
+  private static class FakeZipkinTransport implements Transport {
 
     private final BlockingQueue<com.twitter.zipkin.gen.Span> receivedSpans =
         new ArrayBlockingQueue<com.twitter.zipkin.gen.Span>(1);
@@ -117,19 +98,35 @@
       return receivedSpans.take();
     }
 
-    @Override
-    public ResultCode Log(List<LogEntry> messages) throws TException {
-      for (LogEntry message : messages) {
-        Assert.assertEquals("zipkin", message.category);
-        byte[] bytes = Base64.decodeBase64(message.message);
 
-        TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
-        transport.write(bytes);
-        com.twitter.zipkin.gen.Span zSpan = new com.twitter.zipkin.gen.Span();
-        zSpan.read(new TBinaryProtocol(transport));
-        receivedSpans.add(zSpan);
+    @Override
+    public void open(HTraceConfiguration conf) throws IOException {
+
+    }
+
+    @Override
+    public boolean isOpen() {
+      return false;
+    }
+
+    @Override
+    public void send(List<byte[]> spans) throws IOException {
+      for (byte[] message : spans) {
+        TMemoryBuffer transport = new TMemoryBuffer(message.length);
+        try {
+          transport.write(message);
+          com.twitter.zipkin.gen.Span zSpan = new com.twitter.zipkin.gen.Span();
+          zSpan.read(new TBinaryProtocol(transport));
+          receivedSpans.add(zSpan);
+        } catch (TException e) {
+          throw new IOException(e);
+        }
       }
-      return ResultCode.OK;
+    }
+
+    @Override
+    public void close() throws IOException {
+
     }
   }
 }
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java
new file mode 100644
index 0000000..cb50032
--- /dev/null
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.zipkin;
+
+
+import com.twitter.zipkin.gen.Span;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+import org.apache.htrace.core.TracerPool;
+import org.apache.htrace.impl.KafkaTransport;
+import org.apache.htrace.impl.ZipkinSpanReceiver;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.TestZKUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+import scala.collection.JavaConversions;
+import scala.collection.mutable.Buffer;
+
+public class ITZipkinReceiver {
+
+  @Test
+  public void testKafkaTransport() throws Exception {
+
+    String topic = "zipkin";
+    // Kafka setup
+    EmbeddedZookeeper zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect());
+    ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
+    Properties props = TestUtils.createBrokerConfig(0, TestUtils.choosePort(), false);
+    KafkaConfig config = new KafkaConfig(props);
+    KafkaServer kafkaServer = TestUtils.createServer(config, new MockTime());
+
+    Buffer<KafkaServer> servers = JavaConversions.asScalaBuffer(Collections.singletonList(kafkaServer));
+    TestUtils.createTopic(zkClient, topic, 1, 1, servers, new Properties());
+    zkClient.close();
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 5000);
+
+    // HTrace
+    HTraceConfiguration hTraceConfiguration = HTraceConfiguration.fromKeyValuePairs(
+        "sampler.classes", "AlwaysSampler",
+        "span.receiver.classes", ZipkinSpanReceiver.class.getName(),
+        "zipkin.kafka.metadata.broker.list", config.advertisedHostName() + ":" + config.advertisedPort(),
+        "zipkin.kafka.topic", topic,
+        ZipkinSpanReceiver.TRANSPORT_CLASS_KEY, KafkaTransport.class.getName()
+    );
+
+    final Tracer tracer = new Tracer.Builder("test-tracer")
+        .tracerPool(new TracerPool("test-tracer-pool"))
+        .conf(hTraceConfiguration)
+        .build();
+
+    String scopeName = "test-kafka-transport-scope";
+    TraceScope traceScope = tracer.newScope(scopeName);
+    traceScope.close();
+    tracer.close();
+
+    // Kafka consumer
+    Properties consumerProps = new Properties();
+    consumerProps.put("zookeeper.connect", props.getProperty("zookeeper.connect"));
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testing.group");
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
+    ConsumerConnector connector =
+        kafka.consumer.Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProps));
+    Map<String, Integer> topicCountMap = new HashMap<>();
+    topicCountMap.put(topic, 1);
+    Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);
+    ConsumerIterator<byte[], byte[]> it = streams.get(topic).get(0).iterator();
+
+    // Test
+    Assert.assertTrue("We should have one message in Kafka", it.hasNext());
+    Span span = new Span();
+    new TDeserializer(new TBinaryProtocol.Factory()).deserialize(span, it.next().message());
+    Assert.assertEquals("The span name should match our scope description", span.getName(), scopeName);
+
+    kafkaServer.shutdown();
+
+  }
+
+}
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java
similarity index 100%
rename from htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
rename to htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java
diff --git a/htrace-zipkin/src/test/resources/log4j.properties b/htrace-zipkin/src/test/resources/log4j.properties
new file mode 100644
index 0000000..564a77a
--- /dev/null
+++ b/htrace-zipkin/src/test/resources/log4j.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# By default, everything goes to console and file
+log4j.rootLogger=WARN, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+log4j.appender.A1.ImmediateFlush=true
+
+
+log4j.logger.kafka.utils=WARN, A1
+log4j.logger.kafka.consumer=WARN, A1
+log4j.logger.kafka.producer=WARN, A1
+
+log4j.logger.org.apache.htrace=INFO, A1
\ No newline at end of file