HTRACE-144. Include IP address in span process description (cmccabe)
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
index 58486f1..c113a90 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
@@ -162,6 +162,7 @@
setResponseHeaders(w.Header())
dec := json.NewDecoder(req.Body)
spans := make([]*common.Span, 0, 32)
+ defaultPid := req.Header.Get("htrace-pid")
for {
var span common.Span
err := dec.Decode(&span)
@@ -173,9 +174,13 @@
}
break
}
+ if span.ProcessId == "" {
+ span.ProcessId = defaultPid
+ }
spans = append(spans, &span)
}
- hand.lg.Debugf("writeSpansHandler: received %d span(s).\n", len(spans))
+ hand.lg.Debugf("writeSpansHandler: received %d span(s). defaultPid = %s\n",
+ len(spans), defaultPid)
for spanIdx := range spans {
hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson())
hand.store.WriteSpan(spans[spanIdx])
diff --git a/htrace-core/src/main/java/org/apache/htrace/Span.java b/htrace-core/src/main/java/org/apache/htrace/Span.java
index 71ed872..71164d4 100644
--- a/htrace-core/src/main/java/org/apache/htrace/Span.java
+++ b/htrace-core/src/main/java/org/apache/htrace/Span.java
@@ -63,7 +63,9 @@
boolean isRunning();
/**
- * Return a textual description of this span
+ * Return a textual description of this span.<p/>
+ *
+ * Will never be null.
*/
String getDescription();
@@ -120,22 +122,32 @@
void addTimelineAnnotation(String msg);
/**
- * Get data associated with this span (read only)
+ * Get data associated with this span (read only)<p/>
+ *
+ * Will never be null.
*/
Map<String, String> getKVAnnotations();
/**
- * Get any timeline annotations (read only)
+ * Get any timeline annotations (read only)<p/>
+ *
+ * Will never be null.
*/
List<TimelineAnnotation> getTimelineAnnotations();
/**
- * Return a unique id for the node or process from which this Span originated.
- * IP address is a reasonable choice.
+ * Return a unique id for the process from which this Span originated.<p/>
+ *
+ * Will never be null.
*/
String getProcessId();
/**
+ * Set the process id of a span.
+ */
+ void setProcessId(String s);
+
+ /**
* Serialize to Json
*/
String toJson();
@@ -145,12 +157,25 @@
public void serialize(Span span, JsonGenerator jgen, SerializerProvider provider)
throws IOException {
jgen.writeStartObject();
- jgen.writeStringField("i", String.format("%016x", span.getTraceId()));
- jgen.writeStringField("s", String.format("%016x", span.getSpanId()));
- jgen.writeNumberField("b", span.getStartTimeMillis());
- jgen.writeNumberField("e", span.getStopTimeMillis());
- jgen.writeStringField("d", span.getDescription());
- jgen.writeStringField("r", span.getProcessId());
+ if (span.getTraceId() != 0) {
+ jgen.writeStringField("i", String.format("%016x", span.getTraceId()));
+ }
+ if (span.getSpanId() != 0) {
+ jgen.writeStringField("s", String.format("%016x", span.getSpanId()));
+ }
+ if (span.getStartTimeMillis() != 0) {
+ jgen.writeNumberField("b", span.getStartTimeMillis());
+ }
+ if (span.getStopTimeMillis() != 0) {
+ jgen.writeNumberField("e", span.getStopTimeMillis());
+ }
+ if (!span.getDescription().isEmpty()) {
+ jgen.writeStringField("d", span.getDescription());
+ }
+ String processId = span.getProcessId();
+ if (!processId.isEmpty()) {
+ jgen.writeStringField("r", processId);
+ }
jgen.writeArrayFieldStart("p");
for (long parent : span.getParents()) {
jgen.writeString(String.format("%016x", parent));
diff --git a/htrace-core/src/main/java/org/apache/htrace/Trace.java b/htrace-core/src/main/java/org/apache/htrace/Trace.java
index c7147ae..98d7563 100644
--- a/htrace-core/src/main/java/org/apache/htrace/Trace.java
+++ b/htrace-core/src/main/java/org/apache/htrace/Trace.java
@@ -84,7 +84,6 @@
traceId(tinfo.traceId).
spanId(Tracer.nonZeroRandom64()).
parents(new long[] { tinfo.spanId }).
- processId(Tracer.getProcessId()).
build();
return continueSpan(newSpan);
}
@@ -134,15 +133,6 @@
}
/**
- * Set the processId to be used for all Spans created by this Tracer.
- *
- * @see Span
- */
- public static void setProcessId(String processId) {
- Tracer.processId = processId;
- }
-
- /**
* Removes the given SpanReceiver from the list of SpanReceivers.
*/
public static void removeReceiver(SpanReceiver rcvr) {
diff --git a/htrace-core/src/main/java/org/apache/htrace/Tracer.java b/htrace-core/src/main/java/org/apache/htrace/Tracer.java
index af2d20e..b8c4c1a 100644
--- a/htrace-core/src/main/java/org/apache/htrace/Tracer.java
+++ b/htrace-core/src/main/java/org/apache/htrace/Tracer.java
@@ -49,7 +49,6 @@
};
public static final TraceInfo DONT_TRACE = new TraceInfo(-1, -1);
private static final long EMPTY_PARENT_ARRAY[] = new long[0];
- protected static String processId = null;
/**
* Log a client error, and throw an exception.
@@ -84,7 +83,6 @@
traceId(nonZeroRandom64()).
parents(EMPTY_PARENT_ARRAY).
spanId(nonZeroRandom64()).
- processId(getProcessId()).
build();
} else {
return parent.child(description);
@@ -121,7 +119,6 @@
return span;
}
-
public TraceScope continueSpan(Span s) {
Span oldCurrent = currentSpan();
setCurrentSpan(s);
@@ -131,18 +128,4 @@
protected int numReceivers() {
return receivers.size();
}
-
- static String getProcessId() {
- if (processId == null) {
- String cmdLine = System.getProperty("sun.java.command");
- if (cmdLine != null && !cmdLine.isEmpty()) {
- String fullClassName = cmdLine.split("\\s+")[0];
- String[] classParts = fullClassName.split("\\.");
- cmdLine = classParts[classParts.length - 1];
- }
-
- processId = (cmdLine == null || cmdLine.isEmpty()) ? "Unknown" : cmdLine;
- }
- return processId;
- }
}
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
index 07e4a81..95da72c 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
@@ -59,6 +59,7 @@
private final FileOutputStream stream;
private final FileChannel channel;
private final ReentrantLock channelLock = new ReentrantLock();
+ private final ProcessId processId;
public LocalFileSpanReceiver(HTraceConfiguration conf) {
int capacity = conf.getInt(CAPACITY_KEY, CAPACITY_DEFAULT);
@@ -93,6 +94,7 @@
LOG.debug("Created new LocalFileSpanReceiver with path = " + path +
", capacity = " + capacity);
}
+ this.processId = new ProcessId(conf);
}
/**
@@ -135,6 +137,10 @@
@Override
public void receiveSpan(Span span) {
+ if (span.getProcessId().isEmpty()) {
+ span.setProcessId(processId.get());
+ }
+
// Serialize the span data into a byte[]. Note that we're not holding the
// lock here, to improve concurrency.
byte jsonBuf[] = null;
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
index afd0202..c57eb25 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
@@ -49,6 +49,7 @@
public class MilliSpan implements Span {
private static ObjectWriter JSON_WRITER = new ObjectMapper().writer();
private static final long EMPTY_PARENT_ARRAY[] = new long[0];
+ private static final String EMPTY_STRING = "";
private long begin;
private long end;
@@ -57,7 +58,7 @@
private long parents[];
private final long spanId;
private Map<String, String> traceInfo = null;
- private final String processId;
+ private String processId;
private List<TimelineAnnotation> timeline = null;
private final static Random random = new Random();
@@ -88,12 +89,12 @@
public static class Builder {
private long begin;
private long end;
- private String description;
+ private String description = EMPTY_STRING;
private long traceId;
private long parents[] = EMPTY_PARENT_ARRAY;
private long spanId;
private Map<String, String> traceInfo = null;
- private String processId;
+ private String processId = EMPTY_STRING;
private List<TimelineAnnotation> timeline = null;
public Builder() {
@@ -158,6 +159,18 @@
}
}
+ public MilliSpan() {
+ this.begin = 0;
+ this.end = 0;
+ this.description = EMPTY_STRING;
+ this.traceId = 0;
+ this.parents = EMPTY_PARENT_ARRAY;
+ this.spanId = 0;
+ this.traceInfo = null;
+ this.processId = EMPTY_STRING;
+ this.timeline = null;
+ }
+
private MilliSpan(Builder builder) {
this.begin = builder.begin;
this.end = builder.end;
@@ -285,6 +298,11 @@
}
@Override
+ public void setProcessId(String processId) {
+ this.processId = processId;
+ }
+
+ @Override
public String toJson() {
StringWriter writer = new StringWriter();
try {
@@ -307,18 +325,38 @@
throws IOException, JsonProcessingException {
JsonNode root = jp.getCodec().readTree(jp);
Builder builder = new Builder();
- builder.begin(root.get("b").asLong());
- builder.end(root.get("e").asLong());
- builder.description(root.get("d").asText());
- builder.traceId(parseUnsignedHexLong(root.get("i").asText()));
- builder.spanId(parseUnsignedHexLong(root.get("s").asText()));
- builder.processId(root.get("r").asText());
+ JsonNode bNode = root.get("b");
+ if (bNode != null) {
+ builder.begin(bNode.asLong());
+ }
+ JsonNode eNode = root.get("e");
+ if (eNode != null) {
+ builder.end(eNode.asLong());
+ }
+ JsonNode dNode = root.get("d");
+ if (dNode != null) {
+ builder.description(dNode.asText());
+ }
+ JsonNode iNode = root.get("i");
+ if (iNode != null) {
+ builder.traceId(parseUnsignedHexLong(iNode.asText()));
+ }
+ JsonNode sNode = root.get("s");
+ if (sNode != null) {
+ builder.spanId(parseUnsignedHexLong(sNode.asText()));
+ }
+ JsonNode rNode = root.get("r");
+ if (rNode != null) {
+ builder.processId(rNode.asText());
+ }
JsonNode parentsNode = root.get("p");
LinkedList<Long> parents = new LinkedList<Long>();
- for (Iterator<JsonNode> iter = parentsNode.elements();
- iter.hasNext(); ) {
- JsonNode parentIdNode = iter.next();
- parents.add(parseUnsignedHexLong(parentIdNode.asText()));
+ if (parentsNode != null) {
+ for (Iterator<JsonNode> iter = parentsNode.elements();
+ iter.hasNext(); ) {
+ JsonNode parentIdNode = iter.next();
+ parents.add(parseUnsignedHexLong(parentIdNode.asText()));
+ }
}
builder.parents(parents);
JsonNode traceInfoNode = root.get("n");
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java b/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java
new file mode 100644
index 0000000..ad2e5fc
--- /dev/null
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.HTraceConfiguration;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.TreeSet;
+
+/**
+ * The HTrace process ID.<p/>
+ *
+ * HTrace process IDs are created from format strings.
+ * Format strings contain variables which the ProcessId class will
+ * replace with the correct values at runtime.<p/>
+ *
+ * <ul>
+ * <li>${ip}: will be replaced with an ip address.</li>
+ * <li>${pname}: will be replaced the process name obtained from java.</li>
+ * </ul><p/>
+ *
+ * For example, the string "${pname}/${ip}" will be replaced with something
+ * like: DataNode/192.168.0.1, assuming that the process' name is DataNode
+ * and its IP address is 192.168.0.1.<p/>
+ *
+ * Process ID strings can contain backslashes as escapes.
+ * For example, "\a" will map to "a". "\${ip}" will map to the literal
+ * string "${ip}", not the IP address. A backslash itself can be escaped by a
+ * preceding backslash.
+ */
+public final class ProcessId {
+ public static final Log LOG = LogFactory.getLog(ProcessId.class);
+
+ /**
+ * The configuration key to use for process id
+ */
+ static final String PROCESS_ID_KEY = "process.id";
+
+ /**
+ * The default process ID to use if no other ID is configured.
+ */
+ private static final String DEFAULT_PROCESS_ID = "${pname}/${ip}";
+
+ private final String processId;
+
+ ProcessId(String fmt) {
+ StringBuilder bld = new StringBuilder();
+ StringBuilder varBld = null;
+ boolean escaping = false;
+ int varSeen = 0;
+ for (int i = 0, len = fmt.length() ; i < len; i++) {
+ char c = fmt.charAt(i);
+ if (c == '\\') {
+ if (!escaping) {
+ escaping = true;
+ continue;
+ }
+ }
+ switch (varSeen) {
+ case 0:
+ if (c == '$') {
+ if (!escaping) {
+ varSeen = 1;
+ continue;
+ }
+ }
+ escaping = false;
+ varSeen = 0;
+ bld.append(c);
+ break;
+ case 1:
+ if (c == '{') {
+ if (!escaping) {
+ varSeen = 2;
+ varBld = new StringBuilder();
+ continue;
+ }
+ }
+ escaping = false;
+ varSeen = 0;
+ bld.append("$").append(c);
+ break;
+ default:
+ if (c == '}') {
+ if (!escaping) {
+ String var = varBld.toString();
+ bld.append(processShellVar(var));
+ varBld = null;
+ varSeen = 0;
+ continue;
+ }
+ }
+ escaping = false;
+ varBld.append(c);
+ varSeen++;
+ break;
+ }
+ }
+ if (varSeen > 0) {
+ LOG.warn("Unterminated process ID substitution variable at the end " +
+ "of format string " + fmt);
+ }
+ this.processId = bld.toString();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ProcessID(fmt=" + fmt + "): computed process ID of \"" +
+ this.processId + "\"");
+ }
+ }
+
+ public ProcessId(HTraceConfiguration conf) {
+ this(conf.get(PROCESS_ID_KEY, DEFAULT_PROCESS_ID));
+ }
+
+ private String processShellVar(String var) {
+ if (var.equals("pname")) {
+ return getProcessName();
+ } else if (var.equals("ip")) {
+ return getBestIpString();
+ } else if (var.equals("pid")) {
+ return Long.valueOf(getOsPid()).toString();
+ } else {
+ LOG.warn("unknown ProcessID variable " + var);
+ return "";
+ }
+ }
+
+ static String getProcessName() {
+ String cmdLine = System.getProperty("sun.java.command");
+ if (cmdLine != null && !cmdLine.isEmpty()) {
+ String fullClassName = cmdLine.split("\\s+")[0];
+ String[] classParts = fullClassName.split("\\.");
+ cmdLine = classParts[classParts.length - 1];
+ }
+ return (cmdLine == null || cmdLine.isEmpty()) ? "Unknown" : cmdLine;
+ }
+
+ /**
+ * Get the best IP address that represents this node.<p/>
+ *
+ * This is complicated since nodes can have multiple network interfaces,
+ * and each network interface can have multiple IP addresses. What we're
+ * looking for here is an IP address that will serve to identify this node
+ * to HTrace. So we prefer site-local addresess (i.e. private ones on the
+ * LAN) to publicly routable interfaces. If there are multiple addresses
+ * to choose from, we select the one which comes first in textual sort
+ * order. This should ensure that we at least consistently call each node
+ * by a single name.
+ */
+ static String getBestIpString() {
+ Enumeration<NetworkInterface> ifaces;
+ try {
+ ifaces = NetworkInterface.getNetworkInterfaces();
+ } catch (SocketException e) {
+ LOG.error("Error getting network interfaces", e);
+ return "127.0.0.1";
+ }
+ TreeSet<String> siteLocalCandidates = new TreeSet<String>();
+ TreeSet<String> candidates = new TreeSet<String>();
+ while (ifaces.hasMoreElements()) {
+ NetworkInterface iface = ifaces.nextElement();
+ for (Enumeration<InetAddress> addrs =
+ iface.getInetAddresses(); addrs.hasMoreElements();) {
+ InetAddress addr = addrs.nextElement();
+ if (!addr.isLoopbackAddress()) {
+ if (addr.isSiteLocalAddress()) {
+ siteLocalCandidates.add(addr.getHostAddress());
+ } else {
+ candidates.add(addr.getHostAddress());
+ }
+ }
+ }
+ }
+ if (!siteLocalCandidates.isEmpty()) {
+ return siteLocalCandidates.first();
+ }
+ if (!candidates.isEmpty()) {
+ return candidates.first();
+ }
+ return "127.0.0.1";
+ }
+
+ /**
+ * Get the process id from the operating system.<p/>
+ *
+ * Unfortunately, there is no simple method to get the process id in Java.
+ * The approach we take here is to use the shell method (see
+ * {ProcessId#getOsPidFromShellPpid}) unless we are on Windows, where the
+ * shell is not available. On Windows, we use
+ * {ProcessId#getOsPidFromManagementFactory}, which depends on some
+ * undocumented features of the JVM, but which doesn't require a shell.
+ */
+ static long getOsPid() {
+ if ((System.getProperty("os.name", "generic").toLowerCase(Locale.ENGLISH)).
+ contains("windows")) {
+ return getOsPidFromManagementFactory();
+ } else {
+ return getOsPidFromShellPpid();
+ }
+ }
+
+ /**
+ * Get the process ID by executing a shell and printing the PPID (parent
+ * process ID).<p/>
+ *
+ * This method of getting the process ID doesn't depend on any undocumented
+ * features of the virtual machine, and should work on almost any UNIX
+ * operating system.
+ */
+ private static long getOsPidFromShellPpid() {
+ Process p = null;
+ StringBuilder sb = new StringBuilder();
+ try {
+ p = new ProcessBuilder("/usr/bin/env", "sh", "-c", "echo $PPID").
+ redirectErrorStream(true).start();
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(p.getInputStream()));
+ String line = "";
+ while ((line = reader.readLine()) != null) {
+ sb.append(line.trim());
+ }
+ int exitVal = p.waitFor();
+ if (exitVal != 0) {
+ throw new IOException("Process exited with error code " +
+ Integer.valueOf(exitVal).toString());
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while getting operating system pid from " +
+ "the shell.", e);
+ return 0L;
+ } catch (IOException e) {
+ LOG.error("Error getting operating system pid from the shell.", e);
+ return 0L;
+ } finally {
+ if (p != null) {
+ p.destroy();
+ }
+ }
+ try {
+ return Long.parseLong(sb.toString());
+ } catch (NumberFormatException e) {
+ LOG.error("Error parsing operating system pid from the shell.", e);
+ return 0L;
+ }
+ }
+
+ /**
+ * Get the process ID by looking at the name of the managed bean for the
+ * runtime system of the Java virtual machine.<p/>
+ *
+ * Although this is undocumented, in the Oracle JVM this name is of the form
+ * [OS_PROCESS_ID]@[HOSTNAME].
+ */
+ private static long getOsPidFromManagementFactory() {
+ try {
+ return Long.parseLong(ManagementFactory.getRuntimeMXBean().
+ getName().split("@")[0]);
+ } catch (NumberFormatException e) {
+ LOG.error("Failed to get the operating system process ID from the name " +
+ "of the managed bean for the JVM.", e);
+ return 0L;
+ }
+ }
+
+ public String get() {
+ return processId;
+ }
+}
diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java
index 60b5430..634bef8 100644
--- a/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java
+++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java
@@ -57,6 +57,7 @@
confMap.put(LocalFileSpanReceiver.PATH_KEY, traceFileName);
confMap.put(SpanReceiverBuilder.SPAN_RECEIVER_CONF_KEY,
LocalFileSpanReceiver.class.getName());
+ confMap.put(ProcessId.PROCESS_ID_KEY, "testPid");
SpanReceiver rcvr =
new SpanReceiverBuilder(HTraceConfiguration.fromMap(confMap))
.logErrors(false).build();
@@ -69,5 +70,6 @@
ObjectMapper mapper = new ObjectMapper();
MilliSpan span = mapper.readValue(new File(traceFileName), MilliSpan.class);
assertEquals("testWriteToLocalFile", span.getDescription());
+ assertEquals("testPid", span.getProcessId());
}
}
diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
index 857e9ac..41ee108 100644
--- a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
+++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
@@ -144,4 +144,13 @@
MilliSpan dspan = mapper.readValue(json, MilliSpan.class);
compareSpans(span, dspan);
}
+
+ @Test
+ public void testJsonSerializationWithFieldsNotSet() throws Exception {
+ MilliSpan span = new MilliSpan.Builder().build();
+ String json = span.toJson();
+ ObjectMapper mapper = new ObjectMapper();
+ MilliSpan dspan = mapper.readValue(json, MilliSpan.class);
+ compareSpans(span, dspan);
+ }
}
diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java
new file mode 100644
index 0000000..9e5f6b9
--- /dev/null
+++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class TestProcessId {
+ private void testProcessIdImpl(String expected, String fmt) {
+ assertEquals(expected, new ProcessId(fmt).get());
+ }
+
+ @Test
+ public void testSimpleProcessIds() {
+ testProcessIdImpl("abc", "abc");
+ testProcessIdImpl("abc", "a\\bc");
+ testProcessIdImpl("abc", "ab\\c");
+ testProcessIdImpl("abc", "\\a\\b\\c");
+ testProcessIdImpl("a\\bc", "a\\\\bc");
+ }
+
+ @Test
+ public void testSubstitutionVariables() throws IOException {
+ testProcessIdImpl(ProcessId.getProcessName(), "${pname}");
+ testProcessIdImpl("my." + ProcessId.getProcessName(), "my.${pname}");
+ testProcessIdImpl(ProcessId.getBestIpString() + ".str", "${ip}.str");
+ testProcessIdImpl("${pname}", "\\${pname}");
+ testProcessIdImpl("$cash$money{}", "$cash$money{}");
+ testProcessIdImpl("Foo." + Long.valueOf(ProcessId.getOsPid()).toString(),
+ "Foo.${pid}");
+ }
+}
diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
index 54b8a14..baa4fa1 100644
--- a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
+++ b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
@@ -89,10 +89,12 @@
private int maxSpanBatchSize;
private String flumeHostName;
private int flumePort;
+ private final ProcessId processId;
public FlumeSpanReceiver(HTraceConfiguration conf) {
this.queue = new ArrayBlockingQueue<Span>(1000);
this.tf = new SimpleThreadFactory();
+ this.processId = new ProcessId(conf);
configure(conf);
}
@@ -272,6 +274,9 @@
public void receiveSpan(Span span) {
if (running.get()) {
try {
+ if (span.getProcessId().isEmpty()) {
+ span.setProcessId(processId.get());
+ }
this.queue.add(span);
} catch (IllegalStateException e) {
LOG.error("Error trying to append span (" +
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
index 7a99366..2faf4bb 100644
--- a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
+++ b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
@@ -126,6 +126,7 @@
private final byte[] cf;
private final byte[] icf;
private final int maxSpanBatchSize;
+ private final ProcessId processId;
public HBaseSpanReceiver(HTraceConfiguration conf) {
this.queue = new ArrayBlockingQueue<Span>(1000);
@@ -153,6 +154,7 @@
for (int i = 0; i < numThreads; i++) {
this.service.submit(new WriteSpanRunnable());
}
+ this.processId = new ProcessId(conf);
}
private class WriteSpanRunnable implements Runnable {
@@ -331,6 +333,9 @@
public void receiveSpan(Span span) {
if (running.get()) {
try {
+ if (span.getProcessId().isEmpty()) {
+ span.setProcessId(processId.get());
+ }
this.queue.add(span);
} catch (IllegalStateException e) {
// todo: supress repeating error logs.
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
index 7bf7bac..bf93220 100644
--- a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
+++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
@@ -181,6 +181,11 @@
}
@Override
+ public void setProcessId(String processId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public String getDescription() {
return span.getDescription();
}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
index 7edc2b8..5a4daaf 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
@@ -170,6 +170,11 @@
private boolean mustStartFlush;
/**
+ * The process ID to use for all spans.
+ */
+ private final ProcessId processId;
+
+ /**
* Create an HttpClient instance.
*
* @param connTimeout The timeout to use for connecting.
@@ -221,6 +226,7 @@
capacity + ", url=" + url + ", periodInMs=" + periodInMs +
", maxToSendAtATime=" + maxToSendAtATime);
}
+ processId = new ProcessId(conf);
}
/**
@@ -316,6 +322,7 @@
try {
Request request = httpClient.newRequest(url).method(HttpMethod.POST);
request.header(HttpHeader.CONTENT_TYPE, "application/json");
+ request.header("htrace-pid", processId.get());
StringBuilder bld = new StringBuilder();
for (Span span : spanBuf) {
bld.append(span.toJson());
@@ -412,7 +419,8 @@
lock.unlock();
}
if (!added) {
- long now = System.nanoTime() / 1000000L;
+ long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+ TimeUnit.NANOSECONDS);
long last = lastAtCapacityWarningLog.get();
if (now - last > WARN_TIMEOUT_MS) {
// Only log every 5 minutes. Any more than this for a guest process
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
index eca6d6d..9a01005 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
@@ -24,6 +24,7 @@
import java.io.File;
import java.net.URL;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.HTraceConfiguration;
@@ -65,6 +66,7 @@
*/
private final class TestHTraceConfiguration extends HTraceConfiguration {
private final URL restServerUrl;
+ final static String PROCESS_ID = "TestHTracedRESTReceiver";
public TestHTraceConfiguration(final URL restServerUrl) {
this.restServerUrl = restServerUrl;
@@ -79,6 +81,8 @@
public String get(String key, String defaultValue) {
if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) {
return this.restServerUrl.toString();
+ } else if (key.equals(ProcessId.PROCESS_ID_KEY)) {
+ return PROCESS_ID;
}
return defaultValue;
}
@@ -120,14 +124,21 @@
final int NUM_SPANS = 3;
final HttpClient http = receiver.createHttpClient(60000, 60000);
http.start();
+ Span spans[] = new Span[NUM_SPANS];
+ for (int i = 0; i < NUM_SPANS; i++) {
+ MilliSpan.Builder builder = new MilliSpan.Builder().
+ parents(new long[]{1L}).
+ spanId(i);
+ if (i == NUM_SPANS - 1) {
+ builder.processId("specialPid");
+ }
+ spans[i] = builder.build();
+ }
try {
for (int i = 0; i < NUM_SPANS; i++) {
- Span span = new MilliSpan.Builder().parents(
- new long [] {1L}).spanId(i).build();
- LOG.info(span.toString());
- receiver.receiveSpan(span);
+ LOG.info("receiving " + spans[i].toString());
+ receiver.receiveSpan(spans[i]);
}
-
if (testClose) {
receiver.close();
} else {
@@ -149,6 +160,18 @@
return false;
}
LOG.info("Got " + content + " for span " + i);
+ ObjectMapper mapper = new ObjectMapper();
+ MilliSpan dspan = mapper.readValue(content, MilliSpan.class);
+ assertEquals((long)i, dspan.getSpanId());
+ // Every span should have the process ID we set in the
+ // configuration... except for the last span, which had
+ // a custom value set.
+ if (i == NUM_SPANS - 1) {
+ assertEquals("specialPid", dspan.getProcessId());
+ } else {
+ assertEquals(TestHTraceConfiguration.PROCESS_ID,
+ dspan.getProcessId());
+ }
}
return true;
} catch (Throwable t) {
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
index 06ff0ad..d75c504 100644
--- a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
@@ -132,6 +132,8 @@
}
};
+ private final ProcessId processId;
+
////////////////////
/// Variables that will change on each call to configure()
///////////////////
@@ -144,6 +146,7 @@
public ZipkinSpanReceiver(HTraceConfiguration conf) {
this.queue = new ArrayBlockingQueue<Span>(1000);
this.protocolFactory = new TBinaryProtocol.Factory();
+ this.processId = new ProcessId(conf);
configure(conf);
}
@@ -360,6 +363,9 @@
public void receiveSpan(Span span) {
if (running.get()) {
try {
+ if (span.getProcessId().isEmpty()) {
+ span.setProcessId(processId.get());
+ }
this.queue.add(span);
} catch (IllegalStateException e) {
LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue."