Merge branch 'DeadZen-dz-event-stream-processing'
diff --git a/.gitignore b/.gitignore
index ff8fc4b..3bde15b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@
erl_crash.dump
.project
log
+deps
diff --git a/Makefile b/Makefile
index 80231c5..2994a71 100644
--- a/Makefile
+++ b/Makefile
@@ -2,11 +2,11 @@
all: deps compile
-compile:
+compile: deps
./rebar compile
deps:
- ./rebar get-deps
+ test -d deps || ./rebar get-deps
clean:
./rebar clean
diff --git a/include/lager.hrl b/include/lager.hrl
index 5a8b35e..ac84d98 100644
--- a/include/lager.hrl
+++ b/include/lager.hrl
@@ -16,6 +16,7 @@
-define(DEFAULT_TRUNCATION, 4096).
+-define(DEFAULT_TRACER, lager_default_tracer).
-define(LEVELS,
[debug, info, notice, warning, error, critical, alert, emergency, none]).
diff --git a/rebar.config b/rebar.config
index 97b3169..ab6b7a2 100644
--- a/rebar.config
+++ b/rebar.config
@@ -1,5 +1,9 @@
{erl_opts, [debug_info]}.
{erl_first_files, ["src/lager_util.erl"]}.
+{deps, [
+ {goldrush, ".*",
+ {git, "git://github.com/DeadZen/goldrush.git", {tag, "7ff9b03"}}}
+ ]}.
{cover_enabled, true}.
{edoc_opts, [{stylesheet_file, "./priv/edoc.css"}]}.
diff --git a/src/lager.app.src b/src/lager.app.src
index eed2a6c..76a9228 100644
--- a/src/lager.app.src
+++ b/src/lager.app.src
@@ -10,6 +10,7 @@
stdlib
]},
{registered, [lager_sup, lager_event, lager_crash_log, lager_handler_watcher_sup]},
+ {included_applications, [syntax_tools, goldrush]},
{mod, {lager_app, []}},
{env, [
%% What handlers to install with what arguments
diff --git a/src/lager.erl b/src/lager.erl
index eff2e9f..298a9a8 100644
--- a/src/lager.erl
+++ b/src/lager.erl
@@ -27,10 +27,11 @@
log/3, log/4,
md/0, md/1,
trace/2, trace/3, trace_file/2, trace_file/3, trace_console/1, trace_console/2,
- clear_all_traces/0, stop_trace/1, status/0,
+ clear_all_traces/0, stop_trace/1, status/0,
get_loglevel/1, set_loglevel/2, set_loglevel/3, get_loglevels/0,
update_loglevel_config/0, posix_error/1,
- safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9, do_log/9, pr/2]).
+ safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9,
+ do_log/9, pr/2]).
-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency.
-type log_level_number() :: 0..7.
@@ -167,6 +168,7 @@
Error
end.
+
trace_console(Filter) ->
trace_console(Filter, debug).
@@ -189,6 +191,7 @@
stop_trace({_Filter, _Level, Target} = Trace) ->
{Level, Traces} = lager_config:get(loglevel),
NewTraces = lists:delete(Trace, Traces),
+ lager_util:trace_filter([ element(1, T) || T <- NewTraces ]),
%MinLevel = minimum_loglevel(get_loglevels() ++ get_trace_levels(NewTraces)),
lager_config:set(loglevel, {Level, NewTraces}),
case get_loglevel(Target) of
@@ -207,6 +210,7 @@
clear_all_traces() ->
{Level, _Traces} = lager_config:get(loglevel),
+ lager_util:trace_filter(none),
lager_config:set(loglevel, {Level, []}),
lists:foreach(fun(Handler) ->
case get_loglevel(Handler) of
@@ -219,6 +223,10 @@
status() ->
Handlers = gen_event:which_handlers(lager_event),
+ TraceCount = case length(element(2, lager_config:get(loglevel))) of
+ 0 -> 1;
+ N -> N
+ end,
Status = ["Lager status:\n",
[begin
Level = get_loglevel(Handler),
@@ -244,9 +252,25 @@
end,
io_lib:format("Tracing messages matching ~p at level ~p to ~p\n",
[Filter, LevelName, Destination])
- end || {Filter, Level, Destination} <- element(2, lager_config:get(loglevel))]],
+ end || {Filter, Level, Destination} <- element(2, lager_config:get(loglevel))],
+ [
+ "Tracing Reductions:\n",
+ case ?DEFAULT_TRACER:info('query') of
+ {null, false} -> "";
+ Query -> io_lib:format("~p~n", [Query])
+ end
+ ],
+ [
+ "Tracing Statistics:\n ",
+ [ begin
+ [" ", atom_to_list(Table), ": ",
+ integer_to_list(?DEFAULT_TRACER:info(Table) div TraceCount),
+ "\n"]
+ end || Table <- [input, output, filter] ]
+ ]],
io:put_chars(Status).
+
%% @doc Set the loglevel for a particular backend.
set_loglevel(Handler, Level) when is_atom(Level) ->
Reply = gen_event:call(lager_event, Handler, {set_loglevel, Level}, infinity),
@@ -294,6 +318,8 @@
{MinLevel, Traces} = lager_config:get(loglevel),
case lists:member(Trace, Traces) of
false ->
+ NewTraces = [Trace|Traces],
+ lager_util:trace_filter([ element(1, T) || T <- NewTraces]),
lager_config:set(loglevel, {MinLevel, [Trace|Traces]});
_ ->
ok
diff --git a/src/lager_app.erl b/src/lager_app.erl
index 548e25c..99ecd4f 100644
--- a/src/lager_app.erl
+++ b/src/lager_app.erl
@@ -94,6 +94,8 @@
end
end,
+ lager_util:trace_filter(none),
+
{ok, Pid, SavedHandlers}.
diff --git a/src/lager_crash_log.erl b/src/lager_crash_log.erl
index 7c2f90e..eab2275 100644
--- a/src/lager_crash_log.erl
+++ b/src/lager_crash_log.erl
@@ -207,7 +207,7 @@
Time = [Date, " ", TS," =", ReportStr, "====\n"],
NodeSuffix = other_node_suffix(Pid),
Msg = io_lib:format("~s~s~s", [Time, MsgStr, NodeSuffix]),
- case file:write(NewFD, Msg) of
+ case file:write(NewFD, unicode:characters_to_binary(Msg)) of
{error, Reason} when Flap == false ->
?INT_LOG(error, "Failed to write log message to file ~s: ~s",
[Name, file:format_error(Reason)]),
diff --git a/src/lager_file_backend.erl b/src/lager_file_backend.erl
index 2ca9db1..a46f50f 100644
--- a/src/lager_file_backend.erl
+++ b/src/lager_file_backend.erl
@@ -210,7 +210,7 @@
do_write(#state{fd=FD, name=Name, flap=Flap} = State, Level, Msg) ->
%% delayed_write doesn't report errors
- _ = file:write(FD, Msg),
+ _ = file:write(FD, unicode:characters_to_binary(Msg)),
{mask, SyncLevel} = State#state.sync_on,
case (Level band SyncLevel) /= 0 of
true ->
@@ -476,6 +476,26 @@
?assertMatch([_, _, "[error]", Pid, "Test message\n"], re:split(Bin, " ", [{return, list}, {parts, 5}]))
end
},
+ {"don't choke on unicode",
+ fun() ->
+ gen_event:add_handler(lager_event, lager_file_backend, [{"test.log", info}, {lager_default_formatter}]),
+ lager:log(error, self(),"~ts", [[20013,25991,27979,35797]]),
+ {ok, Bin} = file:read_file("test.log"),
+ Pid = pid_to_list(self()),
+ ?assertMatch([_, _, "[error]", Pid, [228,184,173,230,150,135,230,181,139,232,175,149, $\n]], re:split(Bin, " ", [{return, list}, {parts, 5}]))
+ end
+ },
+ {"don't choke on latin-1",
+ fun() ->
+ %% XXX if this test fails, check that this file is encoded latin-1, not utf-8!
+ gen_event:add_handler(lager_event, lager_file_backend, [{"test.log", info}, {lager_default_formatter}]),
+ lager:log(error, self(),"~ts", ["LÆÝÎN-ï"]),
+ {ok, Bin} = file:read_file("test.log"),
+ Pid = pid_to_list(self()),
+ Res = re:split(Bin, " ", [{return, list}, {parts, 5}]),
+ ?assertMatch([_, _, "[error]", Pid, [76,195,134,195,157,195,142,78,45,195,175,$\n]], Res)
+ end
+ },
{"file can't be opened on startup triggers an error message",
fun() ->
{ok, FInfo} = file:read_file_info("test.log"),
diff --git a/src/lager_util.erl b/src/lager_util.erl
index 029da59..4cd5999 100644
--- a/src/lager_util.erl
+++ b/src/lager_util.erl
@@ -21,7 +21,8 @@
-export([levels/0, level_to_num/1, num_to_level/1, config_to_mask/1, config_to_levels/1, mask_to_levels/1,
open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1,
localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1,
- calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3]).
+ calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3,
+ trace_filter/1, trace_filter/2]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -350,23 +351,34 @@
NewNow = calendar:gregorian_seconds_to_datetime(Seconds),
calculate_next_rotation(T, NewNow).
-validate_trace({Filter, Level, {Destination, ID}}) when is_list(Filter), is_atom(Level), is_atom(Destination) ->
+
+trace_filter(Query) ->
+ trace_filter(?DEFAULT_TRACER, Query).
+
+%% TODO: Support multiple trace modules
+trace_filter(Module, Query) when Query == none; Query == [] ->
+ trace_filter(Module, glc:null(false));
+trace_filter(Module, Query) when is_list(Query) ->
+ trace_filter(Module, glc_lib:reduce(trace_any(Query)));
+trace_filter(Module, Query) ->
+ {ok, _} = glc:compile(Module, Query).
+
+validate_trace({Filter, Level, {Destination, ID}}) when is_tuple(Filter); is_list(Filter), is_atom(Level), is_atom(Destination) ->
case validate_trace({Filter, Level, Destination}) of
{ok, {F, L, D}} ->
{ok, {F, L, {D, ID}}};
Error ->
Error
end;
-validate_trace({Filter, Level, Destination}) when is_list(Filter), is_atom(Level), is_atom(Destination) ->
+validate_trace({Filter, Level, Destination}) when is_tuple(Filter); is_list(Filter), is_atom(Level), is_atom(Destination) ->
+ ValidFilter = validate_trace_filter(Filter),
try config_to_mask(Level) of
+ _ when not ValidFilter ->
+ {error, invalid_trace};
+ L when is_list(Filter) ->
+ {ok, {trace_all(Filter), L, Destination}};
L ->
- case lists:all(fun({Key, _Value}) when is_atom(Key) -> true; (_) ->
- false end, Filter) of
- true ->
- {ok, {Filter, L, Destination}};
- _ ->
- {error, invalid_filter}
- end
+ {ok, {Filter, L, Destination}}
catch
_:_ ->
{error, invalid_level}
@@ -374,6 +386,43 @@
validate_trace(_) ->
{error, invalid_trace}.
+validate_trace_filter(Filter) when is_tuple(Filter), is_atom(element(1, Filter)) =:= false ->
+ false;
+validate_trace_filter(Filter) ->
+ case lists:all(fun({Key, '*'}) when is_atom(Key) -> true;
+ ({Key, _Value}) when is_atom(Key) -> true;
+ ({Key, '=', _Value}) when is_atom(Key) -> true;
+ ({Key, '<', _Value}) when is_atom(Key) -> true;
+ ({Key, '>', _Value}) when is_atom(Key) -> true;
+ (_) -> false end, Filter) of
+ true ->
+ true;
+ _ ->
+ false
+ end.
+
+trace_all(Query) ->
+ glc:all(trace_acc(Query)).
+
+trace_any(Query) ->
+ glc:any(Query).
+
+trace_acc(Query) ->
+ trace_acc(Query, []).
+
+trace_acc([], Acc) ->
+ lists:reverse(Acc);
+trace_acc([{Key, '*'}|T], Acc) ->
+ trace_acc(T, [glc:wc(Key)|Acc]);
+trace_acc([{Key, Val}|T], Acc) ->
+ trace_acc(T, [glc:eq(Key, Val)|Acc]);
+trace_acc([{Key, '=', Val}|T], Acc) ->
+ trace_acc(T, [glc:eq(Key, Val)|Acc]);
+trace_acc([{Key, '>', Val}|T], Acc) ->
+ trace_acc(T, [glc:gt(Key, Val)|Acc]);
+trace_acc([{Key, '<', Val}|T], Acc) ->
+ trace_acc(T, [glc:lt(Key, Val)|Acc]).
+
check_traces(_, _, [], Acc) ->
lists:flatten(Acc);
@@ -384,24 +433,18 @@
check_traces(Attrs, Level, [Flow|Flows], Acc) ->
check_traces(Attrs, Level, Flows, [check_trace(Attrs, Flow)|Acc]).
-check_trace(Attrs, {Filter, _Level, Dest}) ->
- case check_trace_iter(Attrs, Filter) of
- true ->
- Dest;
- false ->
- []
- end.
+check_trace(Attrs, {Filter, _Level, Dest}) when is_list(Filter) ->
+ check_trace(Attrs, {trace_all(Filter), _Level, Dest});
-check_trace_iter(_, []) ->
- true;
-check_trace_iter(Attrs, [{Key, Match}|T]) ->
- case lists:keyfind(Key, 1, Attrs) of
- {Key, _} when Match == '*' ->
- check_trace_iter(Attrs, T);
- {Key, Match} ->
- check_trace_iter(Attrs, T);
- _ ->
- false
+check_trace(Attrs, {Filter, _Level, Dest}) when is_tuple(Filter) ->
+ Made = gre:make(Attrs, [list]),
+ glc:handle(?DEFAULT_TRACER, Made),
+ Match = glc_lib:matches(Filter, Made),
+ case Match of
+ true ->
+ Dest;
+ false ->
+ []
end.
-spec is_loggable(lager_msg:lager_msg(), non_neg_integer()|{'mask', non_neg_integer()}, term()) -> boolean().
@@ -521,6 +564,7 @@
end || N <- lists:seq(0, 20)].
check_trace_test() ->
+ trace_filter(none),
%% match by module
?assertEqual([foo], check_traces([{module, ?MODULE}], ?EMERGENCY, [
{[{module, ?MODULE}], config_to_mask(emergency), foo},
diff --git a/test/lager_test_backend.erl b/test/lager_test_backend.erl
index e98ccb5..8f3e721 100644
--- a/test/lager_test_backend.erl
+++ b/test/lager_test_backend.erl
@@ -353,6 +353,87 @@
ok
end
},
+ {"tracing works with custom attributes and event stream processing",
+ fun() ->
+ lager:set_loglevel(?MODULE, error),
+ ?assertEqual({?ERROR bor ?CRITICAL bor ?ALERT bor ?EMERGENCY, []}, lager_config:get(loglevel)),
+ lager_config:set(loglevel, {element(2, lager_util:config_to_mask(error)), []}),
+ lager:info([{requestid, 6}], "hello world"),
+ ?assertEqual(0, count()),
+ lager:trace(?MODULE, [{requestid, '>', 5}, {requestid, '<', 7}, {foo, bar}], debug),
+ lager:info([{requestid, 5}, {foo, bar}], "hello world"),
+ lager:info([{requestid, 6}, {foo, bar}], "hello world"),
+ ?assertEqual(1, count()),
+ lager:clear_all_traces(),
+ lager:trace(?MODULE, [{requestid, '>', 8}, {foo, bar}]),
+ lager:info([{foo, bar}], "hello world"),
+ lager:info([{requestid, 6}], "hello world"),
+ lager:info([{requestid, 7}], "hello world"),
+ lager:info([{requestid, 8}], "hello world"),
+ lager:info([{requestid, 9}, {foo, bar}], "hello world"),
+ lager:info([{requestid, 10}], "hello world"),
+ ?assertEqual(2, count()),
+ lager:trace(?MODULE, [{requestid, '>', 8}]),
+ lager:info([{foo, bar}], "hello world"),
+ lager:info([{requestid, 6}], "hello world"),
+ lager:info([{requestid, 7}], "hello world"),
+ lager:info([{requestid, 8}], "hello world"),
+ lager:info([{requestid, 9}, {foo, bar}], "hello world"),
+ lager:info([{requestid, 10}], "hello world"),
+ ?assertEqual(4, count()),
+ lager:trace(?MODULE, [{foo, '=', bar}]),
+ lager:info([{foo, bar}], "hello world"),
+ lager:info([{requestid, 6}], "hello world"),
+ lager:info([{requestid, 7}], "hello world"),
+ lager:info([{requestid, 8}], "hello world"),
+ lager:info([{requestid, 9}, {foo, bar}], "hello world"),
+ lager:info([{requestid, 10}], "hello world"),
+ ?assertEqual(7, count()),
+ lager:clear_all_traces(),
+ lager:info([{requestid, 6}], "hello world"),
+ ?assertEqual(7, count()),
+ ok
+ end
+ },
+ {"tracing custom attributes works with event stream processing statistics and reductions",
+ fun() ->
+ lager:set_loglevel(?MODULE, error),
+ ?assertEqual({?ERROR bor ?CRITICAL bor ?ALERT bor ?EMERGENCY, []}, lager_config:get(loglevel)),
+ lager_config:set(loglevel, {element(2, lager_util:config_to_mask(error)), []}),
+ lager:info([{requestid, 6}], "hello world"),
+ ?assertEqual(0, count()),
+ lager:trace(?MODULE, [{beta, '*'}]),
+ lager:trace(?MODULE, [{meta, "data"}]),
+ lager:info([{meta, "data"}], "hello world"),
+ lager:info([{beta, 2}], "hello world"),
+ lager:info([{beta, 2.1}, {foo, bar}], "hello world"),
+ lager:info([{meta, <<"data">>}], "hello world"),
+ ?assertEqual(8, ?DEFAULT_TRACER:info(input)),
+ ?assertEqual(6, ?DEFAULT_TRACER:info(output)),
+ ?assertEqual(2, ?DEFAULT_TRACER:info(filter)),
+ lager:clear_all_traces(),
+ lager:trace(?MODULE, [{meta, "data"}]),
+ lager:trace(?MODULE, [{beta, '>', 2}, {beta, '<', 2.12}]),
+ lager:info([{meta, "data"}], "hello world"),
+ lager:info([{beta, 2}], "hello world"),
+ lager:info([{beta, 2.1}, {foo, bar}], "hello world"),
+ lager:info([{meta, <<"data">>}], "hello world"),
+ ?assertEqual(8, ?DEFAULT_TRACER:info(input)),
+ ?assertEqual(4, ?DEFAULT_TRACER:info(output)),
+ ?assertEqual(4, ?DEFAULT_TRACER:info(filter)),
+ lager:clear_all_traces(),
+ lager:trace_console([{beta, '>', 2}, {meta, "data"}]),
+ lager:trace_console([{beta, '>', 2}, {beta, '<', 2.12}]),
+ Reduced = {all,[{any,[{beta,'<',2.12},{meta,'=',"data"}]},
+ {beta,'>',2}]},
+ ?assertEqual(Reduced, ?DEFAULT_TRACER:info('query')),
+
+ lager:clear_all_traces(),
+ lager:info([{requestid, 6}], "hello world"),
+ ?assertEqual(5, count()),
+ ok
+ end
+ },
{"tracing honors loglevel",
fun() ->
lager:set_loglevel(?MODULE, error),
@@ -1073,10 +1154,11 @@
?assertEqual(true, lager_config:get(async)),
%% put a ton of things in the queue
- Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 10)],
+ Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 15)],
%% serialize on mailbox
_ = gen_event:which_handlers(lager_event),
+ timer:sleep(500),
%% there should be a ton of outstanding messages now, so async is false
?assertEqual(false, lager_config:get(async)),
%% wait for all the workers to return, meaning that all the messages have been logged (since we're in sync mode)