| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_jobs). |
| |
| -export([ |
| % Job creation |
| add/4, |
| add/5, |
| remove/3, |
| get_job_data/3, |
| get_job_state/3, |
| |
| % Job processing |
| accept/1, |
| accept/2, |
| finish/2, |
| finish/3, |
| resubmit/2, |
| resubmit/3, |
| is_resubmitted/1, |
| update/2, |
| update/3, |
| |
| % Subscriptions |
| subscribe/2, |
| subscribe/3, |
| unsubscribe/1, |
| wait/2, |
| wait/3, |
| |
| % Type timeouts |
| set_type_timeout/2, |
| clear_type_timeout/1, |
| get_type_timeout/1 |
| ]). |
| |
| |
| -include("couch_jobs.hrl"). |
| |
| |
| -define(MIN_ACCEPT_WAIT_MSEC, 100). |
| |
| |
| %% Job Creation API |
| |
| -spec add(jtx(), job_type(), job_id(), job_data()) -> ok | {error, any()}. |
| add(Tx, Type, JobId, JobData) -> |
| add(Tx, Type, JobId, JobData, 0). |
| |
| |
| -spec add(jtx(), job_type(), job_id(), job_data(), scheduled_time()) -> |
| ok | {error, any()}. |
| add(Tx, Type, JobId, JobData, ScheduledTime) when is_binary(JobId), |
| is_map(JobData), is_integer(ScheduledTime) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> |
| case couch_jobs_fdb:add(JTx, Type, JobId, JobData, ScheduledTime) of |
| {ok, _, _, _} -> ok; |
| {error, Error} -> {error, Error} |
| end |
| end). |
| |
| |
| -spec remove(jtx(), job_type(), job_id()) -> ok | {error, any()}. |
| remove(Tx, Type, JobId) when is_binary(JobId) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> |
| couch_jobs_fdb:remove(JTx, job(Type, JobId)) |
| end). |
| |
| |
| -spec get_job_data(jtx(), job_type(), job_id()) -> {ok, job_data()} | {error, |
| any()}. |
| get_job_data(Tx, Type, JobId) when is_binary(JobId) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> |
| case couch_jobs_fdb:get_job_state_and_data(JTx, job(Type, JobId)) of |
| {ok, _Seq, _State, Data} -> |
| {ok, couch_jobs_fdb:decode_data(Data)}; |
| {error, Error} -> |
| {error, Error} |
| end |
| end). |
| |
| |
| -spec get_job_state(jtx(), job_type(), job_id()) -> {ok, job_state()} | {error, |
| any()}. |
| get_job_state(Tx, Type, JobId) when is_binary(JobId) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> |
| case couch_jobs_fdb:get_job_state_and_data(JTx, job(Type, JobId)) of |
| {ok, _Seq, State, _Data} -> |
| {ok, State}; |
| {error, Error} -> |
| {error, Error} |
| end |
| end). |
| |
| |
| %% Job processor API |
| |
| -spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}. |
| accept(Type) -> |
| accept(Type, #{}). |
| |
| |
| -spec accept(job_type(), job_accept_opts()) -> {ok, job()} | {error, any()}. |
| accept(Type, #{} = Opts) -> |
| NoSched = maps:get(no_schedule, Opts, false), |
| MaxSchedTimeDefault = case NoSched of |
| true -> 0; |
| false -> ?UNDEFINED_MAX_SCHEDULED_TIME |
| end, |
| MaxSchedTime = maps:get(max_sched_time, Opts, MaxSchedTimeDefault), |
| Timeout = maps:get(timeout, Opts, infinity), |
| case NoSched andalso MaxSchedTime =/= 0 of |
| true -> |
| {error, no_schedule_require_0_max_sched_time}; |
| false -> |
| accept_loop(Type, NoSched, MaxSchedTime, Timeout) |
| end. |
| |
| |
| -spec finish(jtx(), job()) -> ok | {error, any()}. |
| finish(Tx, Job) -> |
| finish(Tx, Job, undefined). |
| |
| |
| -spec finish(jtx(), job(), job_data()) -> ok | {error, any()}. |
| finish(Tx, #{jlock := <<_/binary>>} = Job, JobData) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> |
| couch_jobs_fdb:finish(JTx, Job, JobData) |
| end). |
| |
| |
| -spec resubmit(jtx(), job()) -> {ok, job()} | {error, any()}. |
| resubmit(Tx, Job) -> |
| resubmit(Tx, Job, ?UNDEFINED_MAX_SCHEDULED_TIME). |
| |
| |
| -spec resubmit(jtx(), job(), scheduled_time()) -> {ok, job()} | {error, any()}. |
| resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> |
| couch_jobs_fdb:resubmit(JTx, Job, SchedTime) |
| end). |
| |
| |
| -spec is_resubmitted(job()) -> true | false. |
| is_resubmitted(#{job := true} = Job) -> |
| maps:get(resubmit, Job, false). |
| |
| |
| -spec update(jtx(), job()) -> {ok, job()} | {error, any()}. |
| update(Tx, Job) -> |
| update(Tx, Job, undefined). |
| |
| |
| -spec update(jtx(), job(), job_data()) -> {ok, job()} | {error, any()}. |
| update(Tx, #{jlock := <<_/binary>>} = Job, JobData) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> |
| couch_jobs_fdb:update(JTx, Job, JobData) |
| end). |
| |
| |
| %% Subscription API |
| |
| % Receive events as messages. Wait for them using `wait/2,3` |
| % functions. |
| % |
| |
| -spec subscribe(job_type(), job_id()) -> {ok, job_subscription(), job_state(), |
| job_data()} | {ok, finished, job_data()} | {error, any()}. |
| subscribe(Type, JobId) -> |
| subscribe(undefined, Type, JobId). |
| |
| |
| -spec subscribe(jtx(), job_type(), job_id()) -> {ok, job_subscription(), |
| job_state(), job_data()} | {ok, finished, job_data()} | {error, any()}. |
| subscribe(Tx, Type, JobId) -> |
| StateData = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> |
| Job = #{job => true, type => Type, id => JobId}, |
| couch_jobs_fdb:get_job_state_and_data(JTx, Job) |
| end), |
| case StateData of |
| {ok, _Seq, finished, Data} -> |
| {ok, finished, couch_jobs_fdb:decode_data(Data)}; |
| {ok, Seq, State, Data} -> |
| case couch_jobs_notifier:subscribe(Type, JobId, State, Seq) of |
| {ok, SubRef} -> |
| Data1 = couch_jobs_fdb:decode_data(Data), |
| {ok, SubRef, State, Data1}; |
| {error, Error} -> |
| {error, Error} |
| end; |
| {error, Error} -> |
| {error, Error} |
| end. |
| |
| |
| % Unsubscribe from getting notifications based on a particular subscription. |
| % Each subscription should be followed by its own unsubscription call. However, |
| % subscriber processes are also monitored and auto-unsubscribed if they exit. |
| % If subscribing process is exiting, calling this function is optional. |
| % |
| -spec unsubscribe(job_subscription()) -> ok. |
| unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) -> |
| try |
| couch_jobs_notifier:unsubscribe(Server, Ref) |
| after |
| flush_notifications(Ref) |
| end. |
| |
| |
| % Wait to receive job state updates |
| % |
| -spec wait(job_subscription() | [job_subscription()], timeout()) -> |
| {job_type(), job_id(), job_state(), job_data()} | timeout. |
| wait({_, Ref}, Timeout) -> |
| receive |
| {?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data} -> |
| {Type, Id, State, couch_jobs_fdb:decode_data(Data)} |
| after |
| Timeout -> timeout |
| end; |
| |
| wait(Subs, Timeout) when is_list(Subs) -> |
| {Result, ResendQ} = wait_any(Subs, Timeout, []), |
| lists:foreach(fun(Msg) -> self() ! Msg end, ResendQ), |
| Result. |
| |
| |
| -spec wait(job_subscription() | [job_subscription()], job_state(), timeout()) |
| -> {job_type(), job_id(), job_state(), job_data()} | timeout. |
| wait({_, Ref} = Sub, State, Timeout) when is_atom(State) -> |
| receive |
| {?COUCH_JOBS_EVENT, Ref, Type, Id, MsgState, Data0} -> |
| case MsgState =:= State of |
| true -> |
| Data = couch_jobs_fdb:decode_data(Data0), |
| {Type, Id, State, Data}; |
| false -> |
| wait(Sub, State, Timeout) |
| end |
| after |
| Timeout -> timeout |
| end; |
| |
| wait(Subs, State, Timeout) when is_list(Subs), |
| is_atom(State) -> |
| {Result, ResendQ} = wait_any(Subs, State, Timeout, []), |
| lists:foreach(fun(Msg) -> self() ! Msg end, ResendQ), |
| Result. |
| |
| |
| %% Job type timeout API |
| |
| % These functions manipulate the activity timeout for each job type. |
| |
| -spec set_type_timeout(job_type(), timeout()) -> ok. |
| set_type_timeout(Type, Timeout) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> |
| couch_jobs_fdb:set_type_timeout(JTx, Type, Timeout) |
| end). |
| |
| |
| -spec clear_type_timeout(job_type()) -> ok. |
| clear_type_timeout(Type) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> |
| couch_jobs_fdb:clear_type_timeout(JTx, Type) |
| end). |
| |
| |
| -spec get_type_timeout(job_type()) -> timeout(). |
| get_type_timeout(Type) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> |
| couch_jobs_fdb:get_type_timeout(JTx, Type) |
| end). |
| |
| |
| %% Private utilities |
| |
| accept_loop(Type, NoSched, MaxSchedTime, Timeout) -> |
| TxFun = fun(JTx) -> |
| couch_jobs_fdb:accept(JTx, Type, MaxSchedTime, NoSched) |
| end, |
| case couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), TxFun) of |
| {ok, Job, Data} -> |
| {ok, Job, Data}; |
| {not_found, PendingWatch} -> |
| case wait_pending(PendingWatch, MaxSchedTime, Timeout) of |
| {error, not_found} -> |
| {error, not_found}; |
| ok -> |
| accept_loop(Type, NoSched, MaxSchedTime, Timeout) |
| end |
| end. |
| |
| |
| job(Type, JobId) -> |
| #{job => true, type => Type, id => JobId}. |
| |
| |
| wait_pending(PendingWatch, _MaxSTime, 0) -> |
| erlfdb:cancel(PendingWatch, [flush]), |
| {error, not_found}; |
| |
| wait_pending(PendingWatch, MaxSTime, UserTimeout) -> |
| NowMSec = erlang:system_time(millisecond), |
| Timeout0 = max(?MIN_ACCEPT_WAIT_MSEC, MaxSTime * 1000 - NowMSec), |
| Timeout = min(limit_timeout(Timeout0), UserTimeout), |
| try |
| erlfdb:wait(PendingWatch, [{timeout, Timeout}]), |
| ok |
| catch |
| error:{timeout, _} -> |
| erlfdb:cancel(PendingWatch, [flush]), |
| {error, not_found} |
| end. |
| |
| |
| wait_any(Subs, Timeout0, ResendQ) when is_list(Subs) -> |
| Timeout = limit_timeout(Timeout0), |
| receive |
| {?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data0} = Msg -> |
| case lists:keyfind(Ref, 2, Subs) of |
| false -> |
| wait_any(Subs, Timeout, [Msg | ResendQ]); |
| {_, Ref} -> |
| Data = couch_jobs_fdb:decode_data(Data0), |
| {{Type, Id, State, Data}, ResendQ} |
| end |
| after |
| Timeout -> {timeout, ResendQ} |
| end. |
| |
| |
| wait_any(Subs, State, Timeout0, ResendQ) when |
| is_list(Subs) -> |
| Timeout = limit_timeout(Timeout0), |
| receive |
| {?COUCH_JOBS_EVENT, Ref, Type, Id, MsgState, Data0} = Msg -> |
| case lists:keyfind(Ref, 2, Subs) of |
| false -> |
| wait_any(Subs, Timeout, [Msg | ResendQ]); |
| {_, Ref} -> |
| case MsgState =:= State of |
| true -> |
| Data = couch_jobs_fdb:decode_data(Data0), |
| {{Type, Id, State, Data}, ResendQ}; |
| false -> |
| wait_any(Subs, Timeout, ResendQ) |
| end |
| end |
| after |
| Timeout -> {timeout, ResendQ} |
| end. |
| |
| |
| limit_timeout(Timeout) when is_integer(Timeout), Timeout < 16#FFFFFFFF -> |
| Timeout; |
| |
| limit_timeout(_Timeout) -> |
| infinity. |
| |
| |
| flush_notifications(Ref) -> |
| receive |
| {?COUCH_JOBS_EVENT, Ref, _, _, _} -> |
| flush_notifications(Ref) |
| after |
| 0 -> ok |
| end. |