HTRACE-133. HTracedRESTReceiver drops spans when close() is called (cmccabe)
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
index ae1cfed..7edc2b8 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
@@ -68,9 +68,10 @@
 public class HTracedRESTReceiver implements SpanReceiver {
   private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
 
-  // TODO: Take process name and add this to user agent?  Would help debugging?
-  // @VisibleForTesting Protected so accessible from tests.
-  final HttpClient httpClient;
+  /**
+   * The HttpClient to use for this receiver.
+   */
+  private final HttpClient httpClient;
 
   /**
    * The maximum number of spans to buffer.
@@ -98,11 +99,16 @@
   private final Thread postSpansThread;
 
   /**
-   * Timeout in milliseconds.
-   * For now, it is read and connect timeout.
+   * The connection timeout in milliseconds.
    */
-  public static final String CLIENT_REST_TIMEOUT_MS_KEY = "client.rest.timeout.ms";
-  private static final int CLIENT_REST_TIMEOUT_MS_DEFAULT = 60000;
+  public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = "client.connect.timeout.ms";
+  private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000;
+
+  /**
+   * The idle timeout in milliseconds.
+   */
+  public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = "client.idle.timeout.ms";
+  private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000;
 
   /**
    * URL of the htraced REST server we are to talk to.
@@ -164,19 +170,32 @@
   private boolean mustStartFlush;
 
   /**
+   * Create an HttpClient instance.
+   *
+   * @param connTimeout         The timeout to use for connecting.
+   * @param idleTimeout         The idle timeout to use.
+   */
+  HttpClient createHttpClient(long connTimeout, long idleTimeout) {
+    HttpClient httpClient = new HttpClient();
+    httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
+      this.getClass().getSimpleName()));
+    httpClient.setConnectTimeout(connTimeout);
+    httpClient.setIdleTimeout(idleTimeout);
+    return httpClient;
+  }
+
+  /**
    * Constructor.
    * You must call {@link #close()} post construction when done.
    * @param conf
    * @throws Exception
    */
   public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception {
-    this.httpClient = new HttpClient();
-    this.httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
-      this.getClass().getSimpleName()));
-    // Use same timeout for connection and idle for now.
-    int timeout = conf.getInt(CLIENT_REST_TIMEOUT_MS_KEY, CLIENT_REST_TIMEOUT_MS_DEFAULT);
-    this.httpClient.setConnectTimeout(timeout);
-    this.httpClient.setIdleTimeout(timeout);
+    int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY,
+                                  CLIENT_CONNECT_TIMEOUT_MS_DEFAULT);
+    int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY,
+                                  CLIENT_IDLE_TIMEOUT_MS_DEFAULT);
+    this.httpClient = createHttpClient(connTimeout, idleTimeout);
     this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
     this.spans = new ArrayDeque<Span>(capacity);
     // Build up the writeSpans URL.
@@ -197,9 +216,10 @@
     this.postSpansThread.setName("PostSpans");
     this.postSpansThread.start();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Created new HTracedRESTReceiver with timeout=" + timeout +
-            ", capacity=" + capacity + ", url=" + url +  ", periodInMs=" +
-            periodInMs + ", maxToSendAtATime=" + maxToSendAtATime);
+      LOG.debug("Created new HTracedRESTReceiver with connTimeout=" +
+            connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" +
+            capacity + ", url=" + url +  ", periodInMs=" + periodInMs +
+            ", maxToSendAtATime=" + maxToSendAtATime);
     }
   }
 
@@ -239,20 +259,24 @@
           lock.lock();
           try {
             if (shutdown) {
-              LOG.info("Shutting down PostSpans thread...");
-              break;
-            }
-            try {
-              waitNs = cond.awaitNanos(waitNs);
-              if (mustStartFlush) {
-                waitNs = 0;
-                mustStartFlush = false;
+              if (spans.isEmpty()) {
+                LOG.debug("Shutting down PostSpans thread...");
+                break;
               }
-            } catch (InterruptedException e) {
-              LOG.info("Got InterruptedException");
-              waitNs = 0;
+            } else {
+              try {
+                waitNs = cond.awaitNanos(waitNs);
+                if (mustStartFlush) {
+                  waitNs = 0;
+                  mustStartFlush = false;
+                }
+              } catch (InterruptedException e) {
+                LOG.info("Got InterruptedException");
+                waitNs = 0;
+              }
             }
-            if ((spans.size() > maxToSendAtATime) || (waitNs <= 0)) {
+            if ((spans.size() > maxToSendAtATime) || (waitNs <= 0) ||
+                    shutdown) {
               loadSpanBuf();
               waitNs = periodInNs;
             }
@@ -319,7 +343,7 @@
 
   @Override
   public void close() throws IOException {
-    LOG.info("Closing HTracedRESTReceiver(" + url + ").");
+    LOG.debug("Closing HTracedRESTReceiver(" + url + ").");
     lock.lock();
     try {
       this.shutdown = true;
@@ -328,7 +352,13 @@
       lock.unlock();
     }
     try {
-      postSpansThread.join(30000);
+      postSpansThread.join(120000);
+      if (postSpansThread.isAlive()) {
+        LOG.error("Timed out without closing HTracedRESTReceiver(" +
+                  url + ").");
+      } else {
+        LOG.debug("Closed HTracedRESTReceiver(" + url + ").");
+      }
     } catch (InterruptedException e) {
       LOG.error("Interrupted while joining postSpans", e);
     }
@@ -364,6 +394,11 @@
     boolean added = false;
     lock.lock();
     try {
+      if (shutdown) {
+        LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver " +
+            "is already shut down.");
+        return;
+      }
       if (spans.size() < capacity) {
         spans.add(span);
         added = true;
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
index 676e348..eca6d6d 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
@@ -31,6 +31,7 @@
 import org.apache.htrace.util.DataDir;
 import org.apache.htrace.util.HTracedProcess;
 import org.apache.htrace.util.TestUtil;
+import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.ContentResponse;
 import org.eclipse.jetty.http.HttpStatus;
 import org.junit.After;
@@ -91,14 +92,18 @@
   public void testBasicGet() throws Exception {
     HTracedRESTReceiver receiver =
       new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
+    HttpClient http = receiver.createHttpClient(60000L, 60000L);
+    http.start();
     try {
       // Do basic a GET /server/info against htraced
-      ContentResponse response = receiver.httpClient.GET(restServerUrl + "server/info");
+      ContentResponse response =
+        http.GET(restServerUrl + "server/info");
       assertEquals("application/json", response.getMediaType());
       String content = processGET(response);
       assertTrue(content.contains("ReleaseVersion"));
       System.out.println(content);
     } finally {
+      http.stop();
       receiver.close();
     }
   }
@@ -109,22 +114,25 @@
     return response.getContentAsString();
   }
 
-  /**
-   * Send 100 spans then confirm they made it in.
-   * @throws Exception
-   */
-  @Test (timeout = 60000)
-  public void testSendingSpans() throws Exception {
+  private void testSendingSpansImpl(boolean testClose) throws Exception {
     final HTracedRESTReceiver receiver =
       new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
     final int NUM_SPANS = 3;
+    final HttpClient http = receiver.createHttpClient(60000, 60000);
+    http.start();
     try {
       for (int i = 0; i < NUM_SPANS; i++) {
-        Span span = new MilliSpan.Builder().parents(new long [] {1L}).spanId(i).build();
+        Span span = new MilliSpan.Builder().parents(
+            new long [] {1L}).spanId(i).build();
         LOG.info(span.toString());
         receiver.receiveSpan(span);
       }
-      receiver.startFlushing();
+
+      if (testClose) {
+        receiver.close();
+      } else {
+        receiver.startFlushing();
+      }
       TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
         @Override
         public Boolean get() {
@@ -134,7 +142,7 @@
               // span id.
               String findSpan = String.format("span/%016x", i);
               ContentResponse response =
-                  receiver.httpClient.GET(restServerUrl + findSpan);
+                  http.GET(restServerUrl + findSpan);
               String content = processGET(response);
               if ((content == null) || (content.length() == 0)) {
                 LOG.info("Failed to find span " + i);
@@ -150,7 +158,30 @@
         }
       }, 10, 20000);
     } finally {
-      receiver.close();
+      http.stop();
+      if (!testClose) {
+        receiver.close();
+      }
     }
   }
+
+  /**
+   * Send 100 spans then confirm they made it in.
+   * @throws Exception
+   */
+  @Test (timeout = 60000)
+  public void testSendingSpans() throws Exception {
+    testSendingSpansImpl(false);
+  }
+
+  /**
+   * Test that the REST receiver blocks during shutdown until all spans are sent
+   * (or a long timeout elapses).  Otherwise, short-lived client processes will
+   * never have a chance to send all their spans and we will have incomplete
+   * information.
+   */
+  @Test (timeout = 60000)
+  public void testShutdownBlocksUntilSpanAreSent() throws Exception {
+    testSendingSpansImpl(true);
+  }
 }