HTRACE-237. Optimize htraced span receiver (cmccabe)
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
index e63d414..33908db 100644
--- a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
@@ -17,6 +17,7 @@
 package org.apache.htrace.core;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -170,8 +171,11 @@
       Map<String, String> traceInfoMap = span.getKVAnnotations();
       if (!traceInfoMap.isEmpty()) {
         jgen.writeObjectFieldStart("n");
-        for (Map.Entry<String, String> e : traceInfoMap.entrySet()) {
-          jgen.writeStringField(e.getKey(), e.getValue());
+        String[] keys = traceInfoMap.keySet().
+            toArray(new String[traceInfoMap.size()]);
+        Arrays.sort(keys);
+        for (String key : keys) {
+          jgen.writeStringField(key, traceInfoMap.get(key));
         }
         jgen.writeEndObject();
       }
diff --git a/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java b/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
index 7cb4aed..0869ca0 100644
--- a/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
+++ b/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
@@ -16,8 +16,18 @@
  */
 package org.apache.htrace.util;
 
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.TimelineAnnotation;
+
 import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -88,4 +98,69 @@
       Thread.sleep(periodMs);
     }
   }
+
+  private static long nonZeroRandomLong(Random rand) {
+    long r = 0;
+    do {
+      r = rand.nextLong();
+    } while (r == 0);
+    return r;
+  }
+
+  private static long positiveRandomLong(Random rand) {
+    long r = rand.nextLong();
+    if (r == Long.MIN_VALUE) {
+      // Math.abs can't handle this case
+      return Long.MAX_VALUE;
+    } else if (r > 0) {
+      return r;
+    } else {
+      return -r;
+    }
+  }
+
+  private static String randomString(Random rand) {
+    return new UUID(positiveRandomLong(rand),
+          positiveRandomLong(rand)).toString();
+  }
+
+  public static Span randomSpan(Random rand) {
+    MilliSpan.Builder builder = new MilliSpan.Builder();
+    builder.spanId(
+          new SpanId(nonZeroRandomLong(rand), nonZeroRandomLong(rand)));
+    builder.begin(positiveRandomLong(rand));
+    builder.end(positiveRandomLong(rand));
+    builder.description(randomString(rand));
+    builder.tracerId(randomString(rand));
+    int numParents = rand.nextInt(4);
+    SpanId[] parents = new SpanId[numParents];
+    for (int i = 0; i < numParents; i++) {
+      parents[i] =
+          new SpanId(nonZeroRandomLong(rand), nonZeroRandomLong(rand));
+    }
+    builder.parents(parents);
+    int numTraceInfos = rand.nextInt(4);
+    Map<String, String> traceInfo = new HashMap<String, String>(numTraceInfos);
+    for (int i = 0; i < numTraceInfos; i++) {
+      traceInfo.put(randomString(rand), randomString(rand));
+    }
+    builder.traceInfo(traceInfo);
+    int numTimelineAnnotations = rand.nextInt(4);
+    List<TimelineAnnotation> timeline =
+        new LinkedList<TimelineAnnotation>();
+    for (int i = 0; i < numTimelineAnnotations; i++) {
+      timeline.add(new TimelineAnnotation(positiveRandomLong(rand),
+            randomString(rand)));
+    }
+    builder.timeline(timeline);
+    return builder.build();
+  }
+
+  public static Span[] randomSpans(Random rand, int numSpans) {
+    Span[] spans = new Span[numSpans];
+    for (int i = 0; i < spans.length; i++) {
+      spans[i] = randomSpan(rand);
+    }
+    return spans;
+  }
 }
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 2ac8a1e..dd3f8a3 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -114,32 +114,13 @@
 
 func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
 	var w bytes.Buffer
-	var err error
-	for i := range req.Spans {
-		var buf []byte
-		buf, err = json.Marshal(req.Spans[i])
-		if err != nil {
-			return errors.New(fmt.Sprintf("Error serializing span: %s",
-				err.Error()))
-		}
-		_, err = w.Write(buf)
-		if err != nil {
-			return errors.New(fmt.Sprintf("Error writing span: %s",
-				err.Error()))
-		}
-		_, err = w.Write([]byte{'\n'})
-		//err = io.WriteString(&w, "\n")
-		if err != nil {
-			return errors.New(fmt.Sprintf("Error writing: %s",
-				err.Error()))
-		}
+	enc := json.NewEncoder(&w)
+	err := enc.Encode(req)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Error serializing span: %s",
+			err.Error()))
 	}
-	customHeaders := make(map[string]string)
-	if req.DefaultTrid != "" {
-		customHeaders["htrace-trid"] = req.DefaultTrid
-	}
-	_, _, err = hcl.makeRestRequest("POST", "writeSpans",
-		&w, customHeaders)
+	_, _, err = hcl.makeRestRequest("POST", "writeSpans", &w)
 	if err != nil {
 		return err
 	}
@@ -182,24 +163,19 @@
 	return spans, nil
 }
 
-var EMPTY = make(map[string]string)
-
 func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
-	return hcl.makeRestRequest("GET", reqName, nil, EMPTY)
+	return hcl.makeRestRequest("GET", reqName, nil)
 }
 
 // Make a general JSON REST request.
 // Returns the request body, the response code, and the error.
 // Note: if the response code is non-zero, the error will also be non-zero.
