blob: dd13b45b25548d97b2fba562e6c41d8be2238012 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_views_active_tasks_test).
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_views/include/couch_views.hrl").
-include_lib("fabric/test/fabric2_test.hrl").
-define(MAP_FUN1, <<"map_fun1">>).
-define(MAP_FUN2, <<"map_fun2">>).
-define(INDEX_FOO, <<"_design/foo">>).
-define(INDEX_BAR, <<"_design/bar">>).
-define(TOTAL_DOCS, 1000).
setup() ->
Ctx = test_util:start_couch([
fabric,
couch_jobs,
couch_js,
couch_views
]),
Ctx.
cleanup(Ctx) ->
test_util:stop_couch(Ctx).
foreach_setup() ->
{ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
DDoc = create_ddoc(?INDEX_FOO, ?MAP_FUN1),
Docs = make_docs(?TOTAL_DOCS),
fabric2_db:update_docs(Db, [DDoc | Docs]),
meck:new(couch_views_batch, [passthrough]),
meck:expect(couch_views_batch, success, fun(_, _) -> 100 end),
{Db, DDoc}.
foreach_teardown({Db, _}) ->
meck:unload(),
fabric2_fdb:transactional(Db, fun(TxDb) ->
couch_views:cleanup_indices(TxDb, [])
end),
ok = fabric2_db:delete(fabric2_db:name(Db), []).
active_tasks_test_() ->
{
"Active Tasks test",
{
setup,
fun setup/0,
fun cleanup/1,
{
foreach,
fun foreach_setup/0,
fun foreach_teardown/1,
[
?TDEF_FE(verify_basic_active_tasks),
?TDEF_FE(verify_muliple_active_tasks)
]
}
}
}.
verify_basic_active_tasks({Db, DDoc}) ->
pause_indexer_for_changes(self()),
couch_views:build_indices(Db, [DDoc]),
{IndexerPid, {changes_done, ChangesDone}} = wait_to_reach_changes(10000),
[ActiveTask] = fabric2_active_tasks:get_active_tasks(),
ChangesDone1 = maps:get(<<"changes_done">>, ActiveTask),
Type = maps:get(<<"type">>, ActiveTask),
DbName = maps:get(<<"database">>, ActiveTask),
DDocId = maps:get(<<"design_document">>, ActiveTask),
Node = maps:get(<<"node">>, ActiveTask),
PidBin = maps:get(<<"pid">>, ActiveTask),
Pid = erlang:list_to_pid(binary_to_list(PidBin)),
?assertEqual(<<"indexer">>, Type),
?assertEqual(fabric2_db:name(Db), DbName),
?assertEqual(?INDEX_FOO, DDocId),
?assertEqual(atom_to_binary(node(), utf8), Node),
?assert(is_pid(Pid)),
?assert(is_process_alive(Pid)),
?assertEqual(IndexerPid, Pid),
IndexerPid ! continue,
% we assume the indexer has run for a bit so it has to > 0
?assert(ChangesDone1 > 0),
?assert(ChangesDone1 =< ChangesDone),
?assertEqual(ChangesDone, ?TOTAL_DOCS).
verify_muliple_active_tasks({Db, DDoc}) ->
DDoc2 = create_ddoc(?INDEX_BAR, ?MAP_FUN2),
fabric2_db:update_doc(Db, DDoc2, []),
pause_indexer_for_changes(self()),
couch_views:build_indices(Db, [DDoc, DDoc2]),
{IndexerPid, {changes_done, ChangesDone}} = wait_to_reach_changes(10000),
{IndexerPid2, {changes_done, ChangesDone2}} = wait_to_reach_changes(10000),
ActiveTasks = fabric2_active_tasks:get_active_tasks(),
?assertEqual(length(ActiveTasks), 2),
IndexerPid ! continue,
IndexerPid2 ! continue,
?assertEqual(ChangesDone, ?TOTAL_DOCS),
?assertEqual(ChangesDone2, ?TOTAL_DOCS).
create_ddoc(DDocId, IndexName) ->
couch_doc:from_json_obj(
{[
{<<"_id">>, DDocId},
{<<"views">>,
{[
{IndexName,
{[
{<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>}
]}}
]}}
]}
).
doc(Id, Val) ->
couch_doc:from_json_obj(
{[
{<<"_id">>, list_to_binary(integer_to_list(Id))},
{<<"val">>, Val}
]}
).
make_docs(Count) ->
[doc(I, Count) || I <- lists:seq(1, Count)].
pause_indexer_for_changes(ParentPid) ->
meck:new(couch_views_util, [passthrough]),
meck:expect(couch_views_util, active_tasks_info, fun(
ChangesDone,
DbName,
DDocId,
LastSeq,
DBSeq
) ->
case ChangesDone of
?TOTAL_DOCS ->
ParentPid ! {self(), {changes_done, ChangesDone}},
receive
continue -> ok
end;
_ ->
ok
end,
meck:passthrough([
ChangesDone,
DbName,
DDocId,
LastSeq,
DBSeq
])
end).
wait_to_reach_changes(Timeout) ->
receive
{Pid, {changes_done, ChangesDone}} when is_pid(Pid) ->
{Pid, {changes_done, ChangesDone}}
after Timeout ->
error(timeout_in_pause_indexer_for_changes)
end.