blob: 801993798e8905a7742ea9060865856e3672cf48 [file] [log] [blame]
-module(jaeger_passage_reporter).
-behaviour(passage_reporter).
-behaviour(gen_server).
-include("constants.hrl").
%%------------------------------------------------------------------------------
%% Exported API
%%------------------------------------------------------------------------------
-export([start/1, start/2]).
-export([stop/1]).
-export([which_reporters/0]).
-export_type([reporter_id/0]).
-export_type([start_option/0, start_options/0]).
%%------------------------------------------------------------------------------
%% Application Internal API
%%------------------------------------------------------------------------------
-export([start_link/2]).
%%------------------------------------------------------------------------------
%% 'passage_reporter' Callback API
%%------------------------------------------------------------------------------
-export([report/2]).
%%------------------------------------------------------------------------------
%% 'gen_server' Callback API
%%------------------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%%------------------------------------------------------------------------------
%% Macros & Records
%%------------------------------------------------------------------------------
-define(STATE, ?MODULE).
-record(?STATE,
{
socket :: gen_udp:socket(),
thrift_format :: thrift_protocol:format(),
agent_host :: inet:hostname(),
agent_port :: inet:port_number(),
service_name :: atom(),
service_tags :: passage:tags()
}).
%%------------------------------------------------------------------------------
%% Exported Types
%%------------------------------------------------------------------------------
-type reporter_id() :: atom().
-type start_options() :: [start_option()].
-type start_option() :: {thrift_format, thrift_protocol:format()}
| {agent_host, inet:hostname()}
| {agent_port, inet:port_number()}
| {service_name, atom()}
| {service_tags, passage:tags()}.
%%------------------------------------------------------------------------------
%% Application Internal Functions
%%------------------------------------------------------------------------------
-spec start_link(reporter_id(), start_options()) -> {ok, pid()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start_link(ReporterId, Options) ->
Name = jaeger_passage_local_ns:reporter_name(ReporterId),
gen_server:start_link(Name, ?MODULE, {ReporterId, Options}, []).
%%------------------------------------------------------------------------------
%% Exported Functions
%%------------------------------------------------------------------------------
%% @equiv start(ReporterId, [])
-spec start(reporter_id()) -> {ok, passage_reporter:reporter()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start(ReporterId) ->
start(ReporterId, []).
-spec start(reporter_id(), start_options()) -> {ok, passage_reporter:reporter()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start(ReporterId, Options) ->
case jaeger_passage_reporter_sup:start_child(ReporterId, Options) of
{error, Reason} -> {error, Reason};
{ok, _Pid} -> {ok, passage_reporter:new(?MODULE, ReporterId)}
end.
-spec stop(reporter_id()) -> ok.
stop(ReporterId) ->
jaeger_passage_reporter_sup:stop_child(ReporterId).
-spec which_reporters() -> [reporter_id()].
which_reporters() ->
jaeger_passage_reporter_sup:which_children().
%%------------------------------------------------------------------------------
%% 'passage_reporter' Callback Functions
%%------------------------------------------------------------------------------
%% @private
report(ReporterId, Span) ->
Server = jaeger_passage_local_ns:reporter_name(ReporterId),
gen_server:cast(Server, {report, Span}).
%%------------------------------------------------------------------------------
%% 'gen_server' Callback Functions
%%------------------------------------------------------------------------------
%% @private
init({ReporterId, Options}) ->
Format = proplists:get_value(thrift_protocol, Options, compact),
DefaultPort =
case Format of
compact -> 6831;
binary -> 6832
end,
AgentHost = proplists:get_value(agent_host, Options, "127.0.0.1"),
AgentPort = proplists:get_value(agent_port, Options, DefaultPort),
ServiceName = proplists:get_value(service_name, Options, ReporterId),
Tags0 = proplists:get_value(service_tags, Options, #{}),
{ok, Hostname} = inet:gethostname(),
{ok, Version} = application:get_key(vsn),
Tags1 =
maps:merge(
Tags0,
#{
?JAEGER_CLIENT_VERSION_TAG_KEY => list_to_binary(["jaeger_passage-", Version]),
?TRACER_HOSTNAME_TAG_KEY => list_to_binary(Hostname)
}),
{ok, Socket} = gen_udp:open(0),
State =
#?STATE{
socket = Socket,
thrift_format = Format,
agent_host = AgentHost,
agent_port = AgentPort,
service_name = ServiceName,
service_tags = Tags1
},
{ok, State}.
%% @private
handle_call(_Request, _From, State) ->
{noreply, State}.
%% @private
handle_cast({report, Span}, State) ->
handle_report(Span, State);
handle_cast(_Request, State) ->
{noreply, State}.
%% @private
handle_info(_Info, State) ->
{noreply, State}.
%% @private
terminate(_Reason, _State) ->
ok.
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
-spec handle_report(passage_span:span(), #?STATE{}) -> {noreply, #?STATE{}}.
handle_report(Span, State = #?STATE{service_name = Name, service_tags = Tags}) ->
Message = jaeger_passage_thrift:make_emit_batch_message(Name, Tags, [Span]),
Encoded = thrift_protocol:encode_message(Message, State#?STATE.thrift_format),
ok = gen_udp:send(State#?STATE.socket, State#?STATE.agent_host, State#?STATE.agent_port, Encoded),
{noreply, State}.