blob: dd0a4b9830f81eac960d31d833373730449679aa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.core.MilliSpan;
import org.apache.htrace.core.TimelineAnnotation;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.core.buffer.MessageBuffer;
import org.msgpack.core.buffer.MessageBufferOutput;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanId;
/**
* A ByteBuffer which we are writing msgpack data to.
*/
class PackedBuffer {
/**
* A MessageBufferOutput that simply outputs to a ByteBuffer.
*/
private class PackedBufferOutput implements MessageBufferOutput {
private MessageBuffer savedBuffer;
PackedBufferOutput() {
}
@Override
public MessageBuffer next(int bufferSize) throws IOException {
if (savedBuffer == null || savedBuffer.size() != bufferSize) {
savedBuffer = MessageBuffer.newBuffer(bufferSize);
}
MessageBuffer buffer = savedBuffer;
savedBuffer = null;
return buffer;
}
@Override
public void flush(MessageBuffer buffer) throws IOException {
ByteBuffer b = buffer.toByteBuffer();
bb.put(b);
savedBuffer = buffer;
}
@Override
public void close() throws IOException {
// no-op
}
}
private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte NUM_SPANS[] = "NumSpans".getBytes(UTF8);
private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8);
private static final byte A[] = "a".getBytes(UTF8);
private static final byte B[] = "b".getBytes(UTF8);
private static final byte E[] = "e".getBytes(UTF8);
private static final byte D[] = "d".getBytes(UTF8);
private static final byte R[] = "r".getBytes(UTF8);
private static final byte P[] = "p".getBytes(UTF8);
private static final byte N[] = "n".getBytes(UTF8);
private static final byte T[] = "t".getBytes(UTF8);
private static final byte M[] = "m".getBytes(UTF8);
private static final int HRPC_MAGIC = 0x43525448;
static final int HRPC_REQ_FRAME_LENGTH = 20;
static final int HRPC_RESP_FRAME_LENGTH = 20;
static final int MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024;
static final int MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024;
private static final int SPAN_ID_BYTE_LENGTH = 16;
static final MessagePack.Config MSGPACK_CONF =
new MessagePack.ConfigBuilder()
.readBinaryAsString(false)
.readStringAsBinary(false)
.build();
/**
* The array which we are filling.
*/
final ByteBuffer bb;
/**
* Used to tell the MessagePacker to output to our array.
*/
final PackedBufferOutput out;
/**
* A temporary buffer for serializing span ids and other things.
*/
final byte[] temp;
/**
* Generates msgpack output.
*/
final MessagePacker packer;
/**
* Create a new PackedBuffer.
*
* @param bb The ByteBuffer to use to create the packed buffer.
*/
PackedBuffer(ByteBuffer bb) {
this.bb = bb;
this.out = new PackedBufferOutput();
this.temp = new byte[SPAN_ID_BYTE_LENGTH];
this.packer = new MessagePacker(out, MSGPACK_CONF);
}
/**
* Write the fixed-length request frame which starts packed RPC messages.
*/
static void writeReqFrame(ByteBuffer bb, int methodId, long seq, int length)
throws IOException {
int oldPos = bb.position();
boolean success = false;
try {
bb.order(ByteOrder.LITTLE_ENDIAN);
bb.putInt(HRPC_MAGIC);
bb.putInt(methodId);
bb.putLong(seq);
bb.putInt(length);
success = true;
} finally {
if (!success) {
bb.position(oldPos);
}
}
}
/**
* Write an 8-byte value to a byte array as little-endian.
*/
private static void longToBigEndian(byte b[], int pos, long val) {
b[pos + 0] =(byte) ((val >> 56) & 0xff);
b[pos + 1] =(byte) ((val >> 48) & 0xff);
b[pos + 2] =(byte) ((val >> 40) & 0xff);
b[pos + 3] =(byte) ((val >> 32) & 0xff);
b[pos + 4] =(byte) ((val >> 24) & 0xff);
b[pos + 5] =(byte) ((val >> 16) & 0xff);
b[pos + 6] =(byte) ((val >> 8) & 0xff);
b[pos + 7] =(byte) ((val >> 0) & 0xff);
}
private void writeSpanId(SpanId spanId) throws IOException {
longToBigEndian(temp, 0, spanId.getHigh());
longToBigEndian(temp, 8, spanId.getLow());
packer.packBinaryHeader(SPAN_ID_BYTE_LENGTH);
packer.writePayload(temp, 0, SPAN_ID_BYTE_LENGTH);
}
/**
* Serialize a span to the given OutputStream.
*/
void writeSpan(Span span) throws IOException {
boolean success = false;
int oldPos = bb.position();
try {
int mapSize = 0;
if (span.getSpanId().isValid()) {
mapSize++;
}
if (span.getStartTimeMillis() != 0) {
mapSize++;
}
if (span.getStopTimeMillis() != 0) {
mapSize++;
}
if (!span.getDescription().isEmpty()) {
mapSize++;
}
if (!span.getTracerId().isEmpty()) {
mapSize++;
}
if (span.getParents().length > 0) {
mapSize++;
}
if (!span.getKVAnnotations().isEmpty()) {
mapSize++;
}
if (!span.getTimelineAnnotations().isEmpty()) {
mapSize++;
}
packer.packMapHeader(mapSize);
if (span.getSpanId().isValid()) {
packer.packRawStringHeader(1);
packer.writePayload(A);
writeSpanId(span.getSpanId());
}
if (span.getStartTimeMillis() != 0) {
packer.packRawStringHeader(1);
packer.writePayload(B);
packer.packLong(span.getStartTimeMillis());
}
if (span.getStopTimeMillis() != 0) {
packer.packRawStringHeader(1);
packer.writePayload(E);
packer.packLong(span.getStopTimeMillis());
}
if (!span.getDescription().isEmpty()) {
packer.packRawStringHeader(1);
packer.writePayload(D);
packer.packString(span.getDescription());
}
if (!span.getTracerId().isEmpty()) {
packer.packRawStringHeader(1);
packer.writePayload(R);
packer.packString(span.getTracerId());
}
if (span.getParents().length > 0) {
packer.packRawStringHeader(1);
packer.writePayload(P);
packer.packArrayHeader(span.getParents().length);
for (int i = 0; i < span.getParents().length; i++) {
writeSpanId(span.getParents()[i]);
}
}
if (!span.getKVAnnotations().isEmpty()) {
packer.packRawStringHeader(1);
packer.writePayload(N);
Map<String, String> map = span.getKVAnnotations();
packer.packMapHeader(map.size());
for (Map.Entry<String, String> entry : map.entrySet()) {
packer.packString(entry.getKey());
packer.packString(entry.getValue());
}
}
if (!span.getTimelineAnnotations().isEmpty()) {
packer.packRawStringHeader(1);
packer.writePayload(T);
List<TimelineAnnotation> list = span.getTimelineAnnotations();
packer.packArrayHeader(list.size());
for (TimelineAnnotation annotation : list) {
packer.packMapHeader(2);
packer.packRawStringHeader(1);
packer.writePayload(T);
packer.packLong(annotation.getTime());
packer.packRawStringHeader(1);
packer.writePayload(M);
packer.packString(annotation.getMessage());
}
}
packer.flush();
success = true;
} finally {
if (!success) {
// If we failed earlier, restore the old position.
// This is so that if we run out of space, we don't add a
// partial span to the buffer.
bb.position(oldPos);
}
}
}
static SpanId readSpanId(MessageUnpacker unpacker) throws IOException {
int alen = unpacker.unpackBinaryHeader();
if (alen != SPAN_ID_BYTE_LENGTH) {
throw new IOException("Invalid length given for spanID array. " +
"Expected " + SPAN_ID_BYTE_LENGTH + "; got " + alen);
}
byte[] payload = new byte[SPAN_ID_BYTE_LENGTH];
unpacker.readPayload(payload);
return new SpanId(
((payload[ 7] & 0xffL) << 0) |
((payload[ 6] & 0xffL) << 8) |
((payload[ 5] & 0xffL) << 16) |
((payload[ 4] & 0xffL) << 24) |
((payload[ 3] & 0xffL) << 32) |
((payload[ 2] & 0xffL) << 40) |
((payload[ 1] & 0xffL) << 48) |
((payload[ 0] & 0xffL) << 56),
((payload[15] & 0xffL) << 0) |
((payload[14] & 0xffL) << 8) |
((payload[13] & 0xffL) << 16) |
((payload[12] & 0xffL) << 24) |
((payload[11] & 0xffL) << 32) |
((payload[10] & 0xffL) << 40) |
((payload[ 9] & 0xffL) << 48) |
((payload[ 8] & 0xffL) << 56)
);
}
/**
* Read a span. Used in unit tests. Not optimized.
*/
static Span readSpan(MessageUnpacker unpacker) throws IOException {
int numEntries = unpacker.unpackMapHeader();
MilliSpan.Builder builder = new MilliSpan.Builder();
while (--numEntries >= 0) {
String key = unpacker.unpackString();
if (key.length() != 1) {
throw new IOException("Unknown key " + key);
}
switch (key.charAt(0)) {
case 'a':
builder.spanId(readSpanId(unpacker));
break;
case 'b':
builder.begin(unpacker.unpackLong());
break;
case 'e':
builder.end(unpacker.unpackLong());
break;
case 'd':
builder.description(unpacker.unpackString());
break;
case 'r':
builder.tracerId(unpacker.unpackString());
break;
case 'p':
int numParents = unpacker.unpackArrayHeader();
SpanId[] parents = new SpanId[numParents];
for (int i = 0; i < numParents; i++) {
parents[i] = readSpanId(unpacker);
}
builder.parents(parents);
break;
case 'n':
int mapEntries = unpacker.unpackMapHeader();
HashMap<String, String> entries =
new HashMap<String, String>(mapEntries);
for (int i = 0; i < mapEntries; i++) {
String k = unpacker.unpackString();
String v = unpacker.unpackString();
entries.put(k, v);
}
builder.traceInfo(entries);
break;
case 't':
int listEntries = unpacker.unpackArrayHeader();
ArrayList<TimelineAnnotation> list =
new ArrayList<TimelineAnnotation>(listEntries);
for (int i = 0; i < listEntries; i++) {
int timelineObjectSize = unpacker.unpackMapHeader();
long time = 0;
String msg = "";
for (int j = 0; j < timelineObjectSize; j++) {
String tlKey = unpacker.unpackString();
if (tlKey.length() != 1) {
throw new IOException("Unknown timeline map key " + tlKey);
}
switch (tlKey.charAt(0)) {
case 't':
time = unpacker.unpackLong();
break;
case 'm':
msg = unpacker.unpackString();
break;
default:
throw new IOException("Unknown timeline map key " + tlKey);
}
}
list.add(new TimelineAnnotation(time, msg));
}
builder.timeline(list);
break;
default:
throw new IOException("Unknown key " + key);
}
}
return builder.build();
}
void beginWriteSpansRequest(String defaultPid, int numSpans)
throws IOException {
boolean success = false;
int oldPos = bb.position();
try {
int mapSize = 1;
if (defaultPid != null) {
mapSize++;
}
packer.packMapHeader(mapSize);
if (defaultPid != null) {
packer.packRawStringHeader(DEFAULT_PID.length);
packer.writePayload(DEFAULT_PID);
packer.packString(defaultPid);
}
packer.packRawStringHeader(NUM_SPANS.length);
packer.writePayload(NUM_SPANS);
packer.packInt(numSpans);
packer.flush();
success = true;
} finally {
if (!success) {
bb.position(oldPos);
}
}
}
/**
* Get the underlying ByteBuffer.
*/
ByteBuffer getBuffer() {
return bb;
}
/**
* Reset our position in the array.
*/
void reset() throws IOException {
packer.reset(out);
}
void close() {
try {
packer.close();
} catch (IOException e) {
LOG.error("Error closing MessagePacker", e);
}
}
public String toHexString() {
String prefix = "";
StringBuilder bld = new StringBuilder();
ByteBuffer b = bb.duplicate();
b.flip();
while (b.hasRemaining()) {
bld.append(String.format("%s%02x", prefix, b.get()));
prefix = " ";
}
return bld.toString();
}
}