HTRACE-213 Add test for ZipkinSpanReceiver (Adrian Cole)
diff --git a/htrace-core/src/main/java/org/apache/htrace/HTraceConfiguration.java b/htrace-core/src/main/java/org/apache/htrace/HTraceConfiguration.java
index f4896e9..4580dff 100644
--- a/htrace-core/src/main/java/org/apache/htrace/HTraceConfiguration.java
+++ b/htrace-core/src/main/java/org/apache/htrace/HTraceConfiguration.java
@@ -47,7 +47,7 @@
     return new MapConf(conf);
   }
 
-  static HTraceConfiguration fromKeyValuePairs(String... pairs) {
+  public static HTraceConfiguration fromKeyValuePairs(String... pairs) {
     if ((pairs.length % 2) != 0) {
       throw new RuntimeException("You must specify an equal number of keys " +
           "and values.");
diff --git a/htrace-core/src/test/java/org/apache/htrace/TestHTrace.java b/htrace-core/src/test/java/org/apache/htrace/TestHTrace.java
index 0ab8f35..13338e9 100644
--- a/htrace-core/src/test/java/org/apache/htrace/TestHTrace.java
+++ b/htrace-core/src/test/java/org/apache/htrace/TestHTrace.java
@@ -16,25 +16,24 @@
  */
 package org.apache.htrace;
 
-import org.apache.htrace.HTraceConfiguration;
-import org.apache.htrace.Span;
-import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.TraceTree;
 import org.apache.htrace.TraceTree.SpansByParent;
 import org.apache.htrace.impl.LocalFileSpanReceiver;
 import org.apache.htrace.impl.POJOSpanReceiver;
 import org.apache.htrace.impl.StandardOutSpanReceiver;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 
 public class TestHTrace {
 
+  @Rule
+  public TraceCreator traceCreator = new TraceCreator();
+
   public static final String SPAN_FILE_FLAG = "spanFile";
 
   /**
@@ -47,8 +46,6 @@
     final int numTraces = 3;
     String fileName = System.getProperty(SPAN_FILE_FLAG);
 
-    Collection<SpanReceiver> rcvrs = new HashSet<SpanReceiver>();
-
     // writes spans to a file if one is provided to maven with
     // -DspanFile="FILENAME", otherwise writes to standard out.
     if (fileName != null) {
@@ -62,58 +59,52 @@
       conf.put("local-file-span-receiver.path", fileName);
       LocalFileSpanReceiver receiver =
           new LocalFileSpanReceiver(HTraceConfiguration.fromMap(conf));
-      rcvrs.add(receiver);
+      traceCreator.addReceiver(receiver);
     } else {
-      rcvrs.add(new StandardOutSpanReceiver(HTraceConfiguration.EMPTY));
+      traceCreator.addReceiver(new StandardOutSpanReceiver(HTraceConfiguration.EMPTY));
     }
 
-    POJOSpanReceiver psr = new POJOSpanReceiver(HTraceConfiguration.EMPTY);
-    rcvrs.add(psr);
-    runTraceCreatorTraces(new TraceCreator(rcvrs));
+    traceCreator.addReceiver(new POJOSpanReceiver(HTraceConfiguration.EMPTY){
+      @Override
+      public void close() {
+        TraceTree traceTree = new TraceTree(getSpans());
+        Collection<Span> roots = traceTree.getSpansByParent().find(0);
+        Assert.assertTrue("Trace tree must have roots", !roots.isEmpty());
+        Assert.assertEquals(numTraces, roots.size());
 
-    for (SpanReceiver receiver : rcvrs) {
-      receiver.close();
-    }
+        Map<String, Span> descriptionToRootSpan = new HashMap<String, Span>();
+        for (Span root : roots) {
+          descriptionToRootSpan.put(root.getDescription(), root);
+        }
 
-    Collection<Span> spans = psr.getSpans();
-    TraceTree traceTree = new TraceTree(spans);
-    Collection<Span> roots = traceTree.getSpansByParent().find(0);
-    Assert.assertTrue("Trace tree must have roots", !roots.isEmpty());
-    Assert.assertEquals(numTraces, roots.size());
+        Assert.assertTrue(descriptionToRootSpan.keySet().contains(
+            TraceCreator.RPC_TRACE_ROOT));
+        Assert.assertTrue(descriptionToRootSpan.keySet().contains(
+            TraceCreator.SIMPLE_TRACE_ROOT));
+        Assert.assertTrue(descriptionToRootSpan.keySet().contains(
+            TraceCreator.THREADED_TRACE_ROOT));
 
-    Map<String, Span> descriptionToRootSpan = new HashMap<String, Span>();
-    for (Span root : roots) {
-      descriptionToRootSpan.put(root.getDescription(), root);
-    }
+        SpansByParent spansByParentId = traceTree.getSpansByParent();
+        Span rpcTraceRoot = descriptionToRootSpan.get(TraceCreator.RPC_TRACE_ROOT);
+        Assert.assertEquals(1, spansByParentId.find(rpcTraceRoot.getSpanId()).size());
 
-    Assert.assertTrue(descriptionToRootSpan.keySet().contains(
-        TraceCreator.RPC_TRACE_ROOT));
-    Assert.assertTrue(descriptionToRootSpan.keySet().contains(
-        TraceCreator.SIMPLE_TRACE_ROOT));
-    Assert.assertTrue(descriptionToRootSpan.keySet().contains(
-        TraceCreator.THREADED_TRACE_ROOT));
+        Span rpcTraceChild1 = spansByParentId.find(rpcTraceRoot.getSpanId())
+            .iterator().next();
+        Assert.assertEquals(1, spansByParentId.find(rpcTraceChild1.getSpanId()).size());
 
-    SpansByParent spansByParentId = traceTree.getSpansByParent();
-    Span rpcTraceRoot = descriptionToRootSpan.get(TraceCreator.RPC_TRACE_ROOT);
-    Assert.assertEquals(1, spansByParentId.find(rpcTraceRoot.getSpanId()).size());
+        Span rpcTraceChild2 = spansByParentId.find(rpcTraceChild1.getSpanId())
+            .iterator().next();
+        Assert.assertEquals(1, spansByParentId.find(rpcTraceChild2.getSpanId()).size());
 
-    Span rpcTraceChild1 = spansByParentId.find(rpcTraceRoot.getSpanId())
-        .iterator().next();
-    Assert.assertEquals(1, spansByParentId.find(rpcTraceChild1.getSpanId()).size());
+        Span rpcTraceChild3 = spansByParentId.find(rpcTraceChild2.getSpanId())
+            .iterator().next();
+        Assert.assertEquals(0, spansByParentId.find(rpcTraceChild3.getSpanId()).size());
+      }
+    });
 
-    Span rpcTraceChild2 = spansByParentId.find(rpcTraceChild1.getSpanId())
-        .iterator().next();
-    Assert.assertEquals(1, spansByParentId.find(rpcTraceChild2.getSpanId()).size());
-
-    Span rpcTraceChild3 = spansByParentId.find(rpcTraceChild2.getSpanId())
-        .iterator().next();
-    Assert.assertEquals(0, spansByParentId.find(rpcTraceChild3.getSpanId()).size());
-  }
-
-  private void runTraceCreatorTraces(TraceCreator tc) {
-    tc.createThreadedTrace();
-    tc.createSimpleTrace();
-    tc.createSampleRpcTrace();
+    traceCreator.createThreadedTrace();
+    traceCreator.createSimpleTrace();
+    traceCreator.createSampleRpcTrace();
   }
 
   @Test(timeout=60000)
diff --git a/htrace-core/src/test/java/org/apache/htrace/TraceCreator.java b/htrace-core/src/test/java/org/apache/htrace/TraceCreator.java
index 7ec6309..565ba05 100644
--- a/htrace-core/src/test/java/org/apache/htrace/TraceCreator.java
+++ b/htrace-core/src/test/java/org/apache/htrace/TraceCreator.java
@@ -16,45 +16,50 @@
  */
 package org.apache.htrace;
 
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
 import java.util.Collection;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.htrace.Sampler;
-import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceInfo;
-import org.apache.htrace.TraceScope;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 /**
  * Does some stuff and traces it.
  */
-public class TraceCreator {
+public class TraceCreator implements TestRule {
+  private final List<SpanReceiver> receivers = new ArrayList<SpanReceiver>();
 
   public static final String RPC_TRACE_ROOT = "createSampleRpcTrace";
   public static final String THREADED_TRACE_ROOT = "createThreadedTrace";
   public static final String SIMPLE_TRACE_ROOT = "createSimpleTrace";
 
-  /**
-   * Takes as input the SpanReceiver that should used as the sink for Spans when
-   * createDemoTrace() is called.
-   *
-   * @param receiver
-   */
-  public TraceCreator(SpanReceiver receiver) {
+  public TraceCreator addReceiver(SpanReceiver receiver) {
     Trace.addReceiver(receiver);
+    this.receivers.add(receiver);
+    return this;
   }
 
-  /**
-   * Takes as input the SpanReceivers that should used as the sink for Spans
-   * when createDemoTrace() is called.
-   *
-   * @param receivers
-   */
-  public TraceCreator(Collection<SpanReceiver> receivers) {
-    for (SpanReceiver receiver : receivers) {
-      Trace.addReceiver(receiver);
-    }
+  @Override
+  public Statement apply(final Statement base, Description description) {
+    return new Statement() {
+      @Override
+      public void evaluate() throws Throwable {
+        try {
+          base.evaluate();
+          for (SpanReceiver receiver : receivers) {
+            receiver.close();
+          }
+        } finally {
+          for (SpanReceiver receiver : receivers) {
+            Trace.removeReceiver(receiver);
+          }
+        }
+      }
+    };
   }
 
   public void createSampleRpcTrace() {
diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
index f7d3840..b7f8863 100644
--- a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
+++ b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
@@ -17,166 +17,69 @@
 
 package org.apache.htrace.impl;
 
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.Assert;
-
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.Server;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcTestUtils;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
-import org.apache.flume.source.avro.Status;
 import org.apache.htrace.HTraceConfiguration;
 import org.apache.htrace.Span;
-import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.Trace;
 import org.apache.htrace.TraceCreator;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
+import java.io.IOException;
+
 public class TestFlumeSpanReceiver {
-  private static final Log LOG = LogFactory.getLog(TestFlumeSpanReceiver.class);
+  @Rule
+  public TraceCreator traceCreator = new TraceCreator();
+  @Rule
+  public FakeFlume flumeServer = new FakeFlume();
 
-  private static final String ROOT_SPAN_DESC = "ROOT";
+  @Test
+  public void testSimpleTraces() throws IOException, InterruptedException {
+    traceCreator.addReceiver(new FlumeSpanReceiver(
+        HTraceConfiguration.fromKeyValuePairs(
+            FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())
+        )
+    ));
 
-  private SpanReceiver spanReceiver;
-  private Server flumeServer;
-  private TraceCreator traceCreator;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
+    Span rootSpan = new MilliSpan.Builder().
+        description("root").
+        traceId(1).
+        spanId(100).
+        processId("test").
+        begin(System.currentTimeMillis()).
+        build();
+    Span innerOne = rootSpan.child("Some good work");
+    Span innerTwo = innerOne.child("Some more good work");
+    innerTwo.stop();
+    Assert.assertTrue(flumeServer.nextEventBodyAsString().contains(innerTwo.getDescription()));
+    innerOne.stop();
+    Assert.assertTrue(flumeServer.nextEventBodyAsString().contains(innerOne.getDescription()));
+    rootSpan.addKVAnnotation("foo", "bar");
+    rootSpan.addTimelineAnnotation("timeline");
+    rootSpan.stop();
+    Assert.assertTrue(flumeServer.nextEventBodyAsString().contains(rootSpan.getDescription()));
   }
 
   @Test
-  public void testSimpleTraces() throws FlumeException,
-      EventDeliveryException, IOException {
-    AvroHandler avroHandler = null;
-    List<Span> spans = null;
-    try {
-      avroHandler = new AvroHandler();
-      startReceiver(null, avroHandler);
-      
-      spans = new ArrayList<Span>();
-      Span rootSpan = new MilliSpan.Builder().
-                  description(ROOT_SPAN_DESC).
-                  traceId(1).
-                  spanId(100).
-                  processId("test").
-                  begin(System.currentTimeMillis()).
-                  build();
-      Span innerOne = rootSpan.child("Some good work");
-      Span innerTwo = innerOne.child("Some more good work");
-      innerTwo.stop();
-      spans.add(innerTwo);
-      innerOne.stop();
-      spans.add(innerOne);
-      rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes());
-      rootSpan.addTimelineAnnotation("timeline");
-      rootSpan.stop();
-      spans.add(rootSpan);
+  public void testConcurrency() throws IOException {
+    traceCreator.addReceiver(new FlumeSpanReceiver(
+        HTraceConfiguration.fromKeyValuePairs(
+            FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())
+        )
+    ));
 
-    } finally {
-      stopReceiver();
-    }
-    List<AvroFlumeEvent> events = avroHandler.getAllEvents();
-    Assert.assertEquals(spans.size(), events.size());
-    for (int i = 0; i < spans.size(); i ++) {
-      String json = new String(events.get(i).getBody().array(), Charset.forName("UTF-8"));
-      Assert.assertTrue(json.contains(spans.get(i).getDescription()));
-    }
+    flumeServer.alwaysOk();
+    traceCreator.createThreadedTrace();
   }
 
   @Test
-  public void testConcurrency() throws FlumeException,
-      EventDeliveryException, IOException {
-    try {
-      Map<String, String> extraConf = new HashMap<String, String>();
-      extraConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, "5");
-      startReceiver(extraConf, new RpcTestUtils.OKAvroHandler());
-      traceCreator.createThreadedTrace();
-    } finally {
-      stopReceiver();
-    }
-  }
+  public void testResilience() throws IOException {
+    traceCreator.addReceiver(new FlumeSpanReceiver(
+        HTraceConfiguration.fromKeyValuePairs(
+            FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())
+        )
+    ));
 
-  @Test
-  public void testResilience() throws FlumeException,
-      EventDeliveryException, IOException {
-    try {
-      startReceiver(null, new RpcTestUtils.FailedAvroHandler());
-      traceCreator.createThreadedTrace();
-    } finally {
-      stopReceiver();
-    }
-  }
-
-  private void startReceiver(Map<String, String> extraConf, AvroSourceProtocol avroHandler) {
-    // Start Flume server
-    Assert.assertNull(flumeServer);
-    flumeServer = RpcTestUtils.startServer(avroHandler);
-
-    // Create and configure span receiver
-    Map<String, String> conf = new HashMap<String, String>();
-    conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1");
-    conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort()));
-    if (extraConf != null) {
-      conf.putAll(extraConf);
-    }
-    
-    spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf));
-
-    // Create trace creator, it will register our receiver
-    traceCreator = new TraceCreator(spanReceiver);
-  }
-
-  private void stopReceiver() throws IOException {
-    // Close span receiver
-    if (spanReceiver != null) {
-      Trace.removeReceiver(spanReceiver);
-      spanReceiver.close();
-      spanReceiver = null;
-    }
-
-    // Close Flume server
-    if (flumeServer != null) {
-      RpcTestUtils.stopServer(flumeServer);
-      flumeServer = null;
-    }
-  }
-  
-  private static class AvroHandler implements AvroSourceProtocol {
-    private ArrayList<AvroFlumeEvent> all_events = new ArrayList<AvroFlumeEvent>();
-    
-    public List<AvroFlumeEvent> getAllEvents() {
-      return new ArrayList<AvroFlumeEvent>(all_events);
-    }
-    
-    @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
-      all_events.add(event);
-      return Status.OK;
-    }
-
-    @Override
-    public Status appendBatch(List<AvroFlumeEvent> events) throws
-        AvroRemoteException {
-      all_events.addAll(events);
-      return Status.OK;
-    }
+    flumeServer.alwaysFail();
+    traceCreator.createThreadedTrace();
   }
 }
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
index bf93220..549fddb 100644
--- a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
+++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
@@ -69,7 +69,7 @@
   public void testHBaseSpanReceiver() {
     Table htable = HBaseTestUtil.createTable(UTIL);
     SpanReceiver receiver = HBaseTestUtil.startReceiver(UTIL);
-    TraceCreator tc = new TraceCreator(receiver);
+    TraceCreator tc = new TraceCreator().addReceiver(receiver);
     tc.createThreadedTrace();
     tc.createSimpleTrace();
     tc.createSampleRpcTrace();
diff --git a/htrace-zipkin/pom.xml b/htrace-zipkin/pom.xml
index 53c21ea..ea10480 100644
--- a/htrace-zipkin/pom.xml
+++ b/htrace-zipkin/pom.xml
@@ -110,6 +110,18 @@
       <groupId>org.apache.htrace</groupId>
       <artifactId>htrace-core</artifactId>
       <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.htrace</groupId>
+      <artifactId>htrace-core</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.7</version>
       <scope>provided</scope>
     </dependency>
     <!-- Global deps. -->
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
index f39b753..c0bacd7 100644
--- a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
@@ -208,7 +208,7 @@
     /**
      * scribe client to push zipkin spans
      */
-    private Scribe.Client scribeClient = null;
+    private Scribe.Iface scribe = null;
     private final ByteArrayOutputStream baos;
     private final TProtocol streamProtocol;
 
@@ -260,7 +260,7 @@
         if (dequeuedSpans.isEmpty()) continue;
 
         // If this is the first time through or there was an error re-connect
-        if (scribeClient == null) {
+        if (scribe == null) {
           startClient();
         }
         // Create a new list every time through so that the list doesn't change underneath
@@ -283,7 +283,7 @@
           }
 
           // Send the entries
-          scribeClient.Log(entries);
+          scribe.Log(entries);
           // clear the list for the next time through.
           dequeuedSpans.clear();
           // reset the error counter.
@@ -320,9 +320,9 @@
      */
     private void closeClient() {
       // close out the transport.
-      if (scribeClient != null) {
-        scribeClient.getInputProtocol().getTransport().close();
-        scribeClient = null;
+      if (scribe != null && scribe instanceof Scribe.Client) {
+        ((Scribe.Client) scribe).getInputProtocol().getTransport().close();
+        scribe = null;
       }
     }
 
@@ -330,19 +330,24 @@
      * Re-connect to Zipkin.
      */
     private void startClient() {
-      if (this.scribeClient == null) {
-        TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
-        try {
-          transport.open();
-        } catch (TTransportException e) {
-          e.printStackTrace();
-        }
-        TProtocol protocol = protocolFactory.getProtocol(transport);
-        this.scribeClient = new Scribe.Client(protocol);
+      if (this.scribe == null) {
+        this.scribe = newScribe();
       }
     }
   }
 
+  // Override for testing
+  Scribe.Iface newScribe() {
+    TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      e.printStackTrace();
+    }
+    TProtocol protocol = protocolFactory.getProtocol(transport);
+    return new Scribe.Client(protocol);
+  }
+
   /**
    * Close the receiver.
    * <p/>
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
index de4a8cd..a6790f5 100644
--- a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.htrace;
+package org.apache.htrace.zipkin;
 
 import com.twitter.zipkin.gen.zipkinCoreConstants;
 
+import org.apache.htrace.HTraceConfiguration;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.impl.MilliSpan;
@@ -57,7 +58,7 @@
     Span innerTwo = innerOne.child("Some more good work");
     innerTwo.stop();
     innerOne.stop();
-    rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes());
+    rootSpan.addKVAnnotation("foo", "bar");
     rootSpan.addTimelineAnnotation("timeline");
     rootSpan.stop();