HTRACE-133. HTracedRESTReceiver drops spans when close() is called (cmccabe)
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
index ae1cfed..7edc2b8 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
@@ -68,9 +68,10 @@
public class HTracedRESTReceiver implements SpanReceiver {
private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
- // TODO: Take process name and add this to user agent? Would help debugging?
- // @VisibleForTesting Protected so accessible from tests.
- final HttpClient httpClient;
+ /**
+ * The HttpClient to use for this receiver.
+ */
+ private final HttpClient httpClient;
/**
* The maximum number of spans to buffer.
@@ -98,11 +99,16 @@
private final Thread postSpansThread;
/**
- * Timeout in milliseconds.
- * For now, it is read and connect timeout.
+ * The connection timeout in milliseconds.
*/
- public static final String CLIENT_REST_TIMEOUT_MS_KEY = "client.rest.timeout.ms";
- private static final int CLIENT_REST_TIMEOUT_MS_DEFAULT = 60000;
+ public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = "client.connect.timeout.ms";
+ private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000;
+
+ /**
+ * The idle timeout in milliseconds.
+ */
+ public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = "client.idle.timeout.ms";
+ private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000;
/**
* URL of the htraced REST server we are to talk to.
@@ -164,19 +170,32 @@
private boolean mustStartFlush;
/**
+ * Create an HttpClient instance.
+ *
+ * @param connTimeout The timeout to use for connecting.
+ * @param idleTimeout The idle timeout to use.
+ */
+ HttpClient createHttpClient(long connTimeout, long idleTimeout) {
+ HttpClient httpClient = new HttpClient();
+ httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
+ this.getClass().getSimpleName()));
+ httpClient.setConnectTimeout(connTimeout);
+ httpClient.setIdleTimeout(idleTimeout);
+ return httpClient;
+ }
+
+ /**
* Constructor.
* You must call {@link #close()} post construction when done.
* @param conf
* @throws Exception
*/
public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception {
- this.httpClient = new HttpClient();
- this.httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
- this.getClass().getSimpleName()));
- // Use same timeout for connection and idle for now.
- int timeout = conf.getInt(CLIENT_REST_TIMEOUT_MS_KEY, CLIENT_REST_TIMEOUT_MS_DEFAULT);
- this.httpClient.setConnectTimeout(timeout);
- this.httpClient.setIdleTimeout(timeout);
+ int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY,
+ CLIENT_CONNECT_TIMEOUT_MS_DEFAULT);
+ int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY,
+ CLIENT_IDLE_TIMEOUT_MS_DEFAULT);
+ this.httpClient = createHttpClient(connTimeout, idleTimeout);
this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
this.spans = new ArrayDeque<Span>(capacity);
// Build up the writeSpans URL.
@@ -197,9 +216,10 @@
this.postSpansThread.setName("PostSpans");
this.postSpansThread.start();
if (LOG.isDebugEnabled()) {
- LOG.debug("Created new HTracedRESTReceiver with timeout=" + timeout +
- ", capacity=" + capacity + ", url=" + url + ", periodInMs=" +
- periodInMs + ", maxToSendAtATime=" + maxToSendAtATime);
+ LOG.debug("Created new HTracedRESTReceiver with connTimeout=" +
+ connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" +
+ capacity + ", url=" + url + ", periodInMs=" + periodInMs +
+ ", maxToSendAtATime=" + maxToSendAtATime);
}
}
@@ -239,20 +259,24 @@
lock.lock();
try {
if (shutdown) {
- LOG.info("Shutting down PostSpans thread...");
- break;
- }
- try {
- waitNs = cond.awaitNanos(waitNs);
- if (mustStartFlush) {
- waitNs = 0;
- mustStartFlush = false;
+ if (spans.isEmpty()) {
+ LOG.debug("Shutting down PostSpans thread...");
+ break;
}
- } catch (InterruptedException e) {
- LOG.info("Got InterruptedException");
- waitNs = 0;
+ } else {
+ try {
+ waitNs = cond.awaitNanos(waitNs);
+ if (mustStartFlush) {
+ waitNs = 0;
+ mustStartFlush = false;
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Got InterruptedException");
+ waitNs = 0;
+ }
}
- if ((spans.size() > maxToSendAtATime) || (waitNs <= 0)) {
+ if ((spans.size() > maxToSendAtATime) || (waitNs <= 0) ||
+ shutdown) {
loadSpanBuf();
waitNs = periodInNs;
}
@@ -319,7 +343,7 @@
@Override
public void close() throws IOException {
- LOG.info("Closing HTracedRESTReceiver(" + url + ").");
+ LOG.debug("Closing HTracedRESTReceiver(" + url + ").");
lock.lock();
try {
this.shutdown = true;
@@ -328,7 +352,13 @@
lock.unlock();
}
try {
- postSpansThread.join(30000);
+ postSpansThread.join(120000);
+ if (postSpansThread.isAlive()) {
+ LOG.error("Timed out without closing HTracedRESTReceiver(" +
+ url + ").");
+ } else {
+ LOG.debug("Closed HTracedRESTReceiver(" + url + ").");
+ }
} catch (InterruptedException e) {
LOG.error("Interrupted while joining postSpans", e);
}
@@ -364,6 +394,11 @@
boolean added = false;
lock.lock();
try {
+ if (shutdown) {
+ LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver " +
+ "is already shut down.");
+ return;
+ }
if (spans.size() < capacity) {
spans.add(span);
added = true;
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
index 676e348..eca6d6d 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
@@ -31,6 +31,7 @@
import org.apache.htrace.util.DataDir;
import org.apache.htrace.util.HTracedProcess;
import org.apache.htrace.util.TestUtil;
+import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.junit.After;
@@ -91,14 +92,18 @@
public void testBasicGet() throws Exception {
HTracedRESTReceiver receiver =
new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
+ HttpClient http = receiver.createHttpClient(60000L, 60000L);
+ http.start();
try {
// Do basic a GET /server/info against htraced
- ContentResponse response = receiver.httpClient.GET(restServerUrl + "server/info");
+ ContentResponse response =
+ http.GET(restServerUrl + "server/info");
assertEquals("application/json", response.getMediaType());
String content = processGET(response);
assertTrue(content.contains("ReleaseVersion"));
System.out.println(content);
} finally {
+ http.stop();
receiver.close();
}
}
@@ -109,22 +114,25 @@
return response.getContentAsString();
}
- /**
- * Send 100 spans then confirm they made it in.
- * @throws Exception
- */
- @Test (timeout = 60000)
- public void testSendingSpans() throws Exception {
+ private void testSendingSpansImpl(boolean testClose) throws Exception {
final HTracedRESTReceiver receiver =
new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
final int NUM_SPANS = 3;
+ final HttpClient http = receiver.createHttpClient(60000, 60000);
+ http.start();
try {
for (int i = 0; i < NUM_SPANS; i++) {
- Span span = new MilliSpan.Builder().parents(new long [] {1L}).spanId(i).build();
+ Span span = new MilliSpan.Builder().parents(
+ new long [] {1L}).spanId(i).build();
LOG.info(span.toString());
receiver.receiveSpan(span);
}
- receiver.startFlushing();
+
+ if (testClose) {
+ receiver.close();
+ } else {
+ receiver.startFlushing();
+ }
TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
@Override
public Boolean get() {
@@ -134,7 +142,7 @@
// span id.
String findSpan = String.format("span/%016x", i);
ContentResponse response =
- receiver.httpClient.GET(restServerUrl + findSpan);
+ http.GET(restServerUrl + findSpan);
String content = processGET(response);
if ((content == null) || (content.length() == 0)) {
LOG.info("Failed to find span " + i);
@@ -150,7 +158,30 @@
}
}, 10, 20000);
} finally {
- receiver.close();
+ http.stop();
+ if (!testClose) {
+ receiver.close();
+ }
}
}
+
+ /**
+ * Send 100 spans then confirm they made it in.
+ * @throws Exception
+ */
+ @Test (timeout = 60000)
+ public void testSendingSpans() throws Exception {
+ testSendingSpansImpl(false);
+ }
+
+ /**
+ * Test that the REST receiver blocks during shutdown until all spans are sent
+ * (or a long timeout elapses). Otherwise, short-lived client processes will
+ * never have a chance to send all their spans and we will have incomplete
+ * information.
+ */
+ @Test (timeout = 60000)
+ public void testShutdownBlocksUntilSpanAreSent() throws Exception {
+ testSendingSpansImpl(true);
+ }
}