HTRACE-144. Include IP address in span process description (cmccabe)
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
index 58486f1..c113a90 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
@@ -162,6 +162,7 @@
 	setResponseHeaders(w.Header())
 	dec := json.NewDecoder(req.Body)
 	spans := make([]*common.Span, 0, 32)
+	defaultPid := req.Header.Get("htrace-pid")
 	for {
 		var span common.Span
 		err := dec.Decode(&span)
@@ -173,9 +174,13 @@
 			}
 			break
 		}
+		if span.ProcessId == "" {
+			span.ProcessId = defaultPid
+		}
 		spans = append(spans, &span)
 	}
-	hand.lg.Debugf("writeSpansHandler: received %d span(s).\n", len(spans))
+	hand.lg.Debugf("writeSpansHandler: received %d span(s).  defaultPid = %s\n",
+		len(spans), defaultPid)
 	for spanIdx := range spans {
 		hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson())
 		hand.store.WriteSpan(spans[spanIdx])
diff --git a/htrace-core/src/main/java/org/apache/htrace/Span.java b/htrace-core/src/main/java/org/apache/htrace/Span.java
index 71ed872..71164d4 100644
--- a/htrace-core/src/main/java/org/apache/htrace/Span.java
+++ b/htrace-core/src/main/java/org/apache/htrace/Span.java
@@ -63,7 +63,9 @@
   boolean isRunning();
 
   /**
-   * Return a textual description of this span
+   * Return a textual description of this span.<p/>
+   *
+   * Will never be null.
    */
   String getDescription();
 
@@ -120,22 +122,32 @@
   void addTimelineAnnotation(String msg);
 
   /**
-   * Get data associated with this span (read only)
+   * Get data associated with this span (read only)<p/>
+   *
+   * Will never be null.
    */
   Map<String, String> getKVAnnotations();
 
   /**
-   * Get any timeline annotations (read only)
+   * Get any timeline annotations (read only)<p/>
+   *
+   * Will never be null.
    */
   List<TimelineAnnotation> getTimelineAnnotations();
 
   /**
-   * Return a unique id for the node or process from which this Span originated.
-   * IP address is a reasonable choice.
+   * Return a unique id for the process from which this Span originated.<p/>
+   *
+   * Will never be null.
    */
   String getProcessId();
 
   /**
+   * Set the process id of a span.
+   */
+  void setProcessId(String s);
+
+  /**
    * Serialize to Json
    */
   String toJson();
@@ -145,12 +157,25 @@
     public void serialize(Span span, JsonGenerator jgen, SerializerProvider provider)
         throws IOException {
       jgen.writeStartObject();
-      jgen.writeStringField("i", String.format("%016x", span.getTraceId()));
-      jgen.writeStringField("s", String.format("%016x", span.getSpanId()));
-      jgen.writeNumberField("b", span.getStartTimeMillis());
-      jgen.writeNumberField("e", span.getStopTimeMillis());
-      jgen.writeStringField("d", span.getDescription());
-      jgen.writeStringField("r", span.getProcessId());
+      if (span.getTraceId() != 0) {
+        jgen.writeStringField("i", String.format("%016x", span.getTraceId()));
+      }
+      if (span.getSpanId() != 0) {
+        jgen.writeStringField("s", String.format("%016x", span.getSpanId()));
+      }
+      if (span.getStartTimeMillis() != 0) {
+        jgen.writeNumberField("b", span.getStartTimeMillis());
+      }
+      if (span.getStopTimeMillis() != 0) {
+        jgen.writeNumberField("e", span.getStopTimeMillis());
+      }
+      if (!span.getDescription().isEmpty()) {
+        jgen.writeStringField("d", span.getDescription());
+      }
+      String processId = span.getProcessId();
+      if (!processId.isEmpty()) {
+        jgen.writeStringField("r", processId);
+      }
       jgen.writeArrayFieldStart("p");
       for (long parent : span.getParents()) {
         jgen.writeString(String.format("%016x", parent));
diff --git a/htrace-core/src/main/java/org/apache/htrace/Trace.java b/htrace-core/src/main/java/org/apache/htrace/Trace.java
index c7147ae..98d7563 100644
--- a/htrace-core/src/main/java/org/apache/htrace/Trace.java
+++ b/htrace-core/src/main/java/org/apache/htrace/Trace.java
@@ -84,7 +84,6 @@
         traceId(tinfo.traceId).
         spanId(Tracer.nonZeroRandom64()).
         parents(new long[] { tinfo.spanId }).
-        processId(Tracer.getProcessId()).
         build();
     return continueSpan(newSpan);
   }
@@ -134,15 +133,6 @@
   }
 
   /**
-   * Set the processId to be used for all Spans created by this Tracer.
-   *
-   * @see Span
-   */
-  public static void setProcessId(String processId) {
-    Tracer.processId = processId;
-  }
-
-  /**
    * Removes the given SpanReceiver from the list of SpanReceivers.
    */
   public static void removeReceiver(SpanReceiver rcvr) {
diff --git a/htrace-core/src/main/java/org/apache/htrace/Tracer.java b/htrace-core/src/main/java/org/apache/htrace/Tracer.java
index af2d20e..b8c4c1a 100644
--- a/htrace-core/src/main/java/org/apache/htrace/Tracer.java
+++ b/htrace-core/src/main/java/org/apache/htrace/Tracer.java
@@ -49,7 +49,6 @@
   };
   public static final TraceInfo DONT_TRACE = new TraceInfo(-1, -1);
   private static final long EMPTY_PARENT_ARRAY[] = new long[0];
