HTRACE-109. fix TestHTracedRESTReceiver unit test failures (cmccabe)
diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
index b22e312..ba63f2d 100644
--- a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
@@ -62,6 +62,11 @@
// The log level to use for the logs in htrace.
const HTRACE_LOG_LEVEL = "log.level"
+// A host:port pair to send information to on startup. This is used in unit
+// tests to determine the (random) port of the htraced process that has been
+// started.
+const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address"
+
// Default values for HTrace configuration keys.
var DEFAULTS = map[string]string{
HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
index d2cbafc..191b68e 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
@@ -20,7 +20,9 @@
package main
import (
+ "encoding/json"
"fmt"
+ "net"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"os"
@@ -69,12 +71,54 @@
lg.Errorf("Error creating datastore: %s\n", err.Error())
os.Exit(1)
}
- _, err = CreateRestServer(cnf, store)
+ var rsv *RestServer
+ rsv, err = CreateRestServer(cnf, store)
if err != nil {
lg.Errorf("Error creating REST server: %s\n", err.Error())
os.Exit(1)
}
+ naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
+ if naddr != "" {
+ notif := StartupNotification{
+ HttpAddr: rsv.Addr().String(),
+ ProcessId: os.Getpid(),
+ }
+ err = sendStartupNotification(naddr, ¬if)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+
+ "%s\n", err.Error())
+ os.Exit(1)
+ }
+ }
for {
time.Sleep(time.Duration(10) * time.Hour)
}
}
+
+// A startup notification message that we optionally send on startup.
+// Used by unit tests.
+type StartupNotification struct {
+ HttpAddr string
+ ProcessId int
+}
+
+func sendStartupNotification(naddr string, notif *StartupNotification) error {
+ conn, err := net.Dial("tcp", naddr)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if conn != nil {
+ conn.Close()
+ }
+ }()
+ var buf []byte
+ buf, err = json.Marshal(notif)
+ if err != nil {
+ return err
+ }
+ _, err = conn.Write(buf)
+ conn.Close()
+ conn = nil
+ return nil
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
index 9cdab20..495aed0 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
@@ -112,6 +112,7 @@
if !ok {
return
}
+ hand.lg.Debugf("findSidHandler(sid=%s)\n", common.SpanId(sid))
span := hand.store.FindSpan(sid)
if span == nil {
writeError(hand.lg, w, http.StatusNoContent, fmt.Sprintf("No such span as %s\n",
@@ -139,6 +140,7 @@
if !ok {
return
}
+ hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", common.SpanId(sid), lim)
children := hand.store.FindChildren(sid, lim)
jbytes, err := json.Marshal(children)
if err != nil {
@@ -170,6 +172,7 @@
}
spans = append(spans, &span)
}
+ hand.lg.Debugf("writeSpansHandler: received %d span(s).\n", len(spans))
for spanIdx := range spans {
hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson())
hand.store.WriteSpan(spans[spanIdx])
@@ -238,6 +241,15 @@
w.Write([]byte(rsc))
}
+type logErrorHandler struct {
+ lg *common.Logger
+}
+
+func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ hand.lg.Errorf("Got unknown request %s\n", req.RequestURI)
+ writeError(hand.lg, w, http.StatusBadRequest, "Unknown request.")
+}
+
type RestServer struct {
listener net.Listener
lg *common.Logger
@@ -280,6 +292,9 @@
// Default Handler. This will serve requests for static requests.
r.PathPrefix("/").Handler(&defaultServeHandler{lg: rsv.lg}).Methods("GET")
+ // Log an error message for unknown non-GET requests.
+ r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg})
+
go http.Serve(rsv.listener, r)
rsv.lg.Infof("Started REST server on %s...\n", rsv.listener.Addr().String())
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
index be5521a..4467208 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
@@ -50,6 +50,7 @@
private static Random rand = new Random();
private static ObjectWriter JSON_WRITER = new ObjectMapper().writer();
+ private static final long EMPTY_PARENT_ARRAY[] = new long[0];
private long begin;
private long end;
@@ -74,7 +75,7 @@
private long end;
private String description;
private long traceId;
- private long parents[];
+ private long parents[] = EMPTY_PARENT_ARRAY;
private long spanId;
private Map<String, String> traceInfo = null;
private String processId;
diff --git a/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java b/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java
new file mode 100644
index 0000000..7cb4aed
--- /dev/null
+++ b/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.util;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utilities for writing unit tests.
+ */
+public class TestUtil {
+ /**
+ * Get a dump of the stack traces of all threads.
+ */
+ public static String threadDump() {
+ StringBuilder dump = new StringBuilder();
+ Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces();
+ for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) {
+ Thread thread = e.getKey();
+ dump.append(String.format(
+ "\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s",
+ thread.getName(),
+ (thread.isDaemon() ? "daemon" : ""),
+ thread.getPriority(),
+ thread.getId(),
+ Thread.State.WAITING.equals(thread.getState()) ?
+ "in Object.wait()" : thread.getState().name().toLowerCase(),
+ Thread.State.WAITING.equals(thread.getState()) ?
+ "WAITING (on object monitor)" : thread.getState()));
+ for (StackTraceElement stackTraceElement : e.getValue()) {
+ dump.append("\n at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append("\n");
+ }
+ return dump.toString();
+ }
+
+ /**
+ * A callback which returns a value of type T.
+ *
+ * TODO: remove this when we're on Java 8, in favor of
+ * java.util.function.Supplier.
+ */
+ public interface Supplier<T> {
+ T get();
+ }
+
+ /**
+ * Wait for a condition to become true for a configurable amount of time.
+ *
+ * @param check The condition to wait for.
+ * @param periodMs How often to check the condition, in milliseconds.
+ * @param timeoutMs How long to wait in total, in milliseconds.
+ */
+ public static void waitFor(Supplier<Boolean> check,
+ long periodMs, long timeoutMs)
+ throws TimeoutException, InterruptedException
+ {
+ long endNs = System.nanoTime() +
+ TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS);
+ while (true) {
+ boolean result = check.get();
+ if (result) {
+ return;
+ }
+ long nowNs = System.nanoTime();
+ if (nowNs >= endNs) {
+ throw new TimeoutException("Timed out waiting for test condition. " +
+ "Thread dump:\n" + threadDump());
+ }
+ Thread.sleep(periodMs);
+ }
+ }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
index 35cd332..d730a17 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
@@ -20,14 +20,13 @@
import java.io.IOException;
import java.net.URL;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.ArrayDeque;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -74,19 +73,29 @@
final HttpClient httpClient;
/**
+ * The maximum number of spans to buffer.
+ */
+ private final int capacity;
+
+ /**
* REST URL to use writing Spans.
*/
- private final String writeSpansRESTURL;
+ private final String url;
+
+ /**
+ * The maximum number of spans to send in a single PUT.
+ */
+ private final int maxToSendAtATime;
/**
* Runs background task to do the REST PUT.
*/
- private final ScheduledExecutorService scheduler;
+ private final PostSpans postSpans;
/**
- * Keep around reference so can cancel on close any running scheduled task.
+ * Thread for postSpans
*/
- private final ScheduledFuture<?> scheduledFuture;
+ private final Thread postSpansThread;
/**
* Timeout in milliseconds.
@@ -111,8 +120,8 @@
/**
* Period at which the background thread that does the REST POST to htraced in ms.
*/
- public static final String CLIENT_REST_PERIOD_MS_KEY = "client.reset.period.ms";
- private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 1000;
+ public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms";
+ private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000;
/**
* Maximum spans to post to htraced at a time.
@@ -122,15 +131,37 @@
private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100;
/**
- * Simple bounded queue to hold spans between periodic runs of the httpclient.
+ * Lock protecting the PostSpans data.
*/
- private final Queue<Span> queue;
+ private ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Condition variable used to wake up the PostSpans thread.
+ */
+ private Condition cond = lock.newCondition();
+
+ /**
+ * True if we should shut down.
+ * Protected by the lock.
+ */
+ private boolean shutdown = false;
+
+ /**
+ * Simple bounded queue to hold spans between periodic runs of the httpclient.
+ * Protected by the lock.
+ */
+ private final ArrayDeque<Span> spans;
/**
* Keep last time we logged we were at capacity; used to prevent flooding of logs with
* "at capacity" messages.
*/
- private volatile long lastAtCapacityWarningLog = 0L;
+ private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L);
+
+ /**
+ * True if we should flush as soon as possible. Protected by the lock.
+ */
+ private boolean mustStartFlush;
/**
* Constructor.
@@ -146,25 +177,25 @@
int timeout = conf.getInt(CLIENT_REST_TIMEOUT_MS_KEY, CLIENT_REST_TIMEOUT_MS_DEFAULT);
this.httpClient.setConnectTimeout(timeout);
this.httpClient.setIdleTimeout(timeout);
- int capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
- this.queue = new ArrayBlockingQueue<Span>(capacity, true);
+ this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
+ this.spans = new ArrayDeque<Span>(capacity);
// Build up the writeSpans URL.
URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT));
- URL url =
- new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans");
- this.writeSpansRESTURL = url.toString();
- // Make a scheduler with one thread to run our POST of spans on a period.
- this.scheduler = Executors.newScheduledThreadPool(1);
+ URL url = new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans");
+ this.url = url.toString();
// Period at which we run the background thread that does the REST POST to htraced.
int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, CLIENT_REST_PERIOD_MS_DEFAULT);
// Maximum spans to send in one go
- int maxToSendAtATime =
+ this.maxToSendAtATime =
conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT);
- this.scheduledFuture =
- this.scheduler.scheduleAtFixedRate(new PostSpans(this.queue, maxToSendAtATime),
- periodInMs, periodInMs, TimeUnit.MILLISECONDS);
// Start up the httpclient.
this.httpClient.start();
+ // Start the background thread.
+ this.postSpans = new PostSpans(periodInMs);
+ this.postSpansThread = new Thread(postSpans);
+ this.postSpansThread.setDaemon(true);
+ this.postSpansThread.setName("PostSpans");
+ this.postSpansThread.start();
}
/**
@@ -172,81 +203,188 @@
* Run on a period. Services the passed in queue taking spans and sending them to traced via http.
*/
private class PostSpans implements Runnable {
- private final Queue<Span> q;
- private final int maxToSendAtATime;
+ private final long periodInNs;
+ private final ArrayDeque<Span> spanBuf;
- private PostSpans(final Queue<Span> q, final int maxToSendAtATime) {
- this.q = q;
- this.maxToSendAtATime = maxToSendAtATime;
+ private PostSpans(long periodInMs) {
+ this.periodInNs = TimeUnit.NANOSECONDS.
+ convert(periodInMs, TimeUnit.MILLISECONDS);
+ this.spanBuf = new ArrayDeque<Span>(maxToSendAtATime);
}
+ /**
+ * The span sending thread.
+ *
+ * We send a batch of spans for one of two reasons: there are already
+ * maxToSendAtATime spans in the buffer, or the client.rest.period.ms
+ * has elapsed. The idea is that we want to strike a balance between
+ * sending a lot of spans at a time, for efficiency purposes, and
+ * making sure that we don't buffer spans locally for too long.
+ *
+ * The longer we buffer spans locally, the longer we will have to wait
+ * to see the results of our client operations in the GUI, and the higher
+ * the risk of losing them if the client crashes.
+ */
@Override
public void run() {
- Span span = null;
- // Cycle until we drain queue. Send maxToSendAtATime if more than this in queue.
- while ((span = this.q.poll()) != null) {
- // We got a span. Send at least this one span.
- Request request = httpClient.newRequest(writeSpansRESTURL).method(HttpMethod.POST);
- request.header(HttpHeader.CONTENT_TYPE, "application/json");
- int count = 1;
- request.content(new StringContentProvider(span.toJson()));
- // Drain queue or until we have maxToSendAtATime spans, if more than just one.
- while ((span = this.q.poll()) != null) {
- request.content(new StringContentProvider(span.toJson()));
- count++;
- // If we've accumulated sufficient to send, go ahead and send what we have. Can do the
- // rest in out next go around.
- if (count > this.maxToSendAtATime) break;
- }
- try {
- ContentResponse response = request.send();
- if (response.getStatus() == HttpStatus.OK_200) {
- if (LOG.isDebugEnabled()) LOG.debug("POSTED " + count + " spans");
- } else {
- LOG.error("Status: " + response.getStatus());
- LOG.error(response.getHeaders());
- LOG.error(response.getContentAsString());
+ long waitNs;
+ try {
+ waitNs = periodInNs;
+ while (true) {
+ lock.lock();
+ try {
+ if (shutdown) {
+ LOG.info("Shutting down PostSpans thread...");
+ break;
+ }
+ try {
+ waitNs = cond.awaitNanos(waitNs);
+ if (mustStartFlush) {
+ waitNs = 0;
+ mustStartFlush = false;
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Got InterruptedException");
+ waitNs = 0;
+ }
+ if ((spans.size() > maxToSendAtATime) || (waitNs <= 0)) {
+ loadSpanBuf();
+ waitNs = periodInNs;
+ }
+ } finally {
+ lock.unlock();
}
- } catch (InterruptedException e) {
- LOG.error(e);
- } catch (TimeoutException e) {
- LOG.error(e);
- } catch (ExecutionException e) {
- LOG.error(e);
+ // Once the lock has been safely released, we can do some network
+ // I/O without blocking the client process.
+ if (!spanBuf.isEmpty()) {
+ sendSpans();
+ spanBuf.clear();
+ }
}
+ } finally {
+ if (httpClient != null) {
+ try {
+ httpClient.stop();
+ } catch (Exception e) {
+ LOG.error("Error shutting down httpClient", e);
+ }
+ }
+ spans.clear();
+ }
+ }
+
+ private void loadSpanBuf() {
+ for (int loaded = 0; loaded < maxToSendAtATime; loaded++) {
+ Span span = spans.pollFirst();
+ if (span == null) {
+ return;
+ }
+ spanBuf.add(span);
+ }
+ }
+
+ private void sendSpans() {
+ try {
+ Request request = httpClient.newRequest(url).method(HttpMethod.POST);
+ request.header(HttpHeader.CONTENT_TYPE, "application/json");
+ StringBuilder bld = new StringBuilder();
+ for (Span span : spanBuf) {
+ bld.append(span.toJson());
+ }
+ request.content(new StringContentProvider(bld.toString()));
+ ContentResponse response = request.send();
+ if (response.getStatus() == HttpStatus.OK_200) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("POSTED " + spanBuf.size() + " spans");
+ }
+ } else {
+ LOG.error("Status: " + response.getStatus());
+ LOG.error(response.getHeaders());
+ LOG.error(response.getContentAsString());
+ }
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (TimeoutException e) {
+ LOG.error(e);
+ } catch (ExecutionException e) {
+ LOG.error(e);
}
}
}
@Override
public void close() throws IOException {
- if (this.scheduledFuture != null) this.scheduledFuture.cancel(true);
- if (this.scheduler == null) this.scheduler.shutdown();
- if (this.httpClient != null) {
- try {
- this.httpClient.stop();
- } catch (Exception e) {
- throw new IOException(e);
- }
+ LOG.info("Closing HTracedRESTReceiver.");
+ lock.lock();
+ try {
+ this.shutdown = true;
+ cond.signal();
+ } finally {
+ lock.unlock();
+ }
+ try {
+ postSpansThread.join(30000);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while joining postSpans", e);
}
}
- // @VisibleForTesting
- boolean isQueueEmpty() {
- return this.queue.isEmpty();
+ /**
+ * Start flushing the buffered spans.
+ *
+ * Note that even after calling this function, you will still have to wait
+ * for the flush to finish happening. This function just starts the flush;
+ * it does not block until it has completed. You also do not get
+ * "read-after-write consistency" with htraced... the spans that are
+ * written may be buffered for a short period of time prior to being
+ * readable. This is not a problem for production use (since htraced is not
+ * a database), but it means that most unit tests will need a loop in their
+ * "can I read what I wrote" tests.
+ */
+ void startFlushing() {
+ LOG.info("Triggering HTracedRESTReceiver flush.");
+ lock.lock();
+ try {
+ mustStartFlush = true;
+ cond.signal();
+ } finally {
+ lock.unlock();
+ }
}
private static long WARN_TIMEOUT_MS = 300000;
@Override
public void receiveSpan(Span span) {
- if (!this.queue.offer(span)) {
- // TODO: If failed the offer, run the background thread now. I can't block though?
+ boolean added = false;
+ lock.lock();
+ try {
+ if (spans.size() < capacity) {
+ spans.add(span);
+ added = true;
+ if (spans.size() >= maxToSendAtATime) {
+ cond.signal();
+ }
+ } else {
+ cond.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ if (!added) {
long now = System.nanoTime() / 1000000L;
- // Only log every 5 minutes. Any more than this for a guest process is obnoxious
- if (now - lastAtCapacityWarningLog > WARN_TIMEOUT_MS) {
- LOG.warn("At capacity");
- this.lastAtCapacityWarningLog = now;
+ long last = lastAtCapacityWarningLog.get();
+ if (now - last > WARN_TIMEOUT_MS) {
+ // Only log every 5 minutes. Any more than this for a guest process
+ // is obnoxious.
+ if (lastAtCapacityWarningLog.compareAndSet(last, now)) {
+ // If the atomic-compare-and-set succeeds, we should log. Otherwise,
+ // we should assume another thread already logged and bumped up the
+ // value of lastAtCapacityWarning sometime between our get and the
+ // "if" statement.
+ LOG.warn("There are too many HTrace spans to buffer! We have " +
+ "already buffered " + capacity + " spans. Dropping spans.");
+ }
}
}
}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
index fe9f1c0..b1f1b11 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
@@ -18,6 +18,7 @@
package org.apache.htrace.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -29,6 +30,7 @@
import org.apache.htrace.Span;
import org.apache.htrace.util.DataDir;
import org.apache.htrace.util.HTracedProcess;
+import org.apache.htrace.util.TestUtil;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.junit.After;
@@ -36,20 +38,20 @@
import org.junit.Test;
public class TestHTracedRESTReceiver {
- private static final Log LOG = LogFactory.getLog(TestHTracedRESTReceiver.class);
- private URL restServerUrl;;
+ private static final Log LOG =
+ LogFactory.getLog(TestHTracedRESTReceiver.class);
+ private URL restServerUrl;
private DataDir dataDir;
HTracedProcess htraced;
@Before
public void setUp() throws Exception {
this.dataDir = new DataDir();
- // Start on 9097. Would be better to start at port 0 and then ask server what port it managed
- // to come up on.
- this.restServerUrl = new URL("http://localhost:9097/");
File tlDir = DataDir.getTopLevelOfCheckout(this.dataDir.getDataDir());
File pathToHTracedBinary = HTracedProcess.getPathToHTraceBinaryFromTopLevel(tlDir);
- this.htraced = new HTracedProcess(pathToHTracedBinary, dataDir.getDataDir(), restServerUrl);
+ this.htraced = new HTracedProcess(pathToHTracedBinary,
+ dataDir.getDataDir(), "localhost");
+ this.restServerUrl = new URL("http://" + htraced.getHttpAddr() + "/");
}
@After
@@ -75,6 +77,7 @@
@Override
public String get(String key, String defaultValue) {
if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) {
+ LOG.info("WATERMELON2: got request for htraced.rest.url. Returning " + this.restServerUrl.toString());
return this.restServerUrl.toString();
}
return defaultValue;
@@ -90,7 +93,7 @@
HTracedRESTReceiver receiver =
new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
try {
- // Do basic a GET /server/info against localhost:9095 htraced
+ // Do basic a GET /server/info against htraced
ContentResponse response = receiver.httpClient.GET(restServerUrl + "server/info");
assertEquals("application/json", response.getMediaType());
String content = processGET(response);
@@ -111,31 +114,44 @@
* Send 100 spans then confirm they made it in.
* @throws Exception
*/
- @Test (timeout = 10000)
+ @Test (timeout = 60000)
public void testSendingSpans() throws Exception {
- HTracedRESTReceiver receiver =
+ final HTracedRESTReceiver receiver =
new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
+ final int NUM_SPANS = 3;
try {
- // TODO: Fix MilliSpan. Requires a parentid. Shouldn't have to have one else be explicit it
- // is required.
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < NUM_SPANS; i++) {
Span span = new MilliSpan.Builder().parents(new long [] {1L}).spanId(i).build();
LOG.info(span.toString());
receiver.receiveSpan(span);
}
- // Wait for the queue to empty before we go to check they made it over.
- while (receiver.isQueueEmpty()) Thread.sleep(1);
- // Read them all back.
- for (int i = 0; i < 100; i++) {
- // This is what the REST server expends when querying for a span id.
- String findSpan = String.format("span/%016x", i);
- ContentResponse response = receiver.httpClient.GET(restServerUrl + findSpan);
- String content = processGET(response);
- assertTrue(content != null && content.length() > 0);
- LOG.info(content);
- }
+ receiver.startFlushing();
+ TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ for (int i = 0; i < NUM_SPANS; i++) {
+ // This is what the REST server expects when querying for a
+ // span id.
+ String findSpan = String.format("span/%016x", i);
+ ContentResponse response =
+ receiver.httpClient.GET(restServerUrl + findSpan);
+ String content = processGET(response);
+ if ((content == null) || (content.length() == 0)) {
+ LOG.info("Failed to find span " + i);
+ return false;
+ }
+ LOG.info("Got " + content + " for span " + i);
+ }
+ return true;
+ } catch (Throwable t) {
+ LOG.error("Got exception", t);
+ return false;
+ }
+ }
+ }, 10, 20000);
} finally {
receiver.close();
}
}
-}
\ No newline at end of file
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
index 12343f7..e319925 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
@@ -16,12 +16,19 @@
*/
package org.apache.htrace.util;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ProcessBuilder.Redirect;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.net.URL;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* To get instance of HTraced up and running, create an instance of this class.
@@ -29,30 +36,86 @@
* host data (leveldbs and logs).
* TODO: We expect to find the htraced in a very particular place. Fragile. Will break if stuff
* moves.
- * TODO: What if a port clash? How to have it come up another port then ask the process what port
- * it is running on?
*/
public class HTracedProcess extends Process {
+ private static final Log LOG = LogFactory.getLog(HTracedProcess.class);
private final Process delegate;
- public HTracedProcess(final File pathToHTracedBinary, final File dataDir, final URL url)
- throws IOException {
- // web.address for htraced is hostname ':' port; no 'scheme' yet.
- String webAddress = url.getHost() + ":" + url.getPort();
- // Pass cmdline args to htraced to it uses our test dir for data.
- ProcessBuilder pb = new ProcessBuilder(pathToHTracedBinary.toString(),
- " -Dlog.level=TRACE",
- "-Dweb.address=" + webAddress,
- "-Ddata.store.clear=true",
- "-Ddata.store.directories=" + dataDir.toString());
- pb.redirectErrorStream(true);
- // Inherit STDERR/STDOUT i/o; dumps on console for now. Can add logs later.
- pb.inheritIO();
- pb.directory(dataDir);
- this.delegate = pb.start();
- assert pb.redirectInput() == Redirect.PIPE;
- assert pb.redirectOutput().file() == dataDir;
- assert this.delegate.getInputStream().read() == -1;
+ private final String httpAddr;
+
+ /**
+ * Data send back from the HTraced process on the notification port.
+ */
+ public static class StartupNotificationData {
+ /**
+ * The hostname:port pair which the HTraced process uses for HTTP requests.
+ */
+ @JsonProperty("HttpAddr")
+ String httpAddr;
+
+ /**
+ * The process ID of the HTraced process.
+ */
+ @JsonProperty("ProcessId")
+ long processId;
+ }
+
+ public HTracedProcess(final File binPath, final File dataDir,
+ final String host) throws IOException {
+ // Create a notifier socket bound to a random port.
+ ServerSocket listener = new ServerSocket(0);
+ boolean success = false;
+ Process process = null;
+ try {
+ // Use a random port for the web address. No 'scheme' yet.
+ String webAddress = host + ":0";
+ String logPath = new File(dataDir, "log.txt").getAbsolutePath();
+ // Pass cmdline args to htraced to it uses our test dir for data.
+ ProcessBuilder pb = new ProcessBuilder(binPath.toString(),
+ "-Dlog.level=TRACE",
+ "-Dlog.path=" + logPath,
+ "-Dweb.address=" + webAddress,
+ "-Ddata.store.clear=true",
+ "-Dstartup.notification.address=localhost:" + listener.getLocalPort(),
+ "-Ddata.store.directories=" + dataDir.toString());
+ pb.redirectErrorStream(true);
+ // Inherit STDERR/STDOUT i/o; dumps on console for now. Can add logs later.
+ pb.inheritIO();
+ pb.directory(dataDir);
+ //assert pb.redirectInput() == Redirect.PIPE;
+ //assert pb.redirectOutput().file() == dataDir;
+ process = pb.start();
+ assert process.getInputStream().read() == -1;
+ StartupNotificationData data = readStartupNotification(listener);
+ httpAddr = data.httpAddr;
+ LOG.info("Started htraced process " + data.processId + " with http " +
+ "address " + data.httpAddr + ", logging to " + logPath);
+ success = true;
+ } finally {
+ if (!success) {
+ // Clean up after failure
+ if (process != null) {
+ process.destroy();
+ process = null;
+ }
+ }
+ delegate = process;
+ listener.close();
+ }
+ }
+
+ private static StartupNotificationData
+ readStartupNotification(ServerSocket listener) throws IOException {
+ Socket socket = listener.accept();
+ try {
+ InputStream in = socket.getInputStream();
+ ObjectMapper objectMapper = new ObjectMapper();
+ StartupNotificationData data = objectMapper.
+ readValue(in, StartupNotificationData.class);
+ return data;
+ } finally {
+ socket.close();
+ }
}
public int hashCode() {
@@ -91,6 +154,10 @@
return delegate.toString();
}
+ public String getHttpAddr() {
+ return httpAddr;
+ }
+
/**
* Ugly but how else to do file-math?
* @param topLevel Presumes top-level of the htrace checkout.
@@ -100,4 +167,4 @@
return new File(new File(new File(new File(new File(topLevel, "htrace-core"), "src"), "go"),
"build"), "htraced");
}
-}
\ No newline at end of file
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
index 38f90e5..67e3a21 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
@@ -25,6 +25,8 @@
import java.net.URL;
import java.net.URLConnection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.junit.Before;
import org.junit.Test;
@@ -34,6 +36,8 @@
* in methods in the below.
*/
public class TestHTracedProcess {
+ private static final Log LOG =
+ LogFactory.getLog(TestHTracedProcess.class);
private DataDir testDir = null;
private final int TIMEOUT = 10000;
@@ -51,7 +55,8 @@
connection.setReadTimeout(TIMEOUT);
connection.connect();
StringBuffer sb = new StringBuffer();
- BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(connection.getInputStream()));
try {
String line = null;
while ((line = reader.readLine()) != null) {
@@ -70,24 +75,26 @@
* @throws InterruptedException
*/
@Test (timeout=10000)
- public void testStartStopHTraced() throws IOException, InterruptedException {
- // TODO: Make the test port random so no classes if concurrent test runs. Anything better
- // I can do here? Pass a zero and have the daemon tell me where it is successfully listening?
- String restURL = "http://localhost:9096/";
- URL restServerURL = new URL(restURL);
+ public void testStartStopHTraced() throws Exception {
HTracedProcess htraced = null;
File dataDir = this.testDir.getDataDir();
File topLevel = DataDir.getTopLevelOfCheckout(dataDir);
try {
- htraced = new HTracedProcess(HTracedProcess.getPathToHTraceBinaryFromTopLevel(topLevel),
- dataDir, restServerURL);
- String str = doGet(new URL(restServerURL + "server/info"));
+ htraced = new HTracedProcess(HTracedProcess.
+ getPathToHTraceBinaryFromTopLevel(topLevel),
+ dataDir, "localhost");
+ LOG.info("Started HTracedProcess with REST server URL " +
+ htraced.getHttpAddr());
+ String str = doGet(new URL(
+ "http://" + htraced.getHttpAddr() + "/server/info"));
// Assert we go something back.
assertTrue(str.contains("ReleaseVersion"));
// Assert that the datadir is not empty.
} finally {
- if (htraced != null) htraced.destroy();
- System.out.println("ExitValue=" + htraced.exitValue());
+ if (htraced != null) {
+ htraced.destroy();
+ System.out.println("ExitValue=" + htraced.waitFor());
+ }
}
}
}
\ No newline at end of file