-func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader,
-	customHeaders map[string]string) ([]byte, int, error) {
+func (hcl *Client) makeRestRequest(reqType string, reqName string,
+	reqBody io.Reader) ([]byte, int, error) {
 	url := fmt.Sprintf("http://%s/%s",
 		hcl.restAddr, reqName)
 	req, err := http.NewRequest(reqType, url, reqBody)
 	req.Header.Set("Content-Type", "application/json")
-	for k, v := range customHeaders {
-		req.Header.Set(k, v)
-	}
 	client := &http.Client{}
 	resp, err := client.Do(req)
 	if err != nil {
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rest.go b/htrace-htraced/go/src/org/apache/htrace/common/rest.go
deleted file mode 100644
index b367ed1..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/common/rest.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package common
-
-// Info returned by /server/info
-type ServerInfo struct {
-	// The server release version.
-	ReleaseVersion string
-
-	// The git hash that this software was built with.
-	GitVersion string
-}
-
-// Info returned by /server/stats
-type ServerStats struct {
-	Shards []ShardStats
-}
-
-type ShardStats struct {
-	Path string
-
-	// The approximate number of spans present in this shard.  This may be an
-	// underestimate.
-	ApproxNumSpans uint64
-
-	// leveldb.stats information
-	LevelDbStats string
-}
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 28521a5..9c7bfad 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,14 +38,39 @@
 
 // A request to write spans to htraced.
 type WriteSpansReq struct {
-	DefaultTrid string
+	DefaultTrid string `json:",omitempty"`
 	Spans       []*Span
 }
 
+// Info returned by /server/info
+type ServerInfo struct {
+	// The server release version.
+	ReleaseVersion string
+
+	// The git hash that this software was built with.
+	GitVersion string
+}
+
 // A response to a WriteSpansReq
 type WriteSpansResp struct {
 }
 
+// Info returned by /server/stats
+type ServerStats struct {
+	Shards []ShardStats
+}
+
+type ShardStats struct {
+	Path string
+
+	// The approximate number of spans present in this shard.  This may be an
+	// underestimate.
+	ApproxNumSpans uint64
+
+	// leveldb.stats information
+	LevelDbStats string
+}
+
 // The header which is sent over the wire for HRPC
 type HrpcRequestHeader struct {
 	Magic    uint32
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/span_test.go b/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
index 9de7cee..7fb128d 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
@@ -92,7 +92,7 @@
 			End:         5678,
 			Description: "getFileDescriptors",
 			Parents:     []SpanId{},
-			TracerId:   "testTracerId",
+			TracerId:    "testTracerId",
 		}}
 	mh := new(codec.MsgpackHandle)
 	mh.WriteExt = true
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config.go b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
index b8f6c84..76af7a6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
@@ -69,7 +69,7 @@
 }
 
 func getDefaultHTracedConfDir() string {
-	return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf";
+	return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf"
 }
 
 func getHTracedConfDirs(dlog io.Writer) []string {
@@ -78,7 +78,7 @@
 	if len(paths) < 1 {
 		def := getDefaultHTracedConfDir()
 		io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR defaulting to %s\n", def))
-		return []string{ def }
+		return []string{def}
 	}
 	io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR=%s\n", confDir))
 	return paths
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index 8bc0c64..b5549c5 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -31,8 +31,8 @@
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
 	"os"
-	"time"
 	"strings"
+	"time"
 )
 
 var RELEASE_VERSION string
@@ -126,7 +126,7 @@
 	case serverInfo.FullCommand():
 		os.Exit(printServerInfo(hcl))
 	case serverStats.FullCommand():
-		if (*serverStatsJson) {
+		if *serverStatsJson {
 			os.Exit(printServerStatsJson(hcl))
 		} else {
 			os.Exit(printServerStats(hcl))
@@ -195,7 +195,7 @@
 	}
 	fmt.Printf("HTraced server stats:\n")
 	fmt.Printf("%d leveldb shards.\n", len(stats.Shards))
-	for i := range(stats.Shards) {
+	for i := range stats.Shards {
 		shard := stats.Shards[i]
 		fmt.Printf("==== %s ===\n", shard.Path)
 		fmt.Printf("Approximate number of spans: %d\n", shard.ApproxNumSpans)
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 0595d36..9fb9920 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -953,19 +953,19 @@
 }
 
 func (store *dataStore) ServerStats() *common.ServerStats {
-	serverStats := common.ServerStats {
-		Shards : make([]common.ShardStats, len(store.shards)),
+	serverStats := common.ServerStats{
+		Shards: make([]common.ShardStats, len(store.shards)),
 	}
-	for shardIdx := range(store.shards) {
+	for shardIdx := range store.shards {
 		shard := store.shards[shardIdx]
 		serverStats.Shards[shardIdx].Path = shard.path
-		r := levigo.Range {
-			Start : append([]byte{SPAN_ID_INDEX_PREFIX},
+		r := levigo.Range{
+			Start: append([]byte{SPAN_ID_INDEX_PREFIX},
 				common.INVALID_SPAN_ID.Val()...),
-			Limit : append([]byte{SPAN_ID_INDEX_PREFIX + 1},
+			Limit: append([]byte{SPAN_ID_INDEX_PREFIX + 1},
 				common.INVALID_SPAN_ID.Val()...),
 		}
-		vals := shard.ldb.GetApproximateSizes([]levigo.Range { r })
+		vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
 		serverStats.Shards[shardIdx].ApproxNumSpans = vals[0]
 		serverStats.Shards[shardIdx].LevelDbStats =
 			shard.ldb.PropertyValue("leveldb.stats")
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 97b2bca..cbfc508 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -193,37 +193,24 @@
 	} else {
 		dec = json.NewDecoder(req.Body)
 	}
-	spans := make([]*common.Span, 0, 32)
-	defaultTrid := req.Header.Get("htrace-trid")
-	for {
-		var span common.Span
-		err := dec.Decode(&span)
-		if err != nil {
-			if err != io.EOF {
-				writeError(hand.lg, w, http.StatusBadRequest,
-					fmt.Sprintf("Error parsing spans: %s", err.Error()))
-				return
-			}
-			break
-		}
-		spanIdProblem := span.Id.FindProblem()
-		if spanIdProblem != "" {
-			writeError(hand.lg, w, http.StatusBadRequest,
-				fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
-			return
-		}
-		if span.TracerId == "" {
-			span.TracerId = defaultTrid
-		}
-		spans = append(spans, &span)
+	var msg common.WriteSpansReq
+	err := dec.Decode(&msg)
+	if (err != nil) && (err != io.EOF) {
+		writeError(hand.lg, w, http.StatusBadRequest,
+			fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error()))
+		return
 	}
 	hand.lg.Debugf("writeSpansHandler: received %d span(s).  defaultTrid = %s\n",
-		len(spans), defaultTrid)
-	for spanIdx := range spans {
+		len(msg.Spans), msg.DefaultTrid)
+	for spanIdx := range msg.Spans {
 		if hand.lg.DebugEnabled() {
-			hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson())
+			hand.lg.Debugf("writing span %s\n", msg.Spans[spanIdx].ToJson())
 		}
-		hand.store.WriteSpan(spans[spanIdx])
+		span := msg.Spans[spanIdx]
+		if span.TracerId == "" {
+			span.TracerId = msg.DefaultTrid
+		}
+		hand.store.WriteSpan(span)
 	}
 }
 
diff --git a/htrace-htraced/pom.xml b/htrace-htraced/pom.xml
index 88fd2fc..4478dc5 100644
--- a/htrace-htraced/pom.xml
+++ b/htrace-htraced/pom.xml
@@ -86,6 +86,10 @@
                   <pattern>org.eclipse.jetty</pattern>
                   <shadedPattern>org.apache.htrace.shaded.jetty</shadedPattern>
                 </relocation>
+                <dependency>
+                  <pattern>org.msgpack</pattern>
+                  <shadedPattern>org.apache.htrace.msgpack</shadedPattern>
+                </dependency>
               </relocations>
             </configuration>
             <goals>
@@ -200,12 +204,11 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
-    <!-- htraced rest client deps. -->
-    <!--Is this too much? Pulls down jetty-http, jetty-server, jetty-io
-     This is new-style jetty client, jetty9 and jdk7 required.
-     It can do async but we will use it synchronously at first.
-     Has nice tutorial: http://www.eclipse.org/jetty/documentation/9.1.5.v20140505/http-client-api.html
-     -->
+    <dependency>
+      <groupId>org.msgpack</groupId>
+      <artifactId>msgpack-core</artifactId>
+      <version>0.7.0-p9</version>
+    </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-client</artifactId>
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java
new file mode 100644
index 0000000..29816eb
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+
+import org.apache.htrace.core.Span;
+
+/**
+ * A buffer which contains span data and is able to send it over the network.
+ *
+ * BufferManager functions are not thread-safe.  You must rely on external
+ * synchronization to protect buffers from concurrent operations.
+ */
+interface BufferManager {
+  /**
+   * Write a span to this buffer.
+   *
+   * @param span            The span to write.
+   *
+   * @throws IOException    If the buffer doesn't have enough space to hold the
+   *                          new span.  We will not write a partial span to the
+   *                          buffer in this case.
+   */
+  void writeSpan(Span span) throws IOException;
+
+  /**
+   * Get the amount of content currently in the buffer.
+   */
+  int contentLength();
+
+  /**
+   * Get the number of spans currently buffered.
+   */
+  int getNumberOfSpans();
+
+  /**
+   * Prepare the buffers to be flushed.
+   */
+  void prepare() throws IOException;
+
+  /**
+   * Flush this buffer to htraced.
+   *
+   * This is a blocking operation which will not return until the buffer is
+   * completely flushed.
+   */
+  void flush() throws IOException;
+
+  /**
+   * Clear the data in this buffer.
+   */
+  void clear();
+
+  /**
+   * Closes the buffer manager and frees all resources.
+   */
+  void close();
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
new file mode 100644
index 0000000..cdd176f
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.htrace.core.HTraceConfiguration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The configuration of the HTracedSpanReceiver.
+ *
+ * This class extracts all the relevant configuration information for
+ * HTracedSpanReceiver from the HTraceConfiguration.  It performs parsing and
+ * bounds-checking for the configuration keys.
+ *
+ * It is more efficient to store the configuration as final values in this
+ * structure than to access the HTraceConfiguration object directly.  This is
+ * especially true when the HTraceConfiguration object is a thin shim around a
+ * Hadoop Configuration object, which requires synchronization to access.
+ */
+class Conf {
+  private static final Log LOG = LogFactory.getLog(Conf.class);
+
+  /**
+   * Address of the htraced server.
+   */
+  final static String ADDRESS_KEY =
+      "htraced.receiver.address";
+
+  /**
+   * The minimum number of milliseconds to wait for a read or write
+   * operation on the network.
+   */
+  final static String IO_TIMEOUT_MS_KEY =
+      "htraced.receiver.io.timeout.ms";
+  final static int IO_TIMEOUT_MS_DEFAULT = 60000;
+  final static int IO_TIMEOUT_MS_MIN = 50;
+
+  /**
+   * The minimum number of milliseconds to wait for a network
+   * connection attempt.
+   */
+  final static String CONNECT_TIMEOUT_MS_KEY =
+      "htraced.receiver.connect.timeout.ms";
+  final static int CONNECT_TIMEOUT_MS_DEFAULT = 60000;
+  final static int CONNECT_TIMEOUT_MS_MIN = 50;
+
+  /**
+   * The minimum number of milliseconds to keep alive a connection when it's
+   * not in use.
+   */
+  final static String IDLE_TIMEOUT_MS_KEY =
+      "htraced.receiver.idle.timeout.ms";
+  final static int IDLE_TIMEOUT_MS_DEFAULT = 60000;
+  final static int IDLE_TIMEOUT_MS_MIN = 0;
+
+  /**
+   * Configure the retry times to use when an attempt to flush spans to
+   * htraced fails.  This is configured as a comma-separated list of delay
+   * times in milliseconds.  If the configured value is empty, no retries
+   * will be made.
+   */
+  final static String FLUSH_RETRY_DELAYS_KEY =
+      "htraced.flush.retry.delays.key";
+  final static String FLUSH_RETRY_DELAYS_DEFAULT =
+      "1000,30000";
+
+  /**
+   * The maximum length of time to go in between flush attempts.
+   * Once this time elapses, a flush will be triggered even if we don't
+   * have that many spans buffered.
+   */
+  final static String MAX_FLUSH_INTERVAL_MS_KEY =
+      "htraced.receiver.max.flush.interval.ms";
+  final static int MAX_FLUSH_INTERVAL_MS_DEFAULT = 60000;
+  final static int MAX_FLUSH_INTERVAL_MS_MIN = 10;
+
+  /**
+   * Whether or not to use msgpack for span serialization.
+   * If this key is false, JSON over REST will be used.
+   * If this key is true, msgpack over custom RPC will be used.
+   */
+  final static String PACKED_KEY =
+      "htraced.receiver.packed";
+  final static boolean PACKED_DEFAULT = true;
+
+  /**
+   * The size of the span buffers.
+   */
+  final static String BUFFER_SIZE_KEY =
+      "htraced.receiver.buffer.size";
+  final static int BUFFER_SIZE_DEFAULT = 48 * 1024 * 1024;
+  static int BUFFER_SIZE_MIN = 4 * 1024 * 1024;
+  // The maximum buffer size should not be longer than
+  // PackedBuffer.MAX_HRPC_BODY_LENGTH.
+  final static int BUFFER_SIZE_MAX = 63 * 1024 * 1024;
+
+  /**
+   * Set the fraction of the span buffer which needs to fill up before we
+   * will automatically trigger a flush.  This is a fraction, not a percentage.
+   * It is between 0 and 1.
+   */
+  final static String BUFFER_SEND_TRIGGER_FRACTION_KEY =
+      "htraced.receiver.buffer.send.trigger.fraction";
+  final static double BUFFER_SEND_TRIGGER_FRACTION_DEFAULT = 0.5;
+  final static double BUFFER_SEND_TRIGGER_FRACTION_MIN = 0.1;
+
+  /**
+   * The length of time which receiveSpan should wait for a free spot in a
+   * span buffer before giving up and dropping the span
+   */
+  final static String SPAN_DROP_TIMEOUT_MS_KEY =
+      "htraced.max.buffer.full.retry.ms.key";
+  final static int SPAN_DROP_TIMEOUT_MS_DEFAULT = 5000;
+
+  /**
+   * The length of time we should wait between displaying log messages on the
+   * rate-limited loggers.
+   */
+  final static String ERROR_LOG_PERIOD_MS_KEY =
+      "htraced.error.log.period.ms";
+  final static long ERROR_LOG_PERIOD_MS_DEFAULT = 30000L;
+
+  @JsonProperty("ioTimeoutMs")
+  final int ioTimeoutMs;
+
+  @JsonProperty("connectTimeoutMs")
+  final int connectTimeoutMs;
+
+  @JsonProperty("idleTimeoutMs")
+  final int idleTimeoutMs;
+
+  @JsonProperty("flushRetryDelays")
+  final int[] flushRetryDelays;
+
+  @JsonProperty("maxFlushIntervalMs")
+  final int maxFlushIntervalMs;
+
+  @JsonProperty("packed")
+  final boolean packed;
+
+  @JsonProperty("bufferSize")
+  final int bufferSize;
+
+  @JsonProperty("spanDropTimeoutMs")
+  final int spanDropTimeoutMs;
+
+  @JsonProperty("errorLogPeriodMs")
+  final long errorLogPeriodMs;
+
+  @JsonProperty("triggerSize")
+  final int triggerSize;
+
+  @JsonProperty("endpointStr")
+  final String endpointStr;
+
+  @JsonProperty("endpoint")
+  final InetSocketAddress endpoint;
+
+  private static int getBoundedInt(final HTraceConfiguration conf,
+        String key, int defaultValue, int minValue, int maxValue) {
+    int val = conf.getInt(key, defaultValue);
+    if (val < minValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using minimum value " +
+          "of " + minValue + " instead.");
+      return minValue;
+    } else if (val > maxValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using maximum value " +
+          "of " + maxValue + " instead.");
+      return maxValue;
+    }
+    return val;
+  }
+
+  private static long getBoundedLong(final HTraceConfiguration conf,
+        String key, long defaultValue, long minValue, long maxValue) {
+    String strVal = conf.get(key, Long.toString(defaultValue));
+    long val = 0;
+    try {
+      val = Long.parseLong(strVal);
+    } catch (NumberFormatException nfe) {
+      throw new IllegalArgumentException("Bad value for '" + key +
+        "': should be long");
+    }
+    if (val < minValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using minimum value " +
+          "of " + minValue + " instead.");
+      return minValue;
+    } else if (val > maxValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using maximum value " +
+          "of " + maxValue + " instead.");
+      return maxValue;
+    }
+    return val;
+  }
+
+  private static double getBoundedDouble(final HTraceConfiguration conf,
+        String key, double defaultValue, double minValue, double maxValue) {
+    String strVal = conf.get(key, Double.toString(defaultValue));
+    double val = 0;
+    try {
+      val = Double.parseDouble(strVal);
+    } catch (NumberFormatException nfe) {
+      throw new IllegalArgumentException("Bad value for '" + key +
+        "': should be double");
+    }
+    if (val < minValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using minimum value " +
+          "of " + minValue + " instead.");
+      return minValue;
+    }
+    if (val > maxValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using maximum value " +
+          "of " + maxValue + " instead.");
+      return maxValue;
+    }
+    return val;
+  }
+
+  private static int parseColonPort(String portStr) throws IOException {
+    int colonPosition = portStr.indexOf(':');
+    if (colonPosition != 0) {
+      throw new IOException("Invalid port string " + portStr);
+    }
+    int port = Integer.parseInt(portStr.substring(1), 10);
+    if ((port < 0) || (port > 65535)) {
+      throw new IOException("Invalid port number " + port);
+    }
+    return port;
+  }
+
+  /**
+   * Parse a hostname:port or ip:port pair.
+   *
+   * @param str       The string to parse.
+   * @return          The socket address.
+   */
+  InetSocketAddress parseHostPortPair(String str) throws IOException {
+    str = str.trim();
+    if (str.isEmpty()) {
+      throw new IOException("No hostname:port pair given.");
+    }
+    int bracketBegin = str.indexOf('[');
+    if (bracketBegin == 0) {
+      // Parse an ipv6-style address enclosed in square brackets.
+      int bracketEnd = str.indexOf(']');
+      if (bracketEnd < 0) {
+        throw new IOException("Found left bracket, but no corresponding " +
+            "right bracket, in " + str);
+      }
+      String host = str.substring(bracketBegin + 1, bracketEnd);
+      int port = parseColonPort(str.substring(bracketEnd + 1));
+      return InetSocketAddress.createUnresolved(host, port);
+    } else if (bracketBegin > 0) {
+        throw new IOException("Found a left bracket that wasn't at the " +
+          "start of the host:port pair in " + str);
+    } else {
+      int colon = str.indexOf(':');
+      if (colon <= 0) {
+        throw new IOException("No port component found in " + str);
+      }
+      String host = str.substring(0, colon);
+      int port = parseColonPort(str.substring(colon));
+      return InetSocketAddress.createUnresolved(host, port);
+    }
+  }
+
+  static int[] getIntArray(String arrayStr) {
+    String[] array = arrayStr.split(",");
+    int nonEmptyEntries = 0;
+    for (String str : array) {
+      if (!str.trim().isEmpty()) {
+        nonEmptyEntries++;
+      }
+    }
+    int[] ret = new int[nonEmptyEntries];
+    int i = 0;
+    for (String str : array) {
+      if (!str.trim().isEmpty()) {
+        ret[i++] = Integer.parseInt(str);
+      }
+    }
+    return ret;
+  }
+
+  Conf(HTraceConfiguration conf) throws IOException {
+    this.ioTimeoutMs = getBoundedInt(conf, IO_TIMEOUT_MS_KEY,
+              IO_TIMEOUT_MS_DEFAULT,
+              IO_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+    this.connectTimeoutMs = getBoundedInt(conf, CONNECT_TIMEOUT_MS_KEY,
+              CONNECT_TIMEOUT_MS_DEFAULT,
+              CONNECT_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+    this.idleTimeoutMs = getBoundedInt(conf, IDLE_TIMEOUT_MS_KEY,
+              IDLE_TIMEOUT_MS_DEFAULT,
+              IDLE_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+    this.flushRetryDelays = getIntArray(conf.get(FLUSH_RETRY_DELAYS_KEY,
+              FLUSH_RETRY_DELAYS_DEFAULT));
+    this.maxFlushIntervalMs = getBoundedInt(conf, MAX_FLUSH_INTERVAL_MS_KEY,
+              MAX_FLUSH_INTERVAL_MS_DEFAULT,
+              MAX_FLUSH_INTERVAL_MS_MIN, Integer.MAX_VALUE);
+    this.packed = conf.getBoolean(PACKED_KEY, PACKED_DEFAULT);
+    this.bufferSize = getBoundedInt(conf, BUFFER_SIZE_KEY,
+              BUFFER_SIZE_DEFAULT,
+              BUFFER_SIZE_MIN, BUFFER_SIZE_MAX);
+    double triggerFraction = getBoundedDouble(conf,
+              BUFFER_SEND_TRIGGER_FRACTION_KEY,
+              BUFFER_SEND_TRIGGER_FRACTION_DEFAULT,
+              BUFFER_SEND_TRIGGER_FRACTION_MIN, 1.0);
+    this.spanDropTimeoutMs = conf.getInt(SPAN_DROP_TIMEOUT_MS_KEY,
+        SPAN_DROP_TIMEOUT_MS_DEFAULT);
+    this.errorLogPeriodMs = getBoundedLong(conf, ERROR_LOG_PERIOD_MS_KEY,
+        ERROR_LOG_PERIOD_MS_DEFAULT, 0, Long.MAX_VALUE);
+    this.triggerSize = (int)(this.bufferSize * triggerFraction);
+    try {
+      this.endpointStr = conf.get(ADDRESS_KEY, "");
+      this.endpoint = parseHostPortPair(endpointStr);
+    } catch (IOException e) {
+      throw new IOException("Error reading " + ADDRESS_KEY + ": " +
+          e.getMessage());
+    }
+  }
+
+  @Override
+  public String toString() {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+    try {
+      return mapper.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
deleted file mode 100644
index 643bbd5..0000000
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.impl;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayDeque;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanReceiver;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.api.ContentResponse;
-import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.client.util.StringContentProvider;
-import org.eclipse.jetty.http.HttpField;
-import org.eclipse.jetty.http.HttpHeader;
-import org.eclipse.jetty.http.HttpMethod;
-import org.eclipse.jetty.http.HttpStatus;
-
-/**
- * A {@link SpanReceiver} that passes Spans to htraced via REST. Implementation minimizes
- * dependencies and aims for small footprint since this client will be the guest of another,
- * the process traced.
- *
- * <p>Logs via commons-logging. Uses jetty client. Jetty has its own logging. To connect, see
- * jetty logging to commons-logging and see https://issues.apache.org/jira/browse/HADOOP-6807
- * and http://blogs.bytecode.com.au/glen/2005/06/21/getting-your-logging-working-in-jetty.html.
- *
- * <p>This client depends on the REST defined in <code>rest.go</code> in the htraced REST server.
- *
- * <p>Create an instance by doing:
- * <code>SpanReceiver receiver = new HTracedRESTReceiver(conf);</code> where conf is an instance
- * of {@link HTraceConfiguration}. See the public keys defined below for what we will look for in
- * the configuration.  For example, set {@link #HTRACED_REST_URL_KEY} if
- * <code>htraced</code> is in a non-standard location. Then call
- * <code>receiver.receiveSpan(Span);</code> to send spans to an htraced
- * instance. This method returns immediately. It sends the spans in background.
- *
- * <p>TODO: Shading works?
- * TODO: Add lazy start; don't start background thread till a span gets queued.
- * TODO: Add some metrics; how many times we've run, how many spans and what size we've sent.
- */
-public class HTracedRESTReceiver extends SpanReceiver {
-  private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
-
-  /**
-   * The HttpClient to use for this receiver.
-   */
-  private final HttpClient httpClient;
-
-  /**
-   * The maximum number of spans to buffer.
-   */
-  private final int capacity;
-
-  /**
-   * REST URL to use writing Spans.
-   */
-  private final String url;
-
-  /**
-   * The maximum number of spans to send in a single PUT.
-   */
-  private final int maxToSendAtATime;
-
-  /**
-   * Runs background task to do the REST PUT.
-   */
-  private final PostSpans postSpans;
-
-  /**
-   * Thread for postSpans
-   */
-  private final Thread postSpansThread;
-
-  /**
-   * The connection timeout in milliseconds.
-   */
-  public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = "client.connect.timeout.ms";
-  private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000;
-
-  /**
-   * The idle timeout in milliseconds.
-   */
-  public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = "client.idle.timeout.ms";
-  private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000;
-
-  /**
-   * URL of the htraced REST server we are to talk to.
-   */
-  public static final String HTRACED_REST_URL_KEY = "htraced.rest.url";
-  private static final String HTRACED_REST_URL_DEFAULT = "http://localhost:9095/";
-
-  /**
-   * Maximum size of the queue to accumulate spans in.
-   * Cleared by the background thread that does the REST POST to htraced.
-   */
-  public static final String CLIENT_REST_QUEUE_CAPACITY_KEY = "client.rest.queue.capacity";
-  private static final int CLIENT_REST_QUEUE_CAPACITY_DEFAULT = 1000000;
-
-  /**
-   * Period at which the background thread that does the REST POST to htraced in ms.
-   */
-  public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms";
-  private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000;
-
-  /**
-   * Maximum spans to post to htraced at a time.
-   */
-  public static final String CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY =
-    "client.rest.batch.size";
-  private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100;
-
-  /**
-   * Lock protecting the PostSpans data.
-   */
-  private ReentrantLock lock = new ReentrantLock();
-
-  /**
-   * Condition variable used to wake up the PostSpans thread.
-   */
-  private Condition cond = lock.newCondition();
-
-  /**
-   * True if we should shut down.
-   * Protected by the lock.
-   */
-  private boolean shutdown = false;
-
-  /**
-   * Simple bounded queue to hold spans between periodic runs of the httpclient.
-   * Protected by the lock.
-   */
-  private final ArrayDeque<Span> spans;
-
-  /**
-   * Keep last time we logged we were at capacity; used to prevent flooding of logs with
-   * "at capacity" messages.
-   */
-  private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L);
-
-  /**
-   * True if we should flush as soon as possible.  Protected by the lock.
-   */
-  private boolean mustStartFlush;
-
-  /**
-   * Create an HttpClient instance.
-   *
-   * @param connTimeout         The timeout to use for connecting.
-   * @param idleTimeout         The idle timeout to use.
-   */
-  HttpClient createHttpClient(long connTimeout, long idleTimeout) {
-    HttpClient httpClient = new HttpClient();
-    httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
-      this.getClass().getSimpleName()));
-    httpClient.setConnectTimeout(connTimeout);
-    httpClient.setIdleTimeout(idleTimeout);
-    return httpClient;
-  }
-
-  /**
-   * Constructor.
-   * You must call {@link #close()} post construction when done.
-   * @param conf
-   * @throws Exception
-   */
-  public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception {
-    int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY,
-                                  CLIENT_CONNECT_TIMEOUT_MS_DEFAULT);
-    int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY,
-                                  CLIENT_IDLE_TIMEOUT_MS_DEFAULT);
-    this.httpClient = createHttpClient(connTimeout, idleTimeout);
-    this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
-    this.spans = new ArrayDeque<Span>(capacity);
-    // Build up the writeSpans URL.
-    URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT));
-    URL url = new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans");
-    this.url = url.toString();
-    // Period at which we run the background thread that does the REST POST to htraced.
-    int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, CLIENT_REST_PERIOD_MS_DEFAULT);
-    // Maximum spans to send in one go
-    this.maxToSendAtATime =
-      conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT);
-    // Start up the httpclient.
-    this.httpClient.start();
-    // Start the background thread.
-    this.postSpans = new PostSpans(periodInMs);
-    this.postSpansThread = new Thread(postSpans);
-    this.postSpansThread.setDaemon(true);
-    this.postSpansThread.setName("PostSpans");
-    this.postSpansThread.start();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created new HTracedRESTReceiver with connTimeout=" +
-            connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" +
-            capacity + ", url=" + url +  ", periodInMs=" + periodInMs +
-            ", maxToSendAtATime=" + maxToSendAtATime);
-    }
-  }
-
-  /**
-   * POST spans runnable.
-   * Run on a period. Services the passed in queue taking spans and sending them to traced via http.
-   */
-  private class PostSpans implements Runnable {
-    private final long periodInNs;
-    private final ArrayDeque<Span> spanBuf;
-
-    private PostSpans(long periodInMs) {
-      this.periodInNs = TimeUnit.NANOSECONDS.
-          convert(periodInMs, TimeUnit.MILLISECONDS);
-      this.spanBuf = new ArrayDeque<Span>(maxToSendAtATime);
-    }
-
-    /**
-     * The span sending thread.
-     *
-     * We send a batch of spans for one of two reasons: there are already
-     * maxToSendAtATime spans in the buffer, or the client.rest.period.ms
-     * has elapsed.  The idea is that we want to strike a balance between
-     * sending a lot of spans at a time, for efficiency purposes, and
-     * making sure that we don't buffer spans locally for too long.
-     *
-     * The longer we buffer spans locally, the longer we will have to wait
-     * to see the results of our client operations in the GUI, and the higher
-     * the risk of losing them if the client crashes.
-     */
-    @Override
-    public void run() {
-      long waitNs;
-      try {
-        waitNs = periodInNs;
-        while (true) {
-          lock.lock();
-          try {
-            if (shutdown) {
-              if (spans.isEmpty()) {
-                LOG.debug("Shutting down PostSpans thread...");
-                break;
-              }
-            } else {
-              try {
-                waitNs = cond.awaitNanos(waitNs);
-                if (mustStartFlush) {
-                  waitNs = 0;
-                  mustStartFlush = false;
-                }
-              } catch (InterruptedException e) {
-                LOG.info("Got InterruptedException");
-                waitNs = 0;
-              }
-            }
-            if ((spans.size() > maxToSendAtATime) || (waitNs <= 0) ||
-                    shutdown) {
-              loadSpanBuf();
-              waitNs = periodInNs;
-            }
-          } finally {
-            lock.unlock();
-          }
-          // Once the lock has been safely released, we can do some network
-          // I/O without blocking the client process.
-          if (!spanBuf.isEmpty()) {
-            sendSpans();
-            spanBuf.clear();
-          }
-        }
-      } finally {
-        if (httpClient != null) {
-          try {
-            httpClient.stop();
-          } catch (Exception e) {
-            LOG.error("Error shutting down httpClient", e);
-          }
-        }
-        spans.clear();
-      }
-    }
-
-    private void loadSpanBuf() {
-      for (int loaded = 0; loaded < maxToSendAtATime; loaded++) {
-        Span span = spans.pollFirst();
-        if (span == null) {
-          return;
-        }
-        spanBuf.add(span);
-      }
-    }
-
-    private void sendSpans() {
-      try {
-        Request request = httpClient.newRequest(url).method(HttpMethod.POST);
-        request.header(HttpHeader.CONTENT_TYPE, "application/json");
-        StringBuilder bld = new StringBuilder();
-        for (Span span : spanBuf) {
-          bld.append(span.toJson());
-        }
-        request.content(new StringContentProvider(bld.toString()));
-        ContentResponse response = request.send();
-        if (response.getStatus() == HttpStatus.OK_200) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("POSTED " + spanBuf.size() + " spans");
-          }
-        } else {
-          LOG.error("Status: " + response.getStatus());
-          LOG.error(response.getHeaders());
-          LOG.error(response.getContentAsString());
-        }
-      } catch (InterruptedException e) {
-        LOG.error(e);
-      } catch (TimeoutException e) {
-        LOG.error(e);
-      } catch (ExecutionException e) {
-        LOG.error(e);
-      }
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    LOG.debug("Closing HTracedRESTReceiver(" + url + ").");
-    lock.lock();
-    try {
-      this.shutdown = true;
-      cond.signal();
-    } finally {
-      lock.unlock();
-    }
-    try {
-      postSpansThread.join(120000);
-      if (postSpansThread.isAlive()) {
-        LOG.error("Timed out without closing HTracedRESTReceiver(" +
-                  url + ").");
-      } else {
-        LOG.debug("Closed HTracedRESTReceiver(" + url + ").");
-      }
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted while joining postSpans", e);
-    }
-  }
-
-  /**
-   * Start flushing the buffered spans.
-   *
-   * Note that even after calling this function, you will still have to wait
-   * for the flush to finish happening.  This function just starts the flush;
-   * it does not block until it has completed.  You also do not get
-   * "read-after-write consistency" with htraced... the spans that are
-   * written may be buffered for a short period of time prior to being
-   * readable.  This is not a problem for production use (since htraced is not
-   * a database), but it means that most unit tests will need a loop in their
-   * "can I read what I wrote" tests.
-   */
-  void startFlushing() {
-    LOG.info("Triggering HTracedRESTReceiver flush.");
-    lock.lock();
-    try {
-      mustStartFlush = true;
-      cond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  private static long WARN_TIMEOUT_MS = 300000;
-
-  @Override
-  public void receiveSpan(Span span) {
-    boolean added = false;
-    lock.lock();
-    try {
-      if (shutdown) {
-        LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver " +
-            "is already shut down.");
-        return;
-      }
-      if (spans.size() < capacity) {
-        spans.add(span);
-        added = true;
-        if (spans.size() >= maxToSendAtATime) {
-          cond.signal();
-        }
-      } else {
-        cond.signal();
-      }
-    } finally {
-      lock.unlock();
-    }
-    if (!added) {
-      long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
-          TimeUnit.NANOSECONDS);
-      long last = lastAtCapacityWarningLog.get();
-      if (now - last > WARN_TIMEOUT_MS) {
-        // Only log every 5 minutes. Any more than this for a guest process
-        // is obnoxious.
-        if (lastAtCapacityWarningLog.compareAndSet(last, now)) {
-          // If the atomic-compare-and-set succeeds, we should log.  Otherwise,
-          // we should assume another thread already logged and bumped up the
-          // value of lastAtCapacityWarning sometime between our get and the
-          // "if" statement.
-          LOG.warn("There are too many HTrace spans to buffer!  We have " +
-              "already buffered " + capacity + " spans.  Dropping spans.");
-        }
-      }
-    }
-  }
-}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java
new file mode 100644
index 0000000..f5f493c
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.htrace.core.HTraceConfiguration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanReceiver;
+
+/**
+ * The SpanReceiver which sends spans to htraced.
+ *
+ * HTracedSpanReceiver sends trace spans out to the htraced daemon, where they
+ * are stored and indexed.  It supports two forms of RPC: a JSON/HTTP form, and
+ * an HRPC/msgpack form.  We will use the msgpack form when
+ * htraced.receiver.packed is set to true.
+ *
+ * HTraced buffers are several megabytes in size, and we reuse them to avoid
+ * creating extra garbage on the heap.  They are flushed whenever a timeout
+ * elapses, or when they get more than a configurable percent full.  We allocate
+ * two buffers so that we can continue filling one buffer while the other is
+ * being sent over the wire.  The buffers store serialized spans.  This is
+ * better than storing references to span objects because it minimzes the amount
+ * of pointers we have to follow during a GC.  Buffers are managed by instances
+ * of BufferManager.
+ */
+public class HTracedSpanReceiver extends SpanReceiver {
+  private static final Log LOG = LogFactory.getLog(HTracedSpanReceiver.class);
+
+  private final static int MAX_CLOSING_WAIT_MS = 120000;
+
+  private final FaultInjector faultInjector;
+
+  private final Conf conf;
+
+  private final ReentrantLock lock = new ReentrantLock();
+
+  private final Condition wakePostSpansThread = lock.newCondition();
+
+  private final BufferManager bufferManager[] = new BufferManager[2];
+
+  private final RateLimitedLogger flushErrorLog;
+
+  private final RateLimitedLogger spanDropLog;
+
+  private final PostSpansThread thread;
+
+  private boolean shutdown = false;
+
+  private int activeBuf = 0;
+
+  private int flushingBuf = -1;
+
+  private long lastBufferClearedTimeMs = 0;
+
+  static class FaultInjector {
+    static FaultInjector NO_OP = new FaultInjector();
+    public void handleContentLengthTrigger(int len) { }
+    public void handleThreadStart() throws Exception { }
+    public void handleFlush() throws IOException { }
+  }
+
+  public HTracedSpanReceiver(HTraceConfiguration c) throws Exception {
+    this(c, FaultInjector.NO_OP);
+  }
+
+  HTracedSpanReceiver(HTraceConfiguration c,
+      FaultInjector faultInjector) throws Exception {
+    this.faultInjector = faultInjector;
+    this.conf = new Conf(c);
+    if (this.conf.packed) {
+      for (int i = 0; i < bufferManager.length; i++) {
+        bufferManager[i] = new PackedBufferManager(conf);
+      }
+    } else {
+      for (int i = 0; i < bufferManager.length; i++) {
+        bufferManager[i] = new RestBufferManager(conf);
+      }
+    }
+    this.flushErrorLog = new RateLimitedLogger(LOG, conf.errorLogPeriodMs);
+    this.spanDropLog = new RateLimitedLogger(LOG, conf.errorLogPeriodMs);
+    this.thread = new PostSpansThread();
+    LOG.debug("Created new HTracedSpanReceiver with " + conf.toString());
+  }
+
+  @Override
+  public void receiveSpan(Span span) {
+    long startTimeMs = 0;
+    int numTries = 1;
+    while (true) {
+      lock.lock();
+      try {
+        if (shutdown) {
+          LOG.info("Unable to add span because HTracedSpanReceiver is shutting down.");
+          return;
+        }
+        Throwable exc = null;
+        try {
+          bufferManager[activeBuf].writeSpan(span);
+          int contentLength = bufferManager[activeBuf].contentLength();
+          if (contentLength > conf.triggerSize) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Triggering buffer #" + activeBuf + " flush because" +
+                  " buffer contains " + contentLength + " bytes, and " +
+                  "triggerSize is " + conf.triggerSize);
+            }
+            faultInjector.handleContentLengthTrigger(contentLength);
+            wakePostSpansThread.signal();
+          }
+          return;
+        } catch (Exception e) {
+          exc = e;
+        } catch (Error e) {
+          exc = e;
+        }
+        if (startTimeMs == 0) {
+          startTimeMs = TimeUtil.nowMs();
+        }
+        long deltaMs = TimeUtil.deltaMs(startTimeMs, TimeUtil.nowMs());
+        if (deltaMs > conf.spanDropTimeoutMs) {
+          spanDropLog.error("Dropping a span after unsuccessfully " +
+              "attempting to add it for " + deltaMs + " ms.  There is not " +
+              "enough buffer space. Please increase " + Conf.BUFFER_SIZE_KEY +
+              " or decrease the rate of spans being generated.");
+          return;
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("Unable to write span to buffer #" + activeBuf +
+              " after " + numTries + " attempt(s) and " + deltaMs + " ms" +
+              ".  Buffer already has " +
+                  bufferManager[activeBuf].getNumberOfSpans() + " spans.",
+              exc);
+        }
+        numTries++;
+      } finally {
+        lock.unlock();
+      }
+      try {
+        Thread.sleep(conf.spanDropTimeoutMs / 3);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    lock.lock();
+    try {
+      shutdown = true;
+      wakePostSpansThread.signal();
+    } finally {
+      lock.unlock();
+    }
+    try {
+      thread.join(MAX_CLOSING_WAIT_MS);
+    } catch (InterruptedException e) {
+      LOG.error("HTracedSpanReceiver#close was interrupted", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private class PostSpansThread extends Thread {
+    PostSpansThread() {
+      this.setDaemon(true);
+      this.setName("PostSpans");
+      this.start();
+    }
+
+    private boolean shouldWaitForCond(long timeSinceLastClearedMs) {
+      if (shutdown) {
+        // If we're shutting down, don't wait around.
+        LOG.trace("Should not wait for cond because we're shutting down.");
+        return false;
+      }
+      int contentLength = bufferManager[activeBuf].contentLength();
+      if (contentLength == 0) {
+        // If there is nothing in the buffer, there is nothing to do.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Should wait for cond because we have no data buffered " +
+              "in bufferManager " + activeBuf);
+        }
+        lastBufferClearedTimeMs = TimeUtil.nowMs();
+        return true;
+      } else if (contentLength >= conf.triggerSize) {
+        // If the active buffer is filling up, start flushing.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Should not wait for cond because we have more than " +
+              conf.triggerSize + " bytes buffered in bufferManager " +
+              activeBuf);
+        }
+        return false;
+      }
+      if (timeSinceLastClearedMs > conf.maxFlushIntervalMs) {
+        // If we have let the spans sit in the buffer for too long,
+        // start flushing.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Should not wait for cond because it has been " +
+              timeSinceLastClearedMs + " ms since our last flush, and we " +
+              "are overdue for another because maxFlushIntervalMs is " +
+              conf.maxFlushIntervalMs);
+        }
+        return false;
+      }
+      LOG.trace("Should wait for cond.");
+      return true;
+    }
+
+    @Override
+    public void run() {
+      try {
+        faultInjector.handleThreadStart();
+        LOG.debug("Starting HTracedSpanReceiver thread for " +
+            conf.endpointStr);
+        BufferManager flushBufManager = null;
+        while (true) {
+          lock.lock();
+          flushingBuf = -1;
+          try {
+            while (true) {
+              long timeSinceLastClearedMs = TimeUtil.
+                deltaMs(lastBufferClearedTimeMs, TimeUtil.nowMs());
+              if (!shouldWaitForCond(timeSinceLastClearedMs)) {
+                break;
+              }
+              long waitMs = conf.maxFlushIntervalMs -
+                  Math.min(conf.maxFlushIntervalMs, TimeUtil.
+                      deltaMs(TimeUtil.nowMs(), lastBufferClearedTimeMs));
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Waiting on wakePostSpansThread for " + waitMs +
+                    " ms.");
+              }
+              try {
+                wakePostSpansThread.await(waitMs, TimeUnit.MILLISECONDS);
+              } catch (InterruptedException e) {
+                LOG.info("HTraceSpanReceiver thread was interrupted.", e);
+                throw e;
+              }
+            }
+            if (shutdown && (bufferManager[activeBuf].contentLength() == 0)) {
+              LOG.debug("PostSpansThread shutting down.");
+              return;
+            }
+            flushingBuf = activeBuf;
+            flushBufManager = bufferManager[flushingBuf];
+            activeBuf = (activeBuf == 1) ? 0 : 1;
+          } finally {
+            lock.unlock();
+          }
+          doFlush(flushBufManager);
+          flushBufManager.clear();
+          lastBufferClearedTimeMs = TimeUtil.nowMs();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("setting lastBufferClearedTimeMs to " + lastBufferClearedTimeMs);
+          }
+        }
+      } catch (Throwable e) {
+        LOG.error("PostSpansThread exiting on unexpected exception", e);
+      } finally {
+        for (int i = 0; i < bufferManager.length; i++) {
+          bufferManager[i].close();
+        }
+      }
+    }
+
+    private void doFlush(BufferManager flushBufManager)
+        throws InterruptedException {
+      try {
+        flushBufManager.prepare();
+      } catch (IOException e) {
+        LOG.error("Failed to prepare buffer containing " +
+            flushBufManager.getNumberOfSpans() + " spans for " +
+            "sending to " + conf.endpointStr + " Discarding " +
+            "all spans.", e);
+        return;
+      }
+      int flushTries = 0;
+      while (true) {
+        Throwable exc;
+        try {
+          faultInjector.handleFlush();
+          flushBufManager.flush();
+          exc = null;
+        } catch (RuntimeException e) {
+          exc = e;
+        } catch (Exception e) {
+          exc = e;
+        }
+        if (exc == null) {
+          return;
+        }
+        int numSpans = flushBufManager.getNumberOfSpans();
+        String excMessage = "Failed to flush " + numSpans  + " htrace " +
+            "spans to " + conf.endpointStr + " on try " + (flushTries + 1);
+        if (flushTries >= conf.flushRetryDelays.length) {
+          excMessage += ".  Discarding all spans.";
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.error(excMessage, exc);
+        } else {
+          flushErrorLog.error(excMessage, exc);
+        }
+        if (flushTries >= conf.flushRetryDelays.length) {
+          return;
+        }
+        int delayMs = conf.flushRetryDelays[flushTries];
+        Thread.sleep(delayMs);
+        flushTries++;
+      }
+    }
+  }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
new file mode 100644
index 0000000..f867ad7
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePacker;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.MessageBuffer;
+import org.msgpack.core.buffer.MessageBufferOutput;
+
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+
+/**
+ * A ByteBuffer which we are writing msgpack data to.
+ */
+class PackedBuffer {
+  /**
+   * A MessageBufferOutput that simply outputs to a ByteBuffer.
+   */
+  private class PackedBufferOutput implements MessageBufferOutput {
+    private MessageBuffer savedBuffer;
+
+    PackedBufferOutput() {
+    }
+
+    @Override
+    public MessageBuffer next(int bufferSize) throws IOException {
+      if (savedBuffer == null || savedBuffer.size() != bufferSize) {
+        savedBuffer = MessageBuffer.newBuffer(bufferSize);
+      }
+      MessageBuffer buffer = savedBuffer;
+      savedBuffer = null;
+      return buffer;
+    }
+
+    @Override
+    public void flush(MessageBuffer buffer) throws IOException {
+      ByteBuffer b = buffer.toByteBuffer();
+      bb.put(b);
+      savedBuffer = buffer;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // no-op
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
+  private static final Charset UTF8 = StandardCharsets.UTF_8;
+  private static final byte SPANS[] = "Spans".getBytes(UTF8);
+  private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8);
+  private static final byte A[] = "a".getBytes(UTF8);
+  private static final byte B[] = "b".getBytes(UTF8);
+  private static final byte E[] = "e".getBytes(UTF8);
+  private static final byte D[] = "d".getBytes(UTF8);
+  private static final byte R[] = "r".getBytes(UTF8);
+  private static final byte P[] = "p".getBytes(UTF8);
+  private static final byte N[] = "n".getBytes(UTF8);
+  private static final byte T[] = "t".getBytes(UTF8);
+  private static final byte M[] = "m".getBytes(UTF8);
+  private static final int HRPC_MAGIC = 0x43525448;
+  static final int HRPC_REQ_FRAME_LENGTH = 20;
+  static final int HRPC_RESP_FRAME_LENGTH = 20;
+  static final int MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024;
+  static final int MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024;
+  private static final int SPAN_ID_BYTE_LENGTH = 16;
+  static final MessagePack.Config MSGPACK_CONF =
+      new MessagePack.ConfigBuilder()
+        .readBinaryAsString(false)
+        .readStringAsBinary(false)
+        .build();
+  /**
+   * The array which we are filling.
+   */
+  final ByteBuffer bb;
+
+  /**
+   * Used to tell the MessagePacker to output to our array.
+   */
+  final PackedBufferOutput out;
+
+  /**
+   * A temporary buffer for serializing span ids and other things.
+   */
+  final byte[] temp;
+
+  /**
+   * Generates msgpack output.
+   */
+  final MessagePacker packer;
+
+  /**
+   * Create a new PackedBuffer.
+   *
+   * @param bb        The ByteBuffer to use to create the packed buffer.
+   */
+  PackedBuffer(ByteBuffer bb) {
+    this.bb = bb;
+    this.out = new PackedBufferOutput();
+    this.temp = new byte[SPAN_ID_BYTE_LENGTH];
+    this.packer = new MessagePacker(out, MSGPACK_CONF);
+  }
+
+  /**
+   * Write the fixed-length request frame which starts packed RPC messages.
+   */
+  static void writeReqFrame(ByteBuffer bb, int methodId, long seq, int length)
+      throws IOException {
+    int oldPos = bb.position();
+    boolean success = false;
+    try {
+      bb.order(ByteOrder.LITTLE_ENDIAN);
+      bb.putInt(HRPC_MAGIC);
+      bb.putInt(methodId);
+      bb.putLong(seq);
+      bb.putInt(length);
+      success = true;
+    } finally {
+      if (!success) {
+        bb.position(oldPos);
+      }
+    }
+  }
+
+  /**
+   * Write an 8-byte value to a byte array as little-endian.
+   */
+  private static void longToBigEndian(byte b[], int pos, long val) {
+    b[pos + 0] =(byte) ((val >> 56) & 0xff);
+    b[pos + 1] =(byte) ((val >> 48) & 0xff);
+    b[pos + 2] =(byte) ((val >> 40) & 0xff);
+    b[pos + 3] =(byte) ((val >> 32) & 0xff);
+    b[pos + 4] =(byte) ((val >> 24) & 0xff);
+    b[pos + 5] =(byte) ((val >> 16) & 0xff);
+    b[pos + 6] =(byte) ((val >>  8) & 0xff);
+    b[pos + 7] =(byte) ((val >>  0) & 0xff);
+  }
+
+  private void writeSpanId(SpanId spanId) throws IOException {
+    longToBigEndian(temp, 0, spanId.getHigh());
+    longToBigEndian(temp, 8, spanId.getLow());
+    packer.packBinaryHeader(SPAN_ID_BYTE_LENGTH);
+    packer.writePayload(temp, 0, SPAN_ID_BYTE_LENGTH);
+  }
+
+  /**
+   * Serialize a span to the given OutputStream.
+   */
+  void writeSpan(Span span) throws IOException {
+    boolean success = false;
+    int oldPos = bb.position();
+    try {
+      int mapSize = 0;
+      if (span.getSpanId().isValid()) {
+        mapSize++;
+      }
+      if (span.getStartTimeMillis() != 0) {
+        mapSize++;
+      }
+      if (span.getStopTimeMillis() != 0) {
+        mapSize++;
+      }
+      if (!span.getDescription().isEmpty()) {
+        mapSize++;
+      }
+      if (!span.getTracerId().isEmpty()) {
+        mapSize++;
+      }
+      if (span.getParents().length > 0) {
+        mapSize++;
+      }
+      if (!span.getKVAnnotations().isEmpty()) {
+        mapSize++;
+      }
+      if (!span.getTimelineAnnotations().isEmpty()) {
+        mapSize++;
+      }
+      packer.packMapHeader(mapSize);
+      if (span.getSpanId().isValid()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(A);
+        writeSpanId(span.getSpanId());
+      }
+      if (span.getStartTimeMillis() != 0) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(B);
+        packer.packLong(span.getStartTimeMillis());
+      }
+      if (span.getStopTimeMillis() != 0) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(E);
+        packer.packLong(span.getStopTimeMillis());
+      }
+      if (!span.getDescription().isEmpty()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(D);
+        packer.packString(span.getDescription());
+      }
+      if (!span.getTracerId().isEmpty()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(R);
+        packer.packString(span.getTracerId());
+      }
+      if (span.getParents().length > 0) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(P);
+        packer.packArrayHeader(span.getParents().length);
+        for (int i = 0; i < span.getParents().length; i++) {
+          writeSpanId(span.getParents()[i]);
+        }
+      }
+      if (!span.getKVAnnotations().isEmpty()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(N);
+        Map<String, String> map = span.getKVAnnotations();
+        packer.packMapHeader(map.size());
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+          packer.packString(entry.getKey());
+          packer.packString(entry.getValue());
+        }
+      }
+      if (!span.getTimelineAnnotations().isEmpty()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(T);
+        List<TimelineAnnotation> list = span.getTimelineAnnotations();
+        packer.packArrayHeader(list.size());
+        for (TimelineAnnotation annotation : list) {
+          packer.packMapHeader(2);
+          packer.packRawStringHeader(1);
+          packer.writePayload(T);
+          packer.packLong(annotation.getTime());
+          packer.packRawStringHeader(1);
+          packer.writePayload(M);
+          packer.packString(annotation.getMessage());
+        }
+      }
+      packer.flush();
+      success = true;
+    } finally {
+      if (!success) {
+        // If we failed earlier, restore the old position.
+        // This is so that if we run out of space, we don't add a
+        // partial span to the buffer.
+        bb.position(oldPos);
+      }
+    }
+  }
+
+  static SpanId readSpanId(MessageUnpacker unpacker) throws IOException {
+    int alen = unpacker.unpackBinaryHeader();
+    if (alen != SPAN_ID_BYTE_LENGTH) {
+      throw new IOException("Invalid length given for spanID array.  " +
+          "Expected " + SPAN_ID_BYTE_LENGTH + "; got " + alen);
+    }
+    byte[] payload = new byte[SPAN_ID_BYTE_LENGTH];
+    unpacker.readPayload(payload);
+    return new SpanId(
+        ((payload[ 7] & 0xffL) <<  0) |
+        ((payload[ 6] & 0xffL) <<  8) |
+        ((payload[ 5] & 0xffL) << 16) |
+        ((payload[ 4] & 0xffL) << 24) |
+        ((payload[ 3] & 0xffL) << 32) |
+        ((payload[ 2] & 0xffL) << 40) |
+        ((payload[ 1] & 0xffL) << 48) |
+        ((payload[ 0] & 0xffL) << 56),
+        ((payload[15] & 0xffL) <<  0) |
+        ((payload[14] & 0xffL) <<  8) |
+        ((payload[13] & 0xffL) << 16) |
+        ((payload[12] & 0xffL) << 24) |
+        ((payload[11] & 0xffL) << 32) |
+        ((payload[10] & 0xffL) << 40) |
+        ((payload[ 9] & 0xffL) << 48) |
+        ((payload[ 8] & 0xffL) << 56)
+      );
+  }
+
+  /**
+   * Read a span.  Used in unit tests.  Not optimized.
+   */
+  static Span readSpan(MessageUnpacker unpacker) throws IOException {
+    int numEntries = unpacker.unpackMapHeader();
+    MilliSpan.Builder builder = new MilliSpan.Builder();
+    while (--numEntries >= 0) {
+      String key = unpacker.unpackString();
+      if (key.length() != 1) {
+        throw new IOException("Unknown key " + key);
+      }
+      switch (key.charAt(0)) {
+        case 'a':
+          builder.spanId(readSpanId(unpacker));
+          break;
+        case 'b':
+          builder.begin(unpacker.unpackLong());
+          break;
+        case 'e':
+          builder.end(unpacker.unpackLong());
+          break;
+        case 'd':
+          builder.description(unpacker.unpackString());
+          break;
+        case 'r':
+          builder.tracerId(unpacker.unpackString());
+          break;
+        case 'p':
+          int numParents = unpacker.unpackArrayHeader();
+          SpanId[] parents = new SpanId[numParents];
+          for (int i = 0; i < numParents; i++) {
+            parents[i] = readSpanId(unpacker);
+          }
+          builder.parents(parents);
+          break;
+        case 'n':
+          int mapEntries = unpacker.unpackMapHeader();
+          HashMap<String, String> entries =
+              new HashMap<String, String>(mapEntries);
+          for (int i = 0; i < mapEntries; i++) {
+            String k = unpacker.unpackString();
+            String v = unpacker.unpackString();
+            entries.put(k, v);
+          }
+          builder.traceInfo(entries);
+          break;
+        case 't':
+          int listEntries = unpacker.unpackArrayHeader();
+          ArrayList<TimelineAnnotation> list =
+              new ArrayList<TimelineAnnotation>(listEntries);
+          for (int i = 0; i < listEntries; i++) {
+            int timelineObjectSize = unpacker.unpackMapHeader();
+            long time = 0;
+            String msg = "";
+            for (int j = 0; j < timelineObjectSize; j++) {
+              String tlKey = unpacker.unpackString();
+              if (tlKey.length() != 1) {
+                throw new IOException("Unknown timeline map key " + tlKey);
+              }
+              switch (tlKey.charAt(0)) {
+                case 't':
+                  time = unpacker.unpackLong();
+                  break;
+                case 'm':
+                  msg = unpacker.unpackString();
+                  break;
+                default:
+                  throw new IOException("Unknown timeline map key " + tlKey);
+              }
+            }
+            list.add(new TimelineAnnotation(time, msg));
+          }
+          builder.timeline(list);
+          break;
+        default:
+          throw new IOException("Unknown key " + key);
+      }
+    }
+    return builder.build();
+  }
+
+  void beginWriteSpansRequest(String defaultPid, int numSpans)
+      throws IOException {
+    boolean success = false;
+    int oldPos = bb.position();
+    try {
+      int mapSize = 1;
+      if (defaultPid != null) {
+        mapSize++;
+      }
+      packer.packMapHeader(mapSize);
+      if (defaultPid != null) {
+        packer.packRawStringHeader(DEFAULT_PID.length);
+        packer.writePayload(DEFAULT_PID);
+        packer.packString(defaultPid);
+      }
+      packer.packRawStringHeader(SPANS.length);
+      packer.writePayload(SPANS);
+      packer.packArrayHeader(numSpans);
+      packer.flush();
+      success = true;
+    } finally {
+      if (!success) {
+        bb.position(oldPos);
+      }
+    }
+  }
+
+  /**
+   * Get the underlying ByteBuffer.
+   */
+  ByteBuffer getBuffer() {
+    return bb;
+  }
+
+  /**
+   * Reset our position in the array.
+   */
+  void reset() throws IOException {
+    packer.reset(out);
+  }
+
+  void close() {
+    try {
+      packer.close();
+    } catch (IOException e) {
+      LOG.error("Error closing MessagePacker", e);
+    }
+  }
+
+  public String toHexString() {
+    String prefix = "";
+    StringBuilder bld = new StringBuilder();
+    ByteBuffer b = bb.duplicate();
+    b.flip();
+    while (b.hasRemaining()) {
+      bld.append(String.format("%s%02x", prefix, b.get()));
+      prefix = " ";
+    }
+    return bld.toString();
+  }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java
new file mode 100644
index 0000000..8b59a72
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.CharBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.htrace.core.Span;
+
+class PackedBufferManager implements BufferManager {
+  private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
+  private static final int MAX_PREQUEL_LENGTH = 2048;
+  private static final int METHOD_ID_WRITE_SPANS = 0x1;
+  private final Conf conf;
+  private final ByteBuffer frameBuffer;
+  private final PackedBuffer prequel;
+  private final PackedBuffer spans;
+  private final Selector selector;
+  private int numSpans;
+
+  PackedBufferManager(Conf conf) throws IOException {
+    this.conf = conf;
+    this.frameBuffer = ByteBuffer.allocate(PackedBuffer.HRPC_REQ_FRAME_LENGTH);
+    this.prequel = new PackedBuffer(ByteBuffer.allocate(MAX_PREQUEL_LENGTH));
+    this.spans = new PackedBuffer(ByteBuffer.allocate(conf.bufferSize));
+    this.selector = SelectorProvider.provider().openSelector();
+    clear();
+  }
+
+  @Override
+  public void writeSpan(Span span) throws IOException {
+    spans.writeSpan(span);
+    numSpans++;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("wrote " + span.toJson() + " to PackedBuffer for " +
+          conf.endpointStr + ". numSpans = " + numSpans +
+          ", buffer position = " + spans.getBuffer().position());
+    }
+  }
+
+  @Override
+  public int contentLength() {
+    return spans.getBuffer().position();
+  }
+
+  @Override
+  public int getNumberOfSpans() {
+    return numSpans;
+  }
+
+  @Override
+  public void prepare() throws IOException {
+    prequel.beginWriteSpansRequest(null, numSpans);
+    long totalLength =
+        prequel.getBuffer().position() + spans.getBuffer().position();
+    if (totalLength > PackedBuffer.MAX_HRPC_BODY_LENGTH) {
+      throw new IOException("Can't send RPC of " + totalLength + " bytes " +
+          "because it is longer than " + PackedBuffer.MAX_HRPC_BODY_LENGTH);
+    }
+    PackedBuffer.writeReqFrame(frameBuffer,
+        METHOD_ID_WRITE_SPANS, 1, (int)totalLength);
+    frameBuffer.flip();
+    prequel.getBuffer().flip();
+    spans.getBuffer().flip();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Preparing to send RPC of length " +
+          (totalLength + PackedBuffer.HRPC_REQ_FRAME_LENGTH) + " to " +
+          conf.endpointStr + ", containing " + numSpans + " spans.");
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    SelectionKey sockKey = null;
+    IOException ioe = null;
+    frameBuffer.position(0);
+    prequel.getBuffer().position(0);
+    spans.getBuffer().position(0);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Preparing to flush " + numSpans + " spans to " +
+          conf.endpointStr);
+    }
+    try {
+      sockKey = doConnect();
+      doSend(sockKey, new ByteBuffer[] {
+          frameBuffer, prequel.getBuffer(), spans.getBuffer() });
+      ByteBuffer response = prequel.getBuffer();
+      readAndValidateResponseFrame(sockKey, response,
+          1, METHOD_ID_WRITE_SPANS);
+    } catch (IOException e) {
+      // This LOG message is only at debug level because we also log these
+      // exceptions at error level inside HTracedReceiver.  The logging in
+      // HTracedReceiver is rate-limited to avoid overwhelming the client log
+      // if htraced goes down.  The debug and trace logging is not
+      // rate-limited.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got exception during flush", e);
+      }
+      ioe = e;
+    } finally {
+      if (sockKey != null) {
+        sockKey.cancel();
+        try {
+          SocketChannel sock = (SocketChannel)sockKey.attachment();
+          sock.close();
+        } catch (IOException e) {
+          if (ioe != null) {
+            ioe.addSuppressed(e);
+          }
+        }
+      }
+    }
+    if (ioe != null) {
+      throw ioe;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Successfully flushed " + numSpans + " spans to " +
+          conf.endpointStr);
+    }
+  }
+
+  private long updateRemainingMs(long startMs, long timeoutMs) {
+    long deltaMs = TimeUtil.deltaMs(startMs, TimeUtil.nowMs());
+    if (deltaMs > timeoutMs) {
+      return 0;
+    }
+    return timeoutMs - deltaMs;
+  }
+
+  private SelectionKey doConnect() throws IOException {
+    SocketChannel sock = SocketChannel.open();
+    SelectionKey sockKey = null;
+    boolean success = false;
+    try {
+      if (sock.isBlocking()) {
+        sock.configureBlocking(false);
+      }
+      InetSocketAddress resolvedEndpoint =
+          new InetSocketAddress(conf.endpoint.getHostString(),
+              conf.endpoint.getPort());
+      resolvedEndpoint.getHostName(); // trigger DNS resolution
+      sock.connect(resolvedEndpoint);
+      sockKey = sock.register(selector, SelectionKey.OP_CONNECT, sock);
+      long startMs = TimeUtil.nowMs();
+      long remainingMs = conf.connectTimeoutMs;
+      while (true) {
+        selector.select(remainingMs);
+        for (SelectionKey key : selector.keys()) {
+          if (key.isConnectable()) {
+            SocketChannel s = (SocketChannel)key.attachment();
+            s.finishConnect();
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Successfully connected to " + conf.endpointStr + ".");
+            }
+            success = true;
+            return sockKey;
+          }
+        }
+        remainingMs = updateRemainingMs(startMs, conf.connectTimeoutMs);
+        if (remainingMs == 0) {
+          throw new IOException("Attempt to connect to " + conf.endpointStr +
+              " timed out after " +  TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+              " ms.");
+        }
+      }
+    } finally {
+      if (!success) {
+        if (sockKey != null) {
+          sockKey.cancel();
+        }
+        sock.close();
+      }
+    }
+  }
+
+  /**
+   * Send the provided ByteBuffer objects.
+   *
+   * We use non-blocking I/O because Java does not provide write timeouts.
+   * Without a write timeout, the socket could get hung and we'd never recover.
+   * We also use the GatheringByteChannel#write method which calls the pread()
+   * system call under the covers.  This ensures that even if TCP_NODELAY is on,
+   * we send the minimal number of packets.
+   */
+  private void doSend(SelectionKey sockKey, ByteBuffer[] bufs)
+        throws IOException {
+    long totalWritten = 0;
+    sockKey.interestOps(SelectionKey.OP_WRITE);
+    SocketChannel sock = (SocketChannel)sockKey.attachment();
+    long startMs = TimeUtil.nowMs();
+    long remainingMs = conf.ioTimeoutMs;
+    while (true) {
+      selector.select(remainingMs);
+      int firstBuf = 0;
+      for (SelectionKey key : selector.selectedKeys()) {
+        if (key.isWritable()) {
+          long written = sock.write(bufs, firstBuf, bufs.length - firstBuf);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Sent " + written + " bytes to " + conf.endpointStr);
+          }
+          totalWritten += written;
+        }
+      }
+      while (true) {
+        if (firstBuf == bufs.length) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Finished sending " + totalWritten + " bytes to " +
+                conf.endpointStr);
+          }
+          return;
+        }
+        if (bufs[firstBuf].remaining() > 0) {
+          break;
+        }
+        firstBuf++;
+      }
+      remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs);
+      if (remainingMs == 0) {
+        throw new IOException("Attempt to write to " + conf.endpointStr +
+            " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+            " ms.");
+      }
+    }
+  }
+
+  private void doRecv(SelectionKey sockKey, ByteBuffer response)
+      throws IOException {
+    sockKey.interestOps(SelectionKey.OP_READ);
+    SocketChannel sock = (SocketChannel)sockKey.attachment();
+    int totalRead = response.remaining();
+    long startMs = TimeUtil.nowMs();
+    long remainingMs = conf.ioTimeoutMs;
+    while (remainingMs > 0) {
+      selector.select(remainingMs);
+      for (SelectionKey key : selector.selectedKeys()) {
+        if (key.isReadable()) {
+          sock.read(response);
+        }
+      }
+      if (response.remaining() == 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Received all " + totalRead + " bytes from " +
+              conf.endpointStr);
+        }
+        return;
+      }
+      remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Received " + (totalRead - response.remaining()) +
+                " out of " + totalRead + " bytes from " + conf.endpointStr);
+      }
+      if (remainingMs == 0) {
+        throw new IOException("Attempt to write to " + conf.endpointStr +
+            " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+            " ms.");
+      }
+    }
+  }
+
+  private void readAndValidateResponseFrame(SelectionKey sockKey,
+        ByteBuffer buf, long expectedSeq, int expectedMethodId)
+          throws IOException {
+    buf.clear();
+    buf.limit(PackedBuffer.HRPC_RESP_FRAME_LENGTH);
+    doRecv(sockKey, buf);
+    buf.flip();
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+    long seq = buf.getLong();
+    if (seq != expectedSeq) {
+      throw new IOException("Expected sequence number " + expectedSeq +
+          ", but got sequence number " + seq);
+    }
+    int methodId = buf.getInt();
+    if (expectedMethodId != methodId) {
+      throw new IOException("Expected method id " + expectedMethodId +
+          ", but got " + methodId);
+    }
+    int errorLength = buf.getInt();
+    buf.getInt();
+    if ((errorLength < 0) ||
+        (errorLength > PackedBuffer.MAX_HRPC_ERROR_LENGTH)) {
+      throw new IOException("Got server error with invalid length " +
+          errorLength);
+    } else if (errorLength > 0) {
+      buf.clear();
+      buf.limit(errorLength);
+      doRecv(sockKey, buf);
+      buf.flip();
+      CharBuffer charBuf = StandardCharsets.UTF_8.decode(buf);
+      String serverErrorStr = charBuf.toString();
+      throw new IOException("Got server error " + serverErrorStr);
+    }
+  }
+
+  @Override
+  public void clear() {
+    frameBuffer.clear();
+    prequel.getBuffer().clear();
+    spans.getBuffer().clear();
+    numSpans = 0;
+  }
+
+  @Override
+  public void close() {
+    clear();
+    prequel.close();
+    spans.close();
+    try {
+      selector.close();
+    } catch (IOException e) {
+      LOG.warn("Error closing selector", e);
+    }
+  }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java
new file mode 100644
index 0000000..ac42ee8
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+
+/**
+ * A logger which rate-limits its logging to a configurable level.
+ */
+class RateLimitedLogger {
+  private final Log log;
+  private final long timeoutMs;
+  private long lastLogTimeMs;
+
+  public RateLimitedLogger(Log log, long timeoutMs) {
+    this.log = log;
+    this.timeoutMs = timeoutMs;
+    synchronized (this) {
+      this.lastLogTimeMs = 0L;
+    }
+  }
+
+  public void warn(String what) {
+    long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+        TimeUnit.NANOSECONDS);
+    synchronized (this) {
+      if (now >= lastLogTimeMs + timeoutMs) {
+        log.warn(what);
+        lastLogTimeMs = now;
+      }
+    }
+  }
+
+  public void error(String what) {
+    long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+        TimeUnit.NANOSECONDS);
+    synchronized (this) {
+      if (now >= lastLogTimeMs + timeoutMs) {
+        log.error(what);
+        lastLogTimeMs = now;
+      }
+    }
+  }
+
+  public void error(String what, Throwable e) {
+    long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+        TimeUnit.NANOSECONDS);
+    synchronized (this) {
+      if (now >= lastLogTimeMs + timeoutMs) {
+        log.error(what, e);
+        lastLogTimeMs = now;
+      }
+    }
+  }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
new file mode 100644
index 0000000..2e1aa70
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.htrace.core.Span;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpStatus;
+
+class RestBufferManager implements BufferManager {
+  private static final Log LOG = LogFactory.getLog(RestBufferManager.class);
+  private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static ObjectWriter JSON_WRITER = OBJECT_MAPPER.writer();
+  private static final Charset UTF8 = Charset.forName("UTF-8");
+  private static final byte COMMA_BYTE = (byte)0x2c;
+  private static final int MAX_PREQUEL_LENGTH = 512;
+  private static final int MAX_EPILOGUE_LENGTH = 32;
+  private final Conf conf;
+  private final HttpClient httpClient;
+  private final String urlString;
+  private final ByteBuffer prequel;
+  private final ByteBuffer spans;
+  private final ByteBuffer epilogue;
+  private int numSpans;
+
+  private static class RestBufferManagerContentProvider
+      implements ContentProvider {
+    private final ByteBuffer[] bufs;
+
+    private class ByteBufferIterator implements Iterator<ByteBuffer> {
+      private int bufIdx = -1;
+
+      @Override
+      public boolean hasNext() {
+        return (bufIdx + 1) < bufs.length;
+      }
+
+      @Override
+      public ByteBuffer next() {
+        if ((bufIdx + 1) >= bufs.length) {
+          throw new NoSuchElementException();
+        }
+        bufIdx++;
+        return bufs[bufIdx];
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    RestBufferManagerContentProvider(ByteBuffer[] bufs) {
+      this.bufs = bufs;
+    }
+
+    @Override
+    public long getLength() {
+      long total = 0;
+      for (int i = 0; i < bufs.length; i++) {
+        total += bufs[i].remaining();
+      }
+      return total;
+    }
+
+    @Override
+    public Iterator<ByteBuffer> iterator() {
+      return new ByteBufferIterator();
+    }
+  }
+
+  /**
+   * Create an HttpClient instance.
+   *
+   * @param connTimeout         The timeout to use for connecting.
+   * @param idleTimeout         The idle timeout to use.
+   */
+  static HttpClient createHttpClient(long connTimeout, long idleTimeout) {
+    HttpClient httpClient = new HttpClient();
+    httpClient.setUserAgentField(
+        new HttpField(HttpHeader.USER_AGENT, "HTracedSpanReceiver"));
+    httpClient.setConnectTimeout(connTimeout);
+    httpClient.setIdleTimeout(idleTimeout);
+    return httpClient;
+  }
+
+  RestBufferManager(Conf conf) throws Exception {
+    this.conf = conf;
+    this.httpClient =
+        createHttpClient(conf.connectTimeoutMs, conf.idleTimeoutMs);
+    this.urlString = new URL("http", conf.endpoint.getHostName(),
+        conf.endpoint.getPort(), "/writeSpans").toString();
+    this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH);
+    this.spans = ByteBuffer.allocate(conf.bufferSize);
+    this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH);
+    clear();
+    this.httpClient.start();
+  }
+
+  @Override
+  public void writeSpan(Span span) throws IOException {
+    byte[] spanJsonBytes = JSON_WRITER.writeValueAsBytes(span);
+    if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) {
+      // Make sure we have enough space for the span JSON and a comma.
+      throw new IOException("Not enough space remaining in span buffer.");
+    }
+    spans.put(COMMA_BYTE);
+    spans.put(spanJsonBytes);
+    numSpans++;
+  }
+
+  @Override
+  public int contentLength() {
+    return Math.max(spans.position() - 1, 0);
+  }
+
+  @Override
+  public int getNumberOfSpans() {
+    return numSpans;
+  }
+
+  @Override
+  public void prepare() throws IOException {
+    String prequelString = "{\"Spans\":[";
+    prequel.put(prequelString.getBytes(UTF8));
+    prequel.flip();
+
+    spans.flip();
+
+    String epilogueString = "]}";
+    epilogue.put(epilogueString.toString().getBytes(UTF8));
+    epilogue.flip();
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Preparing to send " + contentLength() + " bytes of span " +
+          "data to " + conf.endpointStr + ", containing " + numSpans +
+          " spans.");
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // Position the buffers at the beginning.
+    prequel.position(0);
+    spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma
+    epilogue.position(0);
+
+    RestBufferManagerContentProvider contentProvider =
+        new RestBufferManagerContentProvider(
+            new ByteBuffer[] { prequel, spans, epilogue });
+    long rpcLength = contentProvider.getLength();
+    try {
+      Request request = httpClient.
+          newRequest(urlString).method(HttpMethod.POST);
+      request.header(HttpHeader.CONTENT_TYPE, "application/json");
+      request.content(contentProvider);
+      ContentResponse response = request.send();
+      if (response.getStatus() != HttpStatus.OK_200) {
+        throw new IOException("Got back error response " +
+            response.getStatus() + " from " + conf.endpointStr + "; " +
+            response.getContentAsString());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sent WriteSpansReq of length " + rpcLength + " to " + conf.endpointStr);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while sending spans via REST", e);
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out sending spans via REST", e);
+    } catch (ExecutionException e) {
+      throw new IOException("Execution exception sending spans via REST", e);
+    }
+  }
+
+  @Override
+  public void clear() {
+    prequel.clear();
+    spans.clear();
+    epilogue.clear();
+    numSpans = 0;
+  }
+
+  @Override
+  public void close() {
+    try {
+      httpClient.stop();
+    } catch (Exception e) {
+      LOG.error("Error stopping HTracedReceiver httpClient", e);
+    }
+  }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java
new file mode 100644
index 0000000..7361c97
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.math.BigInteger;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utilities for dealing with monotonic time.
+ */
+class TimeUtil {
+  /**
+   * Returns the current monotonic time in milliseconds.
+   */
+  static long nowMs() {
+    return TimeUnit.MILLISECONDS.convert(
+        System.nanoTime(), TimeUnit.NANOSECONDS);
+  }
+
+  /**
+   * Get the approximate delta between two monotonic times.
+   *
+   * This function makes the following assumptions:
+   * 1. We read startMs from the monotonic clock prior to endMs.
+   * 2. The two times are not more than 100 years or so apart.
+   *
+   * With these two assumptions in hand, we can smooth over some of the
+   * unpleasant features of the monotonic clock:
+   * 1. It can return either positive or negative values.
+   * 2. When the number of nanoseconds reaches Long.MAX_VALUE it wraps around
+   * to Long.MIN_VALUE.
+   * 3. On some messed up systems it has been known to jump backwards every
+   * now and then.  Oops.  CPU core synchronization mumble mumble.
+   *
+   * @param startMs  The start time.
+   * @param endMs    The end time.
+   * @return         The delta between the two times.
+   */
+  static long deltaMs(long startMs, long endMs) {
+    BigInteger startNs = BigInteger.valueOf(TimeUnit.NANOSECONDS.
+        convert(startMs, TimeUnit.MILLISECONDS));
+    BigInteger endNs = BigInteger.valueOf(TimeUnit.NANOSECONDS.
+        convert(endMs, TimeUnit.MILLISECONDS));
+    BigInteger deltaNs = endNs.subtract(startNs);
+    if (deltaNs.signum() >= 0) {
+      return TimeUnit.MILLISECONDS.convert(deltaNs.min(
+          BigInteger.valueOf(Long.MAX_VALUE)).longValue(), TimeUnit.NANOSECONDS);
+    }
+    deltaNs = deltaNs.negate();
+    if (deltaNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE / 2)) < 0) {
+      // If the 'startNs' is numerically less than the 'endNs', and the
+      // difference between the two is less than 100 years, it's probably
+      // just clock jitter.  Certain old OSes and CPUs had monotonic clocks
+      // that could go backwards under certain conditions (ironic, given
+      // the name).
+      return 0L;
+    }
+    // Handle rollover.
+    BigInteger revDeltaNs = BigInteger.ONE.shiftLeft(64).subtract(deltaNs);
+    return TimeUnit.MILLISECONDS.convert(revDeltaNs.min(
+        BigInteger.valueOf(Long.MAX_VALUE)).longValue(), TimeUnit.NANOSECONDS);
+  }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java
new file mode 100644
index 0000000..f224f6f
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.UUID;
+
+/**
+ * Small util for making a data directory for tests to use when running tests. We put it up at
+ * target/test-data/UUID.  Create an instance of this class per unit test run and it will take
+ * care of setting up the dirs for you.  Pass what is returned here as location from which to
+ * have daemons and tests dump data.
+ */
+public class DataDir implements Closeable {
+  private final File dir;
+
+  public DataDir() throws IOException {
+    String baseDir = System.getProperty(
+        "test.data.base.dir", "target");
+    File testData = new File(new File(baseDir), "test-data");
+    this.dir = new File(testData, UUID.randomUUID().toString());
+    Files.createDirectories(this.dir.toPath());
+  }
+
+  public File get() {
+    return dir;
+  }
+
+  @Override
+  public void close() throws IOException {
+    /*for (File file : this.dir.listFiles()) {
+      file.delete();
+    }
+    Files.delete(this.dir.toPath()); */
+  }
+
+  @Override
+  public String toString() {
+    return "DataDir{" + dir.getAbsolutePath() + "}";
+  }
+}
\ No newline at end of file
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
similarity index 60%
rename from htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
rename to htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
index 3e800d2..26c1a10 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.htrace.util;
+package org.apache.htrace.impl;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -24,27 +24,76 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.ProcessBuilder.Redirect;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.URL;
+import java.net.URI;
+import java.nio.file.Paths;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.http.HttpStatus;
+import org.junit.Assert;
 
 /**
  * To get instance of HTraced up and running, create an instance of this class.
  * Upon successful construction, htraced is running using <code>dataDir</code> as directory to
  * host data (leveldbs and logs).
- * TODO: We expect to find the htraced in a very particular place. Fragile. Will break if stuff
- * moves.
  */
