blob: 8b59a724bc86ba2bd159f62641a47f96a3e5fc4c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.charset.StandardCharsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.core.Span;
class PackedBufferManager implements BufferManager {
private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
private static final int MAX_PREQUEL_LENGTH = 2048;
private static final int METHOD_ID_WRITE_SPANS = 0x1;
private final Conf conf;
private final ByteBuffer frameBuffer;
private final PackedBuffer prequel;
private final PackedBuffer spans;
private final Selector selector;
private int numSpans;
PackedBufferManager(Conf conf) throws IOException {
this.conf = conf;
this.frameBuffer = ByteBuffer.allocate(PackedBuffer.HRPC_REQ_FRAME_LENGTH);
this.prequel = new PackedBuffer(ByteBuffer.allocate(MAX_PREQUEL_LENGTH));
this.spans = new PackedBuffer(ByteBuffer.allocate(conf.bufferSize));
this.selector = SelectorProvider.provider().openSelector();
clear();
}
@Override
public void writeSpan(Span span) throws IOException {
spans.writeSpan(span);
numSpans++;
if (LOG.isTraceEnabled()) {
LOG.trace("wrote " + span.toJson() + " to PackedBuffer for " +
conf.endpointStr + ". numSpans = " + numSpans +
", buffer position = " + spans.getBuffer().position());
}
}
@Override
public int contentLength() {
return spans.getBuffer().position();
}
@Override
public int getNumberOfSpans() {
return numSpans;
}
@Override
public void prepare() throws IOException {
prequel.beginWriteSpansRequest(null, numSpans);
long totalLength =
prequel.getBuffer().position() + spans.getBuffer().position();
if (totalLength > PackedBuffer.MAX_HRPC_BODY_LENGTH) {
throw new IOException("Can't send RPC of " + totalLength + " bytes " +
"because it is longer than " + PackedBuffer.MAX_HRPC_BODY_LENGTH);
}
PackedBuffer.writeReqFrame(frameBuffer,
METHOD_ID_WRITE_SPANS, 1, (int)totalLength);
frameBuffer.flip();
prequel.getBuffer().flip();
spans.getBuffer().flip();
if (LOG.isTraceEnabled()) {
LOG.trace("Preparing to send RPC of length " +
(totalLength + PackedBuffer.HRPC_REQ_FRAME_LENGTH) + " to " +
conf.endpointStr + ", containing " + numSpans + " spans.");
}
}
@Override
public void flush() throws IOException {
SelectionKey sockKey = null;
IOException ioe = null;
frameBuffer.position(0);
prequel.getBuffer().position(0);
spans.getBuffer().position(0);
if (LOG.isTraceEnabled()) {
LOG.trace("Preparing to flush " + numSpans + " spans to " +
conf.endpointStr);
}
try {
sockKey = doConnect();
doSend(sockKey, new ByteBuffer[] {
frameBuffer, prequel.getBuffer(), spans.getBuffer() });
ByteBuffer response = prequel.getBuffer();
readAndValidateResponseFrame(sockKey, response,
1, METHOD_ID_WRITE_SPANS);
} catch (IOException e) {
// This LOG message is only at debug level because we also log these
// exceptions at error level inside HTracedReceiver. The logging in
// HTracedReceiver is rate-limited to avoid overwhelming the client log
// if htraced goes down. The debug and trace logging is not
// rate-limited.
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception during flush", e);
}
ioe = e;
} finally {
if (sockKey != null) {
sockKey.cancel();
try {
SocketChannel sock = (SocketChannel)sockKey.attachment();
sock.close();
} catch (IOException e) {
if (ioe != null) {
ioe.addSuppressed(e);
}
}
}
}
if (ioe != null) {
throw ioe;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Successfully flushed " + numSpans + " spans to " +
conf.endpointStr);
}
}
private long updateRemainingMs(long startMs, long timeoutMs) {
long deltaMs = TimeUtil.deltaMs(startMs, TimeUtil.nowMs());
if (deltaMs > timeoutMs) {
return 0;
}
return timeoutMs - deltaMs;
}
private SelectionKey doConnect() throws IOException {
SocketChannel sock = SocketChannel.open();
SelectionKey sockKey = null;
boolean success = false;
try {
if (sock.isBlocking()) {
sock.configureBlocking(false);
}
InetSocketAddress resolvedEndpoint =
new InetSocketAddress(conf.endpoint.getHostString(),
conf.endpoint.getPort());
resolvedEndpoint.getHostName(); // trigger DNS resolution
sock.connect(resolvedEndpoint);
sockKey = sock.register(selector, SelectionKey.OP_CONNECT, sock);
long startMs = TimeUtil.nowMs();
long remainingMs = conf.connectTimeoutMs;
while (true) {
selector.select(remainingMs);
for (SelectionKey key : selector.keys()) {
if (key.isConnectable()) {
SocketChannel s = (SocketChannel)key.attachment();
s.finishConnect();
if (LOG.isTraceEnabled()) {
LOG.trace("Successfully connected to " + conf.endpointStr + ".");
}
success = true;
return sockKey;
}
}
remainingMs = updateRemainingMs(startMs, conf.connectTimeoutMs);
if (remainingMs == 0) {
throw new IOException("Attempt to connect to " + conf.endpointStr +
" timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
" ms.");
}
}
} finally {
if (!success) {
if (sockKey != null) {
sockKey.cancel();
}
sock.close();
}
}
}
/**
* Send the provided ByteBuffer objects.
*
* We use non-blocking I/O because Java does not provide write timeouts.
* Without a write timeout, the socket could get hung and we'd never recover.
* We also use the GatheringByteChannel#write method which calls the pread()
* system call under the covers. This ensures that even if TCP_NODELAY is on,
* we send the minimal number of packets.
*/
private void doSend(SelectionKey sockKey, ByteBuffer[] bufs)
throws IOException {
long totalWritten = 0;
sockKey.interestOps(SelectionKey.OP_WRITE);
SocketChannel sock = (SocketChannel)sockKey.attachment();
long startMs = TimeUtil.nowMs();
long remainingMs = conf.ioTimeoutMs;
while (true) {
selector.select(remainingMs);
int firstBuf = 0;
for (SelectionKey key : selector.selectedKeys()) {
if (key.isWritable()) {
long written = sock.write(bufs, firstBuf, bufs.length - firstBuf);
if (LOG.isTraceEnabled()) {
LOG.trace("Sent " + written + " bytes to " + conf.endpointStr);
}
totalWritten += written;
}
}
while (true) {
if (firstBuf == bufs.length) {
if (LOG.isTraceEnabled()) {
LOG.trace("Finished sending " + totalWritten + " bytes to " +
conf.endpointStr);
}
return;
}
if (bufs[firstBuf].remaining() > 0) {
break;
}
firstBuf++;
}
remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs);
if (remainingMs == 0) {
throw new IOException("Attempt to write to " + conf.endpointStr +
" timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
" ms.");
}
}
}
private void doRecv(SelectionKey sockKey, ByteBuffer response)
throws IOException {
sockKey.interestOps(SelectionKey.OP_READ);
SocketChannel sock = (SocketChannel)sockKey.attachment();
int totalRead = response.remaining();
long startMs = TimeUtil.nowMs();
long remainingMs = conf.ioTimeoutMs;
while (remainingMs > 0) {
selector.select(remainingMs);
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
sock.read(response);
}
}
if (response.remaining() == 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Received all " + totalRead + " bytes from " +
conf.endpointStr);
}
return;
}
remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs);
if (LOG.isTraceEnabled()) {
LOG.trace("Received " + (totalRead - response.remaining()) +
" out of " + totalRead + " bytes from " + conf.endpointStr);
}
if (remainingMs == 0) {
throw new IOException("Attempt to write to " + conf.endpointStr +
" timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
" ms.");
}
}
}
private void readAndValidateResponseFrame(SelectionKey sockKey,
ByteBuffer buf, long expectedSeq, int expectedMethodId)
throws IOException {
buf.clear();
buf.limit(PackedBuffer.HRPC_RESP_FRAME_LENGTH);
doRecv(sockKey, buf);
buf.flip();
buf.order(ByteOrder.LITTLE_ENDIAN);
long seq = buf.getLong();
if (seq != expectedSeq) {
throw new IOException("Expected sequence number " + expectedSeq +
", but got sequence number " + seq);
}
int methodId = buf.getInt();
if (expectedMethodId != methodId) {
throw new IOException("Expected method id " + expectedMethodId +
", but got " + methodId);
}
int errorLength = buf.getInt();
buf.getInt();
if ((errorLength < 0) ||
(errorLength > PackedBuffer.MAX_HRPC_ERROR_LENGTH)) {
throw new IOException("Got server error with invalid length " +
errorLength);
} else if (errorLength > 0) {
buf.clear();
buf.limit(errorLength);
doRecv(sockKey, buf);
buf.flip();
CharBuffer charBuf = StandardCharsets.UTF_8.decode(buf);
String serverErrorStr = charBuf.toString();
throw new IOException("Got server error " + serverErrorStr);
}
}
@Override
public void clear() {
frameBuffer.clear();
prequel.getBuffer().clear();
spans.getBuffer().clear();
numSpans = 0;
}
@Override
public void close() {
clear();
prequel.close();
spans.close();
try {
selector.close();
} catch (IOException e) {
LOG.warn("Error closing selector", e);
}
}
}