blob: 8b119bd2ef2b0f2e3ece0d14c8341ec4109578ef [file] [log] [blame]
%% Copyright (c) 2012, Magnus Klaar <klaar@ninenines.eu>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%% @doc Event filter implementation.
%%
%% An event query is constructed using the built in operators exported from
%% this module. The filtering operators are used to specify which events
%% should be included in the output of the query. The default output action
%% is to copy all events matching the input filters associated with a query
%% to the output. This makes it possible to construct and compose multiple
%% queries at runtime.
%%
%% === Examples of built in filters ===
%% ```
%% %% Select all events where 'a' exists and is greater than 0.
%% glc:gt(a, 0).
%% %% Select all events where 'a' exists and is equal to 0.
%% glc:eq(a, 0).
%% %% Select all events where 'a' exists and is less than 0.
%% glc:lt(a, 0).
%% %% Select all events where 'a' exists and is anything.
%% glc:wc(a).
%%
%% %% Select no input events. Used as black hole query.
%% glc:null(false).
%% %% Select all input events. Used as passthrough query.
%% glc:null(true).
%% '''
%%
%% === Examples of combining filters ===
%% ```
%% %% Select all events where both 'a' and 'b' exists and are greater than 0.
%% glc:all([glc:gt(a, 0), glc:gt(b, 0)]).
%% %% Select all events where 'a' or 'b' exists and are greater than 0.
%% glc:any([glc:gt(a, 0), glc:gt(b, 0)]).
%% '''
%%
%% === Handling output events ===
%%
%% Once a query has been composed it is possible to override the output action
%% with an erlang function. The function will be applied to each output event
%% from the query. The return value from the function will be ignored.
%%
%% ```
%% %% Write all input events as info reports to the error logger.
%% glc:with(glc:null(true), fun(E) ->
%% error_logger:info_report(gre:pairs(E)) end).
%% '''
%%
-module(glc).
-export([
compile/2,
compile/3,
handle/2,
delete/1,
reset_counters/1,
reset_counters/2
]).
-export([
lt/2,
eq/2,
gt/2,
wc/1,
nf/1
]).
-export([
all/1,
any/1,
null/1,
with/2
]).
-export([
input/1,
output/1,
filter/1,
union/1
]).
-record(module, {
'query' :: term(),
tables :: [{atom(), atom()}],
qtree :: term()
}).
-spec lt(atom(), term()) -> glc_ops:op().
lt(Key, Term) ->
glc_ops:lt(Key, Term).
-spec eq(atom(), term()) -> glc_ops:op().
eq(Key, Term) ->
glc_ops:eq(Key, Term).
-spec gt(atom(), term()) -> glc_ops:op().
gt(Key, Term) ->
glc_ops:gt(Key, Term).
-spec wc(atom()) -> glc_ops:op().
wc(Key) ->
glc_ops:wc(Key).
-spec nf(atom()) -> glc_ops:op().
nf(Key) ->
glc_ops:nf(Key).
%% @doc Filter the input using multiple filters.
%%
%% For an input to be considered valid output the all filters specified
%% in the list must hold for the input event. The list is expected to
%% be a non-empty list. If the list of filters is an empty list a `badarg'
%% error will be thrown.
-spec all([glc_ops:op()]) -> glc_ops:op().
all(Filters) ->
glc_ops:all(Filters).
%% @doc Filter the input using one of multiple filters.
%%
%% For an input to be considered valid output on of the filters specified
%% in the list must hold for the input event. The list is expected to be
%% a non-empty list. If the list of filters is an empty list a `badarg'
%% error will be thrown.
-spec any([glc_ops:op()]) -> glc_ops:op().
any(Filters) ->
glc_ops:any(Filters).
%% @doc Always return `true' or `false'.
-spec null(boolean()) -> glc_ops:op().
null(Result) ->
glc_ops:null(Result).
%% @doc Apply a function to each output of a query.
%%
%% Updating the output action of a query finalizes it. Attempting
%% to use a finalized query to construct a new query will result
%% in a `badarg' error.
-spec with(glc_ops:op(), fun((gre:event()) -> term())) -> glc_ops:op().
with(Query, Action) ->
glc_ops:with(Query, Action).
%% @doc Return a union of multiple queries.
%%
%% The union of multiple queries is the equivalent of executing multiple
%% queries separately on the same input event. The advantage is that filter
%% conditions that are common to all or some of the queries only need to
%% be tested once.
%%
%% All queries are expected to be valid and have an output action other
%% than the default which is `output'. If these expectations don't hold
%% a `badarg' error will be thrown.
-spec union([glc_ops:op()]) -> glc_ops:op().
union(Queries) ->
glc_ops:union(Queries).
%% @doc Compile a query to a module.
%%
%% On success the module representing the query is returned. The module and
%% data associated with the query must be released using the {@link delete/1}
%% function. The name of the query module is expected to be unique.
%% The counters are reset by default, unless Reset is set to false
-spec compile(atom(), glc_ops:op() | [glc_ops:op()]) -> {ok, atom()}.
compile(Module, Query) ->
compile(Module, Query, true).
-spec compile(atom(), glc_ops:op() | [glc_ops:op()], boolean()) -> {ok, atom()}.
compile(Module, Query, Reset) ->
{ok, ModuleData} = module_data(Module, Query),
case glc_code:compile(Module, ModuleData) of
{ok, Module} when Reset ->
reset_counters(Module),
{ok, Module};
{ok, Module} ->
{ok, Module}
end.
%% @doc Handle an event using a compiled query.
%%
%% The input event is expected to have been returned from {@link gre:make/2}.
-spec handle(atom(), gre:event()) -> ok.
handle(Module, Event) ->
Module:handle(Event).
%% @doc The number of input events for this query module.
-spec input(atom()) -> non_neg_integer().
input(Module) ->
Module:info(input).
%% @doc The number of output events for this query module.
-spec output(atom()) -> non_neg_integer().
output(Module) ->
Module:info(output).
%% @doc The number of filtered events for this query module.
-spec filter(atom()) -> non_neg_integer().
filter(Module) ->
Module:info(filter).
%% @doc Release a compiled query.
%%
%% This releases all resources allocated by a compiled query. The query name
%% is expected to be associated with an existing query module. Calling this
%% function will shutdown all relevant processes and purge/delete the module.
-spec delete(atom()) -> ok.
delete(Module) ->
Params = params_name(Module),
Counts = counts_name(Module),
ManageParams = manage_params_name(Module),
ManageCounts = manage_counts_name(Module),
_ = [ begin
ok = supervisor:terminate_child(Sup, Name),
ok = supervisor:delete_child(Sup, Name)
end || {Sup, Name} <-
[{gr_manager_sup, ManageParams}, {gr_manager_sup, ManageCounts},
{gr_param_sup, Params}, {gr_counter_sup, Counts}]
],
code:soft_purge(Module),
code:delete(Module),
ok.
%% @doc Reset all counters
%%
%% This resets all the counters associated with a module
-spec reset_counters(atom()) -> ok.
reset_counters(Module) ->
Module:reset_counters(all).
%% @doc Reset a specific counter
%%
%% This resets a specific counter associated with a module
-spec reset_counters(atom(), atom()) -> ok.
reset_counters(Module, Counter) ->
Module:reset_counters(Counter).
%% @private Map a query to a module data term.
-spec module_data(atom(), term()) -> {ok, #module{}}.
module_data(Module, Query) ->
%% terms in the query which are not valid arguments to the
%% erl_syntax:abstract/1 functions are stored in ETS.
%% the terms are only looked up once they are necessary to
%% continue evaluation of the query.
%% query counters are stored in a shared ETS table. this should
%% be an optional feature. enabled by defaults to simplify tests.
%% the abstract_tables/1 function expects a list of name-atom pairs.
%% tables are referred to by name in the generated code. the table/1
%% function maps names to registered processes response for those tables.
Tables = module_tables(Module),
Query2 = glc_lib:reduce(Query),
{ok, #module{'query'=Query, tables=Tables, qtree=Query2}}.
%% @private Create a data managed supervised process for params, counter tables
module_tables(Module) ->
Params = params_name(Module),
Counts = counts_name(Module),
ManageParams = manage_params_name(Module),
ManageCounts = manage_counts_name(Module),
Counters = [{input,0}, {filter,0}, {output,0}],
_ = supervisor:start_child(gr_param_sup,
{Params, {gr_param, start_link, [Params]},
transient, brutal_kill, worker, [Params]}),
_ = supervisor:start_child(gr_counter_sup,
{Counts, {gr_counter, start_link, [Counts]},
transient, brutal_kill, worker, [Counts]}),
_ = supervisor:start_child(gr_manager_sup,
{ManageParams, {gr_manager, start_link, [ManageParams, Params, []]},
transient, brutal_kill, worker, [ManageParams]}),
_ = supervisor:start_child(gr_manager_sup, {ManageCounts,
{gr_manager, start_link, [ManageCounts, Counts, Counters]},
transient, brutal_kill, worker, [ManageCounts]}),
[{params,Params}, {counters, Counts}].
reg_name(Module, Name) ->
list_to_atom("gr_" ++ atom_to_list(Module) ++ Name).
params_name(Module) -> reg_name(Module, "_params").
counts_name(Module) -> reg_name(Module, "_counters").
manage_params_name(Module) -> reg_name(Module, "_params_mgr").
manage_counts_name(Module) -> reg_name(Module, "_counters_mgr").
%% @todo Move comment.
%% @private Map a query to a simplified query tree term.
%%
%% The simplified query tree is used to combine multiple queries into one
%% query module. The goal of this is to reduce the filtering and dispatch
%% overhead when multiple concurrent queries are executed.
%%
%% A fixed selection condition may be used to specify a property that an event
%% must have in order to be considered part of the input stream for a query.
%%
%% For the sake of simplicity it is only possible to define selection
%% conditions using the fields present in the context and identifiers
%% of an event. The fields in the context are bound to the reserved
%% names:
%%
%% - '$n': node name
%% - '$a': application name
%% - '$p': process identifier
%% - '$t': timestamp
%%
%%
%% If an event must be selected based on the runtime state of an event handler
%% this must be done in the body of the handler.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
setup_query(Module, Query) ->
?assertNot(erlang:module_loaded(Module)),
?assertEqual({ok, Module}, case (catch compile(Module, Query)) of
{'EXIT',_}=Error -> ?debugFmt("~p", [Error]), Error; Else -> Else end),
?assert(erlang:function_exported(Module, table, 1)),
?assert(erlang:function_exported(Module, handle, 1)),
{compiled, Module}.
events_test_() ->
{foreach,
fun() ->
error_logger:tty(false),
application:start(syntax_tools),
application:start(compiler),
application:start(goldrush)
end,
fun(_) ->
application:stop(goldrush),
application:stop(compiler),
application:stop(syntax_tools),
error_logger:tty(true)
end,
[
{"null query compiles",
fun() ->
{compiled, Mod} = setup_query(testmod1, glc:null(false)),
?assertError(badarg, Mod:table(noexists))
end
},
{"params table exists",
fun() ->
{compiled, Mod} = setup_query(testmod2, glc:null(false)),
?assert(is_atom(Mod:table(params))),
?assertMatch([_|_], gr_param:info(Mod:table(params)))
end
},
{"null query exists",
fun() ->
{compiled, Mod} = setup_query(testmod3, glc:null(false)),
?assert(erlang:function_exported(Mod, info, 1)),
?assertError(badarg, Mod:info(invalid)),
?assertEqual({null, false}, Mod:info('query'))
end
},
{"init counters test",
fun() ->
{compiled, Mod} = setup_query(testmod4, glc:null(false)),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output))
end
},
{"filtered events test",
fun() ->
%% If no selection condition is specified no inputs can match.
{compiled, Mod} = setup_query(testmod5, glc:null(false)),
glc:handle(Mod, gre:make([], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(0, Mod:info(output))
end
},
{"nomatch event test",
fun() ->
%% If a selection condition but no body is specified the event
%% is expected to count as filtered out if the condition does
%% not hold.
{compiled, Mod} = setup_query(testmod6, glc:eq('$n', 'noexists@nohost')),
glc:handle(Mod, gre:make([{'$n', 'noexists2@nohost'}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(0, Mod:info(output))
end
},
{"opfilter equal test",
fun() ->
%% If a selection condition but no body is specified the event
%% counts as input to the query, but not as filtered out.
{compiled, Mod} = setup_query(testmod7, glc:eq('$n', 'noexists@nohost')),
glc:handle(Mod, gre:make([{'$n', 'noexists@nohost'}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(1, Mod:info(output))
end
},
{"opfilter wildcard test",
fun() ->
{compiled, Mod} = setup_query(testmod8, glc:wc(a)),
glc:handle(Mod, gre:make([{b, 2}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
glc:handle(Mod, gre:make([{a, 2}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(1, Mod:info(output))
end
},
{"opfilter notfound test",
fun() ->
{compiled, Mod} = setup_query(testmod9, glc:nf(a)),
glc:handle(Mod, gre:make([{a, 2}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
glc:handle(Mod, gre:make([{b, 2}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(1, Mod:info(output))
end
},
{"opfilter greater than test",
fun() ->
{compiled, Mod} = setup_query(testmod10, glc:gt(a, 1)),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
glc:handle(Mod, gre:make([{'a', 0}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(1, Mod:info(output))
end
},
{"opfilter less than test",
fun() ->
{compiled, Mod} = setup_query(testmod11, glc:lt(a, 1)),
glc:handle(Mod, gre:make([{'a', 0}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(1, Mod:info(output)),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(1, Mod:info(output))
end
},
{"allholds op test",
fun() ->
{compiled, Mod} = setup_query(testmod12,
glc:all([glc:eq(a, 1), glc:eq(b, 2)])),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
glc:handle(Mod, gre:make([{'b', 2}], [list])),
?assertEqual(4, Mod:info(input)),
?assertEqual(4, Mod:info(filter)),
glc:handle(Mod, gre:make([{'a', 1},{'b', 2}], [list])),
?assertEqual(5, Mod:info(input)),
?assertEqual(4, Mod:info(filter)),
?assertEqual(1, Mod:info(output))
end
},
{"anyholds op test",
fun() ->
{compiled, Mod} = setup_query(testmod13,
glc:any([glc:eq(a, 1), glc:eq(b, 2)])),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
glc:handle(Mod, gre:make([{'b', 2}], [list])),
?assertEqual(4, Mod:info(input)),
?assertEqual(2, Mod:info(filter))
end
},
{"with function test",
fun() ->
Self = self(),
{compiled, Mod} = setup_query(testmod14,
glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)),
glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end)
end
},
{"delete test",
fun() ->
{compiled, Mod} = setup_query(testmod15, glc:null(false)),
?assert(is_atom(Mod:table(params))),
?assertMatch([_|_], gr_param:info(Mod:table(params))),
?assert(is_list(code:which(Mod))),
?assert(is_pid(whereis(params_name(Mod)))),
?assert(is_pid(whereis(counts_name(Mod)))),
?assert(is_pid(whereis(manage_params_name(Mod)))),
?assert(is_pid(whereis(manage_counts_name(Mod)))),
glc:delete(Mod),
?assertEqual(non_existing, code:which(Mod)),
?assertEqual(undefined, whereis(params_name(Mod))),
?assertEqual(undefined, whereis(counts_name(Mod))),
?assertEqual(undefined, whereis(manage_params_name(Mod))),
?assertEqual(undefined, whereis(manage_counts_name(Mod)))
end
},
{"reset counters test",
fun() ->
{compiled, Mod} = setup_query(testmod16,
glc:any([glc:eq(a, 1), glc:eq(b, 2)])),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
glc:handle(Mod, gre:make([{'b', 2}], [list])),
?assertEqual(4, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
?assertEqual(2, Mod:info(output)),
glc:reset_counters(Mod, input),
?assertEqual(0, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
?assertEqual(2, Mod:info(output)),
glc:reset_counters(Mod, filter),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(2, Mod:info(output)),
glc:reset_counters(Mod),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output))
end
},
{"ets data recovery test",
fun() ->
Self = self(),
{compiled, Mod} = setup_query(testmod17,
glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)),
glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end),
?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_param:list(Mod:table(counters)))),
true = exit(whereis(Mod:table(params)), kill),
true = exit(whereis(Mod:table(counters)), kill),
?assertEqual(1, Mod:info(input)),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(output)),
?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_counter:list(Mod:table(counters))))
end
}
]
}.
union_error_test() ->
?assertError(badarg, glc:union([glc:eq(a, 1)])),
done.
-endif.