Merge branch 'DeadZen-dz-event-stream-processing'
diff --git a/.gitignore b/.gitignore
index ff8fc4b..3bde15b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@
 erl_crash.dump
 .project
 log
+deps
diff --git a/Makefile b/Makefile
index 80231c5..2994a71 100644
--- a/Makefile
+++ b/Makefile
@@ -2,11 +2,11 @@
 
 all: deps compile
 
-compile:
+compile: deps
 	./rebar compile
 
 deps:
-	./rebar get-deps
+	test -d deps || ./rebar get-deps
 
 clean:
 	./rebar clean
diff --git a/include/lager.hrl b/include/lager.hrl
index 5a8b35e..ac84d98 100644
--- a/include/lager.hrl
+++ b/include/lager.hrl
@@ -16,6 +16,7 @@
 
 
 -define(DEFAULT_TRUNCATION, 4096).
+-define(DEFAULT_TRACER, lager_default_tracer).
 
 -define(LEVELS,
     [debug, info, notice, warning, error, critical, alert, emergency, none]).
diff --git a/rebar.config b/rebar.config
index 97b3169..ab6b7a2 100644
--- a/rebar.config
+++ b/rebar.config
@@ -1,5 +1,9 @@
 {erl_opts, [debug_info]}.
 {erl_first_files, ["src/lager_util.erl"]}.
+{deps, [
+        {goldrush, ".*",
+            {git, "git://github.com/DeadZen/goldrush.git", {tag, "7ff9b03"}}}
+       ]}.
 
 {cover_enabled, true}.
 {edoc_opts, [{stylesheet_file, "./priv/edoc.css"}]}.
diff --git a/src/lager.app.src b/src/lager.app.src
index eed2a6c..76a9228 100644
--- a/src/lager.app.src
+++ b/src/lager.app.src
@@ -10,6 +10,7 @@
                   stdlib
                  ]},
   {registered, [lager_sup, lager_event, lager_crash_log, lager_handler_watcher_sup]},
+  {included_applications, [syntax_tools, goldrush]},
   {mod, {lager_app, []}},
   {env, [
             %% What handlers to install with what arguments
diff --git a/src/lager.erl b/src/lager.erl
index eff2e9f..298a9a8 100644
--- a/src/lager.erl
+++ b/src/lager.erl
@@ -27,10 +27,11 @@
         log/3, log/4,
         md/0, md/1,
         trace/2, trace/3, trace_file/2, trace_file/3, trace_console/1, trace_console/2,
-        clear_all_traces/0, stop_trace/1, status/0,
+        clear_all_traces/0, stop_trace/1, status/0, 
         get_loglevel/1, set_loglevel/2, set_loglevel/3, get_loglevels/0,
         update_loglevel_config/0, posix_error/1,
-        safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9, do_log/9, pr/2]).
+        safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9, 
+        do_log/9, pr/2]).
 
 -type log_level() :: debug | info | notice | warning | error | critical | alert | emergency.
 -type log_level_number() :: 0..7.
@@ -167,6 +168,7 @@
             Error
     end.
 
+
 trace_console(Filter) ->
     trace_console(Filter, debug).
 
@@ -189,6 +191,7 @@
 stop_trace({_Filter, _Level, Target} = Trace) ->
     {Level, Traces} = lager_config:get(loglevel),
     NewTraces =  lists:delete(Trace, Traces),
+    lager_util:trace_filter([ element(1, T) || T <- NewTraces ]),
     %MinLevel = minimum_loglevel(get_loglevels() ++ get_trace_levels(NewTraces)),
     lager_config:set(loglevel, {Level, NewTraces}),
     case get_loglevel(Target) of
@@ -207,6 +210,7 @@
 
 clear_all_traces() ->
     {Level, _Traces} = lager_config:get(loglevel),
