blob: 28537fdd930eb9898ef5f20fa075f5547528b4ae [file] [log] [blame]
%% @copyright 2017 Takeru Ohta <phjgt308@gmail.com>
%%
%% @doc Thrift messages for jaeger
%%
%% IDL:
%% - https://github.com/jaegertracing/jaeger-idl/blob/84a83104/thrift/agent.thrift
%% - https://github.com/jaegertracing/jaeger-idl/blob/84a83104/thrift/jaeger.thrift
%%
%% @private
-module(jaeger_passage_thrift).
-include_lib("thrift_protocol/include/thrift_protocol.hrl").
-include("constants.hrl").
%%------------------------------------------------------------------------------
%% Application Internal API
%%------------------------------------------------------------------------------
-export([make_emit_batch_message/3]).
%%------------------------------------------------------------------------------
%% Macros
%%------------------------------------------------------------------------------
-define(TAG_TYPE_STRING, 0).
-define(TAG_TYPE_DOUBLE, 1).
-define(TAG_TYPE_BOOL, 2).
-define(TAG_TYPE_LONG, 3).
-define(TAG_TYPE_BINARY, 4).
-define(REF_TYPE_CHILD_OF, 0).
-define(REF_TYPE_FOLLOWS_FROM, 1).
-define(STRUCT(F1), #thrift_protocol_struct{fields = #{1 => F1}}).
-define(STRUCT(F1, F2), #thrift_protocol_struct{fields = #{1 => F1, 2 => F2}}).
-define(LIST(Es), #thrift_protocol_list{element_type = struct, elements = Es}).
%%------------------------------------------------------------------------------
%% Application Internal API
%%------------------------------------------------------------------------------
-spec make_emit_batch_message(atom(), passage:tags(), [passage_span:span()]) ->
thrift_protocol:message().
make_emit_batch_message(ServiceName, ServiceTags, Spans) ->
Process = make_process(ServiceName, ServiceTags),
Batch = ?STRUCT(Process, make_spans(Spans)),
#thrift_protocol_message{
method_name = <<"emitBatch">>,
message_type = oneway,
sequence_id = 0,
body = ?STRUCT(Batch)
}.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
-spec make_process(atom(), passage:tags()) -> thrift_protocol:struct().
make_process(ServiceName, ServiceTags) ->
?STRUCT(atom_to_binary(ServiceName, utf8), make_tags(ServiceTags)).
-spec make_tags(Tags) -> thrift_protocol:thrift_list() when
Tags :: #{atom() => term()}.
make_tags(Tags) ->
?LIST(maps:fold(fun (K, V, Acc) -> [make_tag(K, V) | Acc] end, [], Tags)).
-spec make_tag(passage:tag_name(), passage:tag_value()) -> thrift_protocol:struct().
make_tag(Key, Value) ->
{ValueType, FieldId, TagValue} = make_tag_value(Value),
#thrift_protocol_struct{
fields = #{
1 => atom_to_binary(Key, utf8),
2 => {i32, ValueType},
FieldId => TagValue
}
}.
-spec make_tag_value(term()) -> {ValueType, FieldId, Value} when
ValueType :: non_neg_integer(),
FieldId :: thrift_protocol:field_id(),
Value :: thrift_protocol:data().
make_tag_value(X) when is_boolean(X) -> {?TAG_TYPE_BOOL, 5, X};
make_tag_value(X) when is_atom(X) -> {?TAG_TYPE_STRING, 3, atom_to_binary(X, utf8)};
make_tag_value(X) when is_binary(X) -> {?TAG_TYPE_STRING, 3, X};
make_tag_value(X) when is_float(X) -> {?TAG_TYPE_DOUBLE, 4, X};
make_tag_value(X) when is_integer(X) -> {?TAG_TYPE_LONG, 6, {i64, X}};
make_tag_value(X) ->
Size = erlang:external_size(X),
case Size > 1024 of
true ->
{?TAG_TYPE_STRING, 3,
list_to_binary(io_lib:format("...~p bytes... (ommitted by ~p)",
[Size, ?MODULE]))};
false ->
try list_to_binary(X) of
Binary -> {?TAG_TYPE_STRING, 3, Binary}
catch
error:badarg ->
Binary = list_to_binary(io_lib:format("~1024p", [X])),
{?TAG_TYPE_STRING, 3, Binary}
end
end.
-spec make_spans([passage_span:span()]) -> thrift_protocol:thrift_list().
make_spans(Spans) ->
?LIST(lists:map(fun make_span/1, Spans)).
-spec make_span(passage_span:span()) -> thrift_protocol:struct().
make_span(Span) ->
Context = passage_span:get_context(Span),
TraceId = jaeger_passage_span_context:get_trace_id(Context),
Refs =
lists:filter(
fun ({_, S}) ->
0 =/= jaeger_passage_span_context:get_span_id(passage_span:get_context(S))
end,
passage_span:get_refs(Span)),
ParentSpanId =
case Refs of
[{_, Parent} | _] ->
jaeger_passage_span_context:get_span_id(
passage_span:get_context(Parent));
_ -> 0
end,
Tags0 = passage_span:get_tags(Span),
Tags1 =
case jaeger_passage_span_context:get_debug_id(Context) of
error -> Tags0;
{ok, DebugId} -> maps:put(?JAEGER_DEBUG_HEADER, DebugId, Tags0)
end,
#thrift_protocol_struct{
fields = #{
1 => {i64, to_i64(TraceId)},
2 => {i64, to_i64(TraceId bsr 64)},
3 => {i64, to_i64(jaeger_passage_span_context:get_span_id(Context))},
4 => {i64, to_i64(ParentSpanId)},
5 => atom_to_binary(passage_span:get_operation_name(Span), utf8),
6 => make_references(Refs),
7 => {i32, jaeger_passage_span_context:get_flags(Context)},
8 => {i64, timestamp_to_us(passage_span:get_start_time(Span))},
9 => {i64, get_duration_us(Span)},
10 => make_tags(Tags1),
11 => make_logs(passage_span:get_logs(Span))
}
}.
-spec make_references(passage:refs()) -> thrift_protocol:thrift_list().
make_references(Refs) ->
Elements =
lists:filtermap(
fun (Ref = {_, Span}) ->
Context = passage_span:get_context(Span),
case jaeger_passage_span_context:get_span_id(Context) of
0 -> false;
_ -> {true, make_reference(Ref)}
end
end,
Refs),
?LIST(Elements).
-spec make_reference(passage:ref()) -> thrift_protocol:struct().
make_reference(Ref) ->
RefType =
case Ref of
{child_of, Span} -> ?REF_TYPE_CHILD_OF;
{follows_from, Span} -> ?REF_TYPE_FOLLOWS_FROM
end,
Context = passage_span:get_context(Span),
TraceId = jaeger_passage_span_context:get_trace_id(Context),
SpanId = jaeger_passage_span_context:get_span_id(Context),
#thrift_protocol_struct{
fields = #{
1 => {i32, RefType},
2 => {i64, to_i64(TraceId)},
3 => {i64, to_i64(TraceId bsr 64)},
4 => {i64, to_i64(SpanId)}
}
}.
-spec make_logs([passage_span:log()]) -> thrift_protocol:thrift_list().
make_logs(Logs) ->
?LIST(lists:map(fun make_log/1, Logs)).
-spec make_log(passage_span:log()) -> thrift_protocol:struct().
make_log({Fields, Time}) ->
?STRUCT({i64, timestamp_to_us(Time)}, make_tags(Fields)).
-spec timestamp_to_us(erlang:timestamp()) -> non_neg_integer().
timestamp_to_us(Timestamp) ->
timer:now_diff(Timestamp, {0, 0, 0}).
-spec get_duration_us(passage_span:span()) -> non_neg_integer().
get_duration_us(Span) ->
Start = passage_span:get_start_time(Span),
{ok, Finish} = passage_span:get_finish_time(Span),
timer:now_diff(Finish, Start).
-spec to_i64(non_neg_integer()) -> integer().
to_i64(N) ->
<<S:64/signed>> = <<N:64>>,
S.