blob: 70c4790ab58063020ddf84e3ae02686fa8f1ef87 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_prometheus_server).
-behaviour(gen_server).
-import(couch_prometheus_util, [
couch_to_prom/3,
to_prom/4,
to_prom_summary/2
]).
-export([
scrape/0,
version/0
]).
-export([
start_link/0,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3,
terminate/2
]).
-include("couch_prometheus.hrl").
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-record(st, {
metrics,
refresh
}).
init([]) ->
Metrics = refresh_metrics(),
RT = update_refresh_timer(),
{ok, #st{metrics = Metrics, refresh = RT}}.
scrape() ->
{ok, Metrics} = gen_server:call(?MODULE, scrape),
Metrics.
version() ->
?PROMETHEUS_VERSION.
handle_call(scrape, _from, #st{metrics = Metrics} = State) ->
{reply, {ok, Metrics}, State};
handle_call(refresh, _from, #st{refresh = OldRT} = State) ->
timer:cancel(OldRT),
Metrics = refresh_metrics(),
RT = update_refresh_timer(),
{reply, ok, State#st{metrics = Metrics, refresh = RT}};
handle_call(Msg, _From, State) ->
{stop, {unknown_call, Msg}, error, State}.
handle_cast(Msg, State) ->
{stop, {unknown_cast, Msg}, State}.
handle_info(refresh, #st{refresh = OldRT} = State) ->
timer:cancel(OldRT),
Metrics = refresh_metrics(),
RT = update_refresh_timer(),
{noreply, State#st{metrics = Metrics, refresh = RT}};
handle_info(Msg, State) ->
{stop, {unknown_info, Msg}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
refresh_metrics() ->
CouchDB = get_couchdb_stats(),
System = couch_stats_httpd:to_ejson(get_system_stats()),
couch_prometheus_util:to_bin(
lists:map(
fun(Line) ->
io_lib:format("~s~n", [Line])
end,
CouchDB ++ System
)
).
get_couchdb_stats() ->
Stats = lists:sort(couch_stats:fetch()),
lists:flatmap(
fun({Path, Info}) ->
couch_to_prom(Path, Info, Stats)
end,
Stats
).
get_system_stats() ->
lists:flatten([
get_uptime_stat(),
get_io_stats(),
get_message_queue_stats(),
get_run_queue_stats(),
get_vm_stats(),
get_ets_stats()
]).
get_uptime_stat() ->
to_prom(uptime_seconds, counter, "couchdb uptime", couch_app:uptime() div 1000).
get_vm_stats() ->
MemLabels = lists:map(
fun({Type, Value}) ->
{[{memory_type, Type}], Value}
end,
erlang:memory()
),
{NumGCs, WordsReclaimed, _} = erlang:statistics(garbage_collection),
CtxSwitches = element(1, erlang:statistics(context_switches)),
Reds = element(1, erlang:statistics(reductions)),
ProcCount = erlang:system_info(process_count),
ProcLimit = erlang:system_info(process_limit),
[
to_prom(
erlang_memory_bytes,
gauge,
"size of memory (in bytes) dynamically allocated by the Erlang emulator",
MemLabels
),
to_prom(
erlang_gc_collections_total,
counter,
"number of garbage collections by the Erlang emulator",
NumGCs
),
to_prom(
erlang_gc_words_reclaimed_total,
counter,
"number of words reclaimed by garbage collections",
WordsReclaimed
),
to_prom(
erlang_context_switches_total, counter, "total number of context switches", CtxSwitches
),
to_prom(erlang_reductions_total, counter, "total number of reductions", Reds),
to_prom(erlang_processes, gauge, "the number of Erlang processes", ProcCount),
to_prom(
erlang_process_limit,
gauge,
"the maximum number of simultaneously existing Erlang processes",
ProcLimit
)
].
get_io_stats() ->
{{input, In}, {output, Out}} = erlang:statistics(io),
[
to_prom(
erlang_io_recv_bytes_total,
counter,
"the total number of bytes received through ports",
In
),
to_prom(
erlang_io_sent_bytes_total, counter, "the total number of bytes output to ports", Out
)
].
get_message_queue_stats() ->
QLenFun = fun(Name) -> message_queue_len(whereis(Name)) end,
Queues = lists:map(QLenFun, registered()),
[
to_prom(
erlang_message_queues, gauge, "total size of all message queues", lists:sum(Queues)
),
to_prom(
erlang_message_queue_min,
gauge,
"minimum size across all message queues",
lists:min(Queues)
),
to_prom(
erlang_message_queue_max,
gauge,
"maximum size across all message queues",
lists:max(Queues)
)
].
message_queue_len(undefined) ->
0;
message_queue_len(Pid) when is_pid(Pid) ->
case erlang:process_info(Pid, message_queue_len) of
{message_queue_len, N} ->
N;
_ ->
0
end.
get_run_queue_stats() ->
%% Workaround for https://bugs.erlang.org/browse/ERL-1355
{Normal, Dirty} =
case erlang:system_info(dirty_cpu_schedulers) > 0 of
false ->
{statistics(run_queue), 0};
true ->
[DCQ | SQs] = lists:reverse(statistics(run_queue_lengths)),
{lists:sum(SQs), DCQ}
end,
[
to_prom(erlang_scheduler_queues, gauge, "the total size of all normal run queues", Normal),
to_prom(
erlang_dirty_cpu_scheduler_queues,
gauge,
"the total size of all dirty CPU scheduler run queues",
Dirty
)
].
get_ets_stats() ->
NumTabs = length(ets:all()),
to_prom(erlang_ets_table, gauge, "number of ETS tables", NumTabs).
drain_refresh_messages() ->
receive
refresh -> drain_refresh_messages()
after 0 ->
ok
end.
update_refresh_timer() ->
drain_refresh_messages(),
RefreshTime = 1000 * config:get_integer("prometheus", "interval", ?REFRESH_INTERVAL),
erlang:send_after(RefreshTime, self(), refresh).
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
system_stats_test() ->
lists:foreach(
fun(Line) ->
?assert(is_binary(Line)),
Trimmed = string:trim(Line),
?assert(starts_with(<<"couchdb_">>, Trimmed) orelse starts_with(<<"# ">>, Trimmed))
end,
get_system_stats()
).
starts_with(Prefix, Line) when is_binary(Prefix), is_binary(Line) ->
binary:longest_common_prefix([Prefix, Line]) > 0.
message_queue_len_test() ->
self() ! refresh,
?assert(message_queue_len(self()) >= 1),
?assertEqual(0, message_queue_len(undefined)),
{Pid, Ref} = spawn_monitor(fun() -> ok end),
receive
{'DOWN', Ref, process, Pid, _} ->
ok
end,
?assertEqual(0, message_queue_len(Pid)).
drain_refresh_messages_test() ->
self() ! refresh,
{messages, Mq0} = erlang:process_info(self(), messages),
?assert(lists:member(refresh, Mq0)),
drain_refresh_messages(),
{messages, Mq1} = erlang:process_info(self(), messages),
?assert(not lists:member(refresh, Mq1)).
-endif.