-  protected static String processId = null;
 
   /**
    * Log a client error, and throw an exception.
@@ -84,7 +83,6 @@
           traceId(nonZeroRandom64()).
           parents(EMPTY_PARENT_ARRAY).
           spanId(nonZeroRandom64()).
-          processId(getProcessId()).
           build();
     } else {
       return parent.child(description);
@@ -121,7 +119,6 @@
     return span;
   }
 
-
   public TraceScope continueSpan(Span s) {
     Span oldCurrent = currentSpan();
     setCurrentSpan(s);
@@ -131,18 +128,4 @@
   protected int numReceivers() {
     return receivers.size();
   }
-
-  static String getProcessId() {
-    if (processId == null) {
-      String cmdLine = System.getProperty("sun.java.command");
-      if (cmdLine != null && !cmdLine.isEmpty()) {
-        String fullClassName = cmdLine.split("\\s+")[0];
-        String[] classParts = fullClassName.split("\\.");
-        cmdLine = classParts[classParts.length - 1];
-      }
-
-      processId = (cmdLine == null || cmdLine.isEmpty()) ? "Unknown" : cmdLine;
-    }
-    return processId;
-  }
 }
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
index 07e4a81..95da72c 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
@@ -59,6 +59,7 @@
   private final FileOutputStream stream;
   private final FileChannel channel;
   private final ReentrantLock channelLock = new ReentrantLock();
+  private final ProcessId processId;
 
   public LocalFileSpanReceiver(HTraceConfiguration conf) {
     int capacity = conf.getInt(CAPACITY_KEY, CAPACITY_DEFAULT);
@@ -93,6 +94,7 @@
       LOG.debug("Created new LocalFileSpanReceiver with path = " + path +
                 ", capacity = " + capacity);
     }
+    this.processId = new ProcessId(conf);
   }
 
   /**
@@ -135,6 +137,10 @@
 
   @Override
   public void receiveSpan(Span span) {
+    if (span.getProcessId().isEmpty()) {
+      span.setProcessId(processId.get());
+    }
+
     // Serialize the span data into a byte[].  Note that we're not holding the
     // lock here, to improve concurrency.
     byte jsonBuf[] = null;
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
index afd0202..c57eb25 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
@@ -49,6 +49,7 @@
 public class MilliSpan implements Span {
   private static ObjectWriter JSON_WRITER = new ObjectMapper().writer();
   private static final long EMPTY_PARENT_ARRAY[] = new long[0];
+  private static final String EMPTY_STRING = "";
 
   private long begin;
   private long end;
@@ -57,7 +58,7 @@
   private long parents[];
   private final long spanId;
   private Map<String, String> traceInfo = null;
-  private final String processId;
+  private String processId;
   private List<TimelineAnnotation> timeline = null;
   private final static Random random = new Random();
 
@@ -88,12 +89,12 @@
   public static class Builder {
     private long begin;
     private long end;
-    private String description;
+    private String description = EMPTY_STRING;
     private long traceId;
     private long parents[] = EMPTY_PARENT_ARRAY;
     private long spanId;
     private Map<String, String> traceInfo = null;
-    private String processId;
+    private String processId = EMPTY_STRING;
     private List<TimelineAnnotation> timeline = null;
 
     public Builder() {
@@ -158,6 +159,18 @@
     }
   }
 
+  public MilliSpan() {
+    this.begin = 0;
+    this.end = 0;
+    this.description = EMPTY_STRING;
+    this.traceId = 0;
+    this.parents = EMPTY_PARENT_ARRAY;
+    this.spanId = 0;
+    this.traceInfo = null;
+    this.processId = EMPTY_STRING;
+    this.timeline = null;
+  }
+
   private MilliSpan(Builder builder) {
     this.begin = builder.begin;
     this.end = builder.end;
@@ -285,6 +298,11 @@
   }
 
   @Override
+  public void setProcessId(String processId) {
+    this.processId = processId;
+  }
+
+  @Override
   public String toJson() {
     StringWriter writer = new StringWriter();
     try {
@@ -307,18 +325,38 @@
           throws IOException, JsonProcessingException {
       JsonNode root = jp.getCodec().readTree(jp);
       Builder builder = new Builder();
-      builder.begin(root.get("b").asLong());
-      builder.end(root.get("e").asLong());
-      builder.description(root.get("d").asText());
-      builder.traceId(parseUnsignedHexLong(root.get("i").asText()));
-      builder.spanId(parseUnsignedHexLong(root.get("s").asText()));
-      builder.processId(root.get("r").asText());
+      JsonNode bNode = root.get("b");
+      if (bNode != null) {
+        builder.begin(bNode.asLong());
+      }
+      JsonNode eNode = root.get("e");
+      if (eNode != null) {
+        builder.end(eNode.asLong());
+      }
+      JsonNode dNode = root.get("d");
+      if (dNode != null) {
+        builder.description(dNode.asText());
+      }
+      JsonNode iNode = root.get("i");
+      if (iNode != null) {
+        builder.traceId(parseUnsignedHexLong(iNode.asText()));
+      }
+      JsonNode sNode = root.get("s");
+      if (sNode != null) {
+        builder.spanId(parseUnsignedHexLong(sNode.asText()));
+      }
+      JsonNode rNode = root.get("r");
+      if (rNode != null) {
+        builder.processId(rNode.asText());
+      }
       JsonNode parentsNode = root.get("p");
       LinkedList<Long> parents = new LinkedList<Long>();
-      for (Iterator<JsonNode> iter = parentsNode.elements();
-           iter.hasNext(); ) {
-        JsonNode parentIdNode = iter.next();
-        parents.add(parseUnsignedHexLong(parentIdNode.asText()));
+      if (parentsNode != null) {
+        for (Iterator<JsonNode> iter = parentsNode.elements();
+             iter.hasNext(); ) {
+          JsonNode parentIdNode = iter.next();
+          parents.add(parseUnsignedHexLong(parentIdNode.asText()));
+        }
       }
       builder.parents(parents);
       JsonNode traceInfoNode = root.get("n");
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java b/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java
new file mode 100644
index 0000000..ad2e5fc
--- /dev/null
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.HTraceConfiguration;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.TreeSet;
+
+/**
+ * The HTrace process ID.<p/>
+ *
+ * HTrace process IDs are created from format strings.
+ * Format strings contain variables which the ProcessId class will
+ * replace with the correct values at runtime.<p/>
+ *
+ * <ul>
+ * <li>${ip}: will be replaced with an ip address.</li>
+ * <li>${pname}: will be replaced the process name obtained from java.</li>
+ * </ul><p/>
+ *
+ * For example, the string "${pname}/${ip}" will be replaced with something
+ * like: DataNode/192.168.0.1, assuming that the process' name is DataNode
+ * and its IP address is 192.168.0.1.<p/>
+ *
+ * Process ID strings can contain backslashes as escapes.
+ * For example, "\a" will map to "a".  "\${ip}" will map to the literal
+ * string "${ip}", not the IP address.  A backslash itself can be escaped by a
+ * preceding backslash.
+ */
+public final class ProcessId {
+  public static final Log LOG = LogFactory.getLog(ProcessId.class);
+
+  /**
+   * The configuration key to use for process id
+   */
+  static final String PROCESS_ID_KEY = "process.id";
+
+  /**
+   * The default process ID to use if no other ID is configured.
+   */
+  private static final String DEFAULT_PROCESS_ID = "${pname}/${ip}";
+
+  private final String processId;
+
+  ProcessId(String fmt) {
+    StringBuilder bld = new StringBuilder();
+    StringBuilder varBld = null;
+    boolean escaping = false;
+    int varSeen = 0;
+    for (int i = 0, len = fmt.length() ; i < len; i++) {
+      char c = fmt.charAt(i);
+      if (c == '\\') {
+        if (!escaping) {
+          escaping = true;
+          continue;
+        }
+      }
+      switch (varSeen) {
+        case 0:
+          if (c == '$') {
+            if (!escaping) {
+              varSeen = 1;
+              continue;
+            }
+          }
+          escaping = false;
+          varSeen = 0;
+          bld.append(c);
+          break;
+        case 1:
+          if (c == '{') {
+            if (!escaping) {
+              varSeen = 2;
+              varBld = new StringBuilder();
+              continue;
+            }
+          }
+          escaping = false;
+          varSeen = 0;
+          bld.append("$").append(c);
+          break;
+        default:
+          if (c == '}') {
+            if (!escaping) {
+              String var = varBld.toString();
+              bld.append(processShellVar(var));
+              varBld = null;
+              varSeen = 0;
+              continue;
+            }
+          }
+          escaping = false;
+          varBld.append(c);
+          varSeen++;
+          break;
+      }
+    }
+    if (varSeen > 0) {
+      LOG.warn("Unterminated process ID substitution variable at the end " +
+          "of format string " + fmt);
+    }
+    this.processId = bld.toString();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("ProcessID(fmt=" + fmt + "): computed process ID of \"" +
+          this.processId + "\"");
+    }
+  }
+
+  public ProcessId(HTraceConfiguration conf) {
+    this(conf.get(PROCESS_ID_KEY, DEFAULT_PROCESS_ID));
+  }
+
+  private String processShellVar(String var) {
+    if (var.equals("pname")) {
+      return getProcessName();
+    } else if (var.equals("ip")) {
+      return getBestIpString();
+    } else if (var.equals("pid")) {
+      return Long.valueOf(getOsPid()).toString();
+    } else {
+      LOG.warn("unknown ProcessID variable " + var);
+      return "";
+    }
+  }
+
+  static String getProcessName() {
+    String cmdLine = System.getProperty("sun.java.command");
+    if (cmdLine != null && !cmdLine.isEmpty()) {
+      String fullClassName = cmdLine.split("\\s+")[0];
+      String[] classParts = fullClassName.split("\\.");
+      cmdLine = classParts[classParts.length - 1];
+    }
+    return (cmdLine == null || cmdLine.isEmpty()) ? "Unknown" : cmdLine;
+  }
+
+  /**
+   * Get the best IP address that represents this node.<p/>
+   *
+   * This is complicated since nodes can have multiple network interfaces,
+   * and each network interface can have multiple IP addresses.  What we're
+   * looking for here is an IP address that will serve to identify this node
+   * to HTrace.  So we prefer site-local addresess (i.e. private ones on the
+   * LAN) to publicly routable interfaces.  If there are multiple addresses
+   * to choose from, we select the one which comes first in textual sort
+   * order.  This should ensure that we at least consistently call each node
+   * by a single name.
+   */
+  static String getBestIpString() {
+    Enumeration<NetworkInterface> ifaces;
+    try {
+      ifaces = NetworkInterface.getNetworkInterfaces();
+    } catch (SocketException e) {
+      LOG.error("Error getting network interfaces", e);
+      return "127.0.0.1";
+    }
+    TreeSet<String> siteLocalCandidates = new TreeSet<String>();
+    TreeSet<String> candidates = new TreeSet<String>();
+    while (ifaces.hasMoreElements()) {
+      NetworkInterface iface = ifaces.nextElement();
+      for (Enumeration<InetAddress> addrs =
+               iface.getInetAddresses(); addrs.hasMoreElements();) {
+        InetAddress addr = addrs.nextElement();
+        if (!addr.isLoopbackAddress()) {
+          if (addr.isSiteLocalAddress()) {
+            siteLocalCandidates.add(addr.getHostAddress());
+          } else {
+            candidates.add(addr.getHostAddress());
+          }
+        }
+      }
+    }
+    if (!siteLocalCandidates.isEmpty()) {
+      return siteLocalCandidates.first();
+    }
+    if (!candidates.isEmpty()) {
+      return candidates.first();
+    }
+    return "127.0.0.1";
+  }
+
+  /**
+   * Get the process id from the operating system.<p/>
+   *
+   * Unfortunately, there is no simple method to get the process id in Java.
+   * The approach we take here is to use the shell method (see
+   * {ProcessId#getOsPidFromShellPpid}) unless we are on Windows, where the
+   * shell is not available.  On Windows, we use
+   * {ProcessId#getOsPidFromManagementFactory}, which depends on some
+   * undocumented features of the JVM, but which doesn't require a shell.
+   */
+  static long getOsPid() {
+    if ((System.getProperty("os.name", "generic").toLowerCase(Locale.ENGLISH)).
+        contains("windows")) {
+      return getOsPidFromManagementFactory();
+    } else {
+      return getOsPidFromShellPpid();
+    }
+  }
+
+  /**
+   * Get the process ID by executing a shell and printing the PPID (parent
+   * process ID).<p/>
+   *
+   * This method of getting the process ID doesn't depend on any undocumented
+   * features of the virtual machine, and should work on almost any UNIX
+   * operating system.
+   */
+  private static long getOsPidFromShellPpid() {
+    Process p = null;
+    StringBuilder sb = new StringBuilder();
+    try {
+      p = new ProcessBuilder("/usr/bin/env", "sh", "-c", "echo $PPID").
+        redirectErrorStream(true).start();
+      BufferedReader reader = new BufferedReader(
+          new InputStreamReader(p.getInputStream()));
+      String line = "";
+      while ((line = reader.readLine()) != null) {
+        sb.append(line.trim());
+      }
+      int exitVal = p.waitFor();
+      if (exitVal != 0) {
+        throw new IOException("Process exited with error code " +
+            Integer.valueOf(exitVal).toString());
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while getting operating system pid from " +
+          "the shell.", e);
+      return 0L;
+    } catch (IOException e) {
+      LOG.error("Error getting operating system pid from the shell.", e);
+      return 0L;
+    } finally {
+      if (p != null) {
+        p.destroy();
+      }
+    }
+    try {
+      return Long.parseLong(sb.toString());
+    } catch (NumberFormatException e) {
+      LOG.error("Error parsing operating system pid from the shell.", e);
+      return 0L;
+    }
+  }
+
+  /**
+   * Get the process ID by looking at the name of the managed bean for the
+   * runtime system of the Java virtual machine.<p/>
+   *
+   * Although this is undocumented, in the Oracle JVM this name is of the form
+   * [OS_PROCESS_ID]@[HOSTNAME].
+   */
+  private static long getOsPidFromManagementFactory() {
+    try {
+      return Long.parseLong(ManagementFactory.getRuntimeMXBean().
+          getName().split("@")[0]);
+    } catch (NumberFormatException e) {
+      LOG.error("Failed to get the operating system process ID from the name " +
+          "of the managed bean for the JVM.", e);
+      return 0L;
+    }
+  }
+
+  public String get() {
+    return processId;
+  }
+}
diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java
index 60b5430..634bef8 100644
--- a/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java
+++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java
@@ -57,6 +57,7 @@
     confMap.put(LocalFileSpanReceiver.PATH_KEY, traceFileName);
     confMap.put(SpanReceiverBuilder.SPAN_RECEIVER_CONF_KEY,
                 LocalFileSpanReceiver.class.getName());
