blob: 72f7540f74b4dad038ec47f680b71b0d192485b3 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_jobs_tests).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("fabric/test/fabric2_test.hrl").
% Job creation API can take an undefined Tx object
% in that case it will start its own transaction
-define(TX, undefined).
couch_jobs_basic_test_() ->
{
"Test couch jobs basics",
{
setup,
fun setup_couch/0,
fun teardown_couch/1,
{
foreach,
fun setup/0,
fun teardown/1,
[
?TDEF_FE(add_remove_pending),
?TDEF_FE(add_remove_errors),
?TDEF_FE(add_with_the_same_scheduled_time),
?TDEF_FE(get_job_data_and_state),
?TDEF_FE(resubmit_as_job_creator),
?TDEF_FE(type_timeouts_and_server, 15),
?TDEF_FE(dead_notifier_restarts_jobs_server),
?TDEF_FE(bad_cast_restarts_couch_jobs_server),
?TDEF_FE(bad_call_restarts_couch_jobs_server),
?TDEF_FE(bad_info_restarts_couch_jobs_server),
?TDEF_FE(bad_cast_restarts_notifier),
?TDEF_FE(bad_call_restarts_notifier),
?TDEF_FE(bad_info_restarts_notifier),
?TDEF_FE(bad_cast_restarts_activity_monitor),
?TDEF_FE(bad_call_restarts_activity_monitor),
?TDEF_FE(bad_info_restarts_activity_monitor),
?TDEF_FE(basic_accept_and_finish),
?TDEF_FE(accept_blocking),
?TDEF_FE(job_processor_update),
?TDEF_FE(resubmit_enqueues_job),
?TDEF_FE(resubmit_finished_updates_job_data),
?TDEF_FE(resubmit_running_does_not_update_job_data),
?TDEF_FE(resubmit_custom_schedtime),
?TDEF_FE(add_pending_updates_job_data),
?TDEF_FE(add_finished_updates_job_data),
?TDEF_FE(add_running_does_not_update_job_data),
?TDEF_FE(accept_max_schedtime),
?TDEF_FE(accept_no_schedule),
?TDEF_FE(subscribe),
?TDEF_FE(remove_when_subscribed_and_pending),
?TDEF_FE(remove_when_subscribed_and_running),
?TDEF_FE(subscribe_wait_multiple),
?TDEF_FE(enqueue_inactive, 15),
?TDEF_FE(remove_running_job),
?TDEF_FE(check_get_jobs),
?TDEF_FE(use_fabric_transaction_object),
?TDEF_FE(metadata_version_bump)
]
}
}
}.
couch_jobs_batching_test_() ->
{
"Test couch jobs batching logic",
{
setup,
fun setup_couch/0,
fun teardown_couch/1,
{
foreach,
fun setup_batch/0,
fun teardown_batch/1,
[
?TDEF_FE(accept_blocking),
?TDEF_FE(resubmit_enqueues_job),
?TDEF_FE(accept_max_schedtime),
?TDEF_FE(accept_no_schedule),
?TDEF_FE(subscribe),
?TDEF_FE(remove_when_subscribed_and_pending),
?TDEF_FE(remove_when_subscribed_and_running),
?TDEF_FE(subscribe_wait_multiple),
?TDEF_FE(enqueue_inactive, 15)
]
}
}
}.
setup_couch() ->
meck:new(couch_jobs_fdb, [passthrough]),
meck:new(couch_jobs_util, [passthrough]),
% Because of a circular dependency between `couch_jobs` and `fabric` in
% `fabric2_db_expiration` module, disable db expiration so when
% `couch_jobs` is stopped the test, `fabric` app doesn't get torn down as
% well and we don't see spurious <<"db_expiration">> jobs show up in test
% results.
meck:new(fabric2_db_expiration, [passthrough]),
meck:expect(fabric2_db_expiration, handle_info, fun
(timeout, St) -> {noreply, St};
(Msg, St) -> meck:passthrough([Msg, St])
end),
test_util:start_couch([fabric]).
teardown_couch(Ctx) ->
meck:unload(),
test_util:stop_couch(Ctx).
setup() ->
application:start(fabric),
application:start(couch_jobs),
clear_jobs(),
% a complex type should work
T1 = {<<"t1">>, 1024},
% a number should work as well
T2 = 42,
T1Timeout = 2,
T2Timeout = 3,
couch_jobs:set_type_timeout(T1, T1Timeout),
couch_jobs:set_type_timeout(T2, T2Timeout),
#{
t1 => T1,
t2 => T2,
t1_timeout => T1Timeout,
j1 => <<"j1">>,
j2 => <<"j2">>,
dbname => ?tempdb()
}.
teardown(#{}) ->
application:stop(couch_jobs),
application:stop(fabric),
ok.
setup_batch() ->
Ctx = setup(),
% Simulate having too many jobs to fit in a 10Mb
meck:expect(
couch_jobs_fdb,
re_enqueue_inactive,
3,
meck:loop([
meck:raise(error, {erlfdb_error, 2101}),
meck:passthrough()
])
),
% Simulate get_inactive_since GRV timing out
meck:expect(
couch_jobs_fdb,
get_inactive_since,
4,
meck:loop([
meck:raise(error, {erlfdb_error, 1007}),
meck:passthrough()
])
),
% Simulate get_active_since transaction timing out
meck:expect(
couch_jobs_fdb,
get_active_since,
4,
meck:loop([
meck:raise(error, {erlfdb_error, 1031}),
meck:passthrough()
])
),
% Set up batching parameters to test small batches down to size 1
meck:expect(couch_jobs_util, get_non_neg_int, [
{[notifier_batch_increment, '_'], 1},
{[activity_monitor_batch_increment, '_'], 1},
{2, meck:passthrough()}
]),
meck:expect(couch_jobs_util, get_float_0_1, [
{[notifier_batch_factor, '_'], 0.0001},
{[activity_monitor_batch_factor, '_'], 0.0001},
{2, meck:passthrough()}
]),
Ctx.
teardown_batch(Ctx) ->
teardown(Ctx),
meck:reset(couch_jobs_fdb),
meck:reset(couch_jobs_util),
meck:expect(couch_jobs_fdb, re_enqueue_inactive, 3, meck:passthrough()),
meck:expect(couch_jobs_fdb, get_active_since, 4, meck:passthrough()),
meck:expect(couch_jobs_fdb, get_inactive_since, 4, meck:passthrough()),
meck:expect(couch_jobs_util, get_non_neg_int, 2, meck:passthrough()),
meck:expect(couch_jobs_util, get_float_0_1, 2, meck:passthrough()),
ok.
clear_jobs() ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
#{jobs_path := Jobs, tx := Tx} = JTx,
erlfdb:clear_range_startswith(Tx, Jobs)
end).
get_job(Type, JobId) ->
couch_jobs_fdb:get_job(Type, JobId).
add_remove_pending(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) ->
?assertEqual(ok, couch_jobs:add(?TX, T1, J1, #{})),
?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
?assertEqual(ok, couch_jobs:remove(?TX, T1, J1)),
% Data and numeric type should work as well. Also do it in a
% transaction
Data = #{<<"x">> => 42},
?assertEqual(
ok,
fabric2_fdb:transactional(fun(Tx) ->
couch_jobs:add(Tx, T2, J2, Data)
end)
),
?assertMatch(#{state := pending, data := Data}, get_job(T2, J2)),
?assertEqual(ok, couch_jobs:remove(?TX, T2, J2)).
get_job_data_and_state(#{t1 := T, j1 := J}) ->
Data = #{<<"x">> => 42},
ok = couch_jobs:add(?TX, T, J, Data),
?assertEqual({ok, Data}, couch_jobs:get_job_data(?TX, T, J)),
?assertEqual({ok, pending}, couch_jobs:get_job_state(?TX, T, J)),
?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
?assertEqual({error, not_found}, couch_jobs:get_job_data(?TX, T, J)),
?assertEqual({error, not_found}, couch_jobs:get_job_state(?TX, T, J)).
add_remove_errors(#{t1 := T, j1 := J}) ->
?assertEqual({error, not_found}, couch_jobs:remove(?TX, 999, <<"x">>)),
?assertMatch(
{error, {json_encoding_error, _}},
couch_jobs:add(
?TX,
T,
J,
#{1 => 2}
)
),
?assertEqual(
{error, no_type_timeout},
couch_jobs:add(
?TX,
<<"x">>,
J,
#{}
)
),
?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
?assertEqual(ok, couch_jobs:remove(?TX, T, J)).
add_with_the_same_scheduled_time(#{t1 := T, j1 := J}) ->
?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
fabric2_fdb:transactional(fun(Tx) ->
?assertEqual(ok, couch_jobs:add(Tx, T, J, #{})),
?assert(erlfdb:is_read_only(Tx))
end).
resubmit_as_job_creator(#{t1 := T, j1 := J}) ->
Data = #{<<"x">> => 42},
ok = couch_jobs:add(?TX, T, J, Data, 15),
% Job was pending, doesn't get resubmitted
ok = couch_jobs:add(?TX, T, J, Data, 16),
?assertMatch(#{state := pending, stime := 16}, get_job(T, J)),
{ok, Job1, Data} = couch_jobs:accept(T),
% If is running, it gets flagged to be resubmitted
ok = couch_jobs:add(?TX, T, J, Data, 17),
?assertMatch(#{state := running, stime := 17}, get_job(T, J)),
?assertEqual(true, couch_jobs:is_resubmitted(get_job(T, J))),
?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
% It should be pending according to the resubmit flag
?assertMatch(#{state := pending, stime := 17}, get_job(T, J)),
% A finished job will be re-enqueued
{ok, Job2, _} = couch_jobs:accept(T),
?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
?assertMatch(#{state := finished, stime := 17}, get_job(T, J)),
ok = couch_jobs:add(?TX, T, J, Data, 18),
?assertMatch(#{state := pending, stime := 18}, get_job(T, J)).
type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) ->
WaitForActivityMonitors = fun(N) ->
test_util:wait(fun() ->
Pids = couch_jobs_activity_monitor_sup:get_child_pids(),
case length(Pids) == N of
true -> ok;
false -> wait
end
end)
end,
WaitForNotifiers = fun(N) ->
test_util:wait(fun() ->
Pids = couch_jobs_notifier_sup:get_child_pids(),
case length(Pids) == N of
true -> ok;
false -> wait
end
end)
end,
couch_jobs_server:force_check_types(),
?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)),
WaitForActivityMonitors(2),
?assertEqual(
2,
length(couch_jobs_activity_monitor_sup:get_child_pids())
),
WaitForNotifiers(2),
?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())),
?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)),
?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)),
couch_jobs_server:force_check_types(),
WaitForActivityMonitors(3),
?assertEqual(
3,
length(couch_jobs_activity_monitor_sup:get_child_pids())
),
WaitForNotifiers(3),
?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())),
?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)),
couch_jobs_server:force_check_types(),
WaitForActivityMonitors(2),
?assertEqual(
2,
length(couch_jobs_activity_monitor_sup:get_child_pids())
),
WaitForNotifiers(2),
?assertEqual(
2,
length(couch_jobs_notifier_sup:get_child_pids())
),
?assertMatch(
{error, _},
couch_jobs_server:get_notifier_server(<<"t3">>)
),
?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>)).
dead_notifier_restarts_jobs_server(#{}) ->
couch_jobs_server:force_check_types(),
ServerPid = whereis(couch_jobs_server),
Ref = monitor(process, ServerPid),
[Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(),
exit(Notifier1, kill),
% Killing a notifier should kill the server as well
receive
{'DOWN', Ref, _, _, _} -> ok
end.
bad_cast_restarts_couch_jobs_server(#{}) ->
ServerPid1 = whereis(couch_jobs_server),
Ref1 = monitor(process, ServerPid1),
gen_server:cast(ServerPid1, bad_cast),
receive
{'DOWN', Ref1, _, _, _} -> ok
end.
bad_call_restarts_couch_jobs_server(#{}) ->
ServerPid2 = whereis(couch_jobs_server),
Ref2 = monitor(process, ServerPid2),
catch gen_server:call(ServerPid2, bad_call),
receive
{'DOWN', Ref2, _, _, _} -> ok
end.
bad_info_restarts_couch_jobs_server(#{}) ->
ServerPid3 = whereis(couch_jobs_server),
Ref3 = monitor(process, ServerPid3),
ServerPid3 ! a_random_message,
receive
{'DOWN', Ref3, _, _, _} -> ok
end.
bad_cast_restarts_notifier(#{}) ->
couch_jobs_server:force_check_types(),
[AMon1, _] = couch_jobs_notifier_sup:get_child_pids(),
Ref1 = monitor(process, AMon1),
gen_server:cast(AMon1, bad_cast),
receive
{'DOWN', Ref1, _, _, _} -> ok
end.
bad_call_restarts_notifier(#{}) ->
couch_jobs_server:force_check_types(),
[AMon2, _] = couch_jobs_notifier_sup:get_child_pids(),
Ref2 = monitor(process, AMon2),
catch gen_server:call(AMon2, bad_call),
receive
{'DOWN', Ref2, _, _, _} -> ok
end.
bad_info_restarts_notifier(#{}) ->
couch_jobs_server:force_check_types(),
[AMon3, _] = couch_jobs_notifier_sup:get_child_pids(),
Ref3 = monitor(process, AMon3),
AMon3 ! a_bad_message,
receive
{'DOWN', Ref3, _, _, _} -> ok
end.
bad_cast_restarts_activity_monitor(#{}) ->
couch_jobs_server:force_check_types(),
[AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
Ref1 = monitor(process, AMon1),
gen_server:cast(AMon1, bad_cast),
receive
{'DOWN', Ref1, _, _, _} -> ok
end.
bad_call_restarts_activity_monitor(#{}) ->
couch_jobs_server:force_check_types(),
[AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
Ref2 = monitor(process, AMon2),
catch gen_server:call(AMon2, bad_call),
receive
{'DOWN', Ref2, _, _, _} -> ok
end.
bad_info_restarts_activity_monitor(#{}) ->
couch_jobs_server:force_check_types(),
% bad info message kills activity monitor
[AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
Ref3 = monitor(process, AMon3),
AMon3 ! a_bad_message,
receive
{'DOWN', Ref3, _, _, _} -> ok
end.
basic_accept_and_finish(#{t1 := T, j1 := J}) ->
ok = couch_jobs:add(?TX, T, J, #{}),
{ok, Job, #{}} = couch_jobs:accept(T),
?assertMatch(#{state := running}, get_job(T, J)),
% check json validation for bad data in finish
?assertMatch(
{error, {json_encoding_error, _}},
fabric2_fdb:transactional(fun(Tx) ->
couch_jobs:finish(Tx, Job, #{1 => 1})
end)
),
Data = #{<<"x">> => 42},
?assertEqual(
ok,
fabric2_fdb:transactional(fun(Tx) ->
couch_jobs:finish(Tx, Job, Data)
end)
),
?assertMatch(#{state := finished, data := Data}, get_job(T, J)).
accept_blocking(#{t1 := T, j1 := J1, j2 := J2}) ->
Accept = fun() -> exit(couch_jobs:accept(T)) end,
WaitAccept = fun(Ref) ->
receive
{'DOWN', Ref, _, _, Res} -> Res
after 500 -> timeout
end
end,
{_, Ref1} = spawn_monitor(Accept),
ok = couch_jobs:add(?TX, T, J1, #{}),
?assertMatch({ok, #{id := J1}, #{}}, WaitAccept(Ref1)),
{_, Ref2} = spawn_monitor(Accept),
?assertEqual(timeout, WaitAccept(Ref2)),
ok = couch_jobs:add(?TX, T, J2, #{}),
?assertMatch({ok, #{id := J2}, #{}}, WaitAccept(Ref2)).
job_processor_update(#{t1 := T, j1 := J}) ->
ok = couch_jobs:add(?TX, T, J, #{}),
{ok, Job, #{}} = couch_jobs:accept(T),
% Use proper transactions in a few places here instead of passing in
% ?TX This is mostly to increase code coverage
?assertMatch(
{ok, #{job := true}},
fabric2_fdb:transactional(fun(Tx) ->
couch_jobs:update(Tx, Job, #{<<"x">> => 1})
end)
),
?assertMatch(
#{data := #{<<"x">> := 1}, state := running},
get_job(T, J)
),
?assertMatch(
{ok, #{job := true}},
fabric2_fdb:transactional(fun(Tx) ->
couch_jobs:update(Tx, Job)
end)
),
?assertMatch(
#{data := #{<<"x">> := 1}, state := running},
get_job(T, J)
),
?assertMatch(
{ok, #{job := true}},
fabric2_fdb:transactional(fun(Tx) ->
couch_jobs:update(Tx, Job, #{<<"x">> => 2})
end)
),
% check json validation for bad data in update
?assertMatch(
{error, {json_encoding_error, _}},
fabric2_fdb:transactional(fun(Tx) ->
couch_jobs:update(Tx, Job, #{1 => 1})
end)
),
?assertMatch(
#{data := #{<<"x">> := 2}, state := running},
get_job(T, J)
),
% Finish may update the data as well
?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"x">> => 3})),
?assertMatch(
#{data := #{<<"x">> := 3}, state := finished},
get_job(T, J)
).
resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
ok = couch_jobs:add(?TX, T, J, #{}),
{ok, Job1, #{}} = couch_jobs:accept(T),
?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6)),
?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
?assertMatch(#{state := pending, stime := 6}, get_job(T, J)),
{ok, Job2, #{}} = couch_jobs:accept(T),
?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
?assertMatch(#{state := finished}, get_job(T, J)).
resubmit_finished_updates_job_data(#{t1 := T, j1 := J}) ->
Data1 = #{<<"test">> => 1},
Data2 = #{<<"test">> => 2},
ok = couch_jobs:add(?TX, T, J, Data1),
{ok, Job1, #{}} = couch_jobs:accept(T),
?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
?assertMatch({ok, _, Data2}, couch_jobs:accept(T)).
resubmit_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
Data1 = #{<<"test">> => 1},
Data2 = #{<<"test">> => 2},
ok = couch_jobs:add(?TX, T, J, Data1),
{ok, Job1, #{}} = couch_jobs:accept(T),
?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
?assertMatch({ok, _, Data1}, couch_jobs:accept(T)).
resubmit_custom_schedtime(#{t1 := T, j1 := J}) ->
?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)),
{ok, Job, #{}} = couch_jobs:accept(T),
?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job, 9)),
?assertEqual(ok, couch_jobs:finish(?TX, Job)),
?assertMatch(#{stime := 9, state := pending}, get_job(T, J)).
add_pending_updates_job_data(#{t1 := T, j1 := J}) ->
Data1 = #{<<"test">> => 1},
Data2 = #{<<"test">> => 2},
ok = couch_jobs:add(?TX, T, J, Data1),
?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
?assertMatch({ok, _, Data2}, couch_jobs:accept(T)).
add_finished_updates_job_data(#{t1 := T, j1 := J}) ->
Data1 = #{<<"test">> => 1},
Data2 = #{<<"test">> => 2},
ok = couch_jobs:add(?TX, T, J, Data1),
{ok, Job1, #{}} = couch_jobs:accept(T),
?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
?assertMatch({ok, _, Data2}, couch_jobs:accept(T)).
add_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
Data1 = #{<<"test">> => 1},
Data2 = #{<<"test">> => 2},
ok = couch_jobs:add(?TX, T, J, Data1),
{ok, Job1, #{}} = couch_jobs:accept(T),
?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
?assertMatch({ok, _, Data1}, couch_jobs:accept(T)).
accept_max_schedtime(#{t1 := T, j1 := J1, j2 := J2}) ->
ok = couch_jobs:add(?TX, T, J1, #{}, 5000),
ok = couch_jobs:add(?TX, T, J2, #{}, 3000),
?assertEqual(
{error, not_found},
couch_jobs:accept(
T,
#{max_sched_time => 1000}
)
),
?assertMatch(
{ok, #{id := J2}, _},
couch_jobs:accept(
T,
#{max_sched_time => 3000}
)
),
?assertMatch(
{ok, #{id := J1}, _},
couch_jobs:accept(
T,
#{max_sched_time => 9000}
)
).
accept_no_schedule(#{t1 := T}) ->
JobCount = 25,
Jobs = [fabric2_util:uuid() || _ <- lists:seq(1, JobCount)],
[couch_jobs:add(?TX, T, J, #{}) || J <- Jobs],
InvalidOpts = #{no_schedule => true, max_sched_time => 1},
?assertMatch({error, _}, couch_jobs:accept(T, InvalidOpts)),
AcceptOpts = #{no_schedule => true},
Accepted = [
begin
{ok, #{id := J}, _} = couch_jobs:accept(T, AcceptOpts),
J
end
|| _ <- lists:seq(1, JobCount)
],
?assertEqual(lists:sort(Jobs), lists:sort(Accepted)).
subscribe(#{t1 := T, j1 := J}) ->
ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 1}),
?assertEqual({error, not_found}, couch_jobs:subscribe(<<"xyz">>, J)),
?assertEqual({error, not_found}, couch_jobs:subscribe(T, <<"j5">>)),
SubRes0 = couch_jobs:subscribe(T, J),
?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes0),
{ok, SubId0, pending, _} = SubRes0,
SubRes1 = couch_jobs:subscribe(T, J),
?assertEqual(SubRes0, SubRes1),
?assertEqual(ok, couch_jobs:unsubscribe(SubId0)),
SubRes = couch_jobs:subscribe(T, J),
?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes),
{ok, SubId, pending, _} = SubRes,
{ok, Job, _} = couch_jobs:accept(T),
?assertMatch(
{T, J, running, #{<<"z">> := 1}},
couch_jobs:wait(SubId, 5000)
),
% Make sure we get intermediate `running` updates
?assertMatch({ok, _}, couch_jobs:update(?TX, Job, #{<<"z">> => 2})),
?assertMatch(
{T, J, running, #{<<"z">> := 2}},
couch_jobs:wait(SubId, 5000)
),
?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"z">> => 3})),
?assertMatch(
{T, J, finished, #{<<"z">> := 3}},
couch_jobs:wait(SubId, finished, 5000)
),
?assertEqual(timeout, couch_jobs:wait(SubId, 50)),
?assertEqual(
{ok, finished, #{<<"z">> => 3}},
couch_jobs:subscribe(T, J)
),
?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
?assertEqual({error, not_found}, couch_jobs:subscribe(T, J)).
remove_when_subscribed_and_pending(#{t1 := T, j1 := J}) ->
ok = couch_jobs:add(?TX, T, J, #{<<"x">> => 1}),
{ok, SId, pending, _} = couch_jobs:subscribe(T, J),
couch_jobs:remove(?TX, T, J),
?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)),
?assertEqual(timeout, couch_jobs:wait(SId, 50)).
remove_when_subscribed_and_running(#{t1 := T, j1 := J}) ->
ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 2}),
{ok, SId, pending, _} = couch_jobs:subscribe(T, J),
{ok, #{}, _} = couch_jobs:accept(T),
?assertMatch({_, _, running, _}, couch_jobs:wait(SId, 5000)),
couch_jobs:remove(?TX, T, J),
?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)),
?assertEqual(timeout, couch_jobs:wait(SId, 50)).
subscribe_wait_multiple(#{t1 := T, j1 := J1, j2 := J2}) ->
ok = couch_jobs:add(?TX, T, J1, #{}),
ok = couch_jobs:add(?TX, T, J2, #{}),
{ok, S1, pending, #{}} = couch_jobs:subscribe(T, J1),
{ok, S2, pending, #{}} = couch_jobs:subscribe(T, J2),
Subs = [S1, S2],
% Accept one job. Only one running update is expected. PJob1 and PJob2
% do not necessarily correspond got Job1 and Job2, they could be
% accepted as Job2 and Job1 respectively.
{ok, PJob1, _} = couch_jobs:accept(T),
?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
% Accept another job. Expect another update.
{ok, PJob2, _} = couch_jobs:accept(T),
?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
?assertMatch({ok, _}, couch_jobs:update(?TX, PJob1, #{<<"q">> => 5})),
?assertMatch({ok, _}, couch_jobs:update(?TX, PJob2, #{<<"r">> => 6})),
% Each job was updated once, expect two running updates.
?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
% Finish one job. Expect one finished update only.
?assertEqual(ok, couch_jobs:finish(?TX, PJob1)),
?assertMatch(
{_, _, finished, #{<<"q">> := 5}},
couch_jobs:wait(Subs, finished, 5000)
),
?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)),
% Finish another job. However, unsubscribe should flush the
% the message and we should not get it.
?assertEqual(ok, couch_jobs:finish(?TX, PJob2)),
?assertEqual(ok, couch_jobs:unsubscribe(S1)),
?assertEqual(ok, couch_jobs:unsubscribe(S2)),
?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)).
enqueue_inactive(#{t1 := T, j1 := J, t1_timeout := Timeout}) ->
couch_jobs_server:force_check_types(),
ok = couch_jobs:add(?TX, T, J, #{<<"y">> => 1}),
{ok, Job, _} = couch_jobs:accept(T),
{ok, SubId, running, #{<<"y">> := 1}} = couch_jobs:subscribe(T, J),
Wait = 3 * Timeout * 1000,
?assertEqual(
{T, J, pending, #{<<"y">> => 1}},
couch_jobs:wait(SubId, pending, Wait)
),
?assertMatch(#{state := pending}, get_job(T, J)),
% After job was re-enqueued, old job processor can't update it anymore
?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
?assertEqual({error, halt}, couch_jobs:finish(?TX, Job)).
remove_running_job(#{t1 := T, j1 := J}) ->
ok = couch_jobs:add(?TX, T, J, #{}),
{ok, Job, _} = couch_jobs:accept(T),
?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
?assertEqual({error, not_found}, couch_jobs:remove(?TX, T, J)),
?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
?assertEqual({error, halt}, couch_jobs:finish(?TX, Job)).
check_get_jobs(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) ->
ok = couch_jobs:add(?TX, T1, J1, #{}),
ok = couch_jobs:add(?TX, T2, J2, #{}),
?assertMatch(
[
{T2, J2, pending, #{}},
{T1, J1, pending, #{}}
],
lists:sort(couch_jobs_fdb:get_jobs())
),
{ok, _, _} = couch_jobs:accept(T1),
?assertMatch(
[
{T2, J2, pending, #{}},
{T1, J1, running, #{}}
],
lists:sort(couch_jobs_fdb:get_jobs())
).
use_fabric_transaction_object(#{t1 := T1, j1 := J1, dbname := DbName}) ->
{ok, Db} = fabric2_db:create(DbName, []),
?assertEqual(ok, couch_jobs:add(Db, T1, J1, #{})),
?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
{ok, Job, _} = couch_jobs:accept(T1),
?assertEqual(
ok,
fabric2_fdb:transactional(Db, fun(Db1) ->
{ok, #{}} = couch_jobs:get_job_data(Db1, T1, J1),
Doc1 = #doc{id = <<"1">>, body = {[]}},
{ok, {_, _}} = fabric2_db:update_doc(Db1, Doc1),
Doc2 = #doc{id = <<"2">>, body = {[]}},
{ok, {_, _}} = fabric2_db:update_doc(Db1, Doc2),
couch_jobs:finish(Db1, Job, #{<<"d">> => 1})
end)
),
ok = couch_jobs:remove(#{tx => undefined}, T1, J1),
ok = fabric2_db:delete(DbName, []).
metadata_version_bump(_) ->
JTx1 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
?assertMatch(#{md_version := not_found}, JTx1),
couch_jobs_fdb:bump_metadata_version(),
ets:delete_all_objects(couch_jobs_fdb),
JTx2 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
?assertMatch(#{md_version := Bin} when is_binary(Bin), JTx2),
couch_jobs_fdb:bump_metadata_version(),
ets:delete_all_objects(couch_jobs_fdb),
JTx3 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
OldMdv = maps:get(md_version, JTx2),
NewMdv = maps:get(md_version, JTx3),
?assert(NewMdv > OldMdv).