blob: f7dde96ecfcb3e30920e4ae704a540346903017f [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_jobs_fdb).
-export([
add/5,
remove/2,
get_job_state_and_data/2,
get_jobs/2,
get_jobs/3,
accept/4,
finish/3,
resubmit/3,
resubmit/4,
update/3,
set_type_timeout/3,
clear_type_timeout/2,
get_type_timeout/2,
get_types/1,
get_activity_vs/2,
get_activity_vs_and_watch/2,
get_active_since/4,
get_inactive_since/4,
re_enqueue_inactive/3,
init_cache/0,
encode_data/1,
decode_data/1,
get_jtx/0,
get_jtx/1,
tx/2,
get_job/2,
get_jobs/0,
bump_metadata_version/0,
bump_metadata_version/1
]).
-include("couch_jobs.hrl").
-record(jv, {
seq,
jlock,
stime,
resubmit,
data
}).
-define(JOBS_ETS_KEY, jobs).
-define(MD_TIMESTAMP_ETS_KEY, md_timestamp).
-define(MD_VERSION_MAX_AGE_SEC, 10).
-define(PENDING_SEQ, 0).
% Data model
%
% (?JOBS, ?DATA, Type, JobId) = (Sequence, Lock, SchedTime, Resubmit, JobData)
% (?JOBS, ?PENDING, Type, ScheduledTime, JobId) = ""
% (?JOBS, ?WATCHES_PENDING, Type) = Counter
% (?JOBS, ?WATCHES_ACTIVITY, Type) = Sequence
% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout
% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId
%
% In the ?DATA row Sequence can have these values:
% 0 - when the job is pending
% null - when the job is finished
% Versionstamp - when the job is running
% Job creation API
add(#{jtx := true} = JTx0, Type, JobId, Data, STime) ->
#{tx := Tx} = JTx = get_jtx(JTx0),
Job = #{job => true, type => Type, id => JobId},
case get_type_timeout(JTx, Type) of
not_found ->
{error, no_type_timeout};
Int when is_integer(Int) ->
Key = job_key(JTx, Job),
case erlfdb:wait(erlfdb:get(Tx, Key)) of
<<_/binary>> ->
{ok, Job1} = resubmit(JTx, Job, STime, Data),
#{seq := Seq, state := State, data := Data1} = Job1,
{ok, State, Seq, Data1};
not_found ->
try
maybe_enqueue(JTx, Type, JobId, STime, true, Data),
{ok, pending, ?PENDING_SEQ, Data}
catch
error:{json_encoding_error, Error} ->
{error, {json_encoding_error, Error}}
end
end
end.
remove(#{jtx := true} = JTx0, #{job := true} = Job) ->
#{tx := Tx} = JTx = get_jtx(JTx0),
#{type := Type, id := JobId} = Job,
Key = job_key(JTx, Job),
case get_job_val(Tx, Key) of
#jv{stime = STime, seq = Seq} ->
couch_jobs_pending:remove(JTx, Type, JobId, STime),
clear_activity(JTx, Type, Seq),
erlfdb:clear(Tx, Key),
update_watch(JTx, Type),
ok;
not_found ->
{error, not_found}
end.
get_job_state_and_data(#{jtx := true} = JTx, #{job := true} = Job) ->
case get_job_val(get_jtx(JTx), Job) of
#jv{seq = Seq, jlock = JLock, data = Data} ->
{ok, Seq, job_state(JLock, Seq), Data};
not_found ->
{error, not_found}
end.
get_jobs(JTx, Type) ->
get_jobs(JTx, Type, fun(_) -> true end).
get_jobs(#{jtx := true} = JTx, Type, Filter) when is_function(Filter, 1) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Prefix = erlfdb_tuple:pack({?DATA, Type}, Jobs),
Opts = [{streaming_mode, want_all}],
Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
lists:foldl(
fun({K, V}, #{} = Acc) ->
{JobId} = erlfdb_tuple:unpack(K, Prefix),
case Filter(JobId) of
true ->
{Seq, JLock, _, _, Data} = erlfdb_tuple:unpack(V),
Acc#{JobId => {Seq, job_state(JLock, Seq), Data}};
false ->
Acc
end
end,
#{},
Result
).
% Job processor API
accept(#{jtx := true} = JTx0, Type, MaxSTime, NoSched) when
is_integer(MaxSTime), is_boolean(NoSched)
->
#{jtx := true, tx := Tx} = JTx = get_jtx(JTx0),
case couch_jobs_pending:dequeue(JTx, Type, MaxSTime, NoSched) of
{not_found, PendingWatch} ->
{not_found, PendingWatch};
{ok, JobId} ->
JLock = fabric2_util:uuid(),
Key = job_key(JTx, Type, JobId),
JV0 = get_job_val(Tx, Key),
#jv{jlock = null, data = Data} = JV0,
JV = JV0#jv{seq = ?UNSET_VS, jlock = JLock, resubmit = false},
set_job_val(Tx, Key, JV),
update_activity(JTx, Type, JobId, null, Data),
Job = #{
job => true,
type => Type,
id => JobId,
jlock => JLock
},
{ok, Job, decode_data(Data)}
end.
finish(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data) when
is_map(Data) orelse Data =:= undefined
->
#{tx := Tx} = JTx = get_jtx(JTx0),
#{type := Type, jlock := JLock, id := JobId} = Job,
case get_job_or_halt(Tx, job_key(JTx, Job), JLock) of
#jv{seq = Seq, stime = STime, resubmit = Resubmit, data = OldData} ->
NewData =
case Data =:= undefined of
true -> OldData;
false -> Data
end,
try maybe_enqueue(JTx, Type, JobId, STime, Resubmit, NewData) of
ok ->
clear_activity(JTx, Type, Seq),
update_watch(JTx, Type)
catch
error:{json_encoding_error, Error} ->
{error, {json_encoding_error, Error}}
end;
halt ->
{error, halt}
end.
resubmit(JTx0, Job, NewSTime) ->
resubmit(JTx0, Job, NewSTime, undefined).
resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime, NewData) ->
#{tx := Tx} = JTx = get_jtx(JTx0),
#{type := Type, id := JobId} = Job,
Key = job_key(JTx, Job),
case get_job_val(Tx, Key) of
#jv{seq = Seq, jlock = JLock, stime = OldSTime, data = Data} = JV ->
STime =
case NewSTime =:= undefined of
true -> OldSTime;
false -> NewSTime
end,
case job_state(JLock, Seq) of
finished ->
ok = maybe_enqueue(JTx, Type, JobId, STime, true, NewData),
NewData1 = update_job_data(Data, NewData),
Job1 = Job#{
seq => ?PENDING_SEQ,
state => pending,
data => NewData1
},
{ok, Job1};
pending when STime == OldSTime ->
% If pending and scheduled time doesn't change avoid generating
% un-necessary writes by removing and re-adding the jobs into the
% pending queue.
Job1 = Job#{
stime => STime,
seq => ?PENDING_SEQ,
state => pending,
data => Data
},
{ok, Job1};
pending ->
JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime, data = NewData},
set_job_val(Tx, Key, JV1),
couch_jobs_pending:remove(JTx, Type, JobId, OldSTime),
couch_jobs_pending:enqueue(JTx, Type, STime, JobId),
NewData1 = update_job_data(Data, NewData),
Job1 = Job#{
stime => STime,
seq => ?PENDING_SEQ,
state => pending,
data => NewData1
},
{ok, Job1};
running ->
JV1 = JV#jv{stime = STime, resubmit = true},
set_job_val(Tx, Key, JV1),
{ok, Job#{
resubmit => true,
stime => STime,
state => running,
seq => Seq,
data => Data
}}
end;
not_found ->
{error, not_found}
end.
update(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data0) when
is_map(Data0) orelse Data0 =:= undefined
->
#{tx := Tx} = JTx = get_jtx(JTx0),
#{jlock := JLock, type := Type, id := JobId} = Job,
Key = job_key(JTx, Job),
case get_job_or_halt(Tx, Key, JLock) of
#jv{seq = Seq, stime = STime, resubmit = Resubmit} = JV0 ->
Data =
case Data0 =:= undefined of
true -> JV0#jv.data;
false -> Data0
end,
JV = JV0#jv{seq = ?UNSET_VS, data = Data},
try set_job_val(Tx, Key, JV) of
ok ->
update_activity(JTx, Type, JobId, Seq, Data),
{ok, Job#{resubmit => Resubmit, stime => STime}}
catch
error:{json_encoding_error, Error} ->
{error, {json_encoding_error, Error}}
end;
halt ->
{error, halt}
end.
% Type and activity monitoring API
set_type_timeout(#{jtx := true} = JTx, Type, Timeout) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
Val = erlfdb_tuple:pack({Timeout}),
erlfdb:set(Tx, Key, Val).
clear_type_timeout(#{jtx := true} = JTx, Type) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
erlfdb:clear(Tx, Key).
get_type_timeout(#{jtx := true} = JTx, Type) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
case erlfdb:wait(erlfdb:get_ss(Tx, Key)) of
not_found ->
not_found;
Val ->
{Timeout} = erlfdb_tuple:unpack(Val),
Timeout
end.
get_types(#{jtx := true} = JTx) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs),
Opts = [{streaming_mode, want_all}],
Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
lists:map(
fun({K, _V}) ->
{Type} = erlfdb_tuple:unpack(K, Prefix),
Type
end,
Result
).
get_activity_vs(#{jtx := true} = JTx, Type) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
case erlfdb:wait(erlfdb:get(Tx, Key)) of
not_found ->
not_found;
Val ->
{VS} = erlfdb_tuple:unpack(Val),
VS
end.
get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
Future = erlfdb:get(Tx, Key),
Watch = erlfdb:watch(Tx, Key),
case erlfdb:wait(Future) of
not_found ->
{not_found, Watch};
Val ->
{VS} = erlfdb_tuple:unpack(Val),
{VS, Watch}
end.
get_active_since(#{jtx := true} = JTx, Type, Versionstamp, Opts) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
StartKeySel = erlfdb_key:first_greater_or_equal(StartKey),
{_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
{JobIdsData, LastSeq} = lists:mapfoldl(
fun({K, V}, _PrevSeq) ->
{Type, Seq} = erlfdb_tuple:unpack(K, Prefix),
{erlfdb_tuple:unpack(V), Seq}
end,
Versionstamp,
erlfdb:wait(Future)
),
{maps:from_list(JobIdsData), LastSeq}.
get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp, Opts) ->
#{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
{StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
EndKeySel = erlfdb_key:first_greater_than(EndKey),
Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
lists:map(
fun({_K, V}) ->
{JobId, _} = erlfdb_tuple:unpack(V),
JobId
end,
erlfdb:wait(Future)
).
re_enqueue_inactive(#{jtx := true} = JTx, Type, JobIds) when is_list(JobIds) ->
#{tx := Tx} = get_jtx(JTx),
lists:foreach(
fun(JobId) ->
case get_job_val(Tx, job_key(JTx, Type, JobId)) of
#jv{seq = Seq, stime = STime, data = Data} ->
clear_activity(JTx, Type, Seq),
maybe_enqueue(JTx, Type, JobId, STime, true, Data);
not_found ->
ok
end
end,
JobIds
),
case length(JobIds) > 0 of
true -> update_watch(JTx, Type);
false -> ok
end.
% Cache initialization API. Called from the supervisor just to create the ETS
% table. It returns `ignore` to tell supervisor it won't actually start any
% process, which is what we want here.
%
init_cache() ->
ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}],
ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts),
ignore.
% Functions to encode / decode JobData
%
encode_data(#{} = JobData) ->
try
iolist_to_binary(jiffy:encode(JobData, [force_utf8]))
catch
throw:{error, Error} ->
% legacy clause since new versions of jiffy raise error instead
error({json_encoding_error, Error});
error:Error ->
error({json_encoding_error, Error})
end.
decode_data(not_found) ->
not_found;
decode_data(#{} = JobData) ->
JobData;
decode_data(<<_/binary>> = JobData) ->
jiffy:decode(JobData, [dedupe_keys, return_maps]).
% Cached job transaction object. This object wraps a transaction, caches the
% directory lookup path, and the metadata version. The function can be used
% from inside or outside the transaction. When used from a transaction it will
% verify if the metadata was changed, and will refresh automatically.
%
get_jtx() ->
get_jtx(undefined).
get_jtx(#{tx := Tx} = _TxDb) ->
get_jtx(Tx);
get_jtx(undefined = _Tx) ->
case ets:lookup(?MODULE, ?JOBS_ETS_KEY) of
[{_, #{} = JTx}] ->
JTx;
[] ->
JTx = update_jtx_cache(init_jtx(undefined)),
JTx#{tx := undefined}
end;
get_jtx({erlfdb_transaction, _} = Tx) ->
case ets:lookup(?MODULE, ?JOBS_ETS_KEY) of
[{_, #{} = JTx}] ->
ensure_current(JTx#{tx := Tx});
[] ->
update_jtx_cache(init_jtx(Tx))
end.
% Transaction processing to be used with couch jobs' specific transaction
% contexts
%
tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) ->
fabric2_fdb:transactional(JTx, Fun).
% Debug and testing API
get_job(Type, JobId) ->
fabric2_fdb:transactional(fun(Tx) ->
JTx = init_jtx(Tx),
case get_job_val(Tx, job_key(JTx, Type, JobId)) of
#jv{seq = Seq, jlock = JLock} = JV ->
#{
job => true,
type => Type,
id => JobId,
seq => Seq,
jlock => JLock,
stime => JV#jv.stime,
resubmit => JV#jv.resubmit,
data => decode_data(JV#jv.data),
state => job_state(JLock, Seq)
};
not_found ->
not_found
end
end).
get_jobs() ->
fabric2_fdb:transactional(fun(Tx) ->
#{jobs_path := Jobs} = init_jtx(Tx),
Prefix = erlfdb_tuple:pack({?DATA}, Jobs),
Opts = [{streaming_mode, want_all}],
Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
lists:map(
fun({K, V}) ->
{Type, JobId} = erlfdb_tuple:unpack(K, Prefix),
{Seq, JLock, _, _, Data} = erlfdb_tuple:unpack(V),
JobState = job_state(JLock, Seq),
{Type, JobId, JobState, decode_data(Data)}
end,
Result
)
end).
% Call this function if the top level "couchdb" FDB directory layer
% changes.
%
bump_metadata_version() ->
fabric2_fdb:transactional(fun(Tx) ->
bump_metadata_version(Tx)
end).
bump_metadata_version(Tx) ->
erlfdb:set_versionstamped_value(Tx, ?COUCH_JOBS_MD_VERSION, <<0:112>>).
% Private helper functions
maybe_enqueue(#{jtx := true} = JTx, Type, JobId, STime, Resubmit, Data) ->
#{tx := Tx} = JTx,
Key = job_key(JTx, Type, JobId),
JV = #jv{
seq = null,
jlock = null,
stime = STime,
resubmit = false,
data = Data
},
case Resubmit of
true ->
set_job_val(Tx, Key, JV#jv{seq = ?PENDING_SEQ}),
couch_jobs_pending:enqueue(JTx, Type, STime, JobId);
false ->
set_job_val(Tx, Key, JV)
end,
ok.
job_key(#{jtx := true, jobs_path := Jobs}, Type, JobId) ->
erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs).
job_key(JTx, #{type := Type, id := JobId}) ->
job_key(JTx, Type, JobId).
get_job_val(#{jtx := true, tx := Tx} = JTx, #{job := true} = Job) ->
get_job_val(Tx, job_key(JTx, Job));
get_job_val(Tx = {erlfdb_transaction, _}, Key) ->
case erlfdb:wait(erlfdb:get(Tx, Key)) of
<<_/binary>> = Val ->
{Seq, JLock, STime, Resubmit, Data} = erlfdb_tuple:unpack(Val),
#jv{
seq = Seq,
jlock = JLock,
stime = STime,
resubmit = Resubmit,
data = Data
};
not_found ->
not_found
end.
set_job_val(Tx = {erlfdb_transaction, _}, Key, #jv{} = JV) ->
#jv{
seq = Seq,
jlock = JLock,
stime = STime,
resubmit = Resubmit,
data = Data0
} = JV,
Data =
case Data0 of
#{} -> encode_data(Data0);
<<_/binary>> -> Data0
end,
case Seq of
?UNSET_VS ->
Val = erlfdb_tuple:pack_vs({Seq, JLock, STime, Resubmit, Data}),
erlfdb:set_versionstamped_value(Tx, Key, Val);
_Other ->
Val = erlfdb_tuple:pack({Seq, JLock, STime, Resubmit, Data}),
erlfdb:set(Tx, Key, Val)
end,
ok.
get_job_or_halt(Tx, Key, JLock) ->
case get_job_val(Tx, Key) of
#jv{jlock = CurJLock} when CurJLock =/= JLock ->
halt;
#jv{} = Res ->
Res;
not_found ->
halt
end.
update_activity(#{jtx := true} = JTx, Type, JobId, Seq, Data0) ->
#{tx := Tx, jobs_path := Jobs} = JTx,
case Seq =/= null of
true -> clear_activity(JTx, Type, Seq);
false -> ok
end,
Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs),
Data =
case Data0 of
#{} -> encode_data(Data0);
<<_/binary>> -> Data0
end,
Val = erlfdb_tuple:pack({JobId, Data}),
erlfdb:set_versionstamped_key(Tx, Key, Val),
update_watch(JTx, Type).
clear_activity(#{jtx := true} = JTx, Type, Seq) ->
#{tx := Tx, jobs_path := Jobs} = JTx,
Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs),
erlfdb:clear(Tx, Key).
update_watch(#{jtx := true} = JTx, Type) ->
#{tx := Tx, jobs_path := Jobs} = JTx,
Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
Val = erlfdb_tuple:pack_vs({?UNSET_VS}),
erlfdb:set_versionstamped_value(Tx, Key, Val),
ok.
job_state(JLock, Seq) ->
case {JLock, Seq} of
{null, null} -> finished;
{JLock, _} when JLock =/= null -> running;
{null, Seq} when Seq =/= null -> pending
end.
% This a transaction context object similar to the Db = #{} one from
% fabric2_fdb. It's is used to cache the jobs path directory (to avoid extra
% lookups on every operation) and to check for metadata changes (in case
% directory changes).
%
init_jtx(undefined) ->
fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end);
init_jtx({erlfdb_transaction, _} = Tx) ->
LayerPrefix = fabric2_fdb:get_dir(Tx),
Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
% layer_prefix, md_version and tx here match db map fields in fabric2_fdb
% but we also assert that this is a job transaction using the jtx => true
% field
#{
jtx => true,
tx => Tx,
layer_prefix => LayerPrefix,
jobs_path => Jobs,
md_version => get_metadata_version(Tx)
}.
ensure_current(#{jtx := true, tx := Tx} = JTx) ->
case get(?COUCH_JOBS_CURRENT) of
Tx ->
JTx;
_ ->
JTx1 = update_current(JTx),
put(?COUCH_JOBS_CURRENT, Tx),
JTx1
end.
get_metadata_version({erlfdb_transaction, _} = Tx) ->
erlfdb:wait(erlfdb:get_ss(Tx, ?COUCH_JOBS_MD_VERSION)).
update_current(#{tx := Tx, md_version := Version} = JTx) ->
case get_md_version_age(Version) of
Age when Age =< ?MD_VERSION_MAX_AGE_SEC ->
% Looked it up not too long ago. Avoid looking it up to frequently
JTx;
_ ->
case get_metadata_version(Tx) of
Version ->
update_md_version_timestamp(Version),
JTx;
_NewVersion ->
update_jtx_cache(init_jtx(Tx))
end
end.
update_jtx_cache(#{jtx := true, md_version := Version} = JTx) ->
CachedJTx = JTx#{tx := undefined},
ets:insert(?MODULE, {?JOBS_ETS_KEY, CachedJTx}),
update_md_version_timestamp(Version),
JTx.
get_md_version_age(Version) ->
Timestamp =
case ets:lookup(?MODULE, ?MD_TIMESTAMP_ETS_KEY) of
[{_, Version, Ts}] -> Ts;
_ -> 0
end,
erlang:system_time(second) - Timestamp.
update_md_version_timestamp(Version) ->
Ts = erlang:system_time(second),
ets:insert(?MODULE, {?MD_TIMESTAMP_ETS_KEY, Version, Ts}).
update_job_data(Data, undefined) ->
Data;
update_job_data(_Data, NewData) ->
NewData.