blob: ae1cfedd3a1c20e8b5556705be0a636d1b718f27 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
/**
* A {@link SpanReceiver} that passes Spans to htraced via REST. Implementation minimizes
* dependencies and aims for small footprint since this client will be the guest of another,
* the process traced.
*
* <p>Logs via commons-logging. Uses jetty client. Jetty has its own logging. To connect, see
* jetty logging to commons-logging and see https://issues.apache.org/jira/browse/HADOOP-6807
* and http://blogs.bytecode.com.au/glen/2005/06/21/getting-your-logging-working-in-jetty.html.
*
* <p>This client depends on the REST defined in <code>rest.go</code> in the htraced REST server.
*
* <p>Create an instance by doing:
* <code>SpanReceiver receiver = new HTracedRESTReceiver(conf);</code> where conf is an instance
* of {@link HTraceConfiguration}. See the public keys defined below for what we will look for in
* the configuration. For example, set {@link #HTRACED_REST_URL_KEY} if
* <code>htraced</code> is in a non-standard location. Then call
* <code>receiver.receiveSpan(Span);</code> to send spans to an htraced
* instance. This method returns immediately. It sends the spans in background.
*
* <p>TODO: Shading works?
* TODO: Add lazy start; don't start background thread till a span gets queued.
* TODO: Add some metrics; how many times we've run, how many spans and what size we've sent.
*/
public class HTracedRESTReceiver implements SpanReceiver {
private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
// TODO: Take process name and add this to user agent? Would help debugging?
// @VisibleForTesting Protected so accessible from tests.
final HttpClient httpClient;
/**
* The maximum number of spans to buffer.
*/
private final int capacity;
/**
* REST URL to use writing Spans.
*/
private final String url;
/**
* The maximum number of spans to send in a single PUT.
*/
private final int maxToSendAtATime;
/**
* Runs background task to do the REST PUT.
*/
private final PostSpans postSpans;
/**
* Thread for postSpans
*/
private final Thread postSpansThread;
/**
* Timeout in milliseconds.
* For now, it is read and connect timeout.
*/
public static final String CLIENT_REST_TIMEOUT_MS_KEY = "client.rest.timeout.ms";
private static final int CLIENT_REST_TIMEOUT_MS_DEFAULT = 60000;
/**
* URL of the htraced REST server we are to talk to.
*/
public static final String HTRACED_REST_URL_KEY = "htraced.rest.url";
private static final String HTRACED_REST_URL_DEFAULT = "http://localhost:9095/";
/**
* Maximum size of the queue to accumulate spans in.
* Cleared by the background thread that does the REST POST to htraced.
*/
public static final String CLIENT_REST_QUEUE_CAPACITY_KEY = "client.rest.queue.capacity";
private static final int CLIENT_REST_QUEUE_CAPACITY_DEFAULT = 1000000;
/**
* Period at which the background thread that does the REST POST to htraced in ms.
*/
public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms";
private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000;
/**
* Maximum spans to post to htraced at a time.
*/
public static final String CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY =
"htrace.client.rest.batch.size";
private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100;
/**
* Lock protecting the PostSpans data.
*/
private ReentrantLock lock = new ReentrantLock();
/**
* Condition variable used to wake up the PostSpans thread.
*/
private Condition cond = lock.newCondition();
/**
* True if we should shut down.
* Protected by the lock.
*/
private boolean shutdown = false;
/**
* Simple bounded queue to hold spans between periodic runs of the httpclient.
* Protected by the lock.
*/
private final ArrayDeque<Span> spans;
/**
* Keep last time we logged we were at capacity; used to prevent flooding of logs with
* "at capacity" messages.
*/
private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L);
/**
* True if we should flush as soon as possible. Protected by the lock.
*/
private boolean mustStartFlush;
/**
* Constructor.
* You must call {@link #close()} post construction when done.
* @param conf
* @throws Exception
*/
public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception {
this.httpClient = new HttpClient();
this.httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
this.getClass().getSimpleName()));
// Use same timeout for connection and idle for now.
int timeout = conf.getInt(CLIENT_REST_TIMEOUT_MS_KEY, CLIENT_REST_TIMEOUT_MS_DEFAULT);
this.httpClient.setConnectTimeout(timeout);
this.httpClient.setIdleTimeout(timeout);
this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
this.spans = new ArrayDeque<Span>(capacity);
// Build up the writeSpans URL.
URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT));
URL url = new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans");
this.url = url.toString();
// Period at which we run the background thread that does the REST POST to htraced.
int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, CLIENT_REST_PERIOD_MS_DEFAULT);
// Maximum spans to send in one go
this.maxToSendAtATime =
conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT);
// Start up the httpclient.
this.httpClient.start();
// Start the background thread.
this.postSpans = new PostSpans(periodInMs);
this.postSpansThread = new Thread(postSpans);
this.postSpansThread.setDaemon(true);
this.postSpansThread.setName("PostSpans");
this.postSpansThread.start();
if (LOG.isDebugEnabled()) {
LOG.debug("Created new HTracedRESTReceiver with timeout=" + timeout +
", capacity=" + capacity + ", url=" + url + ", periodInMs=" +
periodInMs + ", maxToSendAtATime=" + maxToSendAtATime);
}
}
/**
* POST spans runnable.
* Run on a period. Services the passed in queue taking spans and sending them to traced via http.
*/
private class PostSpans implements Runnable {
private final long periodInNs;
private final ArrayDeque<Span> spanBuf;
private PostSpans(long periodInMs) {
this.periodInNs = TimeUnit.NANOSECONDS.
convert(periodInMs, TimeUnit.MILLISECONDS);
this.spanBuf = new ArrayDeque<Span>(maxToSendAtATime);
}
/**
* The span sending thread.
*
* We send a batch of spans for one of two reasons: there are already
* maxToSendAtATime spans in the buffer, or the client.rest.period.ms
* has elapsed. The idea is that we want to strike a balance between
* sending a lot of spans at a time, for efficiency purposes, and
* making sure that we don't buffer spans locally for too long.
*
* The longer we buffer spans locally, the longer we will have to wait
* to see the results of our client operations in the GUI, and the higher
* the risk of losing them if the client crashes.
*/
@Override
public void run() {
long waitNs;
try {
waitNs = periodInNs;
while (true) {
lock.lock();
try {
if (shutdown) {
LOG.info("Shutting down PostSpans thread...");
break;
}
try {
waitNs = cond.awaitNanos(waitNs);
if (mustStartFlush) {
waitNs = 0;
mustStartFlush = false;
}
} catch (InterruptedException e) {
LOG.info("Got InterruptedException");
waitNs = 0;
}
if ((spans.size() > maxToSendAtATime) || (waitNs <= 0)) {
loadSpanBuf();
waitNs = periodInNs;
}
} finally {
lock.unlock();
}
// Once the lock has been safely released, we can do some network
// I/O without blocking the client process.
if (!spanBuf.isEmpty()) {
sendSpans();
spanBuf.clear();
}
}
} finally {
if (httpClient != null) {
try {
httpClient.stop();
} catch (Exception e) {
LOG.error("Error shutting down httpClient", e);
}
}
spans.clear();
}
}
private void loadSpanBuf() {
for (int loaded = 0; loaded < maxToSendAtATime; loaded++) {
Span span = spans.pollFirst();
if (span == null) {
return;
}
spanBuf.add(span);
}
}
private void sendSpans() {
try {
Request request = httpClient.newRequest(url).method(HttpMethod.POST);
request.header(HttpHeader.CONTENT_TYPE, "application/json");
StringBuilder bld = new StringBuilder();
for (Span span : spanBuf) {
bld.append(span.toJson());
}
request.content(new StringContentProvider(bld.toString()));
ContentResponse response = request.send();
if (response.getStatus() == HttpStatus.OK_200) {
if (LOG.isDebugEnabled()) {
LOG.debug("POSTED " + spanBuf.size() + " spans");
}
} else {
LOG.error("Status: " + response.getStatus());
LOG.error(response.getHeaders());
LOG.error(response.getContentAsString());
}
} catch (InterruptedException e) {
LOG.error(e);
} catch (TimeoutException e) {
LOG.error(e);
} catch (ExecutionException e) {
LOG.error(e);
}
}
}
@Override
public void close() throws IOException {
LOG.info("Closing HTracedRESTReceiver(" + url + ").");
lock.lock();
try {
this.shutdown = true;
cond.signal();
} finally {
lock.unlock();
}
try {
postSpansThread.join(30000);
} catch (InterruptedException e) {
LOG.error("Interrupted while joining postSpans", e);
}
}
/**
* Start flushing the buffered spans.
*
* Note that even after calling this function, you will still have to wait
* for the flush to finish happening. This function just starts the flush;
* it does not block until it has completed. You also do not get
* "read-after-write consistency" with htraced... the spans that are
* written may be buffered for a short period of time prior to being
* readable. This is not a problem for production use (since htraced is not
* a database), but it means that most unit tests will need a loop in their
* "can I read what I wrote" tests.
*/
void startFlushing() {
LOG.info("Triggering HTracedRESTReceiver flush.");
lock.lock();
try {
mustStartFlush = true;
cond.signal();
} finally {
lock.unlock();
}
}
private static long WARN_TIMEOUT_MS = 300000;
@Override
public void receiveSpan(Span span) {
boolean added = false;
lock.lock();
try {
if (spans.size() < capacity) {
spans.add(span);
added = true;
if (spans.size() >= maxToSendAtATime) {
cond.signal();
}
} else {
cond.signal();
}
} finally {
lock.unlock();
}
if (!added) {
long now = System.nanoTime() / 1000000L;
long last = lastAtCapacityWarningLog.get();
if (now - last > WARN_TIMEOUT_MS) {
// Only log every 5 minutes. Any more than this for a guest process
// is obnoxious.
if (lastAtCapacityWarningLog.compareAndSet(last, now)) {
// If the atomic-compare-and-set succeeds, we should log. Otherwise,
// we should assume another thread already logged and bumped up the
// value of lastAtCapacityWarning sometime between our get and the
// "if" statement.
LOG.warn("There are too many HTrace spans to buffer! We have " +
"already buffered " + capacity + " spans. Dropping spans.");
}
}
}
}
}