+    lager_util:trace_filter(none),
     lager_config:set(loglevel, {Level, []}),
     lists:foreach(fun(Handler) ->
           case get_loglevel(Handler) of
@@ -219,6 +223,10 @@
 
 status() ->
     Handlers = gen_event:which_handlers(lager_event),
+    TraceCount = case length(element(2, lager_config:get(loglevel))) of
+        0 -> 1;
+        N -> N
+    end,
     Status = ["Lager status:\n",
         [begin
                     Level = get_loglevel(Handler),
@@ -244,9 +252,25 @@
                     end,
                     io_lib:format("Tracing messages matching ~p at level ~p to ~p\n",
                         [Filter, LevelName, Destination])
-            end || {Filter, Level, Destination} <- element(2, lager_config:get(loglevel))]],
+            end || {Filter, Level, Destination} <- element(2, lager_config:get(loglevel))],
+         [
+         "Tracing Reductions:\n",
+            case ?DEFAULT_TRACER:info('query') of
+                {null, false} -> "";
+                Query -> io_lib:format("~p~n", [Query])
+            end
+         ],
+         [
+          "Tracing Statistics:\n ",
+              [ begin 
+                    [" ", atom_to_list(Table), ": ",
+                     integer_to_list(?DEFAULT_TRACER:info(Table) div TraceCount),
+                     "\n"]
+                end || Table <- [input, output, filter] ]
+         ]],
     io:put_chars(Status).
 
+
 %% @doc Set the loglevel for a particular backend.
 set_loglevel(Handler, Level) when is_atom(Level) ->
     Reply = gen_event:call(lager_event, Handler, {set_loglevel, Level}, infinity),
@@ -294,6 +318,8 @@
     {MinLevel, Traces} = lager_config:get(loglevel),
     case lists:member(Trace, Traces) of
         false ->
+            NewTraces = [Trace|Traces],
+            lager_util:trace_filter([ element(1, T) || T <- NewTraces]),
             lager_config:set(loglevel, {MinLevel, [Trace|Traces]});
         _ ->
             ok
diff --git a/src/lager_app.erl b/src/lager_app.erl
index 548e25c..99ecd4f 100644
--- a/src/lager_app.erl
+++ b/src/lager_app.erl
@@ -94,6 +94,8 @@
                 end
         end,
 
+    lager_util:trace_filter(none), 
+
     {ok, Pid, SavedHandlers}.
 
 
diff --git a/src/lager_crash_log.erl b/src/lager_crash_log.erl
index 7c2f90e..eab2275 100644
--- a/src/lager_crash_log.erl
+++ b/src/lager_crash_log.erl
@@ -207,7 +207,7 @@
                     Time = [Date, " ", TS," =", ReportStr, "====\n"],
                     NodeSuffix = other_node_suffix(Pid),
                     Msg = io_lib:format("~s~s~s", [Time, MsgStr, NodeSuffix]),
-                    case file:write(NewFD, Msg) of
+                    case file:write(NewFD, unicode:characters_to_binary(Msg)) of
                         {error, Reason} when Flap == false ->
                             ?INT_LOG(error, "Failed to write log message to file ~s: ~s",
                                 [Name, file:format_error(Reason)]),
diff --git a/src/lager_file_backend.erl b/src/lager_file_backend.erl
index 2ca9db1..a46f50f 100644
--- a/src/lager_file_backend.erl
+++ b/src/lager_file_backend.erl
@@ -210,7 +210,7 @@
 
 do_write(#state{fd=FD, name=Name, flap=Flap} = State, Level, Msg) ->
     %% delayed_write doesn't report errors
-    _ = file:write(FD, Msg),
+    _ = file:write(FD, unicode:characters_to_binary(Msg)),
     {mask, SyncLevel} = State#state.sync_on,
     case (Level band SyncLevel) /= 0 of
         true ->
@@ -476,6 +476,26 @@
                         ?assertMatch([_, _, "[error]", Pid, "Test message\n"], re:split(Bin, " ", [{return, list}, {parts, 5}]))
                 end
             },