-public class HTracedProcess extends Process {
+class HTracedProcess extends Process {
   private static final Log LOG = LogFactory.getLog(HTracedProcess.class);
+
+  static class Builder {
+    String host = "localhost";
+
+    Builder() {
+    }
+
+    Builder host(String host) {
+      this.host = host;
+      return this;
+    }
+
+    HTracedProcess build() throws Exception {
+      return new HTracedProcess(this);
+    }
+  }
+
+  /**
+   * Path to the htraced binary.
+   */
+  private final File htracedPath;
+
+  /**
+   * Temporary directory for test files.
+   */
+  private final DataDir dataDir;
+
+  /**
+   * The Java Process object for htraced.
+   */
   private final Process delegate;
 
+  /**
+   * The HTTP host:port returned from htraced.
+   */
   private final String httpAddr;
 
   /**
+   * The HRPC host:port returned from htraced.
+   */
+  private final String hrpcAddr;
+
+  /**
+   * REST client to use to talk to htraced.
+   */
+  private final HttpClient httpClient;
+
+  /**
    * Data send back from the HTraced process on the notification port.
    */
   @JsonIgnoreProperties(ignoreUnknown = true)
@@ -56,42 +105,59 @@
     String httpAddr;
 
     /**
+     * The hostname:port pair which the HTraced process uses for HRPC requests.
+     */
+    @JsonProperty("HrpcAddr")
+    String hrpcAddr;
+
+    /**
      * The process ID of the HTraced process.
      */
     @JsonProperty("ProcessId")
     long processId;
   }
 
-  public HTracedProcess(final File binPath, final File dataDir,
-                        final String host) throws IOException {
+  private HTracedProcess(Builder builder) throws Exception {
+    this.htracedPath = Paths.get(
+        "target", "..", "go", "build", "htraced").toFile();
+    if (!this.htracedPath.exists()) {
+      throw new RuntimeException("No htraced binary exists at " +
+          this.htracedPath);
+    }
+    this.dataDir = new DataDir();
     // Create a notifier socket bound to a random port.
     ServerSocket listener = new ServerSocket(0);
     boolean success = false;
     Process process = null;
+    HttpClient http = null;
     try {
       // Use a random port for the web address.  No 'scheme' yet.
-      String webAddress = host + ":0";
-      String logPath = new File(dataDir, "log.txt").getAbsolutePath();
+      String random = builder.host + ":0";
+      String logPath = new File(dataDir.get(), "log.txt").getAbsolutePath();
       // Pass cmdline args to htraced to it uses our test dir for data.
-      ProcessBuilder pb = new ProcessBuilder(binPath.toString(),
+      ProcessBuilder pb = new ProcessBuilder(htracedPath.getAbsolutePath(),
         "-Dlog.level=TRACE",
         "-Dlog.path=" + logPath,
-        "-Dweb.address=" + webAddress,
+        "-Dweb.address=" + random,
+        "-Dhrpc.address=" + random,
         "-Ddata.store.clear=true",
         "-Dstartup.notification.address=localhost:" + listener.getLocalPort(),
-        "-Ddata.store.directories=" + dataDir.toString());
+        "-Ddata.store.directories=" + dataDir.get().getAbsolutePath());
       pb.redirectErrorStream(true);
       // Inherit STDERR/STDOUT i/o; dumps on console for now.  Can add logs later.
       pb.inheritIO();
-      pb.directory(dataDir);
+      pb.directory(dataDir.get());
       //assert pb.redirectInput() == Redirect.PIPE;
       //assert pb.redirectOutput().file() == dataDir;
       process = pb.start();
       assert process.getInputStream().read() == -1;
       StartupNotificationData data = readStartupNotification(listener);
       httpAddr = data.httpAddr;
+      hrpcAddr = data.hrpcAddr;
       LOG.info("Started htraced process " + data.processId + " with http " +
                "address " + data.httpAddr + ", logging to " + logPath);
+      http = RestBufferManager.createHttpClient(60000L, 60000L);
+      http.start();
       success = true;
     } finally {
       if (!success) {
@@ -100,9 +166,13 @@
           process.destroy();
           process = null;
         }
+        if (http != null) {
+          http.stop();
+        }
       }
       delegate = process;
       listener.close();
+      httpClient = http;
     }
   }
 
@@ -149,7 +219,18 @@
   }
 
   public void destroy() {
+    try {
+      httpClient.stop();
+    } catch (Exception e) {
+      LOG.error("Error stopping httpClient", e);
+    }
     delegate.destroy();
+    try {
+      dataDir.close();
+    } catch (Exception e) {
+      LOG.error("Error closing " + dataDir, e);
+    }
+    LOG.trace("Destroyed htraced process.");
   }
 
   public String toString() {
@@ -160,6 +241,10 @@
     return httpAddr;
   }
 
+  public String getHrpcAddr() {
+    return hrpcAddr;
+  }
+
   /**
    * Ugly but how else to do file-math?
    * @param topLevel Presumes top-level of the htrace checkout.
@@ -169,4 +254,24 @@
     return new File(new File(new File(new File(topLevel, "htrace-htraced"), "go"), "build"),
       "htraced");
   }
+
+  public String getServerInfoJson() throws Exception {
+    ContentResponse response = httpClient.GET(
+        new URI(String.format("http://%s/server/info", httpAddr)));
+    Assert.assertEquals("application/json", response.getMediaType());
+    Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
+    return response.getContentAsString();
+  }
+
+  public Span getSpan(SpanId spanId) throws Exception {
+    ContentResponse response = httpClient.GET(
+        new URI(String.format("http://%s/span/%s",
+            httpAddr, spanId.toString())));
+    Assert.assertEquals("application/json", response.getMediaType());
+    String responseJson = response.getContentAsString().trim();
+    if (responseJson.isEmpty()) {
+      return null;
+    }
+    return MilliSpan.fromJson(responseJson);
+  }
 }
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
deleted file mode 100644
index d52f071..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URL;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.MilliSpan;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanId;
-import org.apache.htrace.core.TracerId;
-import org.apache.htrace.util.DataDir;
-import org.apache.htrace.util.HTracedProcess;
-import org.apache.htrace.util.TestUtil;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.api.ContentResponse;
-import org.eclipse.jetty.http.HttpStatus;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestHTracedRESTReceiver {
-  private static final Log LOG =
-      LogFactory.getLog(TestHTracedRESTReceiver.class);
-  private URL restServerUrl;
-  private DataDir dataDir;
-  HTracedProcess htraced;
-
-  @Before
-  public void setUp() throws Exception {
-    this.dataDir = new DataDir();
-    File tlDir = DataDir.getTopLevelOfCheckout(this.dataDir.getDataDir());
-    File pathToHTracedBinary = HTracedProcess.getPathToHTraceBinaryFromTopLevel(tlDir);
-    this.htraced = new HTracedProcess(pathToHTracedBinary,
-        dataDir.getDataDir(), "localhost");
-    this.restServerUrl = new URL("http://" + htraced.getHttpAddr() + "/");
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (this.htraced != null) this.htraced.destroy();
-  }
-
-  /**
-   * Our simple version of htrace configuration for testing.
-   */
-  private final class TestHTraceConfiguration extends HTraceConfiguration {
-    private final URL restServerUrl;
-    final static String TRACER_ID = "TestHTracedRESTReceiver";
-
-    public TestHTraceConfiguration(final URL restServerUrl) {
-      this.restServerUrl = restServerUrl;
-    }
-
-    @Override
-    public String get(String key) {
-      return null;
-    }
-
-    @Override
-    public String get(String key, String defaultValue) {
-      if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) {
-        return this.restServerUrl.toString();
-      } else if (key.equals(TracerId.TRACER_ID_KEY)) {
-        return TRACER_ID;
-      }
-      return defaultValue;
-    }
-  }
-
-  /**
-   * Make sure the REST server basically works.
-   * @throws Exception
-   */
-  @Test (timeout = 10000)
-  public void testBasicGet() throws Exception {
-    HTracedRESTReceiver receiver =
-      new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
-    HttpClient http = receiver.createHttpClient(60000L, 60000L);
-    http.start();
-    try {
-      // Do basic a GET /server/info against htraced
-      ContentResponse response =
-        http.GET(restServerUrl + "server/info");
-      assertEquals("application/json", response.getMediaType());
-      String content = processGET(response);
-      assertTrue(content.contains("ReleaseVersion"));
-      System.out.println(content);
-    } finally {
-      http.stop();
-      receiver.close();
-    }
-  }
-
-  private String processGET(final ContentResponse response) {
-    assertTrue("" + response.getStatus(), HttpStatus.OK_200 <= response.getStatus() &&
-      response.getStatus() <= HttpStatus.NO_CONTENT_204);
-    return response.getContentAsString();
-  }
-
-  private void testSendingSpansImpl(boolean testClose) throws Exception {
-    final HTracedRESTReceiver receiver =
-      new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
-    final int NUM_SPANS = 3;
-    final HttpClient http = receiver.createHttpClient(60000, 60000);
-    http.start();
-    Span spans[] = new Span[NUM_SPANS];
-    for (int i = 0; i < NUM_SPANS; i++) {
-      MilliSpan.Builder builder = new MilliSpan.Builder().
-          parents(new SpanId[] { new SpanId(1L, 1L) }).
-          spanId(new SpanId(1L, i));
-      if (i == NUM_SPANS - 1) {
-        builder.tracerId("specialTrid");
-      } else {
-        builder.tracerId(TestHTraceConfiguration.TRACER_ID);
-      }
-      spans[i] = builder.build();
-    }
-    try {
-      for (int i = 0; i < NUM_SPANS; i++) {
-        LOG.info("receiving " + spans[i].toString());
-        receiver.receiveSpan(spans[i]);
-      }
-      if (testClose) {
-        receiver.close();
-      } else {
-        receiver.startFlushing();
-      }
-      TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          try {
-            for (int i = 0; i < NUM_SPANS; i++) {
-              // This is what the REST server expects when querying for a
-              // span id.
-              String findSpan = String.format("span/%s",
-                  new SpanId(1L, i).toString());
-              ContentResponse response =
-                  http.GET(restServerUrl + findSpan);
-              String content = processGET(response);
-              if ((content == null) || (content.length() == 0)) {
-                LOG.info("Failed to find span " + i);
-                return false;
-              }
-              LOG.info("Got " + content + " for span " + i);
-              MilliSpan dspan = MilliSpan.fromJson(content);
-              assertEquals(new SpanId(1, i).toString(),
-                dspan.getSpanId().toString());
-              // Every span should have the tracer ID we set in the
-              // configuration... except for the last span, which had
-              // a custom value set.
-              if (i == NUM_SPANS - 1) {
-                assertEquals("specialTrid", dspan.getTracerId());
-              } else {
-                assertEquals(TestHTraceConfiguration.TRACER_ID,
-                    dspan.getTracerId());
-              }
-            }
-            return true;
-          } catch (Throwable t) {
-            LOG.error("Got exception", t);
-            return false;
-          }
-        }
-      }, 10, 20000);
-    } finally {
-      http.stop();
-      if (!testClose) {
-        receiver.close();
-      }
-    }
-  }
-
-  /**
-   * Send 100 spans then confirm they made it in.
-   * @throws Exception
-   */
-  @Test (timeout = 60000)
-  public void testSendingSpans() throws Exception {
-    testSendingSpansImpl(false);
-  }
-
-  /**
-   * Test that the REST receiver blocks during shutdown until all spans are sent
-   * (or a long timeout elapses).  Otherwise, short-lived client processes will
-   * never have a chance to send all their spans and we will have incomplete
-   * information.
-   */
-  @Test (timeout = 60000)
-  public void testShutdownBlocksUntilSpanAreSent() throws Exception {
-    testSendingSpansImpl(true);
-  }
-}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java
new file mode 100644
index 0000000..99f00a1
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java
@@ -0,0 +1,572 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.TracerId;
+import org.apache.htrace.impl.HTracedSpanReceiver.FaultInjector;
+import org.apache.htrace.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+public class TestHTracedReceiver {
+  private static final Log LOG = LogFactory.getLog(TestHTracedReceiver.class);
+
+  @BeforeClass
+  public static void beforeClass() {
+    // Allow setting really small buffer sizes for testing purposes.
+    // We do not allow setting such small sizes in production.
+    Conf.BUFFER_SIZE_MIN = 0;
+  }
+
+  @Rule
+  public TestRule watcher = new TestWatcher() {
+    protected void starting(Description description) {
+      LOG.info("*** Starting junit test: " + description.getMethodName());
+    }
+
+    protected void finished(Description description) {
+      LOG.info("*** Finished junit test: " + description.getMethodName());
+    }
+  };
+
+  @Test(timeout = 60000)
+  public void testGetServerInfoJson() throws Exception {
+    HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      String response = ht.getServerInfoJson();
+      assertTrue(response.contains("ReleaseVersion"));
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  private void waitForSpans(final HTracedProcess ht, Span[] spans)
+      throws Exception {
+    waitForSpans(ht, spans, spans.length);
+  }
+
+  private void waitForSpans(final HTracedProcess ht, Span[] spans,
+      int numSpans) throws Exception {
+    final LinkedList<SpanId> spanIds = new LinkedList<SpanId>();
+    for (int i = 0; i < numSpans; i++) {
+      spanIds.add(spans[i].getSpanId());
+    }
+    boolean success = false;
+    try {
+      TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          for (Iterator<SpanId> iter = spanIds.iterator();
+               iter.hasNext(); ) {
+            SpanId spanId = iter.next();
+            try {
+              if (ht.getSpan(spanId) == null) {
+                return false;
+              }
+            } catch (InterruptedException e) {
+              LOG.error("Got InterruptedException while looking for " +
+                  "span ID " + spanId, e);
+              Thread.currentThread().interrupt();
+            } catch (Exception e) {
+              LOG.error("Got error looking for span ID " + spanId, e);
+              return false;
+            }
+            iter.remove();
+          }
+          return true;
+        }
+      }, 10, 30000);
+      success = true;
+    } finally {
+      if (!success) {
+        String prefix = "";
+        StringBuilder idStringBld = new StringBuilder();
+        for (Iterator<SpanId> iter = spanIds.iterator();
+             iter.hasNext(); ) {
+          idStringBld.append(prefix);
+          idStringBld.append(iter.next());
+          prefix = ",";
+        }
+        LOG.error("Unable to find span IDs " + idStringBld.toString());
+      }
+    }
+  }
+
+  /**
+   * Test that we can send spans via the HRPC interface.
+   */
+  @Test(timeout = 10000) //60000)
+  public void testSendSpansViaPacked() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testSendSpansViaPacked");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100");
+            put(Conf.ERROR_LOG_PERIOD_MS_KEY, "0");
+          }});
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+      Span[] spans = TestUtil.randomSpans(rand, 10);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      waitForSpans(ht, spans);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that when the SpanReceiver is closed, we send any spans we have
+   * buffered via the HRPC interface.
+   */
+  @Test(timeout = 60000)
+  public void testSendSpansViaPackedAndClose() throws Exception {
+    final Random rand = new Random(456);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testSendSpansViaPackedAndClose");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000");
+          }});
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+      Span[] spans = TestUtil.randomSpans(rand, 10);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      rcvr.close();
+      waitForSpans(ht, spans);
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that we can send spans via the REST interface.
+   */
+  @Test(timeout = 60000)
+  public void testSendSpansViaRest() throws Exception {
+    final Random rand = new Random(789);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testSendSpansViaRest");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100");
+          }});
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+      Span[] spans = TestUtil.randomSpans(rand, 10);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      waitForSpans(ht, spans);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that when the SpanReceiver is closed, we send any spans we have
+   * buffered via the REST interface.
+   */
+  @Test(timeout = 60000)
+  public void testSendSpansViaRestAndClose() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testSendSpansViaRestAndClose");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000");
+          }});
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+      Span[] spans = TestUtil.randomSpans(rand, 10);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      rcvr.close();
+      waitForSpans(ht, spans);
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  private static class Mutable<T> {
+    private T t;
+
+    Mutable(T t) {
+      this.t = t;
+    }
+
+    void set(T t) {
+      this.t = t;
+    }
+
+    T get() {
+      return this.t;
+    }
+  }
+
+  private static class TestHandleContentLengthTriggerInjector
+      extends HTracedSpanReceiver.FaultInjector {
+    final Semaphore threadStartSem = new Semaphore(0);
+    int contentLengthOnTrigger = 0;
+
+    @Override
+    public synchronized void handleContentLengthTrigger(int len) {
+      contentLengthOnTrigger = len;
+    }
+    @Override
+    public void handleThreadStart() throws Exception {
+      threadStartSem.acquire();
+    }
+
+    public synchronized int getContentLengthOnTrigger() {
+      return contentLengthOnTrigger;
+    }
+  }
+
+  /**
+   * Test that filling up one of the buffers causes us to trigger a flush and
+   * start using the other buffer, when using PackedBufferManager.
+   * This also tests that PackedBufferManager can correctly handle a buffer
+   * getting full.
+   */
+  @Test(timeout = 60000)
+  public void testFullBufferCausesPackedThreadTrigger() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY,
+                "testFullBufferCausesPackedThreadTrigger");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.BUFFER_SIZE_KEY, "16384");
+            put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95");
+          }});
+      TestHandleContentLengthTriggerInjector injector =
+          new TestHandleContentLengthTriggerInjector();
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 47);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      Assert.assertTrue("The wakePostSpansThread should have been " +
+          "triggered by the spans added so far.  " +
+          "contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(),
+          injector.getContentLengthOnTrigger() > 16000);
+      injector.threadStartSem.release();
+      rcvr.close();
+      waitForSpans(ht, spans, 45);
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that filling up one of the buffers causes us to trigger a flush and
+   * start using the other buffer, when using RestBufferManager.
+   * This also tests that RestBufferManager can correctly handle a buffer
+   * getting full.
+   */
+  @Test(timeout = 60000)
+  public void testFullBufferCausesRestThreadTrigger() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY,
+                "testFullBufferCausesRestThreadTrigger");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.BUFFER_SIZE_KEY, "16384");
+            put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95");
+          }});
+      TestHandleContentLengthTriggerInjector injector =
+          new TestHandleContentLengthTriggerInjector();
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 34);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      Assert.assertTrue("The wakePostSpansThread should have been " +
+              "triggered by the spans added so far.  " +
+              "contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(),
+          injector.getContentLengthOnTrigger() > 16000);
+      injector.threadStartSem.release();
+      rcvr.close();
+      waitForSpans(ht, spans, 33);
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * A FaultInjector that causes all flushes to fail until a specified
+   * number of milliseconds have passed.
+   */
+  private static class TestInjectFlushFaults
+      extends HTracedSpanReceiver.FaultInjector {
+    private long remainingFaults;
+
+    TestInjectFlushFaults(long remainingFaults) {
+      this.remainingFaults = remainingFaults;
+    }
+
+    @Override
+    public synchronized void handleFlush() throws IOException {
+      if (remainingFaults > 0) {
+        remainingFaults--;
+        throw new IOException("Injected IOException into flush " +
+            "code path.");
+      }
+    }
+  }
+
+  /**
+   * Test that even if the flush fails, the system stays stable and we can
+   * still close the span receiver.
+   */
+  @Test(timeout = 60000)
+  public void testPackedThreadHandlesFlushFailure() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testPackedThreadHandlesFlushFailure");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+          }});
+      TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 15);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that even if the flush fails, the system stays stable and we can
+   * still close the span receiver.
+   */
+  @Test(timeout = 60000)
+  public void testRestThreadHandlesFlushFailure() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testRestThreadHandlesFlushFailure");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+          }});
+      TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 15);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * A FaultInjector that causes all flushes to fail until a specified
+   * number of milliseconds have passed.
+   */
+  private static class WaitForFlushes
+      extends HTracedSpanReceiver.FaultInjector {
+    final Semaphore flushSem;
+
+    WaitForFlushes(int numFlushes) {
+      this.flushSem = new Semaphore(-numFlushes);
+    }
+
+    @Override
+    public void handleFlush() throws IOException {
+      flushSem.release();
+    }
+  }
+
+  /**
+   * Test that the packed code works when performing multiple flushes.
+   */
+  @Test(timeout = 60000)
+  public void testMultiplePackedFlushes() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testMultiplePackedFlushes");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1");
+          }});
+      WaitForFlushes injector = new WaitForFlushes(5);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 3);
+      while (true) {
+        for (Span span : spans) {
+          rcvr.receiveSpan(span);
+        }
+        if (injector.flushSem.availablePermits() >= 0) {
+          break;
+        }
+        Thread.sleep(1);
+      }
+      waitForSpans(ht, spans, 3);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that the REST code works when performing multiple flushes.
+   */
+  @Test(timeout = 60000)
+  public void testMultipleRestFlushes() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testMultipleRestFlushes");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1");
+          }});
+      WaitForFlushes injector = new WaitForFlushes(5);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 3);
+      while (true) {
+        for (Span span : spans) {
+          rcvr.receiveSpan(span);
+        }
+        if (injector.flushSem.availablePermits() >= 0) {
+          break;
+        }
+        Thread.sleep(1);
+      }
+      waitForSpans(ht, spans, 3);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that the packed code works when performing multiple flushes.
+   */
+  @Test(timeout = 60000)
+  public void testPackedRetryAfterFlushError() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testPackedRetryAfterFlushError");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000");
+            put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100");
+          }});
+      TestInjectFlushFaults injector = new TestInjectFlushFaults(5);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 3);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      waitForSpans(ht, spans);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that the REST code works when performing multiple flushes.
+   */
+  @Test(timeout = 60000)
+  public void testRestRetryAfterFlushError() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testRestRetryAfterFlushError");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000");
+            put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100");
+          }});
+      TestInjectFlushFaults injector = new TestInjectFlushFaults(5);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 3);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      waitForSpans(ht, spans);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java
new file mode 100644
index 0000000..bf038f1
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHTracedReceiverConf {
+  private static final Log LOG =
+      LogFactory.getLog(TestHTracedReceiverConf.class);
+
+  @Test(timeout = 60000)
+  public void testParseHostPort() throws Exception {
+    InetSocketAddress addr = new Conf(
+        HTraceConfiguration.fromKeyValuePairs(
+          Conf.ADDRESS_KEY, "example.com:8080")).endpoint;
+    Assert.assertEquals("example.com", addr.getHostName());
+    Assert.assertEquals(8080, addr.getPort());
+
+    addr = new Conf(
+        HTraceConfiguration.fromKeyValuePairs(
+          Conf.ADDRESS_KEY, "127.0.0.1:8081")).endpoint;
+    Assert.assertEquals("127.0.0.1", addr.getHostName());
+    Assert.assertEquals(8081, addr.getPort());
+
+    addr = new Conf(
+        HTraceConfiguration.fromKeyValuePairs(
+          Conf.ADDRESS_KEY, "[ff02:0:0:0:0:0:0:12]:9095")).endpoint;
+    Assert.assertEquals("ff02:0:0:0:0:0:0:12", addr.getHostName());
+    Assert.assertEquals(9095, addr.getPort());
+  }
+
+  private static void verifyFail(String hostPort) {
+    try {
+      new Conf(HTraceConfiguration.fromKeyValuePairs(
+            Conf.ADDRESS_KEY, hostPort));
+      Assert.fail("Expected bad host:port configuration " + hostPort +
+          " to fail, but it succeeded.");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testFailToParseHostPort() throws Exception {
+    verifyFail("localhost"); // no port
+    verifyFail("127.0.0.1"); // no port
+    verifyFail(":8080"); // no hostname
+    verifyFail("bob[ff02:0:0:0:0:0:0:12]:9095"); // bracket at incorrect place
+  }
+
+  @Test(timeout = 60000)
+  public void testGetIntArray() throws Exception {
+    int[] arr = Conf.getIntArray("");
+    Assert.assertEquals(0, arr.length);
+    arr = Conf.getIntArray("123");
+    Assert.assertEquals(1, arr.length);
+    Assert.assertEquals(123, arr[0]);
+    arr = Conf.getIntArray("1,2,3");
+    Assert.assertEquals(3, arr.length);
+    Assert.assertEquals(1, arr[0]);
+    Assert.assertEquals(2, arr[1]);
+    Assert.assertEquals(3, arr[2]);
+    arr = Conf.getIntArray(",-4,5,66,");
+    Assert.assertEquals(3, arr.length);
+    Assert.assertEquals(-4, arr[0]);
+    Assert.assertEquals(5, arr[1]);
+    Assert.assertEquals(66, arr[2]);
+  }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java
new file mode 100644
index 0000000..ed7d904
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+
+public class TestPackedBuffer {
+  private static final Log LOG = LogFactory.getLog(TestPackedBuffer.class);
+
+  @Test(timeout = 60000)
+  public void testWriteReqFrame() throws Exception {
+    byte[] arr = new byte[PackedBuffer.HRPC_REQ_FRAME_LENGTH];
+    ByteBuffer bb = ByteBuffer.wrap(arr);
+    PackedBuffer buf = new PackedBuffer(bb);
+    PackedBuffer.writeReqFrame(bb, 1, 123, 456);
+    Assert.assertEquals(PackedBuffer.HRPC_REQ_FRAME_LENGTH, bb.position());
+    Assert.assertEquals("48 54 52 43 " +
+        "01 00 00 00 " +
+        "7b 00 00 00 00 00 00 00 " +
+        "c8 01 00 00",
+        buf.toHexString());
+  }
+
+  @Test(timeout = 60000)
+  public void testPackSpans() throws Exception {
+    Random rand = new Random(123);
+    byte[] arr = new byte[16384];
+    ByteBuffer bb = ByteBuffer.wrap(arr);
+    bb.limit(bb.capacity());
+    PackedBuffer buf = new PackedBuffer(bb);
+    final int NUM_TEST_SPANS = 5;
+    Span[] spans = new Span[NUM_TEST_SPANS];
+    for (int i = 0; i < NUM_TEST_SPANS; i++) {
+      spans[i] = TestUtil.randomSpan(rand);
+    }
+    for (int i = 0; i < NUM_TEST_SPANS; i++) {
+      buf.writeSpan(spans[i]);
+    }
+    LOG.info("wrote " + buf.toHexString());
+    MessagePack msgpack = new MessagePack(PackedBuffer.MSGPACK_CONF);
+    MessageUnpacker unpacker = msgpack.newUnpacker(arr, 0, bb.position());
+    Span[] respans = new Span[NUM_TEST_SPANS];
+    for (int i = 0; i < NUM_TEST_SPANS; i++) {
+      respans[i] = PackedBuffer.readSpan(unpacker);
+    }
+    for (int i = 0; i < NUM_TEST_SPANS; i++) {
+      Assert.assertEquals("Failed to read back span " + i,
+          spans[i].toJson(), respans[i].toJson());
+    }
+  }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java
new file mode 100644
index 0000000..630a02a
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimeUtil {
+  /**
+   * Test that our deltaMs function can compute the time difference between any
+   * two monotonic times in milliseconds.
+   */
+  @Test(timeout = 60000)
+  public void testDeltaMs() throws Exception {
+    Assert.assertEquals(0, TimeUtil.deltaMs(0, 0));
+    Assert.assertEquals(1, TimeUtil.deltaMs(0, 1));
+    Assert.assertEquals(0, TimeUtil.deltaMs(1, 0));
+    Assert.assertEquals(10, TimeUtil.deltaMs(1000, 1010));
+    long minMs = TimeUnit.MILLISECONDS.convert(Long.MIN_VALUE,
+        TimeUnit.NANOSECONDS);
+    long maxMs = TimeUnit.MILLISECONDS.convert(Long.MAX_VALUE,
+        TimeUnit.NANOSECONDS);
+    Assert.assertEquals(10, TimeUtil.deltaMs(minMs, minMs + 10));
+    Assert.assertEquals(maxMs, TimeUtil.deltaMs(minMs, maxMs));
+    Assert.assertEquals(11, TimeUtil.deltaMs(maxMs - 10, minMs));
+  }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java b/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java
deleted file mode 100644
index 74731fa..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
-/**
- * Small util for making a data directory for tests to use when running tests. We put it up at
- * target/test-data/UUID.  Create an instance of this class per unit test run and it will take
- * care of setting up the dirs for you.  Pass what is returned here as location from which to
- * have daemons and tests dump data.
- * TODO: Add close on exit.
- */
-public class DataDir {
-  private File baseTestDir = null;
-  private File testDir = null;
-
-  /**
-   * System property key to get base test directory value
-   */
-  public static final String TEST_BASE_DIRECTORY_KEY = "test.data.base.dir";
-
-  /**
-   * Default base directory for test output.
-   */
-  public static final String TEST_BASE_DIRECTORY_DEFAULT = "target";
-
-  public static final String TEST_BASE_DIRECTORY_NAME = "test-data";
-
-  /**
-   * @return Where to write test data on local filesystem; usually
-   * {@link #TEST_BASE_DIRECTORY_DEFAULT}
-   * Should not be used directly by the unit tests, hence its's private.
-   * Unit test will use a subdirectory of this directory.
-   * @see #setupDataTestDir()
-   */
-  private synchronized File getBaseTestDir() {
-    if (this.baseTestDir != null) return this.baseTestDir;
-    String testHome = System.getProperty(TEST_BASE_DIRECTORY_KEY, TEST_BASE_DIRECTORY_DEFAULT);
-    this.baseTestDir = new File(testHome, TEST_BASE_DIRECTORY_NAME);
-    return this.baseTestDir;
-  }
-
-  /**
-   * @return Absolute path to the dir created by this instance.
-   * @throws IOException 
-   */
-  public synchronized File getDataDir() throws IOException {
-    if (this.testDir != null) return this.testDir;
-    this.testDir = new File(getBaseTestDir(), UUID.randomUUID().toString());
-    if (!this.testDir.exists()) {
-      if (!this.testDir.mkdirs()) throw new IOException("Failed mkdirs for " + this.testDir);
-    }
-    // Return absolute path. A relative passed to htraced will have it create data dirs relative
-    // to its data dir rather than in it.
-    return this.testDir.getAbsoluteFile();
-  }
-
-  /**
-   * Fragile. Ugly. Presumes paths. Best we can do for now until htraced comes local to this module
-   * and is moved out of src dir.
-   * @param dataDir A datadir gotten from {@link #getDataDir()}
-   * @return Top-level of the checkout.
-   */
-  public static File getTopLevelOfCheckout(final File dataDir) {
-    // Need absolute else we run out of road when dir is relative to this module.
-    File absolute = dataDir.getAbsoluteFile();
-    // Check we are where we think we are.
-    File testDataDir = absolute.getParentFile();
-    if (!testDataDir.getName().equals(TEST_BASE_DIRECTORY_NAME)) {
-      throw new IllegalArgumentException(dataDir.toString());
-    }
-    // Do another check.
-    File targetDir = testDataDir.getParentFile();
-    if (!targetDir.getName().equals(TEST_BASE_DIRECTORY_DEFAULT)) {
-      throw new IllegalArgumentException(dataDir.toString());
-    }
-    // Back up last two dirs out of the htrace-htraced dir.
-    return targetDir.getParentFile().getParentFile();
-  }
-}
\ No newline at end of file
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
deleted file mode 100644
index 67e3a21..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.util;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.net.URLConnection;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test putting up an htraced and making sure it basically works.
- * Makes presumption about paths; where data is relative to the htraced binary, etc., encoded
- * in methods in the below.
- */
-public class TestHTracedProcess {
-  private static final Log LOG =
-      LogFactory.getLog(TestHTracedProcess.class);
-  private DataDir testDir = null;
-  private final int TIMEOUT = 10000;
-
-  @Before
-  public void setupTest() {
-    this.testDir = new DataDir();
-  }
-
-  /*
-   * Do a basic GET of the server info from the running htraced instance.
-   */
-  private String doGet(final URL url) throws IOException {
-    URLConnection connection = url.openConnection();
-    connection.setConnectTimeout(TIMEOUT);
-    connection.setReadTimeout(TIMEOUT);
-    connection.connect();
-    StringBuffer sb = new StringBuffer();
-    BufferedReader reader = new BufferedReader(
-        new InputStreamReader(connection.getInputStream()));
-    try {
-      String line = null;
-      while ((line = reader.readLine()) != null) {
-        System.out.println(line);
-        sb.append(line);
-      }
-    } finally {
-      reader.close();
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Put up an htraced instance and do a Get against /server/info.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  @Test (timeout=10000)
-  public void testStartStopHTraced() throws Exception {
-    HTracedProcess htraced = null;
-    File dataDir = this.testDir.getDataDir();
-    File topLevel = DataDir.getTopLevelOfCheckout(dataDir);
-    try {
-      htraced = new HTracedProcess(HTracedProcess.
-          getPathToHTraceBinaryFromTopLevel(topLevel),
-          dataDir, "localhost");
-      LOG.info("Started HTracedProcess with REST server URL " +
-          htraced.getHttpAddr());
-      String str = doGet(new URL(
-          "http://" + htraced.getHttpAddr() + "/server/info"));
-      // Assert we go something back.
-      assertTrue(str.contains("ReleaseVersion"));
-      // Assert that the datadir is not empty.
-    } finally {
-      if (htraced != null) {
-        htraced.destroy();
-        System.out.println("ExitValue=" + htraced.waitFor());
-      }
-    }
-  }
-}
\ No newline at end of file