blob: ab53c59d1e8cadf775334af02f304cfeba9194e5 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_jobs_pending).
-export([
enqueue/4,
dequeue/4,
remove/4
]).
-include("couch_jobs.hrl").
-define(RANGE_LIMIT, 1024).
enqueue(#{jtx := true} = JTx, Type, STime, JobId) ->
#{tx := Tx, jobs_path := Jobs} = JTx,
Key = erlfdb_tuple:pack({?PENDING, Type, STime, JobId}, Jobs),
erlfdb:set(Tx, Key, <<>>),
WatchKey = erlfdb_tuple:pack({?WATCHES_PENDING, Type}, Jobs),
erlfdb:add(Tx, WatchKey, 1),
ok.
dequeue(#{jtx := true} = JTx, Type, _, true) ->
#{tx := Tx, jobs_path := Jobs} = JTx,
Prefix = erlfdb_tuple:pack({?PENDING, Type, 0}, Jobs),
case get_random_item(Tx, Prefix) of
{error, not_found} ->
{not_found, get_pending_watch(JTx, Type)};
{ok, PendingKey} ->
erlfdb:clear(Tx, PendingKey),
{JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
{ok, JobId}
end;
dequeue(#{jtx := true} = JTx, Type, MaxPriority, _) ->
#{tx := Tx, jobs_path := Jobs} = JTx,
Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
StartKeySel = erlfdb_key:first_greater_than(Prefix),
End = erlfdb_tuple:pack({MaxPriority, <<16#FF>>}, Prefix),
EndKeySel = erlfdb_key:first_greater_or_equal(End),
case clear_random_key_from_range(Tx, StartKeySel, EndKeySel) of
{error, not_found} ->
{not_found, get_pending_watch(JTx, Type)};
{ok, PendingKey} ->
{_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
{ok, JobId}
end.
remove(#{jtx := true} = JTx, Type, JobId, STime) ->
#{tx := Tx, jobs_path := Jobs} = JTx,
Key = erlfdb_tuple:pack({?PENDING, Type, STime, JobId}, Jobs),
erlfdb:clear(Tx, Key).
%% Private functions
% Pick a random item from the range without reading the keys in first. However
% the constraint it that IDs should looks like random UUIDs
get_random_item(Tx, Prefix) ->
Id = fabric2_util:uuid(),
Snapshot = erlfdb:snapshot(Tx),
% Try to be fair and switch evently between trying ids before or after the
% randomly generated one. Otherwise, trying before first, will leave a lot
% of <<"fff...">> IDs in the queue for too long and trying "after" first
% will leave a lot of <"00...">> ones waiting.
case rand:uniform() > 0.5 of
true ->
case get_after(Snapshot, Prefix, Id) of
{error, not_found} -> get_before(Snapshot, Prefix, Id);
{ok, Key} -> {ok, Key}
end;
false ->
case get_before(Snapshot, Prefix, Id) of
{error, not_found} -> get_after(Snapshot, Prefix, Id);
{ok, Key} -> {ok, Key}
end
end.
get_before(Snapshot, Prefix, Id) ->
KSel = erlfdb_key:last_less_or_equal(erlfdb_tuple:pack({Id}, Prefix)),
PrefixSize = byte_size(Prefix),
case erlfdb:wait(erlfdb:get_key(Snapshot, KSel)) of
<<Prefix:PrefixSize/binary, _/binary>> = Key -> {ok, Key};
_ -> {error, not_found}
end.
get_after(Snapshot, Prefix, Id) ->
KSel = erlfdb_key:first_greater_or_equal(erlfdb_tuple:pack({Id}, Prefix)),
PrefixSize = byte_size(Prefix),
case erlfdb:wait(erlfdb:get_key(Snapshot, KSel)) of
<<Prefix:PrefixSize/binary, _/binary>> = Key -> {ok, Key};
_ -> {error, not_found}
end.
% Pick a random key from the range snapshot. Then radomly pick a key to clear.
% Before clearing, ensure there is a read conflict on the key in in case other
% workers have picked the same key.
%
clear_random_key_from_range(Tx, Start, End) ->
Opts = [
{limit, ?RANGE_LIMIT},
{snapshot, true}
],
case erlfdb:wait(erlfdb:get_range(Tx, Start, End, Opts)) of
[] ->
{error, not_found};
[{Key, _}] ->
erlfdb:add_read_conflict_key(Tx, Key),
erlfdb:clear(Tx, Key),
{ok, Key};
[{_, _} | _] = KVs ->
Index = rand:uniform(length(KVs)),
{Key, _} = lists:nth(Index, KVs),
erlfdb:add_read_conflict_key(Tx, Key),
erlfdb:clear(Tx, Key),
{ok, Key}
end.
get_pending_watch(#{jtx := true} = JTx, Type) ->
#{tx := Tx, jobs_path := Jobs} = couch_jobs_fdb:get_jtx(JTx),
Key = erlfdb_tuple:pack({?WATCHES_PENDING, Type}, Jobs),
erlfdb:watch(Tx, Key).