+            {"don't choke on unicode",
+                fun() ->
+                        gen_event:add_handler(lager_event, lager_file_backend, [{"test.log", info}, {lager_default_formatter}]),
+                        lager:log(error, self(),"~ts", [[20013,25991,27979,35797]]),
+                        {ok, Bin} = file:read_file("test.log"),
+                        Pid = pid_to_list(self()),
+                        ?assertMatch([_, _, "[error]", Pid,  [228,184,173,230,150,135,230,181,139,232,175,149, $\n]], re:split(Bin, " ", [{return, list}, {parts, 5}]))
+                end
+            },
+            {"don't choke on latin-1",
+                fun() ->
+                        %% XXX if this test fails, check that this file is encoded latin-1, not utf-8!
+                        gen_event:add_handler(lager_event, lager_file_backend, [{"test.log", info}, {lager_default_formatter}]),
+                        lager:log(error, self(),"~ts", ["LÆÝÎN-ï"]),
+                        {ok, Bin} = file:read_file("test.log"),
+                        Pid = pid_to_list(self()),
+                        Res = re:split(Bin, " ", [{return, list}, {parts, 5}]),
+                        ?assertMatch([_, _, "[error]", Pid,  [76,195,134,195,157,195,142,78,45,195,175,$\n]], Res)
+                end
+            },
             {"file can't be opened on startup triggers an error message",
                 fun() ->
                         {ok, FInfo} = file:read_file_info("test.log"),
diff --git a/src/lager_util.erl b/src/lager_util.erl
index 029da59..4cd5999 100644
--- a/src/lager_util.erl
+++ b/src/lager_util.erl
@@ -21,7 +21,8 @@
 -export([levels/0, level_to_num/1, num_to_level/1, config_to_mask/1, config_to_levels/1, mask_to_levels/1,
         open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1,
         localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1,
-        calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3]).
+        calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3,
+        trace_filter/1, trace_filter/2]).
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
@@ -350,23 +351,34 @@
     NewNow = calendar:gregorian_seconds_to_datetime(Seconds),
     calculate_next_rotation(T, NewNow).
 
-validate_trace({Filter, Level, {Destination, ID}}) when is_list(Filter), is_atom(Level), is_atom(Destination) ->
+
+trace_filter(Query) ->
+    trace_filter(?DEFAULT_TRACER, Query).
+
+%% TODO: Support multiple trace modules 
+trace_filter(Module, Query) when Query == none; Query == [] ->
+    trace_filter(Module, glc:null(false));
+trace_filter(Module, Query) when is_list(Query) ->
+    trace_filter(Module, glc_lib:reduce(trace_any(Query)));
+trace_filter(Module, Query) ->
+    {ok, _} = glc:compile(Module, Query).
+
+validate_trace({Filter, Level, {Destination, ID}}) when is_tuple(Filter); is_list(Filter), is_atom(Level), is_atom(Destination) ->
     case validate_trace({Filter, Level, Destination}) of
         {ok, {F, L, D}} ->
             {ok, {F, L, {D, ID}}};
         Error ->
             Error
     end;
-validate_trace({Filter, Level, Destination}) when is_list(Filter), is_atom(Level), is_atom(Destination) ->
+validate_trace({Filter, Level, Destination}) when is_tuple(Filter); is_list(Filter), is_atom(Level), is_atom(Destination) ->
+    ValidFilter = validate_trace_filter(Filter),
     try config_to_mask(Level) of
+        _ when not ValidFilter ->
+            {error, invalid_trace};
+        L when is_list(Filter)  ->
+            {ok, {trace_all(Filter), L, Destination}};
         L ->
-            case lists:all(fun({Key, _Value}) when is_atom(Key) -> true; (_) ->
-                            false end, Filter) of
-                true ->
-                    {ok, {Filter, L, Destination}};
-                _ ->
-                    {error, invalid_filter}
-            end
+            {ok, {Filter, L, Destination}}
     catch
         _:_ ->
             {error, invalid_level}
@@ -374,6 +386,43 @@
 validate_trace(_) ->
     {error, invalid_trace}.
 
+validate_trace_filter(Filter) when is_tuple(Filter), is_atom(element(1, Filter)) =:= false ->
+    false;
+validate_trace_filter(Filter) ->
+        case lists:all(fun({Key, '*'}) when is_atom(Key) -> true; 
+                          ({Key, _Value})      when is_atom(Key) -> true;
+                          ({Key, '=', _Value}) when is_atom(Key) -> true;
+                          ({Key, '<', _Value}) when is_atom(Key) -> true;
+                          ({Key, '>', _Value}) when is_atom(Key) -> true;
+                          (_) -> false end, Filter) of
+            true ->
+                true;
+            _ ->
+                false
+        end.
+
+trace_all(Query) -> 
+	glc:all(trace_acc(Query)).
+
+trace_any(Query) -> 
+	glc:any(Query).
+
+trace_acc(Query) ->
+    trace_acc(Query, []).
+
+trace_acc([], Acc) -> 
+	lists:reverse(Acc);
+trace_acc([{Key, '*'}|T], Acc) ->
+	trace_acc(T, [glc:wc(Key)|Acc]);
+trace_acc([{Key, Val}|T], Acc) ->
+	trace_acc(T, [glc:eq(Key, Val)|Acc]);
+trace_acc([{Key, '=', Val}|T], Acc) ->
+	trace_acc(T, [glc:eq(Key, Val)|Acc]);
+trace_acc([{Key, '>', Val}|T], Acc) ->
+	trace_acc(T, [glc:gt(Key, Val)|Acc]);
+trace_acc([{Key, '<', Val}|T], Acc) ->
+	trace_acc(T, [glc:lt(Key, Val)|Acc]).
+	
 
 check_traces(_, _,  [], Acc) ->
     lists:flatten(Acc);
