blob: dad363695222e6415853abdc821aad853649d7f1 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_multidb_changes).
-behaviour(gen_server).
-export([
start_link/4
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_info/2,
handle_cast/2,
code_change/3
]).
-export([
changes_reader/3,
changes_reader_cb/3
]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
-define(AVG_DELAY_MSEC, 10).
-define(MAX_DELAY_MSEC, 120000).
-record(state, {
tid :: ets:tid(),
mod :: atom(),
ctx :: term(),
suffix :: binary(),
event_server :: reference(),
scanner :: nil | pid(),
pids :: [{binary(), pid()}],
skip_ddocs :: boolean()
}).
% Behavior API
% For each db shard with a matching suffix, report created,
% deleted, found (discovered) and change events.
-callback db_created(DbName :: binary(), Context :: term()) ->
Context :: term().
-callback db_deleted(DbName :: binary(), Context :: term()) ->
Context :: term().
-callback db_found(DbName :: binary(), Context :: term()) ->
Context :: term().
-callback db_change(DbName :: binary(), Change :: term(), Context :: term()) ->
Context :: term().
% External API
% Opts list can contain:
% - `skip_ddocs` : Skip design docs
-spec start_link(binary(), module(), term(), list()) ->
{ok, pid()} | ignore | {error, term()}.
start_link(DbSuffix, Module, Context, Opts) when
is_binary(DbSuffix), is_atom(Module), is_list(Opts) ->
gen_server:start_link(?MODULE, [DbSuffix, Module, Context, Opts], []).
% gen_server callbacks
init([DbSuffix, Module, Context, Opts]) ->
process_flag(trap_exit, true),
Server = self(),
{ok, #state{
tid = ets:new(?MODULE, [set, protected]),
mod = Module,
ctx = Context,
suffix = DbSuffix,
event_server = register_with_event_server(Server),
scanner = spawn_link(fun() -> scan_all_dbs(Server, DbSuffix) end),
pids = [],
skip_ddocs = proplists:is_defined(skip_ddocs, Opts)
}}.
terminate(_Reason, _State) ->
ok.
handle_call({change, DbName, Change}, _From,
#state{skip_ddocs=SkipDDocs, mod=Mod, ctx=Ctx} = State) ->
case {SkipDDocs, is_design_doc(Change)} of
{true, true} ->
{reply, ok, State};
{_, _} ->
{reply, ok, State#state{ctx=Mod:db_change(DbName, Change, Ctx)}}
end;
handle_call({checkpoint, DbName, EndSeq}, _From, #state{tid=Ets} = State) ->
case ets:lookup(Ets, DbName) of
[] ->
true = ets:insert(Ets, {DbName, EndSeq, false});
[{DbName, _OldSeq, Rescan}] ->
true = ets:insert(Ets, {DbName, EndSeq, Rescan})
end,
{reply, ok, State}.
handle_cast({resume_scan, DbName}, State) ->
{noreply, resume_scan(DbName, State)}.
handle_info({'$couch_event', DbName, Event}, #state{suffix = Suf} = State) ->
case Suf =:= couch_db:dbname_suffix(DbName) of
true ->
{noreply, db_callback(Event, DbName, State)};
_ ->
{noreply, State}
end;
handle_info({'DOWN', Ref, _, _, Info}, #state{event_server = Ref} = State) ->
{stop, {couch_event_server_died, Info}, State};
handle_info({'EXIT', From, normal}, #state{scanner = From} = State) ->
{noreply, State#state{scanner=nil}};
handle_info({'EXIT', From, Reason}, #state{scanner = From} = State) ->
{stop, {scanner_died, Reason}, State};
handle_info({'EXIT', From, Reason}, #state{pids = Pids} = State) ->
couch_log:debug("~p change feed exited ~p", [State#state.suffix, From]),
case lists:keytake(From, 2, Pids) of
{value, {DbName, From}, NewPids} ->
if Reason == normal -> ok; true ->
Fmt = "~s : Known change feed ~w died :: ~w",
couch_log:error(Fmt, [?MODULE, From, Reason])
end,
NewState = State#state{pids = NewPids},
case ets:lookup(State#state.tid, DbName) of
[{DbName, _EndSeq, true}] ->
{noreply, resume_scan(DbName, NewState)};
_ ->
{noreply, NewState}
end;
false when Reason == normal ->
{noreply, State};
false ->
Fmt = "~s(~p) : Unknown pid ~w died :: ~w",
couch_log:error(Fmt, [?MODULE, State#state.suffix, From, Reason]),
{stop, {unexpected_exit, From, Reason}, State}
end;
handle_info(_Msg, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
% Private functions
-spec register_with_event_server(pid()) -> reference().
register_with_event_server(Server) ->
Ref = erlang:monitor(process, couch_event_server),
couch_event:register_all(Server),
Ref.
-spec db_callback(created | deleted | updated, binary(), #state{}) -> #state{}.
db_callback(created, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
NewState = State#state{ctx = Mod:db_created(DbName, Ctx)},
resume_scan(DbName, NewState);
db_callback(deleted, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
State#state{ctx = Mod:db_deleted(DbName, Ctx)};
db_callback(updated, DbName, State) ->
resume_scan(DbName, State);
db_callback(_Other, _DbName, State) ->
State.
-spec resume_scan(binary(), #state{}) -> #state{}.
resume_scan(DbName, #state{pids=Pids, tid=Ets} = State) ->
case {lists:keyfind(DbName, 1, Pids), ets:lookup(Ets, DbName)} of
{{DbName, _}, []} ->
% Found existing change feed, but not entry in ETS
% Flag a need to rescan from begining
true = ets:insert(Ets, {DbName, 0, true}),
State;
{{DbName, _}, [{DbName, EndSeq, _}]} ->
% Found existing change feed and entry in ETS
% Flag a need to rescan from last ETS checkpoint
true = ets:insert(Ets, {DbName, EndSeq, true}),
State;
{false, []} ->
% No existing change feed running. No entry in ETS.
% This is first time seeing this db shard.
% Notify user with a found callback. Insert checkpoint
% entry in ETS to start from 0. And start a change feed.
true = ets:insert(Ets, {DbName, 0, false}),
Mod = State#state.mod,
Ctx = Mod:db_found(DbName, State#state.ctx),
Pid = start_changes_reader(DbName, 0),
State#state{ctx=Ctx, pids=[{DbName, Pid} | Pids]};
{false, [{DbName, EndSeq, _}]} ->
% No existing change feed running. Found existing checkpoint.
% Start a new change reader from last checkpoint.
true = ets:insert(Ets, {DbName, EndSeq, false}),
Pid = start_changes_reader(DbName, EndSeq),
State#state{pids=[{DbName, Pid} | Pids]}
end.
start_changes_reader(DbName, Since) ->
spawn_link(?MODULE, changes_reader, [self(), DbName, Since]).
changes_reader(Server, DbName, Since) ->
{ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
ChangesArgs = #changes_args{
include_docs = true,
since = Since,
feed = "normal",
timeout = infinity
},
ChFun = couch_changes:handle_db_changes(ChangesArgs, {json_req, null}, Db),
ChFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName}}).
changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
ok = gen_server:call(Server, {change, DbName, Change}, infinity),
{Server, DbName};
changes_reader_cb({stop, EndSeq}, _, {Server, DbName}) ->
ok = gen_server:call(Server, {checkpoint, DbName, EndSeq}, infinity),
{Server, DbName};
changes_reader_cb(_, _, Acc) ->
Acc.
scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
ok = scan_local_db(Server, DbSuffix),
{ok, Db} = mem3_util:ensure_exists(
config:get("mem3", "shards_db", "_dbs")),
ChangesFun = couch_changes:handle_db_changes(#changes_args{}, nil, Db),
ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}),
couch_db:close(Db).
scan_changes_cb({change, {Change}, _}, _, {_Server, DbSuffix, _Count} = Acc) ->
DbName = couch_util:get_value(<<"id">>, Change),
case DbName of <<"_design/", _/binary>> -> Acc; _Else ->
NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName),
case {NameMatch, couch_replicator_utils:is_deleted(Change)} of
{false, _} ->
Acc;
{true, true} ->
Acc;
{true, false} ->
Shards = local_shards(DbName),
lists:foldl(fun notify_fold/2, Acc, Shards)
end
end;
scan_changes_cb(_, _, Acc) ->
Acc.
local_shards(DbName) ->
try
[ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
catch
error:database_does_not_exist ->
[]
end.
notify_fold(DbName, {Server, DbSuffix, Count}) ->
Jitter = jitter(Count),
spawn_link(fun() ->
timer:sleep(Jitter),
gen_server:cast(Server, {resume_scan, DbName})
end),
{Server, DbSuffix, Count + 1}.
% Jitter is proportional to the number of shards found so far. This is done to
% avoid a stampede and notifying the callback function with potentially a large
% number of shards back to back during startup.
jitter(N) ->
Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
couch_rand:uniform(Range).
scan_local_db(Server, DbSuffix) when is_pid(Server) ->
case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
{ok, Db} ->
gen_server:cast(Server, {resume_scan, DbSuffix}),
ok = couch_db:close(Db);
_Error ->
ok
end.
is_design_doc({Change}) ->
case lists:keyfind(<<"id">>, 1, Change) of
false ->
false;
{_, Id} ->
is_design_doc_id(Id)
end.
is_design_doc_id(<<?DESIGN_DOC_PREFIX, _/binary>>) ->
true;
is_design_doc_id(_) ->
false.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-define(MOD, multidb_test_module).
-define(SUFFIX, <<"suff">>).
-define(DBNAME, <<"shards/40000000-5fffffff/acct/suff.0123456789">>).
couch_multidb_changes_test_() ->
{
foreach,
fun setup/0,
fun teardown/1,
[
t_handle_call_change(),
t_handle_call_change_filter_design_docs(),
t_handle_call_checkpoint_new(),
t_handle_call_checkpoint_existing(),
t_handle_info_created(),
t_handle_info_deleted(),
t_handle_info_updated(),
t_handle_info_other_event(),
t_handle_info_created_other_db(),
t_handle_info_scanner_exit_normal(),
t_handle_info_scanner_crashed(),
t_handle_info_event_server_exited(),
t_handle_info_unknown_pid_exited(),
t_handle_info_change_feed_exited(),
t_handle_info_change_feed_exited_and_need_rescan(),
t_spawn_changes_reader(),
t_changes_reader_cb_change(),
t_changes_reader_cb_stop(),
t_changes_reader_cb_other(),
t_handle_call_resume_scan_no_chfeed_no_ets_entry(),
t_handle_call_resume_scan_chfeed_no_ets_entry(),
t_handle_call_resume_scan_chfeed_ets_entry(),
t_handle_call_resume_scan_no_chfeed_ets_entry(),
t_start_link(),
t_start_link_no_ddocs(),
t_misc_gen_server_callbacks()
]
}.
setup() ->
mock_logs(),
mock_callback_mod(),
meck:expect(couch_event, register_all, 1, ok),
meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
ChangesFun = meck:val(fun(_) -> ok end),
meck:expect(couch_changes, handle_db_changes, 3, ChangesFun),
meck:expect(couch_db, open_int,
fun(?DBNAME, [?CTX, sys_db]) -> {ok, db};
(_, _) -> {not_found, no_db_file}
end),
meck:expect(couch_db, close, 1, ok),
mock_changes_reader(),
% create process to stand in for couch_event_server
% mocking erlang:monitor doesn't work, so give it real process to monitor
EvtPid = spawn_link(fun() -> receive looper -> ok end end),
true = register(couch_event_server, EvtPid),
EvtPid.
teardown(EvtPid) ->
unlink(EvtPid),
exit(EvtPid, kill),
meck:unload().
t_handle_call_change() ->
?_test(begin
State = mock_state(),
Change = change_row(<<"blah">>),
handle_call_ok({change, ?DBNAME, Change}, State),
?assert(meck:validate(?MOD)),
?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
end).
t_handle_call_change_filter_design_docs() ->
?_test(begin
State0 = mock_state(),
State = State0#state{skip_ddocs = true},
Change = change_row(<<"_design/blah">>),
handle_call_ok({change, ?DBNAME, Change}, State),
?assert(meck:validate(?MOD)),
?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
end).
t_handle_call_checkpoint_new() ->
?_test(begin
Tid = mock_ets(),
State = mock_state(Tid),
handle_call_ok({checkpoint, ?DBNAME, 1}, State),
?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
ets:delete(Tid)
end).
t_handle_call_checkpoint_existing() ->
?_test(begin
Tid = mock_ets(),
State = mock_state(Tid),
true = ets:insert(Tid, {?DBNAME, 1, true}),
handle_call_ok({checkpoint, ?DBNAME, 2}, State),
?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
ets:delete(Tid)
end).
t_handle_info_created() ->
?_test(begin
Tid = mock_ets(),
State = mock_state(Tid),
handle_info_check({'$couch_event', ?DBNAME, created}, State),
?assert(meck:validate(?MOD)),
?assert(meck:called(?MOD, db_created, [?DBNAME, zig]))
end).
t_handle_info_deleted() ->
?_test(begin
State = mock_state(),
handle_info_check({'$couch_event', ?DBNAME, deleted}, State),
?assert(meck:validate(?MOD)),
?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig]))
end).
t_handle_info_updated() ->
?_test(begin
Tid = mock_ets(),
State = mock_state(Tid),
handle_info_check({'$couch_event', ?DBNAME, updated}, State),
?assert(meck:validate(?MOD)),
?assert(meck:called(?MOD, db_found, [?DBNAME, zig]))
end).
t_handle_info_other_event() ->
?_test(begin
State = mock_state(),
handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State),
?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])),
?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])),
?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse]))
end).
t_handle_info_created_other_db() ->
?_test(begin
State = mock_state(),
handle_info_check({'$couch_event', <<"otherdb">>, created}, State),
?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig]))
end).
t_handle_info_scanner_exit_normal() ->
?_test(begin
Res = handle_info({'EXIT', spid, normal}, mock_state()),
?assertMatch({noreply, _}, Res),
{noreply, RState} = Res,
?assertEqual(nil, RState#state.scanner)
end).
t_handle_info_scanner_crashed() ->
?_test(begin
Res = handle_info({'EXIT', spid, oops}, mock_state()),
?assertMatch({stop, {scanner_died, oops}, _State}, Res)
end).
t_handle_info_event_server_exited() ->
?_test(begin
Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
?assertMatch({stop, {couch_event_server_died, reason}, _}, Res)
end).
t_handle_info_unknown_pid_exited() ->
?_test(begin
State0 = mock_state(),
Res0 = handle_info({'EXIT', somepid, normal}, State0),
?assertMatch({noreply, State0}, Res0),
State1 = mock_state(),
Res1 = handle_info({'EXIT', somepid, oops}, State1),
?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1)
end).
t_handle_info_change_feed_exited() ->
?_test(begin
Tid0 = mock_ets(),
State0 = mock_state(Tid0, cpid),
Res0 = handle_info({'EXIT', cpid, normal}, State0),
?assertMatch({noreply, _}, Res0),
{noreply, RState0} = Res0,
?assertEqual([], RState0#state.pids),
ets:delete(Tid0),
Tid1 = mock_ets(),
State1 = mock_state(Tid1, cpid),
Res1 = handle_info({'EXIT', cpid, oops}, State1),
?assertMatch({noreply, _}, Res1),
{noreply, RState1} = Res1,
?assertEqual([], RState1#state.pids),
ets:delete(Tid1)
end).
t_handle_info_change_feed_exited_and_need_rescan() ->
?_test(begin
Tid = mock_ets(),
true = ets:insert(Tid, {?DBNAME, 1, true}),
State = mock_state(Tid, cpid),
Res = handle_info({'EXIT', cpid, normal}, State),
?assertMatch({noreply, _}, Res),
{noreply, RState} = Res,
% rescan flag should have been reset to false
?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
% a mock change feed process should be running
[{?DBNAME, Pid}] = RState#state.pids,
?assert(is_pid(Pid)),
ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
?assertEqual({self(), ?DBNAME}, ChArgs),
ets:delete(Tid)
end).
t_spawn_changes_reader() ->
?_test(begin
Pid = start_changes_reader(?DBNAME, 3),
?assert(erlang:is_process_alive(Pid)),
ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
?assertEqual({self(), ?DBNAME}, ChArgs),
?assert(meck:validate(couch_db)),
?assert(meck:validate(couch_changes)),
?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])),
?assert(meck:called(couch_changes, handle_db_changes, [
#changes_args{
include_docs = true,
since = 3,
feed = "normal",
timeout = infinity
}, {json_req, null}, db]))
end).
t_changes_reader_cb_change() ->
?_test(begin
{ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
Change = change_row(<<"blah">>),
ChArg = {change, Change, ignore},
{Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])),
unlink(Pid),
exit(Pid, kill)
end).
t_changes_reader_cb_stop() ->
?_test(begin
{ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
ChArg = {stop, 11},
{Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
% We checkpoint on stop, check if checkpointed at correct sequence
#state{tid = Tid} = sys:get_state(Pid),
?assertEqual([{?DBNAME, 11, false}], ets:tab2list(Tid)),
unlink(Pid),
exit(Pid, kill)
end).
t_changes_reader_cb_other() ->
?_assertEqual(acc, changes_reader_cb(other, chtype, acc)).
t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
?_test(begin
Tid = mock_ets(),
State = mock_state(Tid),
RState = resume_scan(?DBNAME, State),
% Check if inserted checkpoint entry in ets starting at 0
?assertEqual([{?DBNAME, 0, false}], ets:tab2list(Tid)),
% Check if called db_found callback
?assert(meck:called(?MOD, db_found, [?DBNAME, zig])),
% Check if started a change reader
[{?DBNAME, Pid}] = RState#state.pids,
ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
?assertEqual({self(), ?DBNAME}, ChArgs),
?assert(meck:called(couch_changes, handle_db_changes, [
#changes_args{
include_docs = true,
since = 0,
feed = "normal",
timeout = infinity
}, {json_req, null}, db])),
ets:delete(Tid)
end).
t_handle_call_resume_scan_chfeed_no_ets_entry() ->
?_test(begin
Tid = mock_ets(),
Pid = start_changes_reader(?DBNAME, 0),
State = mock_state(Tid, Pid),
resume_scan(?DBNAME, State),
% Check ets checkpoint is set to 0 and rescan = true
?assertEqual([{?DBNAME, 0, true}], ets:tab2list(Tid)),
ets:delete(Tid),
kill_mock_changes_reader_and_get_its_args(Pid)
end).
t_handle_call_resume_scan_chfeed_ets_entry() ->
?_test(begin
Tid = mock_ets(),
true = ets:insert(Tid, [{?DBNAME, 2, false}]),
Pid = start_changes_reader(?DBNAME, 1),
State = mock_state(Tid, Pid),
resume_scan(?DBNAME, State),
% Check ets checkpoint is set to same endseq but rescan = true
?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
ets:delete(Tid),
kill_mock_changes_reader_and_get_its_args(Pid)
end).
t_handle_call_resume_scan_no_chfeed_ets_entry() ->
?_test(begin
Tid = mock_ets(),
true = ets:insert(Tid, [{?DBNAME, 1, true}]),
State = mock_state(Tid),
RState = resume_scan(?DBNAME, State),
% Check if reset rescan to false but kept same endseq
?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
% Check if started a change reader
[{?DBNAME, Pid}] = RState#state.pids,
ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
?assertEqual({self(), ?DBNAME}, ChArgs),
?assert(meck:called(couch_changes, handle_db_changes, [
#changes_args{
include_docs = true,
since = 1,
feed = "normal",
timeout = infinity
}, {json_req, null}, db])),
ets:delete(Tid)
end).
t_start_link() ->
?_test(begin
{ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
?assert(is_pid(Pid)),
?assertMatch(#state{
mod = ?MOD,
suffix = ?SUFFIX,
ctx = nil,
pids = [],
skip_ddocs = false
}, sys:get_state(Pid)),
unlink(Pid),
exit(Pid, kill),
?assert(meck:called(couch_event, register_all, [Pid]))
end).
t_start_link_no_ddocs() ->
?_test(begin
{ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
?assert(is_pid(Pid)),
?assertMatch(#state{
mod = ?MOD,
suffix = ?SUFFIX,
ctx = nil,
pids = [],
skip_ddocs = true
}, sys:get_state(Pid)),
unlink(Pid),
exit(Pid, kill)
end).
t_misc_gen_server_callbacks() ->
?_test(begin
?assertEqual(ok, terminate(reason, state)),
?assertEqual({ok, state}, code_change(old, state, extra))
end).
scan_dbs_test_() ->
{
foreach,
fun() -> test_util:start_couch([mem3, fabric]) end,
fun(Ctx) -> test_util:stop_couch(Ctx) end,
[
t_find_shard(),
t_shard_not_found(),
t_pass_local(),
t_fail_local()
]
}.
t_find_shard() ->
?_test(begin
DbName = ?tempdb(),
ok = fabric:create_db(DbName, [?CTX]),
?assertEqual(2, length(local_shards(DbName))),
fabric:delete_db(DbName, [?CTX])
end).
t_shard_not_found() ->
?_test(begin
?assertEqual([], local_shards(?tempdb()))
end).
t_pass_local() ->
?_test(begin
LocalDb = ?tempdb(),
{ok, Db} = couch_db:create(LocalDb, [?CTX]),
ok = couch_db:close(Db),
scan_local_db(self(), LocalDb),
receive
{'$gen_cast', Msg} ->
?assertEqual(Msg, {resume_scan, LocalDb})
after 0 ->
?assert(false)
end
end).
t_fail_local() ->
?_test(begin
LocalDb = ?tempdb(),
{ok, Db} = couch_db:create(LocalDb, [?CTX]),
ok = couch_db:close(Db),
scan_local_db(self(), <<"some_other_db">>),
receive
{'$gen_cast', Msg} ->
?assertNotEqual(Msg, {resume_scan, LocalDb})
after 0 ->
?assert(true)
end
end).
% Test helper functions
mock_logs() ->
meck:expect(couch_log, error, 2, ok),
meck:expect(couch_log, notice, 2, ok),
meck:expect(couch_log, info, 2, ok),
meck:expect(couch_log, debug, 2, ok).
mock_callback_mod() ->
meck:new(?MOD, [non_strict]),
meck:expect(?MOD, db_created, fun(_DbName, Ctx) -> Ctx end),
meck:expect(?MOD, db_deleted, fun(_DbName, Ctx) -> Ctx end),
meck:expect(?MOD, db_found, fun(_DbName, Ctx) -> Ctx end),
meck:expect(?MOD, db_change, fun(_DbName, _Change, Ctx) -> Ctx end).
mock_changes_reader_loop({_CbFun, {Server, DbName}}) ->
receive
die ->
exit({Server, DbName})
end.
kill_mock_changes_reader_and_get_its_args(Pid) ->
Ref = monitor(process, Pid),
unlink(Pid),
Pid ! die,
receive
{'DOWN', Ref, _, Pid, {Server, DbName}} ->
{Server, DbName}
after 1000 ->
erlang:error(spawn_change_reader_timeout)
end.
mock_changes_reader() ->
meck:expect(couch_changes, handle_db_changes,
fun(_ChArgs, _Req, db) ->
fun mock_changes_reader_loop/1
end).
mock_ets() ->
ets:new(multidb_test_ets, [set, public]).
mock_state() ->
#state{
mod = ?MOD,
ctx = zig,
suffix = ?SUFFIX,
event_server = esref,
scanner = spid,
pids = []}.
mock_state(Ets) ->
State = mock_state(),
State#state{tid = Ets}.
mock_state(Ets, Pid) ->
State = mock_state(Ets),
State#state{pids = [{?DBNAME, Pid}]}.
change_row(Id) when is_binary(Id) ->
{[
{<<"seq">>, 1},
{<<"id">>, Id},
{<<"changes">>, [{[{<<"rev">>, <<"1-f00">>}]}]},
{doc, {[{<<"_id">>, Id}, {<<"_rev">>, <<"1-f00">>}]}}
]}.
handle_call_ok(Msg, State) ->
?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
handle_info_check(Msg, State) ->
?assertMatch({noreply, _}, handle_info(Msg, State)).
-endif.