blob: 78eab4dfc3ee53b43151f15d086b7fe06cc1596e [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_epi_tests).
-include_lib("couch/include/couch_eunit.hrl").
-define(DATA_FILE1, ?ABS_PATH("test/fixtures/app_data1.cfg")).
-define(DATA_FILE2, ?ABS_PATH("test/fixtures/app_data2.cfg")).
-export([notify_cb/5, save/3]).
-record(ctx, {file, handle, pid, kv, key}).
-define(TIMEOUT, 5000).
-define(MODULE1(Name), "
-export([inc/2, fail/2]).
inc(KV, A) ->
Reply = A + 1,
couch_epi_tests:save(KV, inc1, Reply),
[KV, Reply].
fail(KV, A) ->
inc(KV, A).
").
-define(MODULE2(Name), "
-export([inc/2, fail/2]).
inc(KV, A) ->
Reply = A + 1,
couch_epi_tests:save(KV, inc2, Reply),
[KV, Reply].
fail(KV, _A) ->
couch_epi_tests:save(KV, inc2, check_error),
throw(check_error).
").
-define(DATA_MODULE1(Name), "
-export([data/0]).
data() ->
[
{[complex, key, 1], [
{type, counter},
{desc, foo}
]}
].
").
-define(DATA_MODULE2(Name), "
-export([data/0]).
data() ->
[
{[complex, key, 2], [
{type, counter},
{desc, bar}
]},
{[complex, key, 1], [
{type, counter},
{desc, updated_foo}
]}
].
").
notify_cb(App, Key, OldData, Data, KV) ->
save(KV, is_called, {App, Key, OldData, Data}).
setup(couch_epi_data_source) ->
error_logger:tty(false),
Key = {test_app, descriptions},
File = ?tempfile(),
{ok, _} = file:copy(?DATA_FILE1, File),
application:start(couch_epi),
{ok, Pid} = couch_epi_data_source:start_link(
test_app, {epi_key, Key}, {file, File}, [{interval, 100}]),
ok = couch_epi_data_source:wait(Pid),
KV = state_storage(),
#ctx{
file = File,
key = Key,
handle = couch_epi:get_handle(Key),
kv = KV,
pid = Pid};
setup(couch_epi_data) ->
error_logger:tty(false),
Key = {test_app, descriptions},
application:start(couch_epi),
ok = generate_module(provider, ?DATA_MODULE1(provider)),
{ok, Pid} = couch_epi_data:start_link(
test_app, {epi_key, Key}, provider, []),
ok = couch_epi_data:wait(Pid),
KV = state_storage(),
#ctx{
key = Key,
handle = couch_epi:get_handle(Key),
kv = KV,
pid = Pid};
setup(couch_epi_functions) ->
Key = my_service,
error_logger:tty(false),
application:start(couch_epi),
ok = generate_module(provider1, ?MODULE1(provider1)),
ok = generate_module(provider2, ?MODULE2(provider2)),
{ok, Pid} = couch_epi_functions:start_link(
test_app, {epi_key, Key}, {modules, [provider1, provider2]},
[{interval, 100}]),
ok = couch_epi_functions:wait(Pid),
KV = state_storage(),
#ctx{
key = Key,
handle = couch_epi:get_handle(Key),
kv = KV,
pid = Pid};
setup(_Opts) ->
setup(couch_epi_functions).
teardown(Module, #ctx{pid = Pid} = Ctx) when is_atom(Module) ->
Module:stop(Pid),
teardown(Ctx);
teardown(_Opts, #ctx{pid = Pid} = Ctx) ->
couch_epi_functions:stop(Pid),
teardown(Ctx).
teardown(#ctx{file = File} = Ctx) when File /= undefined ->
file:delete(File),
teardown(Ctx#ctx{file = undefined});
teardown(#ctx{kv = KV}) ->
call(KV, stop),
application:stop(couch_epi),
ok.
upgrade_release(Pid, Module) ->
sys:suspend(Pid),
'ok' = sys:change_code(Pid, Module, 'undefined', []),
sys:resume(Pid),
ok.
epi_config_update_test_() ->
Funs = [
fun ensure_notified_when_changed/2,
fun ensure_not_notified_when_no_change/2,
fun ensure_not_notified_when_unsubscribed/2
],
Modules= [
couch_epi_data,
couch_epi_data_source,
couch_epi_functions
],
{
"config update tests",
[make_case("Check notifications for: ", Modules, Funs)]
}.
epi_data_source_test_() ->
Funs = [
fun check_dump/2,
fun check_get/2,
fun check_get_value/2,
fun check_by_key/2,
fun check_by_source/2,
fun check_keys/2,
fun check_subscribers/2
],
Modules= [
couch_epi_data,
couch_epi_data_source
],
{
"epi data API tests",
[make_case("Check query API for: ", Modules, Funs)]
}.
epi_apply_test_() ->
{
"epi dispatch tests",
{
foreach,
fun() -> setup(couch_epi_functions) end,
fun teardown/1,
[
fun check_pipe/1,
fun check_broken_pipe/1,
fun ensure_fail/1,
fun ensure_fail_pipe/1
]
}
}.
epi_subscription_test_() ->
Funs = [
fun ensure_unsubscribe_when_caller_die/2
],
Modules= [
couch_epi_data,
couch_epi_data_source,
couch_epi_functions
],
{
"epi subscription tests",
[make_case("Check subscription API for: ", Modules, Funs)]
}.
epi_reload_test_() ->
Modules= [
couch_epi_data,
couch_epi_data_source,
couch_epi_functions
],
Funs = [
fun ensure_reload_if_manually_triggered/2,
fun ensure_reload_if_changed/2,
fun ensure_no_reload_when_no_change/2
],
{
"epi reload tests",
{
foreachx,
fun setup/1,
fun teardown/2,
[{M, Fun} || M <- Modules, Fun <- Funs]
}
}.
apply_options_test_() ->
Funs = [fun ensure_apply_is_called/2],
make_case("Apply with options: ", valid_options_permutations(), Funs).
make_case(Msg, P, Funs) ->
[{format_case_name(Msg, Case), [
{
foreachx, fun setup/1, fun teardown/2,
[
{Case, Fun} || Fun <- Funs
]
}
]} || Case <- P].
format_case_name(Msg, Case) ->
lists:flatten(Msg ++ io_lib:format("~p", [Case])).
valid_options_permutations() ->
[
[],
[ignore_errors],
[pipe],
[pipe, ignore_errors],
[concurrent],
[concurrent, ignore_errors]
].
ensure_notified_when_changed(couch_epi_functions, #ctx{key = Key} = Ctx) ->
?_test(begin
subscribe(Ctx, test_app, Key),
update(couch_epi_functions, Ctx),
timer:sleep(200),
Result = get(Ctx, is_called),
Expected = {test_app, Key,
{modules, [provider1, provider2]},
{modules, [provider1, provider2]}},
?assertMatch({ok, Expected}, Result),
ok
end);
ensure_notified_when_changed(Module, #ctx{key = Key} = Ctx) ->
?_test(begin
subscribe(Ctx, test_app, Key),
update(Module, Ctx),
timer:sleep(200),
ExpectedData = lists:usort([
{[complex, key, 1], [{type, counter}, {desc, updated_foo}]},
{[complex, key, 2], [{type, counter}, {desc, bar}]}
]),
Result = get(Ctx, is_called),
?assertMatch({ok, {test_app, Key, {data, _}, {data, _}}}, Result),
{ok, {test_app, Key, {data, OldData}, {data, Data}}} = Result,
?assertMatch(ExpectedData, lists:usort(Data)),
?assertMatch(
[{[complex, key, 1], [{type, counter}, {desc, foo}]}],
lists:usort(OldData))
end).
ensure_not_notified_when_no_change(_Module, #ctx{key = Key} = Ctx) ->
?_test(begin
subscribe(Ctx, test_app, Key),
timer:sleep(200),
?assertMatch(error, get(Ctx, is_called))
end).
ensure_not_notified_when_unsubscribed(Module, #ctx{key = Key} = Ctx) ->
?_test(begin
SubscriptionId = subscribe(Ctx, test_app, Key),
couch_epi:unsubscribe(SubscriptionId),
timer:sleep(100),
update(Module, Ctx),
timer:sleep(200),
?assertMatch(error, get(Ctx, is_called))
end).
ensure_apply_is_called(Opts, #ctx{handle = Handle, kv = KV, key = Key} = Ctx) ->
?_test(begin
couch_epi:apply(Handle, Key, inc, [KV, 2], Opts),
maybe_wait(Opts),
?assertMatch({ok, _}, get(Ctx, inc1)),
?assertMatch({ok, _}, get(Ctx, inc2)),
ok
end).
check_pipe(#ctx{handle = Handle, kv = KV, key = Key}) ->
?_test(begin
Result = couch_epi:apply(Handle, Key, inc, [KV, 2], [pipe]),
?assertMatch([KV, 4], Result),
ok
end).
check_broken_pipe(#ctx{handle = Handle, kv = KV, key = Key} = Ctx) ->
?_test(begin
Result = couch_epi:apply(Handle, Key, fail, [KV, 2], [pipe, ignore_errors]),
?assertMatch([KV, 3], Result),
?assertMatch([3, check_error], pipe_state(Ctx)),
ok
end).
ensure_fail_pipe(#ctx{handle = Handle, kv = KV, key = Key}) ->
?_test(begin
?assertThrow(check_error,
couch_epi:apply(Handle, Key, fail, [KV, 2], [pipe])),
ok
end).
ensure_fail(#ctx{handle = Handle, kv = KV, key = Key}) ->
?_test(begin
?assertThrow(check_error,
couch_epi:apply(Handle, Key, fail, [KV, 2], [])),
ok
end).
ensure_unsubscribe_when_caller_die(_Module, #ctx{key = Key} = Ctx) ->
?_test(begin
spawn(fun() ->
subscribe(Ctx, test_app, Key)
end),
timer:sleep(200),
?assertMatch(error, get(Ctx, is_called))
end).
pipe_state(Ctx) ->
Trace = [get(Ctx, inc1), get(Ctx, inc2)],
lists:usort([State || {ok, State} <- Trace]).
check_dump(_Module, #ctx{handle = Handle}) ->
?_test(begin
?assertMatch(
[[{type, counter}, {desc, foo}]],
couch_epi:dump(Handle))
end).
check_get(_Module, #ctx{handle = Handle}) ->
?_test(begin
?assertMatch(
[[{type, counter}, {desc, foo}]],
couch_epi:get(Handle, [complex,key, 1]))
end).
check_get_value(_Module, #ctx{handle = Handle}) ->
?_test(begin
?assertMatch(
[{type, counter}, {desc, foo}],
couch_epi:get_value(Handle, test_app, [complex,key, 1]))
end).
check_by_key(_Module, #ctx{handle = Handle}) ->
?_test(begin
?assertMatch(
[{[complex, key, 1],
[{test_app, [{type, counter}, {desc, foo}]}]}],
couch_epi:by_key(Handle)),
?assertMatch(
[{test_app, [{type, counter}, {desc, foo}]}],
couch_epi:by_key(Handle, [complex, key, 1]))
end).
check_by_source(_Module, #ctx{handle = Handle}) ->
?_test(begin
?assertMatch(
[{test_app,
[{[complex,key, 1], [{type, counter}, {desc, foo}]}]}],
couch_epi:by_source(Handle)),
?assertMatch(
[{[complex,key, 1], [{type, counter}, {desc, foo}]}],
couch_epi:by_source(Handle, test_app))
end).
check_keys(_Module, #ctx{handle = Handle}) ->
?_assertMatch([[complex,key,1]], couch_epi:keys(Handle)).
check_subscribers(_Module, #ctx{handle = Handle}) ->
?_assertMatch([test_app], couch_epi:subscribers(Handle)).
ensure_reload_if_manually_triggered(Module, #ctx{pid = Pid, key = Key} = Ctx) ->
?_test(begin
subscribe(Ctx, test_app, Key),
update_definitions(Module, Ctx),
Module:reload(Pid),
timer:sleep(50),
Result = get(Ctx, is_called),
?assertNotMatch(error, Result)
end).
ensure_reload_if_changed(couch_epi_data_source = Module,
#ctx{key = Key, handle = Handle} = Ctx) ->
?_test(begin
Version = Handle:version(),
subscribe(Ctx, test_app, Key),
update_definitions(Module, Ctx),
timer:sleep(250),
?assertNotEqual(Version, Handle:version()),
Result = get(Ctx, is_called),
?assertNotMatch(error, Result)
end);
ensure_reload_if_changed(Module,
#ctx{key = Key, handle = Handle} = Ctx) ->
?_test(begin
Version = Handle:version(),
subscribe(Ctx, test_app, Key),
update(Module, Ctx),
?assertNotEqual(Version, Handle:version()),
timer:sleep(100), %% Allow some time for notify to be called
Result = get(Ctx, is_called),
?assertNotMatch(error, Result)
end).
ensure_no_reload_when_no_change(couch_epi_functions = Module,
#ctx{pid = Pid, key = Key, handle = Handle} = Ctx) ->
?_test(begin
Version = Handle:version(),
subscribe(Ctx, test_app, Key),
upgrade_release(Pid, Module),
?assertEqual(Version, Handle:version()),
Result = get(Ctx, is_called),
?assertMatch(error, Result)
end);
ensure_no_reload_when_no_change(Module,
#ctx{key = Key, handle = Handle} = Ctx) ->
?_test(begin
Version = Handle:version(),
subscribe(Ctx, test_app, Key),
timer:sleep(450),
?assertEqual(Version, Handle:version()),
Result = get(Ctx, is_called),
?assertMatch(error, Result)
end).
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
generate_module(Name, Body) ->
Tokens = couch_epi_codegen:scan(Body),
couch_epi_codegen:generate(Name, Tokens).
update(Module, #ctx{pid = Pid} = Ctx) ->
update_definitions(Module, Ctx),
upgrade_release(Pid, Module).
update_definitions(couch_epi_data_source, #ctx{file = File}) ->
{ok, _} = file:copy(?DATA_FILE2, File),
ok;
update_definitions(couch_epi_data, #ctx{}) ->
ok = generate_module(provider, ?DATA_MODULE2(provider));
update_definitions(couch_epi_functions, #ctx{}) ->
ok = generate_module(provider1, ?MODULE2(provider1)).
subscribe(#ctx{kv = Kv}, App, Key) ->
{ok, Pid} = couch_epi:subscribe(App, Key, ?MODULE, notify_cb, Kv),
call(Kv, empty),
Pid.
maybe_wait(Opts) ->
case lists:member(concurrent, Opts) of
true ->
timer:sleep(100);
false ->
ok
end.
%% ------------
%% State tracer
save(Kv, Key, Value) ->
call(Kv, {set, Key, Value}).
get(#ctx{kv = Kv}, Key) ->
call(Kv, {get, Key}).
call(Server, Msg) ->
Ref = make_ref(),
Server ! {{Ref, self()}, Msg},
receive
{reply, Ref, Reply} ->
Reply
after ?TIMEOUT ->
{error, {timeout, Msg}}
end.
reply({Ref, From}, Msg) ->
From ! {reply, Ref, Msg}.
state_storage() ->
spawn_link(fun() -> state_storage(dict:new()) end).
state_storage(Dict) ->
receive
{From, {set, Key, Value}} ->
reply(From, ok),
state_storage(dict:store(Key, Value, Dict));
{From, {get, Key}} ->
reply(From, dict:find(Key, Dict)),
state_storage(Dict);
{From, empty} ->
reply(From, ok),
state_storage(dict:new());
{From, stop} ->
reply(From, ok)
end.