HTRACE-213 Add test for ZipkinSpanReceiver (Adrian Cole)
diff --git a/htrace-core/src/main/java/org/apache/htrace/HTraceConfiguration.java b/htrace-core/src/main/java/org/apache/htrace/HTraceConfiguration.java
index f4896e9..4580dff 100644
--- a/htrace-core/src/main/java/org/apache/htrace/HTraceConfiguration.java
+++ b/htrace-core/src/main/java/org/apache/htrace/HTraceConfiguration.java
@@ -47,7 +47,7 @@
return new MapConf(conf);
}
- static HTraceConfiguration fromKeyValuePairs(String... pairs) {
+ public static HTraceConfiguration fromKeyValuePairs(String... pairs) {
if ((pairs.length % 2) != 0) {
throw new RuntimeException("You must specify an equal number of keys " +
"and values.");
diff --git a/htrace-core/src/test/java/org/apache/htrace/TestHTrace.java b/htrace-core/src/test/java/org/apache/htrace/TestHTrace.java
index 0ab8f35..13338e9 100644
--- a/htrace-core/src/test/java/org/apache/htrace/TestHTrace.java
+++ b/htrace-core/src/test/java/org/apache/htrace/TestHTrace.java
@@ -16,25 +16,24 @@
*/
package org.apache.htrace;
-import org.apache.htrace.HTraceConfiguration;
-import org.apache.htrace.Span;
-import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.TraceTree;
import org.apache.htrace.TraceTree.SpansByParent;
import org.apache.htrace.impl.LocalFileSpanReceiver;
import org.apache.htrace.impl.POJOSpanReceiver;
import org.apache.htrace.impl.StandardOutSpanReceiver;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
public class TestHTrace {
+ @Rule
+ public TraceCreator traceCreator = new TraceCreator();
+
public static final String SPAN_FILE_FLAG = "spanFile";
/**
@@ -47,8 +46,6 @@
final int numTraces = 3;
String fileName = System.getProperty(SPAN_FILE_FLAG);
- Collection<SpanReceiver> rcvrs = new HashSet<SpanReceiver>();
-
// writes spans to a file if one is provided to maven with
// -DspanFile="FILENAME", otherwise writes to standard out.
if (fileName != null) {
@@ -62,58 +59,52 @@
conf.put("local-file-span-receiver.path", fileName);
LocalFileSpanReceiver receiver =
new LocalFileSpanReceiver(HTraceConfiguration.fromMap(conf));
- rcvrs.add(receiver);
+ traceCreator.addReceiver(receiver);
} else {
- rcvrs.add(new StandardOutSpanReceiver(HTraceConfiguration.EMPTY));
+ traceCreator.addReceiver(new StandardOutSpanReceiver(HTraceConfiguration.EMPTY));
}
- POJOSpanReceiver psr = new POJOSpanReceiver(HTraceConfiguration.EMPTY);
- rcvrs.add(psr);
- runTraceCreatorTraces(new TraceCreator(rcvrs));
+ traceCreator.addReceiver(new POJOSpanReceiver(HTraceConfiguration.EMPTY){
+ @Override
+ public void close() {
+ TraceTree traceTree = new TraceTree(getSpans());
+ Collection<Span> roots = traceTree.getSpansByParent().find(0);
+ Assert.assertTrue("Trace tree must have roots", !roots.isEmpty());
+ Assert.assertEquals(numTraces, roots.size());
- for (SpanReceiver receiver : rcvrs) {
- receiver.close();
- }
+ Map<String, Span> descriptionToRootSpan = new HashMap<String, Span>();
+ for (Span root : roots) {
+ descriptionToRootSpan.put(root.getDescription(), root);
+ }
- Collection<Span> spans = psr.getSpans();
- TraceTree traceTree = new TraceTree(spans);
- Collection<Span> roots = traceTree.getSpansByParent().find(0);
- Assert.assertTrue("Trace tree must have roots", !roots.isEmpty());
- Assert.assertEquals(numTraces, roots.size());
+ Assert.assertTrue(descriptionToRootSpan.keySet().contains(
+ TraceCreator.RPC_TRACE_ROOT));
+ Assert.assertTrue(descriptionToRootSpan.keySet().contains(
+ TraceCreator.SIMPLE_TRACE_ROOT));
+ Assert.assertTrue(descriptionToRootSpan.keySet().contains(
+ TraceCreator.THREADED_TRACE_ROOT));
- Map<String, Span> descriptionToRootSpan = new HashMap<String, Span>();
- for (Span root : roots) {
- descriptionToRootSpan.put(root.getDescription(), root);
- }
+ SpansByParent spansByParentId = traceTree.getSpansByParent();
+ Span rpcTraceRoot = descriptionToRootSpan.get(TraceCreator.RPC_TRACE_ROOT);
+ Assert.assertEquals(1, spansByParentId.find(rpcTraceRoot.getSpanId()).size());
- Assert.assertTrue(descriptionToRootSpan.keySet().contains(
- TraceCreator.RPC_TRACE_ROOT));
- Assert.assertTrue(descriptionToRootSpan.keySet().contains(
- TraceCreator.SIMPLE_TRACE_ROOT));
- Assert.assertTrue(descriptionToRootSpan.keySet().contains(
- TraceCreator.THREADED_TRACE_ROOT));
+ Span rpcTraceChild1 = spansByParentId.find(rpcTraceRoot.getSpanId())
+ .iterator().next();
+ Assert.assertEquals(1, spansByParentId.find(rpcTraceChild1.getSpanId()).size());
- SpansByParent spansByParentId = traceTree.getSpansByParent();
- Span rpcTraceRoot = descriptionToRootSpan.get(TraceCreator.RPC_TRACE_ROOT);
- Assert.assertEquals(1, spansByParentId.find(rpcTraceRoot.getSpanId()).size());
+ Span rpcTraceChild2 = spansByParentId.find(rpcTraceChild1.getSpanId())
+ .iterator().next();
+ Assert.assertEquals(1, spansByParentId.find(rpcTraceChild2.getSpanId()).size());
- Span rpcTraceChild1 = spansByParentId.find(rpcTraceRoot.getSpanId())
- .iterator().next();
- Assert.assertEquals(1, spansByParentId.find(rpcTraceChild1.getSpanId()).size());
+ Span rpcTraceChild3 = spansByParentId.find(rpcTraceChild2.getSpanId())
+ .iterator().next();
+ Assert.assertEquals(0, spansByParentId.find(rpcTraceChild3.getSpanId()).size());
+ }
+ });
- Span rpcTraceChild2 = spansByParentId.find(rpcTraceChild1.getSpanId())
- .iterator().next();
- Assert.assertEquals(1, spansByParentId.find(rpcTraceChild2.getSpanId()).size());
-
- Span rpcTraceChild3 = spansByParentId.find(rpcTraceChild2.getSpanId())
- .iterator().next();
- Assert.assertEquals(0, spansByParentId.find(rpcTraceChild3.getSpanId()).size());
- }
-
- private void runTraceCreatorTraces(TraceCreator tc) {
- tc.createThreadedTrace();
- tc.createSimpleTrace();
- tc.createSampleRpcTrace();
+ traceCreator.createThreadedTrace();
+ traceCreator.createSimpleTrace();
+ traceCreator.createSampleRpcTrace();
}
@Test(timeout=60000)
diff --git a/htrace-core/src/test/java/org/apache/htrace/TraceCreator.java b/htrace-core/src/test/java/org/apache/htrace/TraceCreator.java
index 7ec6309..565ba05 100644
--- a/htrace-core/src/test/java/org/apache/htrace/TraceCreator.java
+++ b/htrace-core/src/test/java/org/apache/htrace/TraceCreator.java
@@ -16,45 +16,50 @@
*/
package org.apache.htrace;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.htrace.Sampler;
-import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceInfo;
-import org.apache.htrace.TraceScope;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
/**
* Does some stuff and traces it.
*/
-public class TraceCreator {
+public class TraceCreator implements TestRule {
+ private final List<SpanReceiver> receivers = new ArrayList<SpanReceiver>();
public static final String RPC_TRACE_ROOT = "createSampleRpcTrace";
public static final String THREADED_TRACE_ROOT = "createThreadedTrace";
public static final String SIMPLE_TRACE_ROOT = "createSimpleTrace";
- /**
- * Takes as input the SpanReceiver that should used as the sink for Spans when
- * createDemoTrace() is called.
- *
- * @param receiver
- */
- public TraceCreator(SpanReceiver receiver) {
+ public TraceCreator addReceiver(SpanReceiver receiver) {
Trace.addReceiver(receiver);
+ this.receivers.add(receiver);
+ return this;
}
- /**
- * Takes as input the SpanReceivers that should used as the sink for Spans
- * when createDemoTrace() is called.
- *
- * @param receivers
- */
- public TraceCreator(Collection<SpanReceiver> receivers) {
- for (SpanReceiver receiver : receivers) {
- Trace.addReceiver(receiver);
- }
+ @Override
+ public Statement apply(final Statement base, Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ try {
+ base.evaluate();
+ for (SpanReceiver receiver : receivers) {
+ receiver.close();
+ }
+ } finally {
+ for (SpanReceiver receiver : receivers) {
+ Trace.removeReceiver(receiver);
+ }
+ }
+ }
+ };
}
public void createSampleRpcTrace() {
diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
index f7d3840..b7f8863 100644
--- a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
+++ b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
@@ -17,166 +17,69 @@
package org.apache.htrace.impl;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.Assert;
-
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.Server;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcTestUtils;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
-import org.apache.flume.source.avro.Status;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
-import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.Trace;
import org.apache.htrace.TraceCreator;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import java.io.IOException;
+
public class TestFlumeSpanReceiver {
- private static final Log LOG = LogFactory.getLog(TestFlumeSpanReceiver.class);
+ @Rule
+ public TraceCreator traceCreator = new TraceCreator();
+ @Rule
+ public FakeFlume flumeServer = new FakeFlume();
- private static final String ROOT_SPAN_DESC = "ROOT";
+ @Test
+ public void testSimpleTraces() throws IOException, InterruptedException {
+ traceCreator.addReceiver(new FlumeSpanReceiver(
+ HTraceConfiguration.fromKeyValuePairs(
+ FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())
+ )
+ ));
- private SpanReceiver spanReceiver;
- private Server flumeServer;
- private TraceCreator traceCreator;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
+ Span rootSpan = new MilliSpan.Builder().
+ description("root").
+ traceId(1).
+ spanId(100).
+ processId("test").
+ begin(System.currentTimeMillis()).
+ build();
+ Span innerOne = rootSpan.child("Some good work");
+ Span innerTwo = innerOne.child("Some more good work");
+ innerTwo.stop();
+ Assert.assertTrue(flumeServer.nextEventBodyAsString().contains(innerTwo.getDescription()));
+ innerOne.stop();
+ Assert.assertTrue(flumeServer.nextEventBodyAsString().contains(innerOne.getDescription()));
+ rootSpan.addKVAnnotation("foo", "bar");
+ rootSpan.addTimelineAnnotation("timeline");
+ rootSpan.stop();
+ Assert.assertTrue(flumeServer.nextEventBodyAsString().contains(rootSpan.getDescription()));
}
@Test
- public void testSimpleTraces() throws FlumeException,
- EventDeliveryException, IOException {
- AvroHandler avroHandler = null;
- List<Span> spans = null;
- try {
- avroHandler = new AvroHandler();
- startReceiver(null, avroHandler);
-
- spans = new ArrayList<Span>();
- Span rootSpan = new MilliSpan.Builder().
- description(ROOT_SPAN_DESC).
- traceId(1).
- spanId(100).
- processId("test").
- begin(System.currentTimeMillis()).
- build();
- Span innerOne = rootSpan.child("Some good work");
- Span innerTwo = innerOne.child("Some more good work");
- innerTwo.stop();
- spans.add(innerTwo);
- innerOne.stop();
- spans.add(innerOne);
- rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes());
- rootSpan.addTimelineAnnotation("timeline");
- rootSpan.stop();
- spans.add(rootSpan);
+ public void testConcurrency() throws IOException {
+ traceCreator.addReceiver(new FlumeSpanReceiver(
+ HTraceConfiguration.fromKeyValuePairs(
+ FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())
+ )
+ ));
- } finally {
- stopReceiver();
- }
- List<AvroFlumeEvent> events = avroHandler.getAllEvents();
- Assert.assertEquals(spans.size(), events.size());
- for (int i = 0; i < spans.size(); i ++) {
- String json = new String(events.get(i).getBody().array(), Charset.forName("UTF-8"));
- Assert.assertTrue(json.contains(spans.get(i).getDescription()));
- }
+ flumeServer.alwaysOk();
+ traceCreator.createThreadedTrace();
}
@Test
- public void testConcurrency() throws FlumeException,
- EventDeliveryException, IOException {
- try {
- Map<String, String> extraConf = new HashMap<String, String>();
- extraConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, "5");
- startReceiver(extraConf, new RpcTestUtils.OKAvroHandler());
- traceCreator.createThreadedTrace();
- } finally {
- stopReceiver();
- }
- }
+ public void testResilience() throws IOException {
+ traceCreator.addReceiver(new FlumeSpanReceiver(
+ HTraceConfiguration.fromKeyValuePairs(
+ FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())
+ )
+ ));
- @Test
- public void testResilience() throws FlumeException,
- EventDeliveryException, IOException {
- try {
- startReceiver(null, new RpcTestUtils.FailedAvroHandler());
- traceCreator.createThreadedTrace();
- } finally {
- stopReceiver();
- }
- }
-
- private void startReceiver(Map<String, String> extraConf, AvroSourceProtocol avroHandler) {
- // Start Flume server
- Assert.assertNull(flumeServer);
- flumeServer = RpcTestUtils.startServer(avroHandler);
-
- // Create and configure span receiver
- Map<String, String> conf = new HashMap<String, String>();
- conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1");
- conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort()));
- if (extraConf != null) {
- conf.putAll(extraConf);
- }
-
- spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf));
-
- // Create trace creator, it will register our receiver
- traceCreator = new TraceCreator(spanReceiver);
- }
-
- private void stopReceiver() throws IOException {
- // Close span receiver
- if (spanReceiver != null) {
- Trace.removeReceiver(spanReceiver);
- spanReceiver.close();
- spanReceiver = null;
- }
-
- // Close Flume server
- if (flumeServer != null) {
- RpcTestUtils.stopServer(flumeServer);
- flumeServer = null;
- }
- }
-
- private static class AvroHandler implements AvroSourceProtocol {
- private ArrayList<AvroFlumeEvent> all_events = new ArrayList<AvroFlumeEvent>();
-
- public List<AvroFlumeEvent> getAllEvents() {
- return new ArrayList<AvroFlumeEvent>(all_events);
- }
-
- @Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
- all_events.add(event);
- return Status.OK;
- }
-
- @Override
- public Status appendBatch(List<AvroFlumeEvent> events) throws
- AvroRemoteException {
- all_events.addAll(events);
- return Status.OK;
- }
+ flumeServer.alwaysFail();
+ traceCreator.createThreadedTrace();
}
}
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
index bf93220..549fddb 100644
--- a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
+++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
@@ -69,7 +69,7 @@
public void testHBaseSpanReceiver() {
Table htable = HBaseTestUtil.createTable(UTIL);
SpanReceiver receiver = HBaseTestUtil.startReceiver(UTIL);
- TraceCreator tc = new TraceCreator(receiver);
+ TraceCreator tc = new TraceCreator().addReceiver(receiver);
tc.createThreadedTrace();
tc.createSimpleTrace();
tc.createSampleRpcTrace();
diff --git a/htrace-zipkin/pom.xml b/htrace-zipkin/pom.xml
index 53c21ea..ea10480 100644
--- a/htrace-zipkin/pom.xml
+++ b/htrace-zipkin/pom.xml
@@ -110,6 +110,18 @@
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.htrace</groupId>
+ <artifactId>htrace-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
<scope>provided</scope>
</dependency>
<!-- Global deps. -->
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
index f39b753..c0bacd7 100644
--- a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
@@ -208,7 +208,7 @@
/**
* scribe client to push zipkin spans
*/
- private Scribe.Client scribeClient = null;
+ private Scribe.Iface scribe = null;
private final ByteArrayOutputStream baos;
private final TProtocol streamProtocol;
@@ -260,7 +260,7 @@
if (dequeuedSpans.isEmpty()) continue;
// If this is the first time through or there was an error re-connect
- if (scribeClient == null) {
+ if (scribe == null) {
startClient();
}
// Create a new list every time through so that the list doesn't change underneath
@@ -283,7 +283,7 @@
}
// Send the entries
- scribeClient.Log(entries);
+ scribe.Log(entries);
// clear the list for the next time through.
dequeuedSpans.clear();
// reset the error counter.
@@ -320,9 +320,9 @@
*/
private void closeClient() {
// close out the transport.
- if (scribeClient != null) {
- scribeClient.getInputProtocol().getTransport().close();
- scribeClient = null;
+ if (scribe != null && scribe instanceof Scribe.Client) {
+ ((Scribe.Client) scribe).getInputProtocol().getTransport().close();
+ scribe = null;
}
}
@@ -330,19 +330,24 @@
* Re-connect to Zipkin.
*/
private void startClient() {
- if (this.scribeClient == null) {
- TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
- try {
- transport.open();
- } catch (TTransportException e) {
- e.printStackTrace();
- }
- TProtocol protocol = protocolFactory.getProtocol(transport);
- this.scribeClient = new Scribe.Client(protocol);
+ if (this.scribe == null) {
+ this.scribe = newScribe();
}
}
}
+ // Override for testing
+ Scribe.Iface newScribe() {
+ TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ e.printStackTrace();
+ }
+ TProtocol protocol = protocolFactory.getProtocol(transport);
+ return new Scribe.Client(protocol);
+ }
+
/**
* Close the receiver.
* <p/>
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
index de4a8cd..a6790f5 100644
--- a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package org.apache.htrace;
+package org.apache.htrace.zipkin;
import com.twitter.zipkin.gen.zipkinCoreConstants;
+import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.impl.MilliSpan;
@@ -57,7 +58,7 @@
Span innerTwo = innerOne.child("Some more good work");
innerTwo.stop();
innerOne.stop();
- rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes());
+ rootSpan.addKVAnnotation("foo", "bar");
rootSpan.addTimelineAnnotation("timeline");
rootSpan.stop();