blob: 78675a03fa46522e994128a465e824747835f976 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric2_index).
-behaviour(gen_server).
-export([
register_index/1,
db_updated/1,
cleanup/1,
start_link/0
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("kernel/include/logger.hrl").
-callback build_indices(Db :: map(), DDocs :: list(#doc{})) ->
[{ok, JobId :: binary()} | {error, any()}].
-callback cleanup_indices(Db :: map(), DDocs :: list(#doc{})) ->
[ok | {error, any()}].
-define(SHARDS, 32).
-define(DEFAULT_DELAY_MSEC, 60000).
-define(DEFAULT_RESOLUTION_MSEC, 10000).
register_index(Mod) when is_atom(Mod) ->
Indices = lists:usort([Mod | registrations()]),
application:set_env(fabric, indices, Indices).
db_updated(DbName) when is_binary(DbName) ->
Table = table(erlang:phash2(DbName) rem ?SHARDS),
ets:insert_new(Table, {DbName, now_msec()}).
cleanup(Db) ->
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
DDocs = fabric2_db:get_design_docs(Db),
cleanup_indices(TxDb, DDocs)
end)
catch
error:database_does_not_exist ->
ok;
Tag:Reason:Stack ->
DbName = fabric2_db:name(Db),
?LOG_ERROR(#{
what => index_cleanup_failure,
db => DbName,
tag => Tag,
details => Reason,
stacktrace => Stack
}),
LogMsg = "~p failed to cleanup indices for `~s` ~p:~p ~p",
couch_log:error(LogMsg, [?MODULE, DbName, Tag, Reason, Stack])
end.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_) ->
lists:foreach(
fun(T) ->
spawn_link(fun() -> process_loop(T) end)
end,
create_tables()
),
{ok, nil}.
terminate(_M, _St) ->
ok.
handle_call(Msg, _From, St) ->
{stop, {bad_call, Msg}, {bad_call, Msg}, St}.
handle_cast(Msg, St) ->
{stop, {bad_cast, Msg}, St}.
handle_info(Msg, St) ->
{stop, {bad_info, Msg}, St}.
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
create_tables() ->
Opts = [
named_table,
public,
{write_concurrency, true},
{read_concurrency, true}
],
Tables = [table(N) || N <- lists:seq(0, ?SHARDS - 1)],
[ets:new(T, Opts) || T <- Tables].
table(Id) when is_integer(Id), Id >= 0 andalso Id < ?SHARDS ->
list_to_atom("fabric2_index_" ++ integer_to_list(Id)).
process_loop(Table) ->
Now = now_msec(),
Delay = delay_msec(),
Since = Now - Delay,
case is_enabled() of
true ->
process_updates(Table, Since),
clean_stale(Table, Since);
false ->
clean_stale(Table, Now)
end,
Resolution = resolution_msec(),
Jitter = rand:uniform(1 + Resolution div 2),
timer:sleep(Resolution + Jitter),
process_loop(Table).
clean_stale(Table, Since) ->
Head = {'_', '$1'},
Guard = {'<', '$1', Since},
% Monotonic is not strictly monotonic, so we process items using `=<` but
% clean with `<` in case there was an update with the same timestamp after
% we started processing already at that timestamp.
ets:select_delete(Table, [{Head, [Guard], [true]}]).
process_updates(Table, Since) ->
Head = {'$1', '$2'},
Guard = {'=<', '$2', Since},
case ets:select(Table, [{Head, [Guard], ['$1']}], 25) of
'$end_of_table' -> ok;
{Match, Cont} -> process_updates_iter(Match, Cont)
end.
process_updates_iter([], Cont) ->
case ets:select(Cont) of
'$end_of_table' -> ok;
{Match, Cont1} -> process_updates_iter(Match, Cont1)
end;
process_updates_iter([Db | Rest], Cont) ->
try
process_db(Db)
catch
error:database_does_not_exist ->
ok;
Tag:Reason:Stack ->
?LOG_ERROR(#{
what => index_build_failure,
db => Db,
tag => Tag,
details => Reason,
stacktrace => Stack
}),
LogMsg = "~p failed to build indices for `~s` ~p:~p ~p",
couch_log:error(LogMsg, [?MODULE, Db, Tag, Reason, Stack])
end,
process_updates_iter(Rest, Cont).
process_db(DbName) when is_binary(DbName) ->
{ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
fabric2_fdb:transactional(Db, fun(TxDb) ->
DDocs1 = fabric2_db:get_design_docs(TxDb),
DDocs2 = lists:filter(fun should_update/1, DDocs1),
DDocs3 = shuffle(DDocs2),
build_indices(TxDb, DDocs3),
case auto_cleanup() of
true -> cleanup_indices(TxDb, DDocs1);
false -> ok
end
end).
build_indices(_TxDb, []) ->
[];
build_indices(TxDb, DDocs) ->
lists:flatmap(
fun(Mod) ->
Mod:build_indices(TxDb, DDocs)
end,
registrations()
).
cleanup_indices(TxDb, DDocs) ->
lists:foreach(
fun(Mod) ->
Mod:cleanup_indices(TxDb, DDocs)
end,
registrations()
).
registrations() ->
application:get_env(fabric, indices, []).
should_update(#doc{body = {Props}}) ->
couch_util:get_value(<<"autoupdate">>, Props, true).
shuffle(Items) ->
Tagged = [{rand:uniform(), I} || I <- Items],
Sorted = lists:sort(Tagged),
[I || {_T, I} <- Sorted].
now_msec() ->
erlang:monotonic_time(millisecond).
is_enabled() ->
config:get_boolean("fabric", "index_updater_enabled", true).
delay_msec() ->
config:get_integer(
"fabric",
"index_updater_delay_msec",
?DEFAULT_DELAY_MSEC
).
resolution_msec() ->
config:get_integer(
"fabric",
"index_updater_resolution_msec",
?DEFAULT_RESOLUTION_MSEC
).
auto_cleanup() ->
config:get_boolean("fabric", "index_updater_remove_old_indices", false).