+    confMap.put(ProcessId.PROCESS_ID_KEY, "testPid");
     SpanReceiver rcvr =
         new SpanReceiverBuilder(HTraceConfiguration.fromMap(confMap))
             .logErrors(false).build();
@@ -69,5 +70,6 @@
     ObjectMapper mapper = new ObjectMapper();
     MilliSpan span = mapper.readValue(new File(traceFileName), MilliSpan.class);
     assertEquals("testWriteToLocalFile", span.getDescription());
+    assertEquals("testPid", span.getProcessId());
   }
 }
diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
index 857e9ac..41ee108 100644
--- a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
+++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java
@@ -144,4 +144,13 @@
     MilliSpan dspan = mapper.readValue(json, MilliSpan.class);
     compareSpans(span, dspan);
   }
+
+  @Test
+  public void testJsonSerializationWithFieldsNotSet() throws Exception {
+    MilliSpan span = new MilliSpan.Builder().build();
+    String json = span.toJson();
+    ObjectMapper mapper = new ObjectMapper();
+    MilliSpan dspan = mapper.readValue(json, MilliSpan.class);
+    compareSpans(span, dspan);
+  }
 }
diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java
new file mode 100644
index 0000000..9e5f6b9
--- /dev/null
+++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class TestProcessId {
+  private void testProcessIdImpl(String expected, String fmt) {
+    assertEquals(expected, new ProcessId(fmt).get());
+  }
+
+  @Test
+  public void testSimpleProcessIds() {
+    testProcessIdImpl("abc", "abc");
+    testProcessIdImpl("abc", "a\\bc");
+    testProcessIdImpl("abc", "ab\\c");
+    testProcessIdImpl("abc", "\\a\\b\\c");
+    testProcessIdImpl("a\\bc", "a\\\\bc");
+  }
+
+  @Test
+  public void testSubstitutionVariables() throws IOException {
+    testProcessIdImpl(ProcessId.getProcessName(), "${pname}");
+    testProcessIdImpl("my." + ProcessId.getProcessName(), "my.${pname}");
+    testProcessIdImpl(ProcessId.getBestIpString() + ".str", "${ip}.str");
+    testProcessIdImpl("${pname}", "\\${pname}");
+    testProcessIdImpl("$cash$money{}", "$cash$money{}");
+    testProcessIdImpl("Foo." + Long.valueOf(ProcessId.getOsPid()).toString(),
+        "Foo.${pid}");
+  }
+}
diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
index 54b8a14..baa4fa1 100644
--- a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
+++ b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
@@ -89,10 +89,12 @@
   private int maxSpanBatchSize;
   private String flumeHostName;
   private int flumePort;
