blob: 6090355d56946af4ecdea1bf580b6ffeeb017091 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_stats_aggregator).
-behaviour(gen_server).
-export([start/0, start/1, stop/0]).
-export([all/0, all/1, get/1, get/2, get_json/1, get_json/2, collect_sample/0]).
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
-record(aggregate, {
description = <<"">>,
seconds = 0,
count = 0,
current = null,
sum = null,
mean = null,
variance = null,
stddev = null,
min = null,
max = null,
samples = []
}).
start() ->
PrivDir = couch_util:priv_dir(),
start(filename:join(PrivDir, "stat_descriptions.cfg")).
start(FileName) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [FileName], []).
stop() ->
gen_server:cast(?MODULE, stop).
all() ->
?MODULE:all(0).
all(Time) when is_binary(Time) ->
?MODULE:all(list_to_integer(binary_to_list(Time)));
all(Time) when is_atom(Time) ->
?MODULE:all(list_to_integer(atom_to_list(Time)));
all(Time) when is_integer(Time) ->
Aggs = ets:match(?MODULE, {{'$1', Time}, '$2'}),
Stats = lists:map(fun([Key, Agg]) -> {Key, Agg} end, Aggs),
case Stats of
[] ->
{[]};
_ ->
Ret = lists:foldl(fun({{Mod, Key}, Agg}, Acc) ->
CurrKeys = case proplists:lookup(Mod, Acc) of
none -> [];
{Mod, {Keys}} -> Keys
end,
NewMod = {[{Key, to_json_term(Agg)} | CurrKeys]},
[{Mod, NewMod} | proplists:delete(Mod, Acc)]
end, [], Stats),
{Ret}
end.
get(Key) ->
?MODULE:get(Key, 0).
get(Key, Time) when is_binary(Time) ->
?MODULE:get(Key, list_to_integer(binary_to_list(Time)));
get(Key, Time) when is_atom(Time) ->
?MODULE:get(Key, list_to_integer(atom_to_list(Time)));
get(Key, Time) when is_integer(Time) ->
case ets:lookup(?MODULE, {make_key(Key), Time}) of
[] -> #aggregate{seconds=Time};
[{_, Agg}] -> Agg
end.
get_json(Key) ->
get_json(Key, 0).
get_json(Key, Time) ->
to_json_term(?MODULE:get(Key, Time)).
collect_sample() ->
gen_server:call(?MODULE, collect_sample, infinity).
init(StatDescsFileName) ->
% Create an aggregate entry for each {description, rate} pair.
ets:new(?MODULE, [named_table, set, protected]),
SampleStr = couch_config:get("stats", "samples", "[0]"),
{ok, Samples} = couch_util:parse_term(SampleStr),
{ok, Descs} = file:consult(StatDescsFileName),
lists:foreach(fun({Sect, Key, Value}) ->
lists:foreach(fun(Secs) ->
Agg = #aggregate{
description=list_to_binary(Value),
seconds=Secs
},
ets:insert(?MODULE, {{{Sect, Key}, Secs}, Agg})
end, Samples)
end, Descs),
Self = self(),
ok = couch_config:register(
fun("stats", _) -> exit(Self, config_change) end
),
Rate = list_to_integer(couch_config:get("stats", "rate", "1000")),
% TODO: Add timer_start to kernel start options.
{ok, TRef} = timer:apply_after(Rate, ?MODULE, collect_sample, []),
{ok, {TRef, Rate}}.
terminate(_Reason, {TRef, _Rate}) ->
timer:cancel(TRef),
ok.
handle_call(collect_sample, _, {OldTRef, SampleInterval}) ->
timer:cancel(OldTRef),
{ok, TRef} = timer:apply_after(SampleInterval, ?MODULE, collect_sample, []),
% Gather new stats values to add.
Incs = lists:map(fun({Key, Value}) ->
{Key, {incremental, Value}}
end, couch_stats_collector:all(incremental)),
Abs = lists:map(fun({Key, Values}) ->
couch_stats_collector:clear(Key),
Values2 = case Values of
X when is_list(X) -> X;
Else -> [Else]
end,
{_, Mean} = lists:foldl(fun(Val, {Count, Curr}) ->
{Count+1, Curr + (Val - Curr) / (Count+1)}
end, {0, 0}, Values2),
{Key, {absolute, Mean}}
end, couch_stats_collector:all(absolute)),
Values = Incs ++ Abs,
Now = erlang:now(),
lists:foreach(fun({{Key, Rate}, Agg}) ->
NewAgg = case proplists:lookup(Key, Values) of
none ->
rem_values(Now, Agg);
{Key, {Type, Value}} ->
NewValue = new_value(Type, Value, Agg#aggregate.current),
Agg2 = add_value(Now, NewValue, Agg),
rem_values(Now, Agg2)
end,
ets:insert(?MODULE, {{Key, Rate}, NewAgg})
end, ets:tab2list(?MODULE)),
{reply, ok, {TRef, SampleInterval}}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info(_Info, State) ->
{noreply, State}.
code_change(_OldVersion, State, _Extra) ->
{ok, State}.
new_value(incremental, Value, null) ->
Value;
new_value(incremental, Value, Current) ->
Value - Current;
new_value(absolute, Value, _Current) ->
Value.
add_value(Time, Value, #aggregate{count=Count, seconds=Secs}=Agg) when Count < 1 ->
Samples = case Secs of
0 -> [];
_ -> [{Time, Value}]
end,
Agg#aggregate{
count=1,
current=Value,
sum=Value,
mean=Value,
variance=0.0,
stddev=null,
min=Value,
max=Value,
samples=Samples
};
add_value(Time, Value, Agg) ->
#aggregate{
count=Count,
current=Current,
sum=Sum,
mean=Mean,
variance=Variance,
samples=Samples
} = Agg,
NewCount = Count + 1,
NewMean = Mean + (Value - Mean) / NewCount,
NewVariance = Variance + (Value - Mean) * (Value - NewMean),
StdDev = case NewCount > 1 of
false -> null;
_ -> math:sqrt(NewVariance / (NewCount - 1))
end,
Agg2 = Agg#aggregate{
count=NewCount,
current=Current + Value,
sum=Sum + Value,
mean=NewMean,
variance=NewVariance,
stddev=StdDev,
min=lists:min([Agg#aggregate.min, Value]),
max=lists:max([Agg#aggregate.max, Value])
},
case Agg2#aggregate.seconds of
0 -> Agg2;
_ -> Agg2#aggregate{samples=[{Time, Value} | Samples]}
end.
rem_values(Time, Agg) ->
Seconds = Agg#aggregate.seconds,
Samples = Agg#aggregate.samples,
Pred = fun({When, _Value}) ->
timer:now_diff(Time, When) =< (Seconds * 1000000)
end,
{Keep, Remove} = lists:splitwith(Pred, Samples),
Agg2 = lists:foldl(fun({_, Value}, Acc) ->
rem_value(Value, Acc)
end, Agg, Remove),
Agg2#aggregate{samples=Keep}.
rem_value(_Value, #aggregate{count=Count, seconds=Secs}) when Count =< 1 ->
#aggregate{seconds=Secs};
rem_value(Value, Agg) ->
#aggregate{
count=Count,
sum=Sum,
mean=Mean,
variance=Variance
} = Agg,
OldMean = (Mean * Count - Value) / (Count - 1),
OldVariance = Variance - (Value - OldMean) * (Value - Mean),
OldCount = Count - 1,
StdDev = case OldCount > 1 of
false -> null;
_ -> math:sqrt(clamp_value(OldVariance / (OldCount - 1)))
end,
Agg#aggregate{
count=OldCount,
sum=Sum-Value,
mean=clamp_value(OldMean),
variance=clamp_value(OldVariance),
stddev=StdDev
}.
to_json_term(Agg) ->
{Min, Max} = case Agg#aggregate.seconds > 0 of
false ->
{Agg#aggregate.min, Agg#aggregate.max};
_ ->
case length(Agg#aggregate.samples) > 0 of
true ->
Extract = fun({_Time, Value}) -> Value end,
Samples = lists:map(Extract, Agg#aggregate.samples),
{lists:min(Samples), lists:max(Samples)};
_ ->
{null, null}
end
end,
{[
{description, Agg#aggregate.description},
{current, round_value(Agg#aggregate.sum)},
{sum, round_value(Agg#aggregate.sum)},
{mean, round_value(Agg#aggregate.mean)},
{stddev, round_value(Agg#aggregate.stddev)},
{min, Min},
{max, Max}
]}.
make_key({Mod, Val}) when is_integer(Val) ->
{Mod, list_to_atom(integer_to_list(Val))};
make_key(Key) ->
Key.
round_value(Val) when not is_number(Val) ->
Val;
round_value(Val) when Val == 0 ->
Val;
round_value(Val) ->
erlang:round(Val * 1000.0) / 1000.0.
clamp_value(Val) when Val > 0.00000000000001 ->
Val;
clamp_value(_) ->
0.0.