blob: d469ed41a952fb248d1f29d959c8e7c28410a606 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_jobs).
-export([
% Job creation
add/4,
add/5,
remove/3,
get_job_data/3,
get_job_state/3,
% Job processing
accept/1,
accept/2,
finish/2,
finish/3,
resubmit/2,
resubmit/3,
is_resubmitted/1,
update/2,
update/3,
% Subscriptions
subscribe/2,
subscribe/3,
unsubscribe/1,
wait/2,
wait/3,
% Type timeouts
set_type_timeout/2,
clear_type_timeout/1,
get_type_timeout/1
]).
-include("couch_jobs.hrl").
-define(MIN_ACCEPT_WAIT_MSEC, 100).
%% Job Creation API
-spec add(jtx(), job_type(), job_id(), job_data()) -> ok | {error, any()}.
add(Tx, Type, JobId, JobData) ->
add(Tx, Type, JobId, JobData, 0).
-spec add(jtx(), job_type(), job_id(), job_data(), scheduled_time()) ->
ok | {error, any()}.
add(Tx, Type, JobId, JobData, ScheduledTime) when is_binary(JobId),
is_map(JobData), is_integer(ScheduledTime) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
case couch_jobs_fdb:add(JTx, Type, JobId, JobData, ScheduledTime) of
{ok, _, _, _} -> ok;
{error, Error} -> {error, Error}
end
end).
-spec remove(jtx(), job_type(), job_id()) -> ok | {error, any()}.
remove(Tx, Type, JobId) when is_binary(JobId) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
couch_jobs_fdb:remove(JTx, job(Type, JobId))
end).
-spec get_job_data(jtx(), job_type(), job_id()) -> {ok, job_data()} | {error,
any()}.
get_job_data(Tx, Type, JobId) when is_binary(JobId) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
case couch_jobs_fdb:get_job_state_and_data(JTx, job(Type, JobId)) of
{ok, _Seq, _State, Data} ->
{ok, couch_jobs_fdb:decode_data(Data)};
{error, Error} ->
{error, Error}
end
end).
-spec get_job_state(jtx(), job_type(), job_id()) -> {ok, job_state()} | {error,
any()}.
get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
case couch_jobs_fdb:get_job_state_and_data(JTx, job(Type, JobId)) of
{ok, _Seq, State, _Data} ->
{ok, State};
{error, Error} ->
{error, Error}
end
end).
%% Job processor API
-spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}.
accept(Type) ->
accept(Type, #{}).
-spec accept(job_type(), job_accept_opts()) -> {ok, job()} | {error, any()}.
accept(Type, #{} = Opts) ->
NoSched = maps:get(no_schedule, Opts, false),
MaxSchedTimeDefault = case NoSched of
true -> 0;
false -> ?UNDEFINED_MAX_SCHEDULED_TIME
end,
MaxSchedTime = maps:get(max_sched_time, Opts, MaxSchedTimeDefault),
Timeout = maps:get(timeout, Opts, infinity),
case NoSched andalso MaxSchedTime =/= 0 of
true ->
{error, no_schedule_require_0_max_sched_time};
false ->
accept_loop(Type, NoSched, MaxSchedTime, Timeout)
end.
-spec finish(jtx(), job()) -> ok | {error, any()}.
finish(Tx, Job) ->
finish(Tx, Job, undefined).
-spec finish(jtx(), job(), job_data()) -> ok | {error, any()}.
finish(Tx, #{jlock := <<_/binary>>} = Job, JobData) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
couch_jobs_fdb:finish(JTx, Job, JobData)
end).
-spec resubmit(jtx(), job()) -> {ok, job()} | {error, any()}.
resubmit(Tx, Job) ->
resubmit(Tx, Job, ?UNDEFINED_MAX_SCHEDULED_TIME).
-spec resubmit(jtx(), job(), scheduled_time()) -> {ok, job()} | {error, any()}.
resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
couch_jobs_fdb:resubmit(JTx, Job, SchedTime)
end).
-spec is_resubmitted(job()) -> true | false.
is_resubmitted(#{job := true} = Job) ->
maps:get(resubmit, Job, false).
-spec update(jtx(), job()) -> {ok, job()} | {error, any()}.
update(Tx, Job) ->
update(Tx, Job, undefined).
-spec update(jtx(), job(), job_data()) -> {ok, job()} | {error, any()}.
update(Tx, #{jlock := <<_/binary>>} = Job, JobData) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
couch_jobs_fdb:update(JTx, Job, JobData)
end).
%% Subscription API
% Receive events as messages. Wait for them using `wait/2,3`
% functions.
%
-spec subscribe(job_type(), job_id()) -> {ok, job_subscription(), job_state(),
job_data()} | {ok, finished, job_data()} | {error, any()}.
subscribe(Type, JobId) ->
subscribe(undefined, Type, JobId).
-spec subscribe(jtx(), job_type(), job_id()) -> {ok, job_subscription(),
job_state(), job_data()} | {ok, finished, job_data()} | {error, any()}.
subscribe(Tx, Type, JobId) ->
StateData = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
Job = #{job => true, type => Type, id => JobId},
couch_jobs_fdb:get_job_state_and_data(JTx, Job)
end),
case StateData of
{ok, _Seq, finished, Data} ->
{ok, finished, couch_jobs_fdb:decode_data(Data)};
{ok, Seq, State, Data} ->
case couch_jobs_notifier:subscribe(Type, JobId, State, Seq) of
{ok, SubRef} ->
Data1 = couch_jobs_fdb:decode_data(Data),
{ok, SubRef, State, Data1};
{error, Error} ->
{error, Error}
end;
{error, Error} ->
{error, Error}
end.
% Unsubscribe from getting notifications based on a particular subscription.
% Each subscription should be followed by its own unsubscription call. However,
% subscriber processes are also monitored and auto-unsubscribed if they exit.
% If subscribing process is exiting, calling this function is optional.
%
-spec unsubscribe(job_subscription()) -> ok.
unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) ->
try
couch_jobs_notifier:unsubscribe(Server, Ref)
after
flush_notifications(Ref)
end.
% Wait to receive job state updates
%
-spec wait(job_subscription() | [job_subscription()], timeout()) ->
{job_type(), job_id(), job_state(), job_data()} | timeout.
wait({_, Ref}, Timeout) ->
receive
{?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data} ->
{Type, Id, State, couch_jobs_fdb:decode_data(Data)}
after
Timeout -> timeout
end;
wait(Subs, Timeout) when is_list(Subs) ->
{Result, ResendQ} = wait_any(Subs, Timeout, []),
lists:foreach(fun(Msg) -> self() ! Msg end, ResendQ),
Result.
-spec wait(job_subscription() | [job_subscription()], job_state(), timeout())
-> {job_type(), job_id(), job_state(), job_data()} | timeout.
wait({_, Ref} = Sub, State, Timeout) when is_atom(State) ->
receive
{?COUCH_JOBS_EVENT, Ref, Type, Id, MsgState, Data0} ->
case MsgState =:= State of
true ->
Data = couch_jobs_fdb:decode_data(Data0),
{Type, Id, State, Data};
false ->
wait(Sub, State, Timeout)
end
after
Timeout -> timeout
end;
wait(Subs, State, Timeout) when is_list(Subs),
is_atom(State) ->
{Result, ResendQ} = wait_any(Subs, State, Timeout, []),
lists:foreach(fun(Msg) -> self() ! Msg end, ResendQ),
Result.
%% Job type timeout API
% These functions manipulate the activity timeout for each job type.
-spec set_type_timeout(job_type(), timeout()) -> ok.
set_type_timeout(Type, Timeout) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
couch_jobs_fdb:set_type_timeout(JTx, Type, Timeout)
end).
-spec clear_type_timeout(job_type()) -> ok.
clear_type_timeout(Type) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
couch_jobs_fdb:clear_type_timeout(JTx, Type)
end).
-spec get_type_timeout(job_type()) -> timeout().
get_type_timeout(Type) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
couch_jobs_fdb:get_type_timeout(JTx, Type)
end).
%% Private utilities
accept_loop(Type, NoSched, MaxSchedTime, Timeout) ->
TxFun = fun(JTx) ->
couch_jobs_fdb:accept(JTx, Type, MaxSchedTime, NoSched)
end,
case couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), TxFun) of
{ok, Job, Data} ->
{ok, Job, Data};
{not_found, PendingWatch} ->
case wait_pending(PendingWatch, MaxSchedTime, Timeout) of
{error, not_found} ->
{error, not_found};
ok ->
accept_loop(Type, NoSched, MaxSchedTime, Timeout)
end
end.
job(Type, JobId) ->
#{job => true, type => Type, id => JobId}.
wait_pending(PendingWatch, _MaxSTime, 0) ->
erlfdb:cancel(PendingWatch, [flush]),
{error, not_found};
wait_pending(PendingWatch, MaxSTime, UserTimeout) ->
NowMSec = erlang:system_time(millisecond),
Timeout0 = max(?MIN_ACCEPT_WAIT_MSEC, MaxSTime * 1000 - NowMSec),
Timeout = min(limit_timeout(Timeout0), UserTimeout),
try
erlfdb:wait(PendingWatch, [{timeout, Timeout}]),
ok
catch
error:{timeout, _} ->
erlfdb:cancel(PendingWatch, [flush]),
{error, not_found}
end.
wait_any(Subs, Timeout0, ResendQ) when is_list(Subs) ->
Timeout = limit_timeout(Timeout0),
receive
{?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data0} = Msg ->
case lists:keyfind(Ref, 2, Subs) of
false ->
wait_any(Subs, Timeout, [Msg | ResendQ]);
{_, Ref} ->
Data = couch_jobs_fdb:decode_data(Data0),
{{Type, Id, State, Data}, ResendQ}
end
after
Timeout -> {timeout, ResendQ}
end.
wait_any(Subs, State, Timeout0, ResendQ) when
is_list(Subs) ->
Timeout = limit_timeout(Timeout0),
receive
{?COUCH_JOBS_EVENT, Ref, Type, Id, MsgState, Data0} = Msg ->
case lists:keyfind(Ref, 2, Subs) of
false ->
wait_any(Subs, Timeout, [Msg | ResendQ]);
{_, Ref} ->
case MsgState =:= State of
true ->
Data = couch_jobs_fdb:decode_data(Data0),
{{Type, Id, State, Data}, ResendQ};
false ->
wait_any(Subs, Timeout, ResendQ)
end
end
after
Timeout -> {timeout, ResendQ}
end.
limit_timeout(Timeout) when is_integer(Timeout), Timeout < 16#FFFFFFFF ->
Timeout;
limit_timeout(_Timeout) ->
infinity.
flush_notifications(Ref) ->
receive
{?COUCH_JOBS_EVENT, Ref, _, _, _} ->
flush_notifications(Ref)
after
0 -> ok
end.