HTRACE-32. Change span timeline annotations map to be a map<string, string> (cmccabe)
diff --git a/htrace-core/src/main/java/org/apache/htrace/Span.java b/htrace-core/src/main/java/org/apache/htrace/Span.java
index 2731583..df28630 100644
--- a/htrace-core/src/main/java/org/apache/htrace/Span.java
+++ b/htrace-core/src/main/java/org/apache/htrace/Span.java
@@ -96,6 +96,12 @@
/**
* Add a data annotation associated with this span
*/
+ void addKVAnnotation(String key, String value);
+
+ /**
+ * Add a data annotation associated with this span
+ */
+ @Deprecated
void addKVAnnotation(byte[] key, byte[] value);
/**
@@ -106,7 +112,7 @@
/**
* Get data associated with this span (read only)
*/
- Map<byte[], byte[]> getKVAnnotations();
+ Map<String, String> getKVAnnotations();
/**
* Get any timeline annotations (read only)
@@ -140,12 +146,11 @@
jgen.writeString(String.format("%016x", span.getParentId()));
}
jgen.writeEndArray();
- Map<byte[], byte[]> traceInfoMap = span.getKVAnnotations();
+ Map<String, String> traceInfoMap = span.getKVAnnotations();
if (!traceInfoMap.isEmpty()) {
jgen.writeObjectFieldStart("n");
- for (Map.Entry<byte[], byte[]> e : traceInfoMap.entrySet()) {
- jgen.writeStringField(new String(e.getKey(), "UTF-8"),
- new String(e.getValue(), "UTF-8"));
+ for (Map.Entry<String, String> e : traceInfoMap.entrySet()) {
+ jgen.writeStringField(e.getKey(), e.getValue());
}
jgen.writeEndObject();
}
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
index 7095008..8b037bf 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
@@ -16,6 +16,8 @@
*/
package org.apache.htrace.impl;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.HTraceConfiguration;
@@ -23,8 +25,11 @@
import org.apache.htrace.SpanReceiver;
import java.io.BufferedWriter;
+import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -42,9 +47,9 @@
public static final int CAPACITY_DEFAULT = 5000;
// default timeout duration when calling executor.awaitTermination()
public static final long EXECUTOR_TERMINATION_TIMEOUT_DURATION_DEFAULT = 60;
+ private static ObjectWriter JSON_WRITER = new ObjectMapper().writer();
private String file;
- private FileWriter fwriter;
- private BufferedWriter bwriter;
+ private Writer writer;
private ExecutorService executor;
private long executorTerminationTimeoutDuration;
@@ -57,15 +62,28 @@
}
this.executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(capacity));
+ boolean success = false;
+ FileOutputStream fos = null;
try {
- this.fwriter = new FileWriter(this.file, true);
+ fos = new FileOutputStream(file, true);
+ this.writer = new BufferedWriter(
+ new OutputStreamWriter(fos,"UTF-8"));
+ success = true;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
+ } finally {
+ if (!success) {
+ if (fos == null) {
+ try {
+ fos.close();
+ } catch (IOException e) {
+ LOG.error("Error closing output stream for " + file, e);
+ }
+ }
+ }
}
- this.bwriter = new BufferedWriter(fwriter);
}
-
private class WriteSpanRunnable implements Runnable {
public final Span span;
@@ -76,9 +94,8 @@
@Override
public void run() {
try {
- bwriter.write(span.toJson());
- bwriter.newLine();
- bwriter.flush();
+ JSON_WRITER.writeValue(writer, this);
+ writer.write("%n");
} catch (IOException e) {
LOG.error("Error when writing to file: " + file, e);
}
@@ -104,14 +121,9 @@
}
try {
- fwriter.close();
+ writer.close();
} catch (IOException e) {
- LOG.error("Error closing filewriter for file: " + file, e);
- }
- try {
- bwriter.close();
- } catch (IOException e) {
- LOG.error("Error closing bufferedwriter for file: " + file, e);
+ LOG.error("Error closing writer for file: " + file, e);
}
}
-}
\ No newline at end of file
+}
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
index 597b566..b34df64 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
@@ -56,7 +57,7 @@
private final long traceId;
private final long parents[];
private final long spanId;
- private Map<byte[], byte[]> traceInfo = null;
+ private Map<String, String> traceInfo = null;
private final String processId;
private List<TimelineAnnotation> timeline = null;
@@ -75,7 +76,7 @@
private long traceId;
private long parents[];
private long spanId;
- private Map<byte[], byte[]> traceInfo = null;
+ private Map<String, String> traceInfo = null;
private String processId;
private List<TimelineAnnotation> timeline = null;
@@ -121,7 +122,7 @@
return this;
}
- public Builder traceInfo(Map<byte[], byte[]> traceInfo) {
+ public Builder traceInfo(Map<String, String> traceInfo) {
this.traceInfo = traceInfo.isEmpty() ? null : traceInfo;
return this;
}
@@ -237,9 +238,19 @@
}
@Override
- public void addKVAnnotation(byte[] key, byte[] value) {
+ public void addKVAnnotation(byte[] key, byte[] value) {
+ // TODO: remove this method
+ try {
+ addKVAnnotation(new String(key, "UTF-8"), new String(value, "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void addKVAnnotation(String key, String value) {
if (traceInfo == null)
- traceInfo = new HashMap<byte[], byte[]>();
+ traceInfo = new HashMap<String, String>();
traceInfo.put(key, value);
}
@@ -252,7 +263,7 @@
}
@Override
- public Map<byte[], byte[]> getKVAnnotations() {
+ public Map<String, String> getKVAnnotations() {
if (traceInfo == null)
return Collections.emptyMap();
return Collections.unmodifiableMap(traceInfo);
@@ -310,12 +321,11 @@
builder.parents(parents);
JsonNode traceInfoNode = root.get("n");
if (traceInfoNode != null) {
- HashMap<byte[], byte[]> traceInfo = new HashMap<byte[], byte[]>();
+ HashMap<String, String> traceInfo = new HashMap<String, String>();
for (Iterator<String> iter = traceInfoNode.fieldNames();
iter.hasNext(); ) {
String field = iter.next();
- traceInfo.put(field.getBytes("UTF-8"),
- traceInfoNode.get(field).asText().getBytes("UTF-8"));
+ traceInfo.put(field, traceInfoNode.get(field).asText());
}
builder.traceInfo(traceInfo);
}
diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
index 677ec61..7ad0482 100644
--- a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
+++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
@@ -41,24 +41,14 @@
assertEquals(expected.getSpanId(), got.getSpanId());
assertEquals(expected.getProcessId(), got.getProcessId());
assertEquals(expected.getParentId(), got.getParentId());
- Map<byte[], byte[]> expectedT = expected.getKVAnnotations();
- Map<byte[], byte[]> gotT = got.getKVAnnotations();
+ Map<String, String> expectedT = expected.getKVAnnotations();
+ Map<String, String> gotT = got.getKVAnnotations();
if (expectedT == null) {
assertEquals(null, gotT);
} else {
assertEquals(expectedT.size(), gotT.size());
- Map<String, String> expectedTMap = new HashMap<String, String>();
- for (byte[] key : expectedT.keySet()) {
- expectedTMap.put(new String(key, "UTF-8"),
- new String(expectedT.get(key), "UTF-8"));
- }
- Map<String, String> gotTMap = new HashMap<String, String>();
- for (byte[] key : gotT.keySet()) {
- gotTMap.put(new String(key, "UTF-8"),
- new String(gotT.get(key), "UTF-8"));
- }
- for (String key : expectedTMap.keySet()) {
- assertEquals(expectedTMap.get(key), gotTMap.get(key));
+ for (String key : expectedT.keySet()) {
+ assertEquals(expectedT.get(key), gotT.get(key));
}
}
List<TimelineAnnotation> expectedTimeline =
@@ -137,9 +127,9 @@
processId("b2408.halxg.com:8080").
spanId(111111111L).
traceId(4443);
- Map<byte[], byte[]> traceInfo = new HashMap<byte[], byte[]>();
- traceInfo.put("abc".getBytes("UTF-8"), "123".getBytes("UTF-8"));
- traceInfo.put("def".getBytes("UTF-8"), "456".getBytes("UTF-8"));
+ Map<String, String> traceInfo = new HashMap<String, String>();
+ traceInfo.put("abc", "123");
+ traceInfo.put("def", "456");
builder.traceInfo(traceInfo);
List<TimelineAnnotation> timeline = new LinkedList<TimelineAnnotation>();
timeline.add(new TimelineAnnotation(310L, "something happened"));
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
index d3cffe2..52b344c 100644
--- a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
+++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
@@ -189,7 +189,7 @@
}
@Override
- public Map<byte[], byte[]> getKVAnnotations() {
+ public Map<String, String> getKVAnnotations() {
return Collections.emptyMap();
}
@@ -202,6 +202,9 @@
public void addKVAnnotation(byte[] key, byte[] value) {}
@Override
+ public void addKVAnnotation(String key, String value) {}
+
+ @Override
public void addTimelineAnnotation(String msg) {}
@Override
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/zipkin/HTraceToZipkinConverter.java b/htrace-zipkin/src/main/java/org/apache/htrace/zipkin/HTraceToZipkinConverter.java
index 09ab1ea..8bd6442 100644
--- a/htrace-zipkin/src/main/java/org/apache/htrace/zipkin/HTraceToZipkinConverter.java
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/zipkin/HTraceToZipkinConverter.java
@@ -23,8 +23,11 @@
import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.zipkinCoreConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.htrace.TimelineAnnotation;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -72,6 +75,7 @@
* <p/>
*/
public class HTraceToZipkinConverter {
+ private static final Log LOG = LogFactory.getLog(HTraceToZipkinConverter.class);
private final int ipv4Address;
private final short port;
@@ -145,11 +149,15 @@
private List<BinaryAnnotation> createZipkinBinaryAnnotations(org.apache.htrace.Span span,
Endpoint ep) {
List<BinaryAnnotation> l = new ArrayList<BinaryAnnotation>();
- for (Map.Entry<byte[], byte[]> e : span.getKVAnnotations().entrySet()) {
+ for (Map.Entry<String, String> e : span.getKVAnnotations().entrySet()) {
BinaryAnnotation binaryAnn = new BinaryAnnotation();
binaryAnn.setAnnotation_type(AnnotationType.BYTES);
- binaryAnn.setKey(new String(e.getKey()));
- binaryAnn.setValue(e.getValue());
+ binaryAnn.setKey(e.getKey());
+ try {
+ binaryAnn.setValue(e.getValue().getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException ex) {
+ LOG.error("Error encoding string as UTF-8", ex);
+ }
binaryAnn.setHost(ep);
l.add(binaryAnn);
}