| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_stats_aggregator). |
| -behaviour(gen_server). |
| |
| -export([start/0, start/1, stop/0]). |
| -export([all/0, all/1, get/1, get/2, get_json/1, get_json/2, collect_sample/0]). |
| |
| -export([init/1, terminate/2, code_change/3]). |
| -export([handle_call/3, handle_cast/2, handle_info/2]). |
| |
| -record(aggregate, { |
| description = <<"">>, |
| seconds = 0, |
| count = 0, |
| current = null, |
| sum = null, |
| mean = null, |
| variance = null, |
| stddev = null, |
| min = null, |
| max = null, |
| samples = [] |
| }). |
| |
| |
| start() -> |
| PrivDir = couch_util:priv_dir(), |
| start(filename:join(PrivDir, "stat_descriptions.cfg")). |
| |
| start(FileName) -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [FileName], []). |
| |
| stop() -> |
| gen_server:cast(?MODULE, stop). |
| |
| all() -> |
| ?MODULE:all(0). |
| all(Time) when is_binary(Time) -> |
| ?MODULE:all(list_to_integer(binary_to_list(Time))); |
| all(Time) when is_atom(Time) -> |
| ?MODULE:all(list_to_integer(atom_to_list(Time))); |
| all(Time) when is_integer(Time) -> |
| Aggs = ets:match(?MODULE, {{'$1', Time}, '$2'}), |
| Stats = lists:map(fun([Key, Agg]) -> {Key, Agg} end, Aggs), |
| case Stats of |
| [] -> |
| {[]}; |
| _ -> |
| Ret = lists:foldl(fun({{Mod, Key}, Agg}, Acc) -> |
| CurrKeys = case proplists:lookup(Mod, Acc) of |
| none -> []; |
| {Mod, {Keys}} -> Keys |
| end, |
| NewMod = {[{Key, to_json_term(Agg)} | CurrKeys]}, |
| [{Mod, NewMod} | proplists:delete(Mod, Acc)] |
| end, [], Stats), |
| {Ret} |
| end. |
| |
| get(Key) -> |
| ?MODULE:get(Key, 0). |
| get(Key, Time) when is_binary(Time) -> |
| ?MODULE:get(Key, list_to_integer(binary_to_list(Time))); |
| get(Key, Time) when is_atom(Time) -> |
| ?MODULE:get(Key, list_to_integer(atom_to_list(Time))); |
| get(Key, Time) when is_integer(Time) -> |
| case ets:lookup(?MODULE, {make_key(Key), Time}) of |
| [] -> #aggregate{seconds=Time}; |
| [{_, Agg}] -> Agg |
| end. |
| |
| get_json(Key) -> |
| get_json(Key, 0). |
| get_json(Key, Time) -> |
| to_json_term(?MODULE:get(Key, Time)). |
| |
| collect_sample() -> |
| gen_server:call(?MODULE, collect_sample, infinity). |
| |
| |
| init(StatDescsFileName) -> |
| % Create an aggregate entry for each {description, rate} pair. |
| ets:new(?MODULE, [named_table, set, protected]), |
| SampleStr = couch_config:get("stats", "samples", "[0]"), |
| {ok, Samples} = couch_util:parse_term(SampleStr), |
| {ok, Descs} = file:consult(StatDescsFileName), |
| lists:foreach(fun({Sect, Key, Value}) -> |
| lists:foreach(fun(Secs) -> |
| Agg = #aggregate{ |
| description=list_to_binary(Value), |
| seconds=Secs |
| }, |
| ets:insert(?MODULE, {{{Sect, Key}, Secs}, Agg}) |
| end, Samples) |
| end, Descs), |
| |
| Self = self(), |
| ok = couch_config:register( |
| fun("stats", _) -> exit(Self, config_change) end |
| ), |
| |
| Rate = list_to_integer(couch_config:get("stats", "rate", "1000")), |
| % TODO: Add timer_start to kernel start options. |
| {ok, TRef} = timer:apply_after(Rate, ?MODULE, collect_sample, []), |
| {ok, {TRef, Rate}}. |
| |
| terminate(_Reason, {TRef, _Rate}) -> |
| timer:cancel(TRef), |
| ok. |
| |
| handle_call(collect_sample, _, {OldTRef, SampleInterval}) -> |
| timer:cancel(OldTRef), |
| {ok, TRef} = timer:apply_after(SampleInterval, ?MODULE, collect_sample, []), |
| % Gather new stats values to add. |
| Incs = lists:map(fun({Key, Value}) -> |
| {Key, {incremental, Value}} |
| end, couch_stats_collector:all(incremental)), |
| Abs = lists:map(fun({Key, Values}) -> |
| couch_stats_collector:clear(Key), |
| Values2 = case Values of |
| X when is_list(X) -> X; |
| Else -> [Else] |
| end, |
| {_, Mean} = lists:foldl(fun(Val, {Count, Curr}) -> |
| {Count+1, Curr + (Val - Curr) / (Count+1)} |
| end, {0, 0}, Values2), |
| {Key, {absolute, Mean}} |
| end, couch_stats_collector:all(absolute)), |
| |
| Values = Incs ++ Abs, |
| Now = erlang:now(), |
| lists:foreach(fun({{Key, Rate}, Agg}) -> |
| NewAgg = case proplists:lookup(Key, Values) of |
| none -> |
| rem_values(Now, Agg); |
| {Key, {Type, Value}} -> |
| NewValue = new_value(Type, Value, Agg#aggregate.current), |
| Agg2 = add_value(Now, NewValue, Agg), |
| rem_values(Now, Agg2) |
| end, |
| ets:insert(?MODULE, {{Key, Rate}, NewAgg}) |
| end, ets:tab2list(?MODULE)), |
| {reply, ok, {TRef, SampleInterval}}. |
| |
| handle_cast(stop, State) -> |
| {stop, normal, State}. |
| |
| handle_info(_Info, State) -> |
| {noreply, State}. |
| |
| code_change(_OldVersion, State, _Extra) -> |
| {ok, State}. |
| |
| |
| new_value(incremental, Value, null) -> |
| Value; |
| new_value(incremental, Value, Current) -> |
| Value - Current; |
| new_value(absolute, Value, _Current) -> |
| Value. |
| |
| add_value(Time, Value, #aggregate{count=Count, seconds=Secs}=Agg) when Count < 1 -> |
| Samples = case Secs of |
| 0 -> []; |
| _ -> [{Time, Value}] |
| end, |
| Agg#aggregate{ |
| count=1, |
| current=Value, |
| sum=Value, |
| mean=Value, |
| variance=0.0, |
| stddev=null, |
| min=Value, |
| max=Value, |
| samples=Samples |
| }; |
| add_value(Time, Value, Agg) -> |
| #aggregate{ |
| count=Count, |
| current=Current, |
| sum=Sum, |
| mean=Mean, |
| variance=Variance, |
| samples=Samples |
| } = Agg, |
| |
| NewCount = Count + 1, |
| NewMean = Mean + (Value - Mean) / NewCount, |
| NewVariance = Variance + (Value - Mean) * (Value - NewMean), |
| StdDev = case NewCount > 1 of |
| false -> null; |
| _ -> math:sqrt(NewVariance / (NewCount - 1)) |
| end, |
| Agg2 = Agg#aggregate{ |
| count=NewCount, |
| current=Current + Value, |
| sum=Sum + Value, |
| mean=NewMean, |
| variance=NewVariance, |
| stddev=StdDev, |
| min=lists:min([Agg#aggregate.min, Value]), |
| max=lists:max([Agg#aggregate.max, Value]) |
| }, |
| case Agg2#aggregate.seconds of |
| 0 -> Agg2; |
| _ -> Agg2#aggregate{samples=[{Time, Value} | Samples]} |
| end. |
| |
| rem_values(Time, Agg) -> |
| Seconds = Agg#aggregate.seconds, |
| Samples = Agg#aggregate.samples, |
| Pred = fun({When, _Value}) -> |
| timer:now_diff(Time, When) =< (Seconds * 1000000) |
| end, |
| {Keep, Remove} = lists:splitwith(Pred, Samples), |
| Agg2 = lists:foldl(fun({_, Value}, Acc) -> |
| rem_value(Value, Acc) |
| end, Agg, Remove), |
| Agg2#aggregate{samples=Keep}. |
| |
| rem_value(_Value, #aggregate{count=Count, seconds=Secs}) when Count =< 1 -> |
| #aggregate{seconds=Secs}; |
| rem_value(Value, Agg) -> |
| #aggregate{ |
| count=Count, |
| sum=Sum, |
| mean=Mean, |
| variance=Variance |
| } = Agg, |
| |
| OldMean = (Mean * Count - Value) / (Count - 1), |
| OldVariance = Variance - (Value - OldMean) * (Value - Mean), |
| OldCount = Count - 1, |
| StdDev = case OldCount > 1 of |
| false -> null; |
| _ -> math:sqrt(clamp_value(OldVariance / (OldCount - 1))) |
| end, |
| Agg#aggregate{ |
| count=OldCount, |
| sum=Sum-Value, |
| mean=clamp_value(OldMean), |
| variance=clamp_value(OldVariance), |
| stddev=StdDev |
| }. |
| |
| to_json_term(Agg) -> |
| {Min, Max} = case Agg#aggregate.seconds > 0 of |
| false -> |
| {Agg#aggregate.min, Agg#aggregate.max}; |
| _ -> |
| case length(Agg#aggregate.samples) > 0 of |
| true -> |
| Extract = fun({_Time, Value}) -> Value end, |
| Samples = lists:map(Extract, Agg#aggregate.samples), |
| {lists:min(Samples), lists:max(Samples)}; |
| _ -> |
| {null, null} |
| end |
| end, |
| {[ |
| {description, Agg#aggregate.description}, |
| {current, round_value(Agg#aggregate.sum)}, |
| {sum, round_value(Agg#aggregate.sum)}, |
| {mean, round_value(Agg#aggregate.mean)}, |
| {stddev, round_value(Agg#aggregate.stddev)}, |
| {min, Min}, |
| {max, Max} |
| ]}. |
| |
| make_key({Mod, Val}) when is_integer(Val) -> |
| {Mod, list_to_atom(integer_to_list(Val))}; |
| make_key(Key) -> |
| Key. |
| |
| round_value(Val) when not is_number(Val) -> |
| Val; |
| round_value(Val) when Val == 0 -> |
| Val; |
| round_value(Val) -> |
| erlang:round(Val * 1000.0) / 1000.0. |
| |
| clamp_value(Val) when Val > 0.00000000000001 -> |
| Val; |
| clamp_value(_) -> |
| 0.0. |