+  private final ProcessId processId;
 
   public FlumeSpanReceiver(HTraceConfiguration conf) {
     this.queue = new ArrayBlockingQueue<Span>(1000);
     this.tf = new SimpleThreadFactory();
+    this.processId = new ProcessId(conf);
     configure(conf);
   }
 
@@ -272,6 +274,9 @@
   public void receiveSpan(Span span) {
     if (running.get()) {
       try {
+        if (span.getProcessId().isEmpty()) {
+          span.setProcessId(processId.get());
+        }
         this.queue.add(span);
       } catch (IllegalStateException e) {
         LOG.error("Error trying to append span (" +
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
index 7a99366..2faf4bb 100644
--- a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
+++ b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
@@ -126,6 +126,7 @@
   private final byte[] cf;
   private final byte[] icf;
   private final int maxSpanBatchSize;
+  private final ProcessId processId;
 
   public HBaseSpanReceiver(HTraceConfiguration conf) {
     this.queue = new ArrayBlockingQueue<Span>(1000);
@@ -153,6 +154,7 @@
     for (int i = 0; i < numThreads; i++) {
       this.service.submit(new WriteSpanRunnable());
     }
+    this.processId = new ProcessId(conf);
   }
 
   private class WriteSpanRunnable implements Runnable {
@@ -331,6 +333,9 @@
   public void receiveSpan(Span span) {
     if (running.get()) {
       try {
+        if (span.getProcessId().isEmpty()) {
+          span.setProcessId(processId.get());
+        }
         this.queue.add(span);
       } catch (IllegalStateException e) {
         // todo: supress repeating error logs.
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
index 7bf7bac..bf93220 100644
--- a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
+++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
@@ -181,6 +181,11 @@
     }
 
     @Override
+    public void setProcessId(String processId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
     public String getDescription() {
       return span.getDescription();
     }
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
index 7edc2b8..5a4daaf 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
@@ -170,6 +170,11 @@
   private boolean mustStartFlush;
 
   /**
+   * The process ID to use for all spans.
+   */
+  private final ProcessId processId;
+
+  /**
    * Create an HttpClient instance.
    *
    * @param connTimeout         The timeout to use for connecting.
@@ -221,6 +226,7 @@
             capacity + ", url=" + url +  ", periodInMs=" + periodInMs +
             ", maxToSendAtATime=" + maxToSendAtATime);
     }
+    processId = new ProcessId(conf);
   }
 
   /**
@@ -316,6 +322,7 @@
       try {
         Request request = httpClient.newRequest(url).method(HttpMethod.POST);
         request.header(HttpHeader.CONTENT_TYPE, "application/json");
+        request.header("htrace-pid", processId.get());
         StringBuilder bld = new StringBuilder();
         for (Span span : spanBuf) {
           bld.append(span.toJson());
@@ -412,7 +419,8 @@
       lock.unlock();
     }
     if (!added) {
-      long now = System.nanoTime() / 1000000L;
+      long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+          TimeUnit.NANOSECONDS);
       long last = lastAtCapacityWarningLog.get();
       if (now - last > WARN_TIMEOUT_MS) {
         // Only log every 5 minutes. Any more than this for a guest process
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
index eca6d6d..9a01005 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
@@ -24,6 +24,7 @@
 import java.io.File;
 import java.net.URL;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.htrace.HTraceConfiguration;
@@ -65,6 +66,7 @@
    */
   private final class TestHTraceConfiguration extends HTraceConfiguration {
     private final URL restServerUrl;
+    final static String PROCESS_ID = "TestHTracedRESTReceiver";
 
     public TestHTraceConfiguration(final URL restServerUrl) {
       this.restServerUrl = restServerUrl;
@@ -79,6 +81,8 @@
     public String get(String key, String defaultValue) {
       if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) {
         return this.restServerUrl.toString();
+      } else if (key.equals(ProcessId.PROCESS_ID_KEY)) {
+        return PROCESS_ID;
       }
       return defaultValue;
     }
@@ -120,14 +124,21 @@
     final int NUM_SPANS = 3;
     final HttpClient http = receiver.createHttpClient(60000, 60000);
     http.start();
+    Span spans[] = new Span[NUM_SPANS];
+    for (int i = 0; i < NUM_SPANS; i++) {
+      MilliSpan.Builder builder = new MilliSpan.Builder().
+          parents(new long[]{1L}).
+          spanId(i);
+      if (i == NUM_SPANS - 1) {
+        builder.processId("specialPid");
+      }
+      spans[i] = builder.build();
+    }
     try {
       for (int i = 0; i < NUM_SPANS; i++) {
-        Span span = new MilliSpan.Builder().parents(
-            new long [] {1L}).spanId(i).build();
-        LOG.info(span.toString());
-        receiver.receiveSpan(span);
+        LOG.info("receiving " + spans[i].toString());
+        receiver.receiveSpan(spans[i]);
       }
-
       if (testClose) {
         receiver.close();
       } else {
@@ -149,6 +160,18 @@
                 return false;
               }
               LOG.info("Got " + content + " for span " + i);
+              ObjectMapper mapper = new ObjectMapper();
+              MilliSpan dspan = mapper.readValue(content, MilliSpan.class);
+              assertEquals((long)i, dspan.getSpanId());
+              // Every span should have the process ID we set in the
+              // configuration... except for the last span, which had
+              // a custom value set.
+              if (i == NUM_SPANS - 1) {
+                assertEquals("specialPid", dspan.getProcessId());
+              } else {
+                assertEquals(TestHTraceConfiguration.PROCESS_ID,
+                    dspan.getProcessId());
+              }
             }
             return true;
           } catch (Throwable t) {
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
index 06ff0ad..d75c504 100644
--- a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
@@ -132,6 +132,8 @@
     }
   };
 
+  private final ProcessId processId;
+
   ////////////////////
   /// Variables that will change on each call to configure()
   ///////////////////
@@ -144,6 +146,7 @@
   public ZipkinSpanReceiver(HTraceConfiguration conf) {
     this.queue = new ArrayBlockingQueue<Span>(1000);
     this.protocolFactory = new TBinaryProtocol.Factory();
+    this.processId = new ProcessId(conf);
     configure(conf);
   }
 
@@ -360,6 +363,9 @@
   public void receiveSpan(Span span) {
     if (running.get()) {
       try {
+        if (span.getProcessId().isEmpty()) {
+          span.setProcessId(processId.get());
+        }
         this.queue.add(span);
       } catch (IllegalStateException e) {
         LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue."