blob: 377d19f7c71c4f0aa7552820d0de6bb8bab42e9c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.core.Span;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
class RestBufferManager implements BufferManager {
private static final Log LOG = LogFactory.getLog(RestBufferManager.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final byte COMMA_BYTE = (byte)0x2c;
private static final int MAX_PREQUEL_LENGTH = 512;
private static final int MAX_EPILOGUE_LENGTH = 32;
private final Conf conf;
private final HttpClient httpClient;
private final String urlString;
private final ByteBuffer prequel;
private final ByteBuffer spans;
private final ByteBuffer epilogue;
private int numSpans;
private static class RestBufferManagerContentProvider
implements ContentProvider {
private final ByteBuffer[] bufs;
private class ByteBufferIterator implements Iterator<ByteBuffer> {
private int bufIdx = -1;
@Override
public boolean hasNext() {
return (bufIdx + 1) < bufs.length;
}
@Override
public ByteBuffer next() {
if ((bufIdx + 1) >= bufs.length) {
throw new NoSuchElementException();
}
bufIdx++;
return bufs[bufIdx];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
RestBufferManagerContentProvider(ByteBuffer[] bufs) {
this.bufs = bufs;
}
@Override
public long getLength() {
long total = 0;
for (int i = 0; i < bufs.length; i++) {
total += bufs[i].remaining();
}
return total;
}
@Override
public Iterator<ByteBuffer> iterator() {
return new ByteBufferIterator();
}
}
/**
* Create an HttpClient instance.
*
* @param connTimeout The timeout to use for connecting.
* @param idleTimeout The idle timeout to use.
*/
static HttpClient createHttpClient(long connTimeout, long idleTimeout) {
HttpClient httpClient = new HttpClient();
httpClient.setUserAgentField(
new HttpField(HttpHeader.USER_AGENT, "HTracedSpanReceiver"));
httpClient.setConnectTimeout(connTimeout);
httpClient.setIdleTimeout(idleTimeout);
return httpClient;
}
RestBufferManager(Conf conf) throws Exception {
this.conf = conf;
this.httpClient =
createHttpClient(conf.connectTimeoutMs, conf.idleTimeoutMs);
this.urlString = new URL("http", conf.endpoint.getHostName(),
conf.endpoint.getPort(), "/writeSpans").toString();
this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH);
this.spans = ByteBuffer.allocate(conf.bufferSize);
this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH);
clear();
this.httpClient.start();
}
@Override
public void writeSpan(Span span) throws IOException {
byte[] spanJsonBytes = span.toString().getBytes(UTF8);
if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) {
// Make sure we have enough space for the span JSON and a comma.
throw new IOException("Not enough space remaining in span buffer.");
}
spans.put(COMMA_BYTE);
spans.put(spanJsonBytes);
numSpans++;
}
@Override
public int contentLength() {
return Math.max(spans.position() - 1, 0);
}
@Override
public int getNumberOfSpans() {
return numSpans;
}
@Override
public void prepare() throws IOException {
String prequelString = "{\"Spans\":[";
prequel.put(prequelString.getBytes(UTF8));
prequel.flip();
spans.flip();
String epilogueString = "]}";
epilogue.put(epilogueString.toString().getBytes(UTF8));
epilogue.flip();
if (LOG.isTraceEnabled()) {
LOG.trace("Preparing to send " + contentLength() + " bytes of span " +
"data to " + conf.endpointStr + ", containing " + numSpans +
" spans.");
}
}
@Override
public void flush() throws IOException {
// Position the buffers at the beginning.
prequel.position(0);
spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma
epilogue.position(0);
RestBufferManagerContentProvider contentProvider =
new RestBufferManagerContentProvider(
new ByteBuffer[] { prequel, spans, epilogue });
long rpcLength = contentProvider.getLength();
try {
Request request = httpClient.
newRequest(urlString).method(HttpMethod.POST);
request.header(HttpHeader.CONTENT_TYPE, "application/json");
request.content(contentProvider);
ContentResponse response = request.send();
if (response.getStatus() != HttpStatus.OK_200) {
throw new IOException("Got back error response " +
response.getStatus() + " from " + conf.endpointStr + "; " +
response.getContentAsString());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sent WriteSpansReq of length " + rpcLength + " to " + conf.endpointStr);
}
} catch (InterruptedException e) {
throw new IOException("Interrupted while sending spans via REST", e);
} catch (TimeoutException e) {
throw new IOException("Timed out sending spans via REST", e);
} catch (ExecutionException e) {
throw new IOException("Execution exception sending spans via REST", e);
}
}
@Override
public void clear() {
prequel.clear();
spans.clear();
epilogue.clear();
numSpans = 0;
}
@Override
public void close() {
try {
httpClient.stop();
} catch (Exception e) {
LOG.error("Error stopping HTracedReceiver httpClient", e);
}
}
}