@@ -384,24 +433,18 @@
 check_traces(Attrs, Level, [Flow|Flows], Acc) ->
     check_traces(Attrs, Level, Flows, [check_trace(Attrs, Flow)|Acc]).
 
-check_trace(Attrs, {Filter, _Level, Dest}) ->
-    case check_trace_iter(Attrs, Filter) of
-        true ->
-            Dest;
-        false ->
-            []
-    end.
+check_trace(Attrs, {Filter, _Level, Dest}) when is_list(Filter) ->
+    check_trace(Attrs, {trace_all(Filter), _Level, Dest});
 
-check_trace_iter(_, []) ->
-    true;
-check_trace_iter(Attrs, [{Key, Match}|T]) ->
-    case lists:keyfind(Key, 1, Attrs) of
-        {Key, _} when Match == '*' ->
-            check_trace_iter(Attrs, T);
-        {Key, Match} ->
-            check_trace_iter(Attrs, T);
-        _ ->
-            false
+check_trace(Attrs, {Filter, _Level, Dest}) when is_tuple(Filter) ->
+    Made = gre:make(Attrs, [list]),
+    glc:handle(?DEFAULT_TRACER, Made),
+    Match = glc_lib:matches(Filter, Made),
+    case Match of
+	true ->
+	    Dest;
+	false ->
+	    []
     end.
 
 -spec is_loggable(lager_msg:lager_msg(), non_neg_integer()|{'mask', non_neg_integer()}, term()) -> boolean().
@@ -521,6 +564,7 @@
     end || N <- lists:seq(0, 20)].
 
 check_trace_test() ->
+    trace_filter(none),
     %% match by module
     ?assertEqual([foo], check_traces([{module, ?MODULE}], ?EMERGENCY, [
                 {[{module, ?MODULE}], config_to_mask(emergency), foo},
diff --git a/test/lager_test_backend.erl b/test/lager_test_backend.erl
index e98ccb5..8f3e721 100644
--- a/test/lager_test_backend.erl
+++ b/test/lager_test_backend.erl
@@ -353,6 +353,87 @@
                         ok
                 end
             },
+            {"tracing works with custom attributes and event stream processing",
+                fun() ->
+                        lager:set_loglevel(?MODULE, error),
+                        ?assertEqual({?ERROR bor ?CRITICAL bor ?ALERT bor ?EMERGENCY, []}, lager_config:get(loglevel)),
+                        lager_config:set(loglevel, {element(2, lager_util:config_to_mask(error)), []}),
+                        lager:info([{requestid, 6}], "hello world"),
+                        ?assertEqual(0, count()),
+                        lager:trace(?MODULE, [{requestid, '>', 5}, {requestid, '<', 7}, {foo, bar}], debug),
+                        lager:info([{requestid, 5}, {foo, bar}], "hello world"),
+                        lager:info([{requestid, 6}, {foo, bar}], "hello world"),
+                        ?assertEqual(1, count()),
+                        lager:clear_all_traces(),
+                        lager:trace(?MODULE, [{requestid, '>', 8}, {foo, bar}]),
+                        lager:info([{foo, bar}], "hello world"),
+                        lager:info([{requestid, 6}], "hello world"),
+                        lager:info([{requestid, 7}], "hello world"),
+                        lager:info([{requestid, 8}], "hello world"),
+                        lager:info([{requestid, 9}, {foo, bar}], "hello world"),
+                        lager:info([{requestid, 10}], "hello world"),
+                        ?assertEqual(2, count()),
+                        lager:trace(?MODULE, [{requestid, '>', 8}]),
+                        lager:info([{foo, bar}], "hello world"),
+                        lager:info([{requestid, 6}], "hello world"),
+                        lager:info([{requestid, 7}], "hello world"),
+                        lager:info([{requestid, 8}], "hello world"),
+                        lager:info([{requestid, 9}, {foo, bar}], "hello world"),
+                        lager:info([{requestid, 10}], "hello world"),
+                        ?assertEqual(4, count()),
+                        lager:trace(?MODULE, [{foo, '=', bar}]),
+                        lager:info([{foo, bar}], "hello world"),
+                        lager:info([{requestid, 6}], "hello world"),
+                        lager:info([{requestid, 7}], "hello world"),
+                        lager:info([{requestid, 8}], "hello world"),
+                        lager:info([{requestid, 9}, {foo, bar}], "hello world"),
+                        lager:info([{requestid, 10}], "hello world"),
+                        ?assertEqual(7, count()),
+                        lager:clear_all_traces(),
+                        lager:info([{requestid, 6}], "hello world"),
+                        ?assertEqual(7, count()),
+                        ok
+                end
+            },
+            {"tracing custom attributes works with event stream processing statistics and reductions",
+                fun() ->
+                        lager:set_loglevel(?MODULE, error),
+                        ?assertEqual({?ERROR bor ?CRITICAL bor ?ALERT bor ?EMERGENCY, []}, lager_config:get(loglevel)),
+                        lager_config:set(loglevel, {element(2, lager_util:config_to_mask(error)), []}),
+                        lager:info([{requestid, 6}], "hello world"),
+                        ?assertEqual(0, count()),
+                        lager:trace(?MODULE, [{beta, '*'}]),
+                        lager:trace(?MODULE, [{meta, "data"}]),
+                        lager:info([{meta, "data"}], "hello world"),
+                        lager:info([{beta, 2}], "hello world"),
+                        lager:info([{beta, 2.1}, {foo, bar}], "hello world"),
+                        lager:info([{meta, <<"data">>}], "hello world"),
+                        ?assertEqual(8, ?DEFAULT_TRACER:info(input)),
+                        ?assertEqual(6, ?DEFAULT_TRACER:info(output)),
+                        ?assertEqual(2, ?DEFAULT_TRACER:info(filter)),
+                        lager:clear_all_traces(),
+                        lager:trace(?MODULE, [{meta, "data"}]),
+                        lager:trace(?MODULE, [{beta, '>', 2}, {beta, '<', 2.12}]),
+                        lager:info([{meta, "data"}], "hello world"),
+                        lager:info([{beta, 2}], "hello world"),
+                        lager:info([{beta, 2.1}, {foo, bar}], "hello world"),
+                        lager:info([{meta, <<"data">>}], "hello world"),
+                        ?assertEqual(8, ?DEFAULT_TRACER:info(input)),
+                        ?assertEqual(4, ?DEFAULT_TRACER:info(output)),
+                        ?assertEqual(4, ?DEFAULT_TRACER:info(filter)),
+                        lager:clear_all_traces(),
+                        lager:trace_console([{beta, '>', 2}, {meta, "data"}]),
+                        lager:trace_console([{beta, '>', 2}, {beta, '<', 2.12}]),
+                        Reduced = {all,[{any,[{beta,'<',2.12},{meta,'=',"data"}]},
+                                              {beta,'>',2}]},
+                        ?assertEqual(Reduced, ?DEFAULT_TRACER:info('query')),
+
+                        lager:clear_all_traces(),
+                        lager:info([{requestid, 6}], "hello world"),
+                        ?assertEqual(5, count()),
+                        ok
+                end
+            },
             {"tracing honors loglevel",
                 fun() ->
                         lager:set_loglevel(?MODULE, error),
@@ -1073,10 +1154,11 @@
                         ?assertEqual(true, lager_config:get(async)),
 
                         %% put a ton of things in the queue
-                        Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 10)],
+                        Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 15)],
 
                         %% serialize on mailbox
                         _ = gen_event:which_handlers(lager_event),
+                        timer:sleep(500),
                         %% there should be a ton of outstanding messages now, so async is false
                         ?assertEqual(false, lager_config:get(async)),
                         %% wait for all the workers to return, meaning that all the messages have been logged (since we're in sync mode)