HTRACE-109. fix TestHTracedRESTReceiver unit test failures (cmccabe)
diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
index b22e312..ba63f2d 100644
--- a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
@@ -62,6 +62,11 @@
 // The log level to use for the logs in htrace.
 const HTRACE_LOG_LEVEL = "log.level"
 
+// A host:port pair to send information to on startup.  This is used in unit
+// tests to determine the (random) port of the htraced process that has been
+// started.
+const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address"
+
 // Default values for HTrace configuration keys.
 var DEFAULTS = map[string]string{
 	HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
index d2cbafc..191b68e 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
@@ -20,7 +20,9 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
+	"net"
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
 	"os"
@@ -69,12 +71,54 @@
 		lg.Errorf("Error creating datastore: %s\n", err.Error())
 		os.Exit(1)
 	}
-	_, err = CreateRestServer(cnf, store)
+	var rsv *RestServer
+	rsv, err = CreateRestServer(cnf, store)
 	if err != nil {
 		lg.Errorf("Error creating REST server: %s\n", err.Error())
 		os.Exit(1)
 	}
+	naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
+	if naddr != "" {
+		notif := StartupNotification{
+			HttpAddr:  rsv.Addr().String(),
+			ProcessId: os.Getpid(),
+		}
+		err = sendStartupNotification(naddr, &notif)
+		if err != nil {
+			fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+
+				"%s\n", err.Error())
+			os.Exit(1)
+		}
+	}
 	for {
 		time.Sleep(time.Duration(10) * time.Hour)
 	}
 }
+
+// A startup notification message that we optionally send on startup.
+// Used by unit tests.
+type StartupNotification struct {
+	HttpAddr  string
+	ProcessId int
+}
+
+func sendStartupNotification(naddr string, notif *StartupNotification) error {
+	conn, err := net.Dial("tcp", naddr)
+	if err != nil {
+		return err
+	}
+	defer func() {
+		if conn != nil {
+			conn.Close()
+		}
+	}()
+	var buf []byte
+	buf, err = json.Marshal(notif)
+	if err != nil {
+		return err
+	}
+	_, err = conn.Write(buf)
+	conn.Close()
+	conn = nil
+	return nil
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
index 9cdab20..495aed0 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
@@ -112,6 +112,7 @@
 	if !ok {
 		return
 	}
+	hand.lg.Debugf("findSidHandler(sid=%s)\n", common.SpanId(sid))
 	span := hand.store.FindSpan(sid)
 	if span == nil {
 		writeError(hand.lg, w, http.StatusNoContent, fmt.Sprintf("No such span as %s\n",
@@ -139,6 +140,7 @@
 	if !ok {
 		return
 	}
+	hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", common.SpanId(sid), lim)
 	children := hand.store.FindChildren(sid, lim)
 	jbytes, err := json.Marshal(children)
 	if err != nil {
@@ -170,6 +172,7 @@
 		}
 		spans = append(spans, &span)
 	}
+	hand.lg.Debugf("writeSpansHandler: received %d span(s).\n", len(spans))
 	for spanIdx := range spans {
 		hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson())
 		hand.store.WriteSpan(spans[spanIdx])
@@ -238,6 +241,15 @@
 	w.Write([]byte(rsc))
 }
 
+type logErrorHandler struct {
+	lg *common.Logger
+}
+
+func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	hand.lg.Errorf("Got unknown request %s\n", req.RequestURI)
+	writeError(hand.lg, w, http.StatusBadRequest, "Unknown request.")
+}
+
 type RestServer struct {
 	listener net.Listener
 	lg       *common.Logger
@@ -280,6 +292,9 @@
 	// Default Handler. This will serve requests for static requests.
 	r.PathPrefix("/").Handler(&defaultServeHandler{lg: rsv.lg}).Methods("GET")
 
+	// Log an error message for unknown non-GET requests.
+	r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg})
+
 	go http.Serve(rsv.listener, r)
 
 	rsv.lg.Infof("Started REST server on %s...\n", rsv.listener.Addr().String())
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
index be5521a..4467208 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
@@ -50,6 +50,7 @@
 
   private static Random rand = new Random();
   private static ObjectWriter JSON_WRITER = new ObjectMapper().writer();
+  private static final long EMPTY_PARENT_ARRAY[] = new long[0];
 
   private long begin;
   private long end;
@@ -74,7 +75,7 @@
     private long end;
     private String description;
     private long traceId;
-    private long parents[];
+    private long parents[] = EMPTY_PARENT_ARRAY;
     private long spanId;
     private Map<String, String> traceInfo = null;
     private String processId;
diff --git a/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java b/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java
new file mode 100644
index 0000000..7cb4aed
--- /dev/null
+++ b/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.util;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utilities for writing unit tests.
+ */
+public class TestUtil {
+  /**
+   * Get a dump of the stack traces of all threads.
+   */
+  public static String threadDump() {
+    StringBuilder dump = new StringBuilder();
+    Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces();
+    for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) {
+      Thread thread = e.getKey();
+      dump.append(String.format(
+          "\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s",
+          thread.getName(),
+          (thread.isDaemon() ? "daemon" : ""),
+          thread.getPriority(),
+          thread.getId(),
+          Thread.State.WAITING.equals(thread.getState()) ?
+              "in Object.wait()" : thread.getState().name().toLowerCase(),
+          Thread.State.WAITING.equals(thread.getState()) ?
+              "WAITING (on object monitor)" : thread.getState()));
+      for (StackTraceElement stackTraceElement : e.getValue()) {
+        dump.append("\n        at ");
+        dump.append(stackTraceElement);
+      }
+      dump.append("\n");
+    }
+    return dump.toString();
+  }
+
+  /**
+   * A callback which returns a value of type T.
+   *
+   * TODO: remove this when we're on Java 8, in favor of
+   * java.util.function.Supplier.
+   */
+  public interface Supplier<T> {
+    T get();
+  }
+
+  /**
+   * Wait for a condition to become true for a configurable amount of time.
+   *
+   * @param check           The condition to wait for.
+   * @param periodMs        How often to check the condition, in milliseconds.
+   * @param timeoutMs       How long to wait in total, in milliseconds.
+   */
+  public static void waitFor(Supplier<Boolean> check, 
+      long periodMs, long timeoutMs)
+          throws TimeoutException, InterruptedException
+  {
+    long endNs = System.nanoTime() +
+        TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS);
+    while (true) {
+      boolean result = check.get();
+      if (result) {
+        return;
+      }
+      long nowNs = System.nanoTime();
+      if (nowNs >= endNs) {
+        throw new TimeoutException("Timed out waiting for test condition. " +
+            "Thread dump:\n" + threadDump());
+      }
+      Thread.sleep(periodMs);
+    }
+  }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
index 35cd332..d730a17 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
@@ -20,14 +20,13 @@
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.ArrayDeque;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,19 +73,29 @@
   final HttpClient httpClient;
 
   /**
+   * The maximum number of spans to buffer.
+   */
+  private final int capacity;
+
+  /**
    * REST URL to use writing Spans.
    */
-  private final String writeSpansRESTURL;
+  private final String url;
+
+  /**
+   * The maximum number of spans to send in a single PUT.
+   */
+  private final int maxToSendAtATime;
 
   /**
    * Runs background task to do the REST PUT.
    */
-  private final ScheduledExecutorService scheduler;
+  private final PostSpans postSpans;
 
   /**
-   * Keep around reference so can cancel on close any running scheduled task.
+   * Thread for postSpans
    */
-  private final ScheduledFuture<?> scheduledFuture;
+  private final Thread postSpansThread;
 
   /**
    * Timeout in milliseconds.
@@ -111,8 +120,8 @@
   /**
    * Period at which the background thread that does the REST POST to htraced in ms.
    */
-  public static final String CLIENT_REST_PERIOD_MS_KEY = "client.reset.period.ms";
-  private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 1000;
+  public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms";
+  private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000;
 
   /**
    * Maximum spans to post to htraced at a time.
@@ -122,15 +131,37 @@
   private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100;
 
   /**
-   * Simple bounded queue to hold spans between periodic runs of the httpclient.
+   * Lock protecting the PostSpans data.
    */
-  private final Queue<Span> queue;
+  private ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * Condition variable used to wake up the PostSpans thread.
+   */
+  private Condition cond = lock.newCondition();
+
+  /**
+   * True if we should shut down.
+   * Protected by the lock.
+   */
+  private boolean shutdown = false;
+
+  /**
+   * Simple bounded queue to hold spans between periodic runs of the httpclient.
+   * Protected by the lock.
+   */
+  private final ArrayDeque<Span> spans;
 
   /**
    * Keep last time we logged we were at capacity; used to prevent flooding of logs with
    * "at capacity" messages.
    */
-  private volatile long lastAtCapacityWarningLog = 0L;
+  private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L);
+
+  /**
+   * True if we should flush as soon as possible.  Protected by the lock.
+   */
+  private boolean mustStartFlush;
 
   /**
    * Constructor.
@@ -146,25 +177,25 @@
     int timeout = conf.getInt(CLIENT_REST_TIMEOUT_MS_KEY, CLIENT_REST_TIMEOUT_MS_DEFAULT);
     this.httpClient.setConnectTimeout(timeout);
     this.httpClient.setIdleTimeout(timeout);
-    int capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
-    this.queue = new ArrayBlockingQueue<Span>(capacity, true);
+    this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
+    this.spans = new ArrayDeque<Span>(capacity);
     // Build up the writeSpans URL.
     URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT));
-    URL url =
-      new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans");
-    this.writeSpansRESTURL = url.toString();
-    // Make a scheduler with one thread to run our POST of spans on a period.
-    this.scheduler = Executors.newScheduledThreadPool(1);
+    URL url = new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans");
+    this.url = url.toString();
     // Period at which we run the background thread that does the REST POST to htraced.
     int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, CLIENT_REST_PERIOD_MS_DEFAULT);
     // Maximum spans to send in one go
-    int maxToSendAtATime =
+    this.maxToSendAtATime =
       conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT);
-    this.scheduledFuture =
-      this.scheduler.scheduleAtFixedRate(new PostSpans(this.queue, maxToSendAtATime),
-          periodInMs, periodInMs, TimeUnit.MILLISECONDS);
     // Start up the httpclient.
     this.httpClient.start();
+    // Start the background thread.
+    this.postSpans = new PostSpans(periodInMs);
+    this.postSpansThread = new Thread(postSpans);
+    this.postSpansThread.setDaemon(true);
+    this.postSpansThread.setName("PostSpans");
+    this.postSpansThread.start();
   }
 
   /**
@@ -172,81 +203,188 @@
    * Run on a period. Services the passed in queue taking spans and sending them to traced via http.
    */
   private class PostSpans implements Runnable {
-    private final Queue<Span> q;
-    private final int maxToSendAtATime;
+    private final long periodInNs;
+    private final ArrayDeque<Span> spanBuf;
 
-    private PostSpans(final Queue<Span> q, final int maxToSendAtATime) {
-      this.q = q;
-      this.maxToSendAtATime = maxToSendAtATime;
+    private PostSpans(long periodInMs) {
+      this.periodInNs = TimeUnit.NANOSECONDS.
+          convert(periodInMs, TimeUnit.MILLISECONDS);
+      this.spanBuf = new ArrayDeque<Span>(maxToSendAtATime);
     }
 
+    /**
+     * The span sending thread.
+     *
+     * We send a batch of spans for one of two reasons: there are already
+     * maxToSendAtATime spans in the buffer, or the client.rest.period.ms
+     * has elapsed.  The idea is that we want to strike a balance between
+     * sending a lot of spans at a time, for efficiency purposes, and
+     * making sure that we don't buffer spans locally for too long.
+     *
+     * The longer we buffer spans locally, the longer we will have to wait
+     * to see the results of our client operations in the GUI, and the higher
+     * the risk of losing them if the client crashes.
+     */
     @Override
     public void run() {
-      Span span = null;
-      // Cycle until we drain queue. Send maxToSendAtATime if more than this in queue.
-      while ((span = this.q.poll()) != null) {
-        // We got a span. Send at least this one span.
-        Request request = httpClient.newRequest(writeSpansRESTURL).method(HttpMethod.POST);
-        request.header(HttpHeader.CONTENT_TYPE, "application/json");
-        int count = 1;
-        request.content(new StringContentProvider(span.toJson()));
-        // Drain queue or until we have maxToSendAtATime spans, if more than just one.
-        while ((span = this.q.poll()) != null) {
-          request.content(new StringContentProvider(span.toJson()));
-          count++;
-          // If we've accumulated sufficient to send, go ahead and send what we have. Can do the
-          // rest in out next go around.
-          if (count > this.maxToSendAtATime) break;
-        }
-        try {
-          ContentResponse response = request.send();
-          if (response.getStatus() == HttpStatus.OK_200) {
-            if (LOG.isDebugEnabled()) LOG.debug("POSTED " + count + " spans");
-          } else {
-            LOG.error("Status: " + response.getStatus());
-            LOG.error(response.getHeaders());
-            LOG.error(response.getContentAsString());
+      long waitNs;
+      try {
+        waitNs = periodInNs;
+        while (true) {
+          lock.lock();
+          try {
+            if (shutdown) {
+              LOG.info("Shutting down PostSpans thread...");
+              break;
+            }
+            try {
+              waitNs = cond.awaitNanos(waitNs);
+              if (mustStartFlush) {
+                waitNs = 0;
+                mustStartFlush = false;
+              }
+            } catch (InterruptedException e) {
+              LOG.info("Got InterruptedException");
+              waitNs = 0;
+            }
+            if ((spans.size() > maxToSendAtATime) || (waitNs <= 0)) {
+              loadSpanBuf();
+              waitNs = periodInNs;
+            }
+          } finally {
+            lock.unlock();
           }
-        } catch (InterruptedException e) {
-          LOG.error(e);
-        } catch (TimeoutException e) {
-          LOG.error(e);
-        } catch (ExecutionException e) {
-          LOG.error(e);
+          // Once the lock has been safely released, we can do some network
+          // I/O without blocking the client process.
+          if (!spanBuf.isEmpty()) {
+            sendSpans();
+            spanBuf.clear();
+          }
         }
+      } finally {
+        if (httpClient != null) {
+          try {
+            httpClient.stop();
+          } catch (Exception e) {
+            LOG.error("Error shutting down httpClient", e);
+          }
+        }
+        spans.clear();
+      }
+    }
+
+    private void loadSpanBuf() {
+      for (int loaded = 0; loaded < maxToSendAtATime; loaded++) {
+        Span span = spans.pollFirst();
+        if (span == null) {
+          return;
+        }
+        spanBuf.add(span);
+      }
+    }
+
+    private void sendSpans() {
+      try {
+        Request request = httpClient.newRequest(url).method(HttpMethod.POST);
+        request.header(HttpHeader.CONTENT_TYPE, "application/json");
+        StringBuilder bld = new StringBuilder();
+        for (Span span : spanBuf) {
+          bld.append(span.toJson());
+        }
+        request.content(new StringContentProvider(bld.toString()));
+        ContentResponse response = request.send();
+        if (response.getStatus() == HttpStatus.OK_200) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("POSTED " + spanBuf.size() + " spans");
+          }
+        } else {
+          LOG.error("Status: " + response.getStatus());
+          LOG.error(response.getHeaders());
+          LOG.error(response.getContentAsString());
+        }
+      } catch (InterruptedException e) {
+        LOG.error(e);
+      } catch (TimeoutException e) {
+        LOG.error(e);
+      } catch (ExecutionException e) {
+        LOG.error(e);
       }
     }
   }
 
   @Override
   public void close() throws IOException {
-    if (this.scheduledFuture != null) this.scheduledFuture.cancel(true);
-    if (this.scheduler == null) this.scheduler.shutdown();
-    if (this.httpClient != null) {
-      try {
-        this.httpClient.stop();
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
+    LOG.info("Closing HTracedRESTReceiver.");
+    lock.lock();
+    try {
+      this.shutdown = true;
+      cond.signal();
+    } finally {
+      lock.unlock();
+    }
+    try {
+      postSpansThread.join(30000);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while joining postSpans", e);
     }
   }
 
-  // @VisibleForTesting
-  boolean isQueueEmpty() {
-    return this.queue.isEmpty();
+  /**
+   * Start flushing the buffered spans.
+   *
+   * Note that even after calling this function, you will still have to wait
+   * for the flush to finish happening.  This function just starts the flush;
+   * it does not block until it has completed.  You also do not get
+   * "read-after-write consistency" with htraced... the spans that are
+   * written may be buffered for a short period of time prior to being
+   * readable.  This is not a problem for production use (since htraced is not
+   * a database), but it means that most unit tests will need a loop in their
+   * "can I read what I wrote" tests.
+   */
+  void startFlushing() {
+    LOG.info("Triggering HTracedRESTReceiver flush.");
+    lock.lock();
+    try {
+      mustStartFlush = true;
+      cond.signal();
+    } finally {
+      lock.unlock();
+    }
   }
 
   private static long WARN_TIMEOUT_MS = 300000;
 
   @Override
   public void receiveSpan(Span span) {
-    if (!this.queue.offer(span)) {
-      // TODO: If failed the offer, run the background thread now. I can't block though?
+    boolean added = false;
+    lock.lock();
+    try {
+      if (spans.size() < capacity) {
+        spans.add(span);
+        added = true;
+        if (spans.size() >= maxToSendAtATime) {
+          cond.signal();
+        }
+      } else {
+        cond.signal();
+      }
+    } finally {
+      lock.unlock();
+    }
+    if (!added) {
       long now = System.nanoTime() / 1000000L;
-      // Only log every 5 minutes. Any more than this for a guest process is obnoxious
-      if (now - lastAtCapacityWarningLog > WARN_TIMEOUT_MS) {
-        LOG.warn("At capacity");
-        this.lastAtCapacityWarningLog = now;
+      long last = lastAtCapacityWarningLog.get();
+      if (now - last > WARN_TIMEOUT_MS) {
+        // Only log every 5 minutes. Any more than this for a guest process
+        // is obnoxious.
+        if (lastAtCapacityWarningLog.compareAndSet(last, now)) {
+          // If the atomic-compare-and-set succeeds, we should log.  Otherwise,
+          // we should assume another thread already logged and bumped up the
+          // value of lastAtCapacityWarning sometime between our get and the
+          // "if" statement.
+          LOG.warn("There are too many HTrace spans to buffer!  We have " +
+              "already buffered " + capacity + " spans.  Dropping spans.");
+        }
       }
     }
   }
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
index fe9f1c0..b1f1b11 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
@@ -18,6 +18,7 @@
 package org.apache.htrace.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -29,6 +30,7 @@
 import org.apache.htrace.Span;
 import org.apache.htrace.util.DataDir;
 import org.apache.htrace.util.HTracedProcess;
+import org.apache.htrace.util.TestUtil;
 import org.eclipse.jetty.client.api.ContentResponse;
 import org.eclipse.jetty.http.HttpStatus;
 import org.junit.After;
@@ -36,20 +38,20 @@
 import org.junit.Test;
 
 public class TestHTracedRESTReceiver {
-  private static final Log LOG = LogFactory.getLog(TestHTracedRESTReceiver.class);
-  private URL restServerUrl;;
+  private static final Log LOG =
+      LogFactory.getLog(TestHTracedRESTReceiver.class);
+  private URL restServerUrl;
   private DataDir dataDir;
   HTracedProcess htraced;
 
   @Before
   public void setUp() throws Exception {
     this.dataDir = new DataDir();
-    // Start on 9097. Would be better to start at port 0 and then ask server what port it managed
-    // to come up on.
-    this.restServerUrl = new URL("http://localhost:9097/");
     File tlDir = DataDir.getTopLevelOfCheckout(this.dataDir.getDataDir());
     File pathToHTracedBinary = HTracedProcess.getPathToHTraceBinaryFromTopLevel(tlDir);
-    this.htraced = new HTracedProcess(pathToHTracedBinary, dataDir.getDataDir(), restServerUrl);
+    this.htraced = new HTracedProcess(pathToHTracedBinary,
+        dataDir.getDataDir(), "localhost");
+    this.restServerUrl = new URL("http://" + htraced.getHttpAddr() + "/");
   }
 
   @After
@@ -75,6 +77,7 @@
     @Override
     public String get(String key, String defaultValue) {
       if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) {
+        LOG.info("WATERMELON2: got request for htraced.rest.url.  Returning " + this.restServerUrl.toString());
         return this.restServerUrl.toString();
       }
       return defaultValue;
@@ -90,7 +93,7 @@
     HTracedRESTReceiver receiver =
       new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
     try {
-      // Do basic a GET /server/info against localhost:9095 htraced
+      // Do basic a GET /server/info against htraced
       ContentResponse response = receiver.httpClient.GET(restServerUrl + "server/info");
       assertEquals("application/json", response.getMediaType());
       String content = processGET(response);
@@ -111,31 +114,44 @@
    * Send 100 spans then confirm they made it in.
    * @throws Exception
    */
-  @Test (timeout = 10000)
+  @Test (timeout = 60000)
   public void testSendingSpans() throws Exception {
-    HTracedRESTReceiver receiver =
+    final HTracedRESTReceiver receiver =
       new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
+    final int NUM_SPANS = 3;
     try {
-      // TODO: Fix MilliSpan. Requires a parentid.  Shouldn't have to have one else be explicit it
-      // is required.
-      for (int i = 0; i < 100; i++) {
+      for (int i = 0; i < NUM_SPANS; i++) {
         Span span = new MilliSpan.Builder().parents(new long [] {1L}).spanId(i).build();
         LOG.info(span.toString());
         receiver.receiveSpan(span);
       }
-      // Wait for the queue to empty before we go to check they made it over.
-      while (receiver.isQueueEmpty()) Thread.sleep(1);
-      // Read them all back.
-      for (int i = 0; i < 100; i++) {
-        // This is what the REST server expends when querying for a span id.
-        String findSpan = String.format("span/%016x", i);
-        ContentResponse response = receiver.httpClient.GET(restServerUrl + findSpan);
-        String content = processGET(response);
-        assertTrue(content != null && content.length() > 0);
-        LOG.info(content);
-      }
+      receiver.startFlushing();
+      TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            for (int i = 0; i < NUM_SPANS; i++) {
+              // This is what the REST server expects when querying for a
+              // span id.
+              String findSpan = String.format("span/%016x", i);
+              ContentResponse response =
+                  receiver.httpClient.GET(restServerUrl + findSpan);
+              String content = processGET(response);
+              if ((content == null) || (content.length() == 0)) {
+                LOG.info("Failed to find span " + i);
+                return false;
+              }
+              LOG.info("Got " + content + " for span " + i);
+            }
+            return true;
+          } catch (Throwable t) {
+            LOG.error("Got exception", t);
+            return false;
+          }
+        }
+      }, 10, 20000);
     } finally {
       receiver.close();
     }
   }
-}
\ No newline at end of file
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
index 12343f7..e319925 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
@@ -16,12 +16,19 @@
  */
 package org.apache.htrace.util;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.ProcessBuilder.Redirect;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.net.URL;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * To get instance of HTraced up and running, create an instance of this class.
@@ -29,30 +36,86 @@
  * host data (leveldbs and logs).
  * TODO: We expect to find the htraced in a very particular place. Fragile. Will break if stuff
  * moves.
- * TODO: What if a port clash? How to have it come up another port then ask the process what port
- * it is running on?
  */
 public class HTracedProcess extends Process {
+  private static final Log LOG = LogFactory.getLog(HTracedProcess.class);
   private final Process delegate;
 
-  public HTracedProcess(final File pathToHTracedBinary, final File dataDir, final URL url)
-  throws IOException {
-    // web.address for htraced is hostname ':' port; no 'scheme' yet.
-    String webAddress = url.getHost() + ":" + url.getPort();
-    // Pass cmdline args to htraced to it uses our test dir for data.
-    ProcessBuilder pb = new ProcessBuilder(pathToHTracedBinary.toString(),
-      " -Dlog.level=TRACE",
-      "-Dweb.address=" + webAddress,
-      "-Ddata.store.clear=true",
-      "-Ddata.store.directories=" + dataDir.toString());
-    pb.redirectErrorStream(true);
-    // Inherit STDERR/STDOUT i/o; dumps on console for now.  Can add logs later.
-    pb.inheritIO();
-    pb.directory(dataDir);
-    this.delegate = pb.start();
-    assert pb.redirectInput() == Redirect.PIPE;
-    assert pb.redirectOutput().file() == dataDir;
-    assert this.delegate.getInputStream().read() == -1;
+  private final String httpAddr;
+
+  /**
+   * Data send back from the HTraced process on the notification port.
+   */
+  public static class StartupNotificationData {
+    /**
+     * The hostname:port pair which the HTraced process uses for HTTP requests.
+     */
+    @JsonProperty("HttpAddr")
+    String httpAddr;
+
+    /**
+     * The process ID of the HTraced process.
+     */
+    @JsonProperty("ProcessId")
+    long processId;
+  }
+
+  public HTracedProcess(final File binPath, final File dataDir,
+                        final String host) throws IOException {
+    // Create a notifier socket bound to a random port.
+    ServerSocket listener = new ServerSocket(0);
+    boolean success = false;
+    Process process = null;
+    try {
+      // Use a random port for the web address.  No 'scheme' yet.
+      String webAddress = host + ":0";
+      String logPath = new File(dataDir, "log.txt").getAbsolutePath();
+      // Pass cmdline args to htraced to it uses our test dir for data.
+      ProcessBuilder pb = new ProcessBuilder(binPath.toString(),
+        "-Dlog.level=TRACE",
+        "-Dlog.path=" + logPath,
+        "-Dweb.address=" + webAddress,
+        "-Ddata.store.clear=true",
+        "-Dstartup.notification.address=localhost:" + listener.getLocalPort(),
+        "-Ddata.store.directories=" + dataDir.toString());
+      pb.redirectErrorStream(true);
+      // Inherit STDERR/STDOUT i/o; dumps on console for now.  Can add logs later.
+      pb.inheritIO();
+      pb.directory(dataDir);
+      //assert pb.redirectInput() == Redirect.PIPE;
+      //assert pb.redirectOutput().file() == dataDir;
+      process = pb.start();
+      assert process.getInputStream().read() == -1;
+      StartupNotificationData data = readStartupNotification(listener);
+      httpAddr = data.httpAddr;
+      LOG.info("Started htraced process " + data.processId + " with http " +
+               "address " + data.httpAddr + ", logging to " + logPath);
+      success = true;
+    } finally {
+      if (!success) {
+        // Clean up after failure
+        if (process != null) {
+          process.destroy();
+          process = null;
+        }
+      }
+      delegate = process;
+      listener.close();
+    }
+  }
+
+  private static StartupNotificationData
+      readStartupNotification(ServerSocket listener) throws IOException {
+    Socket socket = listener.accept();
+    try {
+      InputStream in = socket.getInputStream();
+      ObjectMapper objectMapper = new ObjectMapper();
+      StartupNotificationData data = objectMapper.
+          readValue(in, StartupNotificationData.class);
+      return data;
+    } finally {
+      socket.close();
+    }
   }
 
   public int hashCode() {
@@ -91,6 +154,10 @@
     return delegate.toString();
   }
 
+  public String getHttpAddr() {
+    return httpAddr;
+  }
+
   /**
    * Ugly but how else to do file-math?
    * @param topLevel Presumes top-level of the htrace checkout.
@@ -100,4 +167,4 @@
     return new File(new File(new File(new File(new File(topLevel, "htrace-core"), "src"), "go"),
       "build"), "htraced");
   }
-}
\ No newline at end of file
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
index 38f90e5..67e3a21 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
@@ -25,6 +25,8 @@
 import java.net.URL;
 import java.net.URLConnection;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -34,6 +36,8 @@
  * in methods in the below.
  */
 public class TestHTracedProcess {
+  private static final Log LOG =
+      LogFactory.getLog(TestHTracedProcess.class);
   private DataDir testDir = null;
   private final int TIMEOUT = 10000;
 
@@ -51,7 +55,8 @@
     connection.setReadTimeout(TIMEOUT);
     connection.connect();
     StringBuffer sb = new StringBuffer();
-    BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(connection.getInputStream()));
     try {
       String line = null;
       while ((line = reader.readLine()) != null) {
@@ -70,24 +75,26 @@
    * @throws InterruptedException
    */
   @Test (timeout=10000)
-  public void testStartStopHTraced() throws IOException, InterruptedException {
-    // TODO: Make the test port random so no classes if concurrent test runs. Anything better
-    // I can do here?  Pass a zero and have the daemon tell me where it is successfully listening?
-    String restURL = "http://localhost:9096/";
-    URL restServerURL = new URL(restURL);
+  public void testStartStopHTraced() throws Exception {
     HTracedProcess htraced = null;
     File dataDir = this.testDir.getDataDir();
     File topLevel = DataDir.getTopLevelOfCheckout(dataDir);
     try {
-      htraced = new HTracedProcess(HTracedProcess.getPathToHTraceBinaryFromTopLevel(topLevel),
-        dataDir, restServerURL);
-      String str = doGet(new URL(restServerURL + "server/info"));
+      htraced = new HTracedProcess(HTracedProcess.
+          getPathToHTraceBinaryFromTopLevel(topLevel),
+          dataDir, "localhost");
+      LOG.info("Started HTracedProcess with REST server URL " +
+          htraced.getHttpAddr());
+      String str = doGet(new URL(
+          "http://" + htraced.getHttpAddr() + "/server/info"));
       // Assert we go something back.
       assertTrue(str.contains("ReleaseVersion"));
       // Assert that the datadir is not empty.
     } finally {
-      if (htraced != null) htraced.destroy();
-      System.out.println("ExitValue=" + htraced.exitValue());
+      if (htraced != null) {
+        htraced.destroy();
+        System.out.println("ExitValue=" + htraced.waitFor());
+      }
     }
   }
 }
\ No newline at end of file