blob: 13812fde63026428c58fc2a6f697f4faa64157ee [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_stats_aggregator).
-include("couch_stats.hrl").
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([start/0, stop/0,
get/1, get/2, get_json/1, get_json/2, all/0,
time_passed/0, clear_aggregates/1]).
-record(state, {
aggregates = [],
descriptions = [],
timer_refs = []
}).
-define(COLLECTOR, couch_stats_collector).
-define(QUEUE_MAX_LENGTH, 900). % maximimum number of seconds
% PUBLIC API
start() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:call(?MODULE, stop).
get(Key) ->
gen_server:call(?MODULE, {get, Key}).
get(Key, Time) ->
gen_server:call(?MODULE, {get, Key, Time}).
get_json(Key) ->
gen_server:call(?MODULE, {get_json, Key}).
get_json(Key, Time) ->
gen_server:call(?MODULE, {get_json, Key, Time}).
time_passed() ->
gen_server:call(?MODULE, time_passed, infinity).
clear_aggregates(Time) ->
gen_server:call(?MODULE, {clear_aggregates, Time}, infinity).
all() ->
gen_server:call(?MODULE, all).
% GEN_SERVER
init(_) ->
ets:new(?MODULE, [named_table, set, protected]),
TimerRefs = init_timers(),
init_descriptions(),
{ok, #state{timer_refs = TimerRefs}}.
handle_call({get, Key}, _, State) ->
Value = get_aggregate(Key, State),
{reply, Value, State};
handle_call({get, Key, Time}, _, State) ->
Value = get_aggregate(Key, State, Time),
{reply, Value, State};
handle_call({get_json, Key}, _, State) ->
Value = aggregate_to_json_term(get_aggregate(Key, State)),
{reply, Value, State};
handle_call({get_json, Key, Time}, _, State) ->
Value = aggregate_to_json_term(get_aggregate(Key, State, Time)),
{reply, Value, State};
handle_call(time_passed, _, OldState) ->
NewTimerRefs = update_timer(time_passed, OldState#state.timer_refs),
% the foldls below could probably be refactored into a less code-duping form
% update aggregates on incremental counters
NextState = lists:foldl(fun(Counter, State) ->
{Key, Value} = Counter,
update_aggregates_loop(Key, Value, State, incremental)
end, OldState, ?COLLECTOR:all(incremental)),
% update aggregates on absolute value counters
NewState = lists:foldl(fun(Counter, State) ->
{Key, Value} = Counter,
% clear the counter, we've got the important bits in State
?COLLECTOR:clear(Key),
update_aggregates_loop(Key, Value, State, absolute)
end, NextState, ?COLLECTOR:all(absolute)),
{reply, ok, NewState#state{timer_refs = NewTimerRefs}};
handle_call({clear_aggregates, Time}, _, State) ->
{reply, ok, do_clear_aggregates(Time, State)};
handle_call(all, _ , State) ->
Results = do_get_all(State),
{reply, Results, State};
handle_call(stop, _, State) ->
{stop, normal, stopped, State}.
% PRIVATE API
% Stats = [{Key, TimesProplist}]
% TimesProplist = [{Time, Aggrgates}]
% Aggregates = #aggregates{}
%
% [
% {Key, [
% {TimeA, #aggregates{}},
% {TimeB, #aggregates{}},
% {TimeC, #aggregates{}},
% {TimeD, #aggregates{}}
% ]
% },
%
% ]
%% clear the aggregats record for a specific Time = 60 | 300 | 900
do_clear_aggregates(Time, #state{aggregates=Stats, timer_refs=Refs}) ->
NewTimerRefs = update_timer(Time, Refs),
NewStats = lists:map(fun({Key, TimesProplist}) ->
{Key, case proplists:lookup(Time, TimesProplist) of
% do have stats for this key, if we don't, return Stat unmodified
none ->
TimesProplist;
% there are stats, let's unset the Time one
{_Time, _Stat} ->
[{Time, #aggregates{}} | proplists:delete(Time, TimesProplist)]
end}
end, Stats),
#state{aggregates=NewStats, timer_refs=NewTimerRefs}.
get_aggregate(Key, State) ->
%% default Time is 0, which is when CouchDB started
get_aggregate(Key, State, '0').
get_aggregate(Key, #state{aggregates=StatsList}, Time) ->
Description = get_description(Key),
Aggregates = case proplists:lookup(Key, StatsList) of
% if we don't have any data here, return an empty record
none -> #aggregates{description=Description};
{Key, Stats} ->
case proplists:lookup(Time, Stats) of
none -> #aggregates{description=Description}; % empty record again
{Time, Stat} -> Stat#aggregates{description=Description}
end
end,
Aggregates.
get_description(Key) ->
case ets:lookup(?MODULE, Key) of
[] -> <<"No description yet.">>;
[{_Key, Description}] -> Description
end.
%% updates all aggregates for Key
update_aggregates_loop(Key, Values, State, CounterType) ->
#state{aggregates=AllStats} = State,
% if we don't have any aggregates yet, put a list of empty atoms in
% so we can loop over them in update_aggregates().
% [{{httpd,requests},
% [{'0',{aggregates,1,1,1,0,0,1,1}},
% {'60',{aggregates,1,1,1,0,0,1,1}},
% {'300',{aggregates,1,1,1,0,0,1,1}},
% {'900',{aggregates,1,1,1,0,0,1,1}}]}]
[{_Key, StatsList}] = case proplists:lookup(Key, AllStats) of
none -> [{Key, [
{'0', empty},
{'60', empty},
{'300', empty},
{'900', empty}
]}];
AllStatsMatch ->
[AllStatsMatch]
end,
% if we get called with a single value, wrap in in a list
ValuesList = case is_list(Values) of
false -> [Values];
_True -> Values
end,
% loop over all Time's
NewStats = lists:map(fun({Time, Stats}) ->
% loop over all values for Key
lists:foldl(fun(Value, Stat) ->
{Time, update_aggregates(Value, Stat, CounterType)}
end, Stats, ValuesList)
end, StatsList),
% put the newly calculated aggregates into State and delete the previous
% entry
#state{
aggregates=[{Key, NewStats} | proplists:delete(Key, AllStats)]
}.
% does the actual updating of the aggregate record
update_aggregates(Value, Stat, CounterType) ->
case Stat of
% the first time this is called, we don't have to calculate anything
% we just populate the record with Value
empty -> #aggregates{
min=Value,
max=Value,
mean=Value,
variance=0,
stddev=0,
count=1,
current=Value
};
% this sure could look nicer -- any ideas?
StatsRecord ->
#aggregates{
min=Min,
max=Max,
mean=Mean,
variance=Variance,
count=Count,
current=Current
} = StatsRecord,
% incremental counters need to keep track of the last update's value
NewValue = case CounterType of
incremental -> Value - Current;
absolute -> Value
end,
% Knuth, The Art of Computer Programming, vol. 2, p. 232.
NewCount = Count + 1,
NewMean = Mean + (NewValue - Mean) / NewCount, % NewCount is never 0.
NewVariance = Variance + (NewValue - Mean) * (NewValue - NewMean),
#aggregates{
min=lists:min([NewValue, Min]),
max=lists:max([NewValue, Max]),
mean=NewMean,
variance=NewVariance,
stddev=math:sqrt(NewVariance / NewCount),
count=NewCount,
current=Value
}
end.
aggregate_to_json_term(#aggregates{min=Min,max=Max,mean=Mean,stddev=Stddev,count=Count,current=Current,description=Description}) ->
{[
{current, Current},
{count, Count},
{mean, Mean},
{min, Min},
{max, Max},
{stddev, Stddev},
{description, Description}
]}.
get_stats(Key, State) ->
aggregate_to_json_term(get_aggregate(Key, State)).
% convert ets2list() list into JSON-erlang-terms.
% Thanks to Paul Davis
do_get_all(#state{aggregates=Stats}=State) ->
case Stats of
[] -> {[]};
_ ->
[{LastMod, LastVals} | LastRestMods] = lists:foldl(fun({{Module, Key}, _Count}, AccIn) ->
case AccIn of
[] ->
[{Module, [{Key, get_stats({Module, Key}, State)}]}];
[{Module, PrevVals} | RestMods] ->
[{Module, [{Key, get_stats({Module, Key}, State)} | PrevVals]} | RestMods];
[{OtherMod, ModVals} | RestMods] ->
[{Module, [{Key, get_stats({Module, Key}, State)}]}, {OtherMod, {lists:reverse(ModVals)}} | RestMods]
end
end, [], lists:sort(Stats)),
{[{LastMod, {lists:sort(LastVals)}} | LastRestMods]}
end.
init_descriptions() ->
% ets is probably overkill here, but I didn't manage to keep the
% descriptions in the gen_server state. Which means there is probably
% a bug in one of the handle_call() functions most likely the one that
% handles the time_passed message. But don't tell anyone, the math is
% correct :) -- Jan
% Style guide for descriptions: Start with a lowercase letter & do not add
% a trailing full-stop / period.
% please keep this in alphabetical order
ets:insert(?MODULE, {{couchdb, database_writes}, <<"number of times a database was changed">>}),
ets:insert(?MODULE, {{couchdb, database_reads}, <<"number of times a document was read from a database">>}),
ets:insert(?MODULE, {{couchdb, open_databases}, <<"number of open databases">>}),
ets:insert(?MODULE, {{couchdb, open_os_files}, <<"number of file descriptors CouchDB has open">>}),
ets:insert(?MODULE, {{couchdb, request_time}, <<"length of a request inside CouchDB without MochiWeb">>}),
ets:insert(?MODULE, {{httpd, bulk_requests}, <<"number of bulk requests">>}),
ets:insert(?MODULE, {{httpd, requests}, <<"number of HTTP requests">>}),
ets:insert(?MODULE, {{httpd, temporary_view_reads}, <<"number of temporary view reads">>}),
ets:insert(?MODULE, {{httpd, view_reads}, <<"number of view reads">>}),
ets:insert(?MODULE, {{httpd, clients_requesting_changes}, <<"Number of clients currently requesting continuous _changes">>}),
ets:insert(?MODULE, {{httpd_request_methods, 'COPY'}, <<"number of HTTP COPY requests">>}),
ets:insert(?MODULE, {{httpd_request_methods, 'DELETE'}, <<"number of HTTP DELETE requests">>}),
ets:insert(?MODULE, {{httpd_request_methods, 'GET'}, <<"number of HTTP GET requests">>}),
ets:insert(?MODULE, {{httpd_request_methods, 'HEAD'}, <<"number of HTTP HEAD requests">>}),
ets:insert(?MODULE, {{httpd_request_methods, 'MOVE'}, <<"number of HTTP MOVE requests">>}),
ets:insert(?MODULE, {{httpd_request_methods, 'POST'}, <<"number of HTTP POST requests">>}),
ets:insert(?MODULE, {{httpd_request_methods, 'PUT'}, <<"number of HTTP PUT requests">>}),
ets:insert(?MODULE, {{httpd_status_codes, '200'}, <<"number of HTTP 200 OK responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '201'}, <<"number of HTTP 201 Created responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '202'}, <<"number of HTTP 202 Accepted responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '301'}, <<"number of HTTP 301 Moved Permanently responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '304'}, <<"number of HTTP 304 Not Modified responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '400'}, <<"number of HTTP 400 Bad Request responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '401'}, <<"number of HTTP 401 Unauthorized responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '403'}, <<"number of HTTP 403 Forbidden responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '404'}, <<"number of HTTP 404 Not Found responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '405'}, <<"number of HTTP 405 Method Not Allowed responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '409'}, <<"number of HTTP 409 Conflict responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '412'}, <<"number of HTTP 412 Precondition Failed responses">>}),
ets:insert(?MODULE, {{httpd_status_codes, '500'}, <<"number of HTTP 500 Internal Server Error responses">>}).
% please keep this in alphabetical order
% Timer
init_timers() ->
% OTP docs on timer: http://erlang.org/doc/man/timer.html
% start() -> ok
% Starts the timer server. Normally, the server does not need to be
% started explicitly. It is started dynamically if it is needed. This is
% useful during development, but in a target system the server should be
% started explicitly. Use configuration parameters for kernel for this.
%
% TODO: Add timer_start to kernel start options.
% start timers every second, minute, five minutes and fifteen minutes
% in the rare event of a timer death, couch_stats_aggregator will die,
% too and restarted by the supervision tree, all stats (for the last
% fifteen minutes) are gone.
{ok, A} = timer:apply_after(1000, ?MODULE, time_passed, []),
{ok, B} = timer:apply_after(60000, ?MODULE, clear_aggregates, ['60']),
{ok, C} = timer:apply_after(300000, ?MODULE, clear_aggregates, ['300']),
{ok, D} = timer:apply_after(900000, ?MODULE, clear_aggregates, ['900']),
[{time_passed,A}, {'60',B}, {'300',C}, {'900',D}].
update_timer(Type, Refs) ->
timer:cancel(proplists:get_value(Type, Refs)),
{ok,NewRef} =
if Type == time_passed ->
timer:apply_after(1000, ?MODULE, time_passed, []);
true ->
IntType = list_to_integer(atom_to_list(Type)),
timer:apply_after(1000*IntType, ?MODULE, clear_aggregates, [Type])
end,
[{Type,NewRef} | proplists:delete(Type, Refs)].
% Unused gen_server behaviour API functions that we need to declare.
%% @doc Unused
handle_cast(foo, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
%% @doc Unused
terminate(_Reason, _State) -> ok.
%% @doc Unused
code_change(_OldVersion, State, _Extra) -> {ok, State}.
%% Tests
-ifdef(TEST).
% Internal API unit tests go here
-endif.