Add Zipkin collector support to HTrace.
* Added thrift files to talk to zipkin
* Added ZipkinSpanReceiver.that batches spans to zipkin
* Small changes to Trace
* Changed Process to be closer to Service.
* Bumped version to 2.00 since there were breaking changes.
* Make HTrace multi module
* Created htrace-core
* Created htrace-zipkin
Significant work by himanshu@cloudera.com and eclark@apache.org
diff --git a/htrace-core/pom.xml b/htrace-core/pom.xml
new file mode 100644
index 0000000..031dcf3
--- /dev/null
+++ b/htrace-core/pom.xml
@@ -0,0 +1,103 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.cloudera.htrace</groupId>
+ <artifactId>htrace-core</artifactId>
+ <version>2.00-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+
+ <parent>
+ <artifactId>htrace</artifactId>
+ <groupId>org.cloudera.htrace</groupId>
+ <version>2.00-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <name>htrace-core</name>
+ <url>https://github.com/cloudera/htrace</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.1.2</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.8.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.5.1</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <optimize>true</optimize>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <!-- explicitly define maven-deploy-plugin after other to force exec order -->
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.7</version>
+ <executions>
+ <execution>
+ <id>deploy</id>
+ <phase>deploy</phase>
+ <goals>
+ <goal>deploy</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/src/main/java/org/cloudera/htrace/HTraceConfiguration.java b/htrace-core/src/main/java/org/cloudera/htrace/HTraceConfiguration.java
similarity index 70%
rename from src/main/java/org/cloudera/htrace/HTraceConfiguration.java
rename to htrace-core/src/main/java/org/cloudera/htrace/HTraceConfiguration.java
index f887dac..7d75199 100644
--- a/src/main/java/org/cloudera/htrace/HTraceConfiguration.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/HTraceConfiguration.java
@@ -16,6 +16,9 @@
*/
package org.cloudera.htrace;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import java.util.HashMap;
import java.util.Map;
@@ -24,12 +27,30 @@
* to provide tracing configuration.
*/
public abstract class HTraceConfiguration {
- public abstract String get(String key);
+
+ private static final Log LOG = LogFactory.getLog(HTraceConfiguration.class);
public static HTraceConfiguration fromMap(Map<String, String> conf) {
return new MapConf(conf);
}
+ public abstract String get(String key);
+
+ public abstract String get(String key, String defaultValue);
+
+ public boolean getBoolean(String key, boolean defaultValue) {
+ String value = get(key, String.valueOf(defaultValue)).trim().toLowerCase();
+
+ if ("true".equals(value)) {
+ return true;
+ } else if ("false".equals(value)) {
+ return true;
+ }
+
+ LOG.warn("Expected boolean for key ["+key+"] instead got ["+value+"].");
+ return defaultValue;
+ }
+
public int getInt(String key, int defaultVal) {
String val = get(key);
if (val == null || val.trim().isEmpty()) {
@@ -41,7 +62,7 @@
throw new IllegalArgumentException("Bad value for '" + key + "': should be int");
}
}
-
+
private static class MapConf extends HTraceConfiguration {
private final Map<String, String> conf;
@@ -53,5 +74,11 @@
public String get(String key) {
return conf.get(key);
}
+
+ @Override
+ public String get(String key, String defaultValue) {
+ String value = get(key);
+ return value == null ? defaultValue : value;
+ }
}
}
diff --git a/src/main/java/org/cloudera/htrace/Sampler.java b/htrace-core/src/main/java/org/cloudera/htrace/Sampler.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/Sampler.java
rename to htrace-core/src/main/java/org/cloudera/htrace/Sampler.java
diff --git a/src/main/java/org/cloudera/htrace/Span.java b/htrace-core/src/main/java/org/cloudera/htrace/Span.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/Span.java
rename to htrace-core/src/main/java/org/cloudera/htrace/Span.java
diff --git a/src/main/java/org/cloudera/htrace/SpanReceiver.java b/htrace-core/src/main/java/org/cloudera/htrace/SpanReceiver.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/SpanReceiver.java
rename to htrace-core/src/main/java/org/cloudera/htrace/SpanReceiver.java
diff --git a/src/main/java/org/cloudera/htrace/TimelineAnnotation.java b/htrace-core/src/main/java/org/cloudera/htrace/TimelineAnnotation.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/TimelineAnnotation.java
rename to htrace-core/src/main/java/org/cloudera/htrace/TimelineAnnotation.java
diff --git a/src/main/java/org/cloudera/htrace/Trace.java b/htrace-core/src/main/java/org/cloudera/htrace/Trace.java
similarity index 86%
rename from src/main/java/org/cloudera/htrace/Trace.java
rename to htrace-core/src/main/java/org/cloudera/htrace/Trace.java
index 18aa8fb..3c9a5e9 100644
--- a/src/main/java/org/cloudera/htrace/Trace.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/Trace.java
@@ -16,15 +16,15 @@
*/
package org.cloudera.htrace;
-import java.security.SecureRandom;
-import java.util.Random;
-import java.util.concurrent.Callable;
-
import org.cloudera.htrace.impl.MilliSpan;
import org.cloudera.htrace.impl.TrueIfTracingSampler;
import org.cloudera.htrace.wrappers.TraceCallable;
import org.cloudera.htrace.wrappers.TraceRunnable;
+import java.security.SecureRandom;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
/**
* The primary way to interact with the library. Provides methods to start
* spans, as well as set necessary tracing information.
@@ -58,6 +58,7 @@
* @return
*/
public static TraceScope startSpan(String description, Span parent) {
+ if (parent == null) return startSpan(description);
return continueSpan(parent.child(description));
}
@@ -72,6 +73,15 @@
return startSpan(description, s, null);
}
+ public static TraceScope startSpan(String description, Sampler<TraceInfo> s, TraceInfo tinfo) {
+ Span span = null;
+ if (isTracing() || s.next(tinfo)) {
+ span = new MilliSpan(description, tinfo.traceId, tinfo.spanId,
+ random.nextLong(), Tracer.getProcessId());
+ }
+ return continueSpan(span);
+ }
+
public static <T> TraceScope startSpan(String description, Sampler<T> s, T info) {
Span span = null;
if (isTracing() || s.next(info)) {
@@ -184,4 +194,19 @@
return runnable;
}
}
+
+ /**
+ * Wrap the runnable in a TraceRunnable, if tracing
+ *
+ * @param description name of the span to be created.
+ * @param runnable
+ * @return The runnable provided, wrapped if tracing, 'runnable' if not.
+ */
+ public static Runnable wrap(String description, Runnable runnable) {
+ if (isTracing()) {
+ return new TraceRunnable(Trace.currentSpan(), runnable, description);
+ } else {
+ return runnable;
+ }
+ }
}
diff --git a/src/main/java/org/cloudera/htrace/TraceInfo.java b/htrace-core/src/main/java/org/cloudera/htrace/TraceInfo.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/TraceInfo.java
rename to htrace-core/src/main/java/org/cloudera/htrace/TraceInfo.java
diff --git a/src/main/java/org/cloudera/htrace/TraceScope.java b/htrace-core/src/main/java/org/cloudera/htrace/TraceScope.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/TraceScope.java
rename to htrace-core/src/main/java/org/cloudera/htrace/TraceScope.java
diff --git a/src/main/java/org/cloudera/htrace/TraceTree.java b/htrace-core/src/main/java/org/cloudera/htrace/TraceTree.java
similarity index 99%
rename from src/main/java/org/cloudera/htrace/TraceTree.java
rename to htrace-core/src/main/java/org/cloudera/htrace/TraceTree.java
index 783e86c..d83091f 100644
--- a/src/main/java/org/cloudera/htrace/TraceTree.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/TraceTree.java
@@ -16,13 +16,12 @@
*/
package org.cloudera.htrace;
-import java.util.Collection;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import java.util.Collection;
/**
* Used to create the graph formed by spans.
diff --git a/src/main/java/org/cloudera/htrace/Tracer.java b/htrace-core/src/main/java/org/cloudera/htrace/Tracer.java
similarity index 94%
rename from src/main/java/org/cloudera/htrace/Tracer.java
rename to htrace-core/src/main/java/org/cloudera/htrace/Tracer.java
index 9efe627..4fc1441 100644
--- a/src/main/java/org/cloudera/htrace/Tracer.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/Tracer.java
@@ -16,16 +16,15 @@
*/
package org.cloudera.htrace;
-import java.lang.management.ManagementFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cloudera.htrace.impl.MilliSpan;
+
import java.security.SecureRandom;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.cloudera.htrace.impl.MilliSpan;
-
/**
* A Tracer provides the implementation for collecting and distributing Spans
* within a process.
@@ -105,7 +104,6 @@
static String getProcessId() {
if (processId == null) {
- String mxBeanName = ManagementFactory.getRuntimeMXBean().getName();
String cmdLine = System.getProperty("sun.java.command");
if (cmdLine != null && !cmdLine.isEmpty()) {
String fullClassName = cmdLine.split("\\s+")[0];
@@ -113,7 +111,7 @@
cmdLine = classParts[classParts.length - 1];
}
- processId = cmdLine + " (" + mxBeanName + ")";
+ processId = (cmdLine == null || cmdLine.isEmpty())?"Unknown":cmdLine;
}
return processId;
}
diff --git a/src/main/java/org/cloudera/htrace/impl/AlwaysSampler.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/AlwaysSampler.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/impl/AlwaysSampler.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/AlwaysSampler.java
diff --git a/src/main/java/org/cloudera/htrace/impl/CountSampler.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/CountSampler.java
similarity index 99%
rename from src/main/java/org/cloudera/htrace/impl/CountSampler.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/CountSampler.java
index 2af4623..1e73804 100644
--- a/src/main/java/org/cloudera/htrace/impl/CountSampler.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/impl/CountSampler.java
@@ -16,10 +16,10 @@
*/
package org.cloudera.htrace.impl;
-import java.util.Random;
-
import org.cloudera.htrace.Sampler;
+import java.util.Random;
+
/**
* Sampler that returns true every N calls.
*
diff --git a/src/main/java/org/cloudera/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/LocalFileSpanReceiver.java
similarity index 99%
rename from src/main/java/org/cloudera/htrace/impl/LocalFileSpanReceiver.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/LocalFileSpanReceiver.java
index 97aac7a..b60f04a 100644
--- a/src/main/java/org/cloudera/htrace/impl/LocalFileSpanReceiver.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/impl/LocalFileSpanReceiver.java
@@ -16,6 +16,13 @@
*/
package org.cloudera.htrace.impl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.cloudera.htrace.HTraceConfiguration;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.SpanReceiver;
+import org.mortbay.util.ajax.JSON;
+
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
@@ -26,13 +33,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.cloudera.htrace.HTraceConfiguration;
-import org.cloudera.htrace.Span;
-import org.cloudera.htrace.SpanReceiver;
-import org.mortbay.util.ajax.JSON;
-
/**
* Writes the spans it receives to a local file. For now I am ignoring the data
* (annotations) portion of the spans. A production LocalFileSpanReceiver should
diff --git a/src/main/java/org/cloudera/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/MilliSpan.java
similarity index 97%
rename from src/main/java/org/cloudera/htrace/impl/MilliSpan.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/MilliSpan.java
index 396627e..2a60405 100644
--- a/src/main/java/org/cloudera/htrace/impl/MilliSpan.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/impl/MilliSpan.java
@@ -16,6 +16,9 @@
*/
package org.cloudera.htrace.impl;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.TimelineAnnotation;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -23,9 +26,6 @@
import java.util.Map;
import java.util.Random;
-import org.cloudera.htrace.Span;
-import org.cloudera.htrace.TimelineAnnotation;
-
/**
* A Span implementation that stores its information in milliseconds since the
* epoch.
@@ -88,7 +88,7 @@
@Override
public String toString() {
return "start=" + start + "\nstop=" + stop + "\nparentId=" + parentSpanId
- + "\ndescription=" + description + "\nspanId=" + spanId
+ + "\ndescription=" + description + "\nspanId=" + spanId +"\ntraceId=" + traceId
+ "\ntraceInfo=" + traceInfo + "\nprocessId=" + processId
+ "\ntimeline=" + timeline;
}
diff --git a/src/main/java/org/cloudera/htrace/impl/NeverSampler.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/NeverSampler.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/impl/NeverSampler.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/NeverSampler.java
diff --git a/src/main/java/org/cloudera/htrace/impl/POJOSpanReceiver.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/POJOSpanReceiver.java
similarity index 99%
rename from src/main/java/org/cloudera/htrace/impl/POJOSpanReceiver.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/POJOSpanReceiver.java
index b08afaf..5ae37f9 100644
--- a/src/main/java/org/cloudera/htrace/impl/POJOSpanReceiver.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/impl/POJOSpanReceiver.java
@@ -1,13 +1,13 @@
package org.cloudera.htrace.impl;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-
import org.cloudera.htrace.HTraceConfiguration;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.SpanReceiver;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
/**
* SpanReceiver for testing only that just collects the Span objects it
* receives. The spans it receives can be accessed with getSpans();
diff --git a/src/main/java/org/cloudera/htrace/impl/ProbabilitySampler.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/ProbabilitySampler.java
similarity index 99%
rename from src/main/java/org/cloudera/htrace/impl/ProbabilitySampler.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/ProbabilitySampler.java
index 4a0ffe4..3dfb544 100644
--- a/src/main/java/org/cloudera/htrace/impl/ProbabilitySampler.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/impl/ProbabilitySampler.java
@@ -16,10 +16,10 @@
*/
package org.cloudera.htrace.impl;
-import java.util.Random;
-
import org.cloudera.htrace.Sampler;
+import java.util.Random;
+
public class ProbabilitySampler implements Sampler<Object> {
public final double threshold;
private Random random;
diff --git a/src/main/java/org/cloudera/htrace/impl/StandardOutSpanReceiver.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/StandardOutSpanReceiver.java
similarity index 99%
rename from src/main/java/org/cloudera/htrace/impl/StandardOutSpanReceiver.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/StandardOutSpanReceiver.java
index 7b14169..b8030e4 100644
--- a/src/main/java/org/cloudera/htrace/impl/StandardOutSpanReceiver.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/impl/StandardOutSpanReceiver.java
@@ -1,11 +1,11 @@
package org.cloudera.htrace.impl;
-import java.io.IOException;
-
import org.cloudera.htrace.HTraceConfiguration;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.SpanReceiver;
+import java.io.IOException;
+
/**
* Used for testing. Simply prints to standard out any spans it receives.
*/
diff --git a/src/main/java/org/cloudera/htrace/impl/TrueIfTracingSampler.java b/htrace-core/src/main/java/org/cloudera/htrace/impl/TrueIfTracingSampler.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/impl/TrueIfTracingSampler.java
rename to htrace-core/src/main/java/org/cloudera/htrace/impl/TrueIfTracingSampler.java
diff --git a/src/main/java/org/cloudera/htrace/wrappers/TraceCallable.java b/htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceCallable.java
similarity index 81%
rename from src/main/java/org/cloudera/htrace/wrappers/TraceCallable.java
rename to htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceCallable.java
index 5a58e74..fc32c2c 100644
--- a/src/main/java/org/cloudera/htrace/wrappers/TraceCallable.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceCallable.java
@@ -16,12 +16,12 @@
*/
package org.cloudera.htrace.wrappers;
-import java.util.concurrent.Callable;
-
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
+import java.util.concurrent.Callable;
+
/**
* Wrap a Callable with a Span that survives a change in threads.
*
@@ -29,20 +29,26 @@
public class TraceCallable<V> implements Callable<V> {
private final Callable<V> impl;
private final Span parent;
+ private final String description;
public TraceCallable(Callable<V> impl) {
this(Trace.currentSpan(), impl);
}
public TraceCallable(Span parent, Callable<V> impl) {
+ this(parent, impl, null);
+ }
+
+ public TraceCallable(Span parent, Callable<V> impl, String description) {
this.impl = impl;
this.parent = parent;
+ this.description = description;
}
@Override
public V call() throws Exception {
if (parent != null) {
- TraceScope chunk = Trace.startSpan(Thread.currentThread().getName(), parent);
+ TraceScope chunk = Trace.startSpan(getDescription(), parent);
try {
return impl.call();
@@ -57,4 +63,8 @@
public Callable<V> getImpl() {
return impl;
}
+
+ private String getDescription() {
+ return this.description == null ? Thread.currentThread().getName(): description;
+ }
}
diff --git a/src/main/java/org/cloudera/htrace/wrappers/TraceExecutorService.java b/htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceExecutorService.java
similarity index 100%
rename from src/main/java/org/cloudera/htrace/wrappers/TraceExecutorService.java
rename to htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceExecutorService.java
diff --git a/src/main/java/org/cloudera/htrace/wrappers/TraceProxy.java b/htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceProxy.java
similarity index 98%
rename from src/main/java/org/cloudera/htrace/wrappers/TraceProxy.java
rename to htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceProxy.java
index 92118b3..ab532a7 100644
--- a/src/main/java/org/cloudera/htrace/wrappers/TraceProxy.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceProxy.java
@@ -16,15 +16,14 @@
*/
package org.cloudera.htrace.wrappers;
+import org.cloudera.htrace.Sampler;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
+
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import org.cloudera.htrace.Sampler;
-import org.cloudera.htrace.Span;
-import org.cloudera.htrace.Trace;
-import org.cloudera.htrace.TraceScope;
-
public class TraceProxy {
/**
* Returns an object that will trace all calls to itself.
diff --git a/src/main/java/org/cloudera/htrace/wrappers/TraceRunnable.java b/htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceRunnable.java
similarity index 80%
rename from src/main/java/org/cloudera/htrace/wrappers/TraceRunnable.java
rename to htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceRunnable.java
index fdfa895..129f859 100644
--- a/src/main/java/org/cloudera/htrace/wrappers/TraceRunnable.java
+++ b/htrace-core/src/main/java/org/cloudera/htrace/wrappers/TraceRunnable.java
@@ -16,7 +16,6 @@
*/
package org.cloudera.htrace.wrappers;
-import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
@@ -29,20 +28,26 @@
private final Span parent;
private final Runnable runnable;
+ private final String description;
public TraceRunnable(Runnable runnable) {
this(Trace.currentSpan(), runnable);
}
public TraceRunnable(Span parent, Runnable runnable) {
+ this(parent, runnable, null);
+ }
+
+ public TraceRunnable(Span parent, Runnable runnable, String description) {
this.parent = parent;
this.runnable = runnable;
+ this.description = description;
}
@Override
public void run() {
if (parent != null) {
- TraceScope chunk = Trace.startSpan(Thread.currentThread().getName(), parent);
+ TraceScope chunk = Trace.startSpan(getDescription(), parent);
try {
runnable.run();
@@ -53,4 +58,8 @@
runnable.run();
}
}
+
+ private String getDescription() {
+ return this.description == null ? Thread.currentThread().getName(): description;
+ }
}
diff --git a/src/test/java/org/cloudera/htrace/TestCountSampler.java b/htrace-core/src/test/java/org/cloudera/htrace/TestCountSampler.java
similarity index 91%
rename from src/test/java/org/cloudera/htrace/TestCountSampler.java
rename to htrace-core/src/test/java/org/cloudera/htrace/TestCountSampler.java
index 80d072e..6d635bd 100644
--- a/src/test/java/org/cloudera/htrace/TestCountSampler.java
+++ b/htrace-core/src/test/java/org/cloudera/htrace/TestCountSampler.java
@@ -16,9 +16,8 @@
*/
package org.cloudera.htrace;
-import static org.junit.Assert.assertEquals;
-
import org.cloudera.htrace.impl.CountSampler;
+import org.junit.Assert;
import org.junit.Test;
public class TestCountSampler {
@@ -35,7 +34,7 @@
if (hundred.next(null))
hundredCount++;
}
- assertEquals(2, hundredCount);
- assertEquals(100, halfCount);
+ Assert.assertEquals(2, hundredCount);
+ Assert.assertEquals(100, halfCount);
}
}
diff --git a/src/test/java/org/cloudera/htrace/TestHTrace.java b/htrace-core/src/test/java/org/cloudera/htrace/TestHTrace.java
similarity index 85%
rename from src/test/java/org/cloudera/htrace/TestHTrace.java
rename to htrace-core/src/test/java/org/cloudera/htrace/TestHTrace.java
index aac0a76..c8e3b31 100644
--- a/src/test/java/org/cloudera/htrace/TestHTrace.java
+++ b/htrace-core/src/test/java/org/cloudera/htrace/TestHTrace.java
@@ -16,8 +16,12 @@
*/
package org.cloudera.htrace;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Multimap;
+import org.cloudera.htrace.impl.LocalFileSpanReceiver;
+import org.cloudera.htrace.impl.POJOSpanReceiver;
+import org.cloudera.htrace.impl.StandardOutSpanReceiver;
+import org.junit.Assert;
+import org.junit.Test;
import java.io.File;
import java.util.Collection;
@@ -25,13 +29,6 @@
import java.util.HashSet;
import java.util.Map;
-import org.cloudera.htrace.impl.LocalFileSpanReceiver;
-import org.cloudera.htrace.impl.POJOSpanReceiver;
-import org.cloudera.htrace.impl.StandardOutSpanReceiver;
-import org.junit.Test;
-
-import com.google.common.collect.Multimap;
-
public class TestHTrace {
public static final String SPAN_FILE_FLAG = "spanFile";
@@ -77,35 +74,35 @@
Collection<Span> spans = psr.getSpans();
TraceTree traceTree = new TraceTree(spans);
Collection<Span> roots = traceTree.getRoots();
- assertEquals(numTraces, roots.size());
+ Assert.assertEquals(numTraces, roots.size());
Map<String,Span> descriptionToRootSpan = new HashMap<String,Span>();
for (Span root : roots) {
descriptionToRootSpan.put(root.getDescription(), root);
}
- assertTrue(descriptionToRootSpan.keySet().contains(
+ Assert.assertTrue(descriptionToRootSpan.keySet().contains(
TraceCreator.RPC_TRACE_ROOT));
- assertTrue(descriptionToRootSpan.keySet().contains(
+ Assert.assertTrue(descriptionToRootSpan.keySet().contains(
TraceCreator.SIMPLE_TRACE_ROOT));
- assertTrue(descriptionToRootSpan.keySet().contains(
+ Assert.assertTrue(descriptionToRootSpan.keySet().contains(
TraceCreator.THREADED_TRACE_ROOT));
Multimap<Long, Span> spansByParentId = traceTree.getSpansByParentIdMap();
Span rpcTraceRoot = descriptionToRootSpan.get(TraceCreator.RPC_TRACE_ROOT);
- assertEquals(1, spansByParentId.get(rpcTraceRoot.getSpanId()).size());
+ Assert.assertEquals(1, spansByParentId.get(rpcTraceRoot.getSpanId()).size());
Span rpcTraceChild1 = spansByParentId.get(rpcTraceRoot.getSpanId())
.iterator().next();
- assertEquals(1, spansByParentId.get(rpcTraceChild1.getSpanId()).size());
+ Assert.assertEquals(1, spansByParentId.get(rpcTraceChild1.getSpanId()).size());
Span rpcTraceChild2 = spansByParentId.get(rpcTraceChild1.getSpanId())
.iterator().next();
- assertEquals(1, spansByParentId.get(rpcTraceChild2.getSpanId()).size());
+ Assert.assertEquals(1, spansByParentId.get(rpcTraceChild2.getSpanId()).size());
Span rpcTraceChild3 = spansByParentId.get(rpcTraceChild2.getSpanId())
.iterator().next();
- assertEquals(0, spansByParentId.get(rpcTraceChild3.getSpanId()).size());
+ Assert.assertEquals(0, spansByParentId.get(rpcTraceChild3.getSpanId()).size());
}
private void runTraceCreatorTraces(TraceCreator tc) {
diff --git a/src/test/java/org/cloudera/htrace/TestSampler.java b/htrace-core/src/test/java/org/cloudera/htrace/TestSampler.java
similarity index 82%
rename from src/test/java/org/cloudera/htrace/TestSampler.java
rename to htrace-core/src/test/java/org/cloudera/htrace/TestSampler.java
index e3b9a7d..3369142 100644
--- a/src/test/java/org/cloudera/htrace/TestSampler.java
+++ b/htrace-core/src/test/java/org/cloudera/htrace/TestSampler.java
@@ -1,7 +1,6 @@
package org.cloudera.htrace;
-import static org.junit.Assert.*;
-
+import org.junit.Assert;
import org.junit.Test;
public class TestSampler {
@@ -9,17 +8,17 @@
public void testParamterizedSampler() {
TestParamSampler sampler = new TestParamSampler();
TraceScope s = Trace.startSpan("test", sampler, 1);
- assertNotNull(s.getSpan());
+ Assert.assertNotNull(s.getSpan());
s.close();
s = Trace.startSpan("test", sampler, -1);
- assertNull(s.getSpan());
+ Assert.assertNull(s.getSpan());
s.close();
}
@Test
public void testAlwaysSampler() {
TraceScope cur = Trace.startSpan("test", new TraceInfo(0, 0));
- assertNotNull(cur);
+ Assert.assertNotNull(cur);
cur.close();
}
diff --git a/src/test/java/org/cloudera/htrace/TraceCreator.java b/htrace-core/src/test/java/org/cloudera/htrace/TraceCreator.java
similarity index 100%
rename from src/test/java/org/cloudera/htrace/TraceCreator.java
rename to htrace-core/src/test/java/org/cloudera/htrace/TraceCreator.java
diff --git a/htrace-zipkin/pom.xml b/htrace-zipkin/pom.xml
new file mode 100644
index 0000000..906add1
--- /dev/null
+++ b/htrace-zipkin/pom.xml
@@ -0,0 +1,111 @@
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.cloudera.htrace</groupId>
+ <artifactId>htrace-zipkin</artifactId>
+ <version>2.00-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+
+ <parent>
+ <artifactId>htrace</artifactId>
+ <groupId>org.cloudera.htrace</groupId>
+ <version>2.00-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <name>htrace-zipkin</name>
+ <url>https://github.com/cloudera/htrace</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.1.2</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.8.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.5.1</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <optimize>true</optimize>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <!-- explicitly define maven-deploy-plugin after other to force exec order -->
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.7</version>
+ <executions>
+ <execution>
+ <id>deploy</id>
+ <phase>deploy</phase>
+ <goals>
+ <goal>deploy</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.thrift.tools</groupId>
+ <artifactId>maven-thrift-plugin</artifactId>
+ <version>0.1.11</version>
+ <executions>
+ <execution>
+ <id>thrift-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.cloudera.htrace</groupId>
+ <artifactId>htrace-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/htrace-zipkin/src/main/java/org/cloudera/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/cloudera/htrace/impl/ZipkinSpanReceiver.java
new file mode 100644
index 0000000..764f3d5
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/cloudera/htrace/impl/ZipkinSpanReceiver.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.cloudera.htrace.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.zipkin.gen.LogEntry;
+import com.twitter.zipkin.gen.Scribe;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.cloudera.htrace.HTraceConfiguration;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.SpanReceiver;
+import org.cloudera.htrace.zipkin.HTraceToZipkinConverter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Zipkin is an open source tracing library. This span receiver acts as a bridge between HTrace and
+ * Zipkin, that converts HTrace Span objects into Zipkin Span objects.
+ *
+ * HTrace spans are queued into a blocking queue. From there background worker threads will
+ * batch the spans together and then send them through to a Zipkin collector.
+ */
+public class ZipkinSpanReceiver implements SpanReceiver {
+ private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class);
+
+ /**
+ * Default hostname to fall back on.
+ */
+ private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
+
+ /**
+ * Default collector port.
+ */
+ private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port.
+
+ /**
+ * this is used to tell scribe that the entries are for zipkin..
+ */
+ private static final String CATEGORY = "zipkin";
+
+ /**
+ * Whether the service which is traced is in client or a server mode. It is used while creating
+ * the Endpoint.
+ */
+ private static final boolean DEFAULT_IN_CLIENT_MODE = false;
+
+ /**
+ * How long this receiver will try and wait for all threads to shutdown.
+ */
+ private static final int SHUTDOWN_TIMEOUT = 30;
+
+ /**
+ * How many spans this receiver will try and send in one batch.
+ */
+ private static final int MAX_SPAN_BATCH_SIZE = 100;
+
+ /**
+ * How many errors in a row before we start dropping traces on the floor.
+ */
+ private static final int MAX_ERRORS = 10;
+
+ /**
+ * The queue that will get all HTrace spans that are to be sent.
+ */
+ private final BlockingQueue<Span> queue;
+
+ /**
+ * Factory used to encode a Zipkin Span to bytes.
+ */
+ private final TProtocolFactory protocolFactory;
+
+ /**
+ * Boolean used to signal that the threads should end.
+ */
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
+ /**
+ * The thread factory used to create new ExecutorService.
+ *
+ * This will be the same factory for the lifetime of this object so that
+ * no thread names will ever be duplicated.
+ */
+ private final ThreadFactory tf;
+
+ ////////////////////
+ /// Variables that will change on each call to configure()
+ ///////////////////
+ private HTraceToZipkinConverter converter;
+ private ExecutorService service;
+ private HTraceConfiguration conf;
+ private String collectorHostname;
+ private int collectorPort;
+
+ public ZipkinSpanReceiver() {
+ this.queue = new ArrayBlockingQueue<Span>(1000);
+ this.protocolFactory = new TBinaryProtocol.Factory();
+
+ tf = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("zipkinSpanReceiver-%d")
+ .build();
+ }
+
+ @Override
+ public void configure(HTraceConfiguration conf) {
+ this.conf = conf;
+
+ this.collectorHostname = conf.get("zipkin.collector-hostname",
+ DEFAULT_COLLECTOR_HOSTNAME);
+ this.collectorPort = conf.getInt("zipkin.collector-port",
+ DEFAULT_COLLECTOR_PORT);
+
+ // initialize the endpoint. This endpoint is used while writing the Span.
+ initConverter();
+
+ int numThreads = conf.getInt("zipkin.num-threads", 1);
+
+ // If there are already threads runnnig tear them down.
+ if (this.service != null) {
+ this.service.shutdownNow();
+ this.service = null;
+ }
+
+ this.service = Executors.newFixedThreadPool(numThreads, tf);
+
+ for (int i = 0; i< numThreads; i++) {
+ this.service.submit(new WriteSpanRunnable());
+ }
+ }
+
+ /**
+ * Set up the HTrace to Zipkin converter.
+ */
+ private void initConverter() {
+ boolean inClientMode = conf.getBoolean("zipkin.is-in-client-mode", DEFAULT_IN_CLIENT_MODE);
+ InetAddress tracedServiceHostname = null;
+ // Try and get the hostname. If it's not configured try and get the local hostname.
+ try {
+ String host = conf.get("zipkin.traced-service-hostname",
+ InetAddress.getLocalHost().getHostAddress());
+
+ tracedServiceHostname = InetAddress.getByName(host);
+ } catch (UnknownHostException e) {
+ LOG.error("Couldn't get the localHost address", e);
+ }
+ short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", 80);
+ byte[] address = tracedServiceHostname != null
+ ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes();
+ int ipv4 = ByteBuffer.wrap(address).getInt();
+ this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort, inClientMode);
+ }
+
+
+
+ private class WriteSpanRunnable implements Runnable {
+ /**
+ * scribe client to push zipkin spans
+ */
+ private Scribe.Client scribeClient = null;
+ private final ByteArrayOutputStream baos;
+ private final TProtocol streamProtocol;
+
+ public WriteSpanRunnable() {
+ baos = new ByteArrayOutputStream();
+ streamProtocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+ }
+
+ /**
+ * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin
+ * collector as a thrift object. The scribe client which is used for rpc writes a list of
+ * LogEntry objects, so the span objects are first transformed into LogEntry objects before
+ * sending to the zipkin-collector.
+ *
+ * Here is a little ascii art which shows the above transformation:
+ * <pre>
+ * +------------+ +------------+ +------------+ +-----------------+
+ * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
+ * +------------+ +------------+ +------------+ (Scribe rpc) +-----------------+
+ * </pre>
+ */
+ @Override
+ public void run() {
+
+ List<Span> dequeuedSpans = new ArrayList<Span>(MAX_SPAN_BATCH_SIZE);
+
+ long errorCount = 0;
+
+ while (running.get() || queue.size() > 0) {
+ Span firstSpan = null;
+ try {
+ // Block for up to a second. to try and get a span.
+ // We only block for a little bit in order to notice if the running value has changed
+ firstSpan = queue.poll(1, TimeUnit.SECONDS);
+
+ // If the poll was successful then it's possible that there
+ // will be other spans to get. Try and get them.
+ if (firstSpan != null) {
+ // Add the first one that we got
+ dequeuedSpans.add(firstSpan);
+ // Try and get up to 100 queues
+ queue.drainTo(dequeuedSpans, MAX_SPAN_BATCH_SIZE - 1);
+ }
+
+ } catch (InterruptedException ie) {
+ // Ignored.
+ }
+
+ if (dequeuedSpans.isEmpty()) continue;
+
+ // If this is the first time through or there was an error re-connect
+ if (scribeClient == null) {
+ startClient();
+ }
+ // Create a new list every time through so that the list doesn't change underneath
+ // thrift as it's sending.
+ List<LogEntry> entries = new ArrayList<LogEntry>(dequeuedSpans.size());
+ try {
+ // Convert every de-queued span
+ for (Span htraceSpan : dequeuedSpans) {
+ // convert the HTrace span to Zipkin span
+ com.twitter.zipkin.gen.Span zipkinSpan = converter.toZipkinSpan(htraceSpan);
+ // Clear any old data.
+ baos.reset();
+ // Write the span to a BAOS
+ zipkinSpan.write(streamProtocol);
+
+ // Do Base64 encoding and put the string into a log entry.
+ LogEntry logEntry =
+ new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray()));
+ entries.add(logEntry);
+ }
+
+ // Send the entries
+ scribeClient.Log(entries);
+ // clear the list for the next time through.
+ dequeuedSpans.clear();
+ // reset the error counter.
+ errorCount = 0;
+ } catch (Exception e) {
+ LOG.error("Error when writing to the zipkin collector: " +
+ collectorHostname + ":" + collectorPort);
+
+ errorCount += 1;
+ // If there have been ten errors in a row start dropping things.
+ if (errorCount < MAX_ERRORS) {
+ dequeuedSpans.addAll(dequeuedSpans);
+ }
+
+ closeClient();
+ try {
+ // Since there was an error sleep just a little bit to try and allow the
+ // zipkin collector some time to recover.
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // Ignored
+ }
+ }
+ }
+ closeClient();
+ }
+
+ /**
+ * Close out the connection.
+ */
+ private void closeClient() {
+ // close out the transport.
+ if (scribeClient != null) {
+ scribeClient.getInputProtocol().getTransport().close();
+ scribeClient = null;
+ }
+ }
+
+ /**
+ * Re-connect to Zipkin.
+ */
+ private void startClient() {
+ if (this.scribeClient == null) {
+ TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ e.printStackTrace();
+ }
+ TProtocol protocol = protocolFactory.getProtocol(transport);
+ this.scribeClient = new Scribe.Client(protocol);
+ }
+ }
+ }
+
+ /**
+ * Close the receiver.
+ *
+ * This tries to shut
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ running.set(false);
+ service.shutdown();
+ try {
+ if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
+ LOG.error("Was not able to process all remaining spans to write upon closing in: " +
+ SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +". There could be un-sent spans still left." +
+ " They have been dropped.");
+ }
+ } catch (InterruptedException e1) {
+ LOG.warn("Thread interrupted when terminating executor.", e1);
+ }
+ }
+
+ @Override
+ public void receiveSpan(Span span) {
+ if (running.get()) {
+ try {
+ this.queue.add(span);
+ } catch (IllegalStateException e) {
+ LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue."
+ +" Blocking Queue was full.");
+ }
+ }
+ }
+}
diff --git a/htrace-zipkin/src/main/java/org/cloudera/htrace/zipkin/HTraceToZipkinConverter.java b/htrace-zipkin/src/main/java/org/cloudera/htrace/zipkin/HTraceToZipkinConverter.java
new file mode 100644
index 0000000..57bf460
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/cloudera/htrace/zipkin/HTraceToZipkinConverter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.cloudera.htrace.zipkin;
+
+import com.twitter.zipkin.gen.Annotation;
+import com.twitter.zipkin.gen.AnnotationType;
+import com.twitter.zipkin.gen.BinaryAnnotation;
+import com.twitter.zipkin.gen.Endpoint;
+import com.twitter.zipkin.gen.Span;
+import com.twitter.zipkin.gen.zipkinCoreConstants;
+import org.cloudera.htrace.TimelineAnnotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is responsible for converting a HTrace.Span to a Zipkin.Span object. To use the Zipkin
+ * infrastructure (collector, front end), we need to store the Span information in a zipkin specific
+ * format. This class transforms a HTrace:Span object to a Zipkin:Span object.
+ * <p>
+ * This is how both Span objects are related:
+ * <table>
+ * <col width="50%"/> <col width="50%"/> <thead>
+ * <tr>
+ * <th>HTrace:Span</th>
+ * <th>Zipkin:Span</th>
+ * </tr>
+ * <thead> <tbody>
+ * <tr>
+ * <td>TraceId</td>
+ * <td>TraceId</td>
+ * </tr>
+ * <tr>
+ * <td>ParentId</td>
+ * <td>ParentId</td>
+ * </tr>
+ * <tr>
+ * <td>SpanId</td>
+ * <td>id</td>
+ * </tr>
+ * <tr>
+ * <td>Description</td>
+ * <td>Name</td>
+ * </tr>
+ * <tr>
+ * <td>startTime, stopTime</td>
+ * <td>Annotations (cs, cr, sr, ss)</td>
+ * </tr>
+ * <tr>
+ * <td>Other annotations</td>
+ * <td>Annotations</td>
+ * </tr>
+ * </tbody>
+ * </table>
+ * <p>
+ */
+public class HTraceToZipkinConverter {
+
+ private final int ipv4Address;
+ private final short port;
+ private final boolean inClientMode;
+
+ public HTraceToZipkinConverter(int ipv4Address, short port, boolean inClientMode) {
+ this.ipv4Address = ipv4Address;
+ this.port = port;
+ this.inClientMode = inClientMode;
+ }
+
+ /**
+ * Converts a given HTrace span to a Zipkin Span.
+ * <ul>
+ * <li>First set the start annotation. [CS, SR], depending whether it is a client service or not.
+ * <li>Set other id's, etc [TraceId's etc]
+ * <li>Create binary annotations based on data from HTrace Span object.
+ * <li>Set the last annotation. [SS, CR]
+ * </ul>
+ */
+ public Span toZipkinSpan(org.cloudera.htrace.Span hTraceSpan) {
+ Span zipkinSpan = new Span();
+ Endpoint ep = new Endpoint(ipv4Address, port, hTraceSpan.getProcessId());
+ List<Annotation> annotationList = createZipkinAnnotations(hTraceSpan, ep);
+ List<BinaryAnnotation> binaryAnnotationList = createZipkinBinaryAnnotations(hTraceSpan, ep);
+ zipkinSpan.setTrace_id(hTraceSpan.getTraceId());
+ if (hTraceSpan.getParentId() != org.cloudera.htrace.Span.ROOT_SPAN_ID) {
+ zipkinSpan.setParent_id(hTraceSpan.getParentId());
+ }
+ zipkinSpan.setId(hTraceSpan.getSpanId());
+ zipkinSpan.setName(hTraceSpan.getDescription());
+ zipkinSpan.setAnnotations(annotationList);
+ zipkinSpan.setBinary_annotations(binaryAnnotationList);
+ return zipkinSpan;
+ }
+
+ /**
+ * Add annotations from the htrace Span.
+ */
+ private List<Annotation> createZipkinAnnotations(org.cloudera.htrace.Span hTraceSpan,
+ Endpoint ep) {
+ List<Annotation> annotationList = new ArrayList<Annotation>();
+
+ String firstAnno =
+ inClientMode ? zipkinCoreConstants.CLIENT_SEND : zipkinCoreConstants.SERVER_RECV;
+ String lastAnno =
+ inClientMode ? zipkinCoreConstants.CLIENT_RECV : zipkinCoreConstants.SERVER_SEND;
+
+ // add first zipkin annotation.
+ annotationList.add(createZipkinAnnotation(firstAnno, hTraceSpan, ep, true));
+ // add HTrace time annotation
+ for (TimelineAnnotation ta : hTraceSpan.getTimelineAnnotations()) {
+ annotationList.add(createZipkinAnnotation(ta.getMessage(), hTraceSpan, ep, true));
+ }
+ // add last zipkin annotation
+ annotationList.add(createZipkinAnnotation(lastAnno, hTraceSpan, ep, false));
+ return annotationList;
+ }
+
+ /**
+ * Creates a list of Annotations that are present in HTrace Span object.
+ * @return list of Annotations that could be added to Zipkin Span.
+ */
+ private List<BinaryAnnotation> createZipkinBinaryAnnotations(org.cloudera.htrace.Span span,
+ Endpoint ep) {
+ List<BinaryAnnotation> l = new ArrayList<BinaryAnnotation>();
+ for (Map.Entry<byte[], byte[]> e : span.getKVAnnotations().entrySet()) {
+ BinaryAnnotation binaryAnn = new BinaryAnnotation();
+ binaryAnn.setAnnotation_type(AnnotationType.BYTES);
+ binaryAnn.setKey(new String(e.getKey()));
+ binaryAnn.setValue(e.getValue());
+ binaryAnn.setHost(ep);
+ l.add(binaryAnn);
+ }
+ return l;
+ }
+
+ /**
+ * Create an annotation with the correct times and endpoint.
+ * @param value Annotation value
+ * @param span Span from which timestamp will be extracted
+ * @param ep the endopint this annotation will be associated with.
+ * @param sendRequest use the first or last timestamp.
+ */
+ private static Annotation createZipkinAnnotation(String value, org.cloudera.htrace.Span span,
+ Endpoint ep, boolean sendRequest) {
+ Annotation annotation = new Annotation();
+ annotation.setHost(ep);
+
+ // Zipkin is in microseconds
+ if (sendRequest) {
+ annotation.setTimestamp(span.getStartTimeMillis() * 1000);
+ }
+ else {
+ annotation.setTimestamp(span.getStopTimeMillis() * 1000);
+ }
+
+ annotation.setDuration((int) span.getAccumulatedMillis());
+ annotation.setValue(value);
+ return annotation;
+ }
+}
diff --git a/htrace-zipkin/src/main/thrift/scribe.thrift b/htrace-zipkin/src/main/thrift/scribe.thrift
new file mode 100644
index 0000000..1976396
--- /dev/null
+++ b/htrace-zipkin/src/main/thrift/scribe.thrift
@@ -0,0 +1,31 @@
+# Copyright 2012 Twitter Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+namespace java com.twitter.zipkin.gen
+
+enum ResultCode
+{
+ OK,
+ TRY_LATER
+}
+
+struct LogEntry
+{
+ 1: string category,
+ 2: string message
+}
+
+service Scribe
+{
+ ResultCode Log(1: list<LogEntry> messages);
+}
diff --git a/htrace-zipkin/src/main/thrift/zipkinCore.thrift b/htrace-zipkin/src/main/thrift/zipkinCore.thrift
new file mode 100644
index 0000000..a14b888
--- /dev/null
+++ b/htrace-zipkin/src/main/thrift/zipkinCore.thrift
@@ -0,0 +1,58 @@
+# Copyright 2012 Twitter Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+namespace java com.twitter.zipkin.gen
+namespace rb Zipkin
+
+//************** Collection related structs **************
+
+// these are the annotations we always expect to find in a span
+const string CLIENT_SEND = "cs"
+const string CLIENT_RECV = "cr"
+const string SERVER_SEND = "ss"
+const string SERVER_RECV = "sr"
+
+// this represents a host and port in a network
+struct Endpoint {
+ 1: i32 ipv4,
+ 2: i16 port // beware that this will give us negative ports. some conversion needed
+ 3: string service_name // which service did this operation happen on?
+}
+
+// some event took place, either one by the framework or by the user
+struct Annotation {
+ 1: i64 timestamp // microseconds from epoch
+ 2: string value // what happened at the timestamp?
+ 3: optional Endpoint host // host this happened on
+ 4: optional i32 duration // how long did the operation take? microseconds
+}
+
+enum AnnotationType { BOOL, BYTES, I16, I32, I64, DOUBLE, STRING }
+
+struct BinaryAnnotation {
+ 1: string key,
+ 2: binary value,
+ 3: AnnotationType annotation_type,
+ 4: optional Endpoint host
+}
+
+struct Span {
+ 1: i64 trace_id // unique trace id, use for all spans in trace
+ 3: string name, // span name, rpc method for example
+ 4: i64 id, // unique span id, only used for this span
+ 5: optional i64 parent_id, // parent span id
+ 6: list<Annotation> annotations, // list of all annotations/events that occured
+ 8: list<BinaryAnnotation> binary_annotations // any binary annotations
+ 9: optional bool debug = 0 // if true, we DEMAND that this span passes all samplers
+}
+
diff --git a/htrace-zipkin/src/test/java/org/cloudera/htrace/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/cloudera/htrace/TestHTraceSpanToZipkinSpan.java
new file mode 100644
index 0000000..5c5e2d0
--- /dev/null
+++ b/htrace-zipkin/src/test/java/org/cloudera/htrace/TestHTraceSpanToZipkinSpan.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.cloudera.htrace;
+
+import com.twitter.zipkin.gen.Endpoint;
+import org.cloudera.htrace.zipkin.HTraceToZipkinConverter;
+import org.cloudera.htrace.impl.POJOSpanReceiver;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Creates HTrace and then convert it to Zipkin trace and checks whether it is a valid span or not.
+ */
+public class TestHTraceSpanToZipkinSpan {
+
+ private static final String ROOT_SPAN_DESC = "ROOT";
+
+ @Test
+ public void testHTraceToZipkin() throws IOException {
+ Endpoint ep = new Endpoint(12345, (short) 12, "test");
+ POJOSpanReceiver psr = new POJOSpanReceiver();
+ Trace.addReceiver(psr);
+ runWorkerMethods();
+ psr.close();
+ Collection<Span> spans = psr.getSpans();
+ for (Span s : spans) {
+ com.twitter.zipkin.gen.Span zs =
+ new HTraceToZipkinConverter(12345, (short) 12, true).toZipkinSpan(s);
+ assertSpansAreEquivalent(s, zs);
+ }
+ }
+
+ private void assertSpansAreEquivalent(Span s, com.twitter.zipkin.gen.Span zs) {
+ assertEquals(s.getTraceId(), zs.getTrace_id());
+ assertEquals(s.getParentId(), zs.getParent_id());
+ assertEquals(s.getSpanId(), zs.getId());
+ Assert.assertNotNull(zs.getAnnotations());
+ if (ROOT_SPAN_DESC.equals(zs.getName())) {
+ assertEquals(3, zs.getAnnotations().size());// two start/stop + one timeline annotation
+ assertEquals(1, zs.getBinary_annotations().size());
+ } else {
+ assertEquals(2, zs.getAnnotations().size());
+ }
+
+ }
+
+ private void runWorkerMethods() {
+ TraceScope root = Trace.startSpan(ROOT_SPAN_DESC, Sampler.ALWAYS);
+ try {
+ doSomeWork();
+ root.getSpan().addKVAnnotation("foo".getBytes(), "bar".getBytes());
+ root.getSpan().addTimelineAnnotation("timeline");
+ } finally {
+ root.close();
+ }
+ }
+
+ private void doSomeWork() {
+ TraceScope tScope = Trace.startSpan("Some good work");
+ try {
+ Thread.sleep((long) (2000 * Math.random()));
+ doSomeMoreWork();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ tScope.close();
+ }
+ }
+
+ private void doSomeMoreWork() {
+ TraceScope tScope = Trace.startSpan("Some more good work");
+ try {
+ Thread.sleep((long) (2000 * Math.random()));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ tScope.close();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 16356c7..1a2e327 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,10 +11,16 @@
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<groupId>org.cloudera.htrace</groupId>
+ <modules>
+ <module>htrace-core</module>
+ <module>htrace-zipkin</module>
+ </modules>
+
+
<modelVersion>4.0.0</modelVersion>
<artifactId>htrace</artifactId>
<name>htrace</name>
- <version>1.51-SNAPSHOT</version>
+ <version>2.00-SNAPSHOT</version>
<description>Tracing library</description>
<developers>
<developer>
@@ -25,6 +31,14 @@
<organization>Cloudera</organization>
<organizationUrl>http://www.cloudera.com</organizationUrl>
</developer>
+ <developer>
+ <id>eclark</id>
+ <name>Elliott Clark</name>
+ <email>eclark@apache.org</email>
+ <timezone>-7</timezone>
+ <organization>Cloudera</organization>
+ <organizationUrl>http://www.cloudera.com</organizationUrl>
+ </developer>
</developers>
<profiles>
<profile>
@@ -60,7 +74,7 @@
<artifactId>oss-parent</artifactId>
<version>7</version>
</parent>
- <packaging>jar</packaging>
+ <packaging>pom</packaging>
<url>https://github.com/cloudera/htrace</url>
<licenses>
<license>
@@ -75,44 +89,15 @@
<developerConnection>scm:git:git@github.com:cloudera/htrace.git</developerConnection>
<url>scm:git:git@github.com:cloudera/htrace.git</url>
</scm>
+ <pluginRepositories>
+ <pluginRepository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </pluginRepository>
+ </pluginRepositories>
<build>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.1.2</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <phase>package</phase>
- <goals>
- <goal>jar-no-fork</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>2.8.1</version>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <phase>package</phase>
- <goals><goal>jar</goal></goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.5.1</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- <optimize>true</optimize>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- <plugin>
<!-- explicitly define maven-deploy-plugin after other to force exec order -->
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
@@ -129,27 +114,50 @@
<properties>
<targetJdk>1.6</targetJdk>
</properties>
- <dependencies>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- <version>6.1.26</version>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.1.1</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>12.0.1</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>6.1.26</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.7</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>12.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.9.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.cloudera.htrace</groupId>
+ <artifactId>htrace-core</artifactId>
+ <version>${version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.cloudera.htrace</groupId>
+ <artifactId>htrace-core</artifactId>
+ <version>${version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
</project>