blob: b33d01f3536d9ce833653d04933995d27f209d2e [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(ken_server).
% gen_server boilerplate
-behaviour(gen_server).
-vsn(1).
-export([init/1, terminate/2]).
-export([handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
% Public interface
-export([start_link/0]).
-export([add/1]).
-export([remove/1]).
-export([add_all_shards/1]).
-export([set_batch_size/1]).
-export([set_delay/1]).
-export([set_limit/1]).
-export([set_prune_interval/1]).
% exports for spawn
-export([update_db_indexes/2]).
-record(job, {
name, % {DbName, GroupId} for view. {DbName, DDocId, IndexId} for search.
server, % Pid of either view group or search index
worker_pid = nil,
seq = 0,
lru = erlang:monotonic_time()
}).
-record(state, {
q = queue:new(),
dbworker = nil,
limit = 20,
delay = 5000,
batch_size = 1,
prune_interval = 60000,
pruned_last
}).
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
-ifdef(HAVE_DREYFUS).
-include_lib("dreyfus/include/dreyfus.hrl").
-endif.
-ifdef(HAVE_HASTINGS).
-include_lib("hastings/src/hastings.hrl").
-endif.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% @doc Adds a database shard to be indexed
-spec add(binary()) -> ok.
add(DbName) ->
gen_server:cast(?MODULE, {add, DbName}).
%% @doc Removes all the pending jobs for a database shard.
-spec remove(binary()) -> ok.
remove(DbName) ->
gen_server:cast(?MODULE, {remove, DbName}).
%% @doc Adds all the shards for a database to be indexed.
-spec add_all_shards(binary()) -> ok.
add_all_shards(DbName) ->
try
Shards = mem3:shards(mem3:dbname(DbName)),
lists:map(fun(Shard) ->
rexi:cast(Shard#shard.node, {ken_server, add, [Shard#shard.name]})
end, Shards)
catch error:database_does_not_exist ->
ok
end.
%% @doc Changes the configured value for a batch size.
%% Returns previous value.
-spec set_batch_size(pos_integer()) -> pos_integer().
set_batch_size(BS) when is_integer(BS), BS > 0 ->
gen_server:call(?MODULE, {set_batch_size, BS}).
%% @doc Changes the configured value for a delay between batches.
%% Returns previous value.
-spec set_delay(non_neg_integer()) -> non_neg_integer().
set_delay(Delay) when is_integer(Delay), Delay >= 0 ->
gen_server:call(?MODULE, {set_delay, Delay}).
%% @doc Changes the configured value for a limit.
%% Returns previous value.
-spec set_limit(pos_integer()) -> pos_integer().
set_limit(Limit) when is_integer(Limit), Limit > 0 ->
gen_server:call(?MODULE, {set_limit, Limit}).
%% @doc Changes the configured value for a prune interval.
%% Returns previous value.
-spec set_prune_interval(pos_integer()) -> pos_integer().
set_prune_interval(Interval) when is_integer(Interval), Interval > 1000 ->
gen_server:call(?MODULE, {set_prune_interval, Interval}).
%% gen_server callbacks
init(_) ->
erlang:send(self(), start_event_handler),
ets:new(ken_pending, [named_table]),
ets:new(ken_resubmit, [named_table]),
ets:new(ken_workers, [named_table, public, {keypos, #job.name}]),
Limit = list_to_integer(config("limit", "20")),
{ok, #state{pruned_last = erlang:monotonic_time(), limit = Limit}}.
terminate(_Reason, _State) ->
ok.
handle_call({set_batch_size, BS}, _From, #state{batch_size = Old} = State) ->
{reply, Old, State#state{batch_size = BS}, 0};
handle_call({set_delay, Delay}, _From, #state{delay = Old} = State) ->
{reply, Old, State#state{delay = Delay}, 0};
handle_call({set_limit, Limit}, _From, #state{limit = Old} = State) ->
{reply, Old, State#state{limit = Limit}, 0};
handle_call({set_prune_interval, Interval}, _From , State) ->
Old = State#state.prune_interval,
{reply, Old, State#state{prune_interval = Interval}, 0};
handle_call(Msg, From, State) ->
{stop, {unknown_call, Msg, From}, State}.
% Queues a DB to (maybe) have indexing jobs spawned.
handle_cast({add, DbName}, State) ->
case ets:insert_new(ken_pending, {DbName}) of
true ->
{noreply, State#state{q = queue:in(DbName, State#state.q)}, 0};
false ->
{noreply, State, 0}
end;
handle_cast({remove, DbName}, State) ->
Q2 = queue:filter(fun(X) -> X =/= DbName end, State#state.q),
ets:delete(ken_pending, DbName),
% Delete search index workers
ets:match_delete(ken_workers, #job{name={DbName,'_','_'}, _='_'}),
% Delete view index workers
ets:match_delete(ken_workers, #job{name={DbName,'_'}, _='_'}),
% TODO kill off active jobs for this DB as well
{noreply, State#state{q = Q2}, 0};
handle_cast({resubmit, DbName}, State) ->
ets:delete(ken_resubmit, DbName),
handle_cast({add, DbName}, State);
% st index job names have 3 elements, 3rd being 'hastings'. See job record definition.
handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} = Job}, State) ->
% hastings_index:await will trigger a hastings index update
{Pid, _} = erlang:spawn_monitor(hastings_index, await,
[GPid, Seq]),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};
% search index job names have 3 elements. See job record definition.
handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) ->
% dreyfus_index:await will trigger a search index update.
{Pid, _} = erlang:spawn_monitor(dreyfus_index, await,
[GPid, Seq]),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};
handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) ->
% couch_index:get_state/2 will trigger a view group index update.
{Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};
handle_cast(Msg, State) ->
{stop, {unknown_cast, Msg}, State}.
handle_info({gen_event_EXIT, ken_event_handler, Reason}, State) ->
couch_log:error("ken_event_handler terminated: ~w", [Reason]),
erlang:send_after(5000, self(), start_event_handler),
{ok, State, 0};
handle_info(start_event_handler, State) ->
case ken_event_handler:start_link() of
{ok, _Pid} ->
ok;
Error ->
couch_log:error("ken_event_handler init: ~w", [Error]),
erlang:send_after(5000, self(), start_event_handler)
end,
{noreply, State, 0};
handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) ->
Now = erlang:monotonic_time(),
Interval = erlang:convert_time_unit(
State#state.delay, millisecond, native),
case Now - Last > Interval of
true ->
NewState = prune_worker_table(State);
_ ->
NewState = State
end,
{noreply, maybe_start_next_queued_job(NewState), I};
handle_info({'DOWN', _, _, Pid, Reason}, #state{dbworker = {Name,Pid}} = St) ->
maybe_resubmit(Name, Reason),
{noreply, St#state{dbworker=nil}, 0};
handle_info({'DOWN', _, _, Pid, Reason}, State) ->
debrief_worker(Pid, Reason, State),
{noreply, State, 0};
handle_info(Msg, State) ->
{stop, {unknown_info, Msg}, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% private functions
maybe_start_next_queued_job(#state{dbworker = {_,_}} = State) ->
State;
maybe_start_next_queued_job(#state{q=Q} = State) ->
IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
BatchChannels = list_to_integer(config("batch_channels", "20")),
TotalChannels = IncrementalChannels + BatchChannels,
case queue:out(Q) of
{{value, DbName}, Q2} ->
case skip_job(DbName) of
true ->
% job is either being resubmitted or ignored, skip it
ets:delete(ken_pending, DbName),
maybe_start_next_queued_job(State#state{q = Q2});
false ->
case get_active_count() of A when A < TotalChannels ->
Args = [DbName, State],
{Pid, _} = spawn_monitor(?MODULE, update_db_indexes, Args),
ets:delete(ken_pending, DbName),
State#state{dbworker = {DbName,Pid}, q = Q2};
_ ->
State#state{q = queue:in_r(DbName, Q2)}
end
end;
{empty, Q} ->
State
end.
skip_job(DbName) ->
ets:member(ken_resubmit, DbName) orelse ignore_db(DbName).
ignore_db(DbName) ->
case config:get("ken.ignore", ?b2l(DbName), false) of
"true" ->
true;
_ ->
false
end.
get_active_count() ->
MatchSpec = [{#job{worker_pid='$1', _='_'}, [{is_pid, '$1'}], [true]}],
ets:select_count(ken_workers, MatchSpec).
% If any indexing job fails, resubmit requests for all indexes.
update_db_indexes(Name, State) ->
{ok, DDocs} = design_docs(Name),
RandomSorted = lists:sort([{rand:uniform(), D} || D <- DDocs]),
Resubmit = lists:foldl(fun({_, DDoc}, Acc) ->
JsonDDoc = couch_doc:from_json_obj(DDoc),
case update_ddoc_indexes(Name, JsonDDoc, State) of
ok -> Acc;
_ -> true
end
end, false, RandomSorted),
if Resubmit -> exit(resubmit); true -> ok end.
design_docs(Name) ->
try
case fabric:design_docs(mem3:dbname(Name)) of
{error, {maintenance_mode, _, _Node}} ->
{ok, []};
Else ->
Else
end
catch error:database_does_not_exist ->
{ok, []}
end.
% Returns an error if any job creation fails.
update_ddoc_indexes(Name, #doc{}=Doc, State) ->
{ok, Db} = case couch_db:open_int(Name, []) of
{ok, _} = Resp -> Resp;
Else -> exit(Else)
end,
Seq = couch_db:get_update_seq(Db),
couch_db:close(Db),
ViewUpdated = case should_update(Doc, <<"views">>) of true ->
try couch_mrview_util:ddoc_to_mrst(Name, Doc) of
{ok, MRSt} -> update_ddoc_views(Name, MRSt, Seq, State)
catch _:_ ->
ok
end;
false ->
ok
end,
SearchUpdated = search_updated(Name, Doc, Seq, State),
STUpdated = st_updated(Name, Doc, Seq, State),
case {ViewUpdated, SearchUpdated, STUpdated} of
{ok, ok, ok} -> ok;
_ -> resubmit
end.
-ifdef(HAVE_DREYFUS).
search_updated(Name, Doc, Seq, State) ->
case should_update(Doc, <<"indexes">>) of true ->
try dreyfus_index:design_doc_to_indexes(Doc) of
SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State)
catch _:_ ->
ok
end;
false ->
ok
end.
-else.
search_updated(_Name, _Doc, _Seq, _State) ->
ok.
-endif.
-ifdef(HAVE_HASTINGS).
st_updated(Name, Doc, Seq, State) ->
case should_update(Doc, <<"st_indexes">>) of true ->
try
hastings_index:design_doc_to_indexes(Doc) of
STIndexes -> update_ddoc_st_indexes(Name, STIndexes, Seq, State)
catch _:_ ->
ok
end;
false ->
ok
end.
-else.
st_updated(_Name, _Doc, _Seq, _State) ->
ok.
-endif.
should_update(#doc{body={Props}}, IndexType) ->
case couch_util:get_value(<<"autoupdate">>, Props) of
false ->
false;
{AUProps} ->
case couch_util:get_value(IndexType, AUProps) of
false ->
false;
_ ->
true
end;
_ ->
true
end.
update_ddoc_views(Name, MRSt, Seq, State) ->
Language = couch_mrview_index:get(language, MRSt),
Allowed = lists:member(Language, allowed_languages()),
Views = couch_mrview_index:get(views, MRSt),
if Allowed andalso Views =/= [] ->
{ok, Pid} = couch_index_server:get_index(couch_mrview_index, MRSt),
GroupName = couch_mrview_index:get(idx_name, MRSt),
maybe_start_job({Name, GroupName}, Pid, Seq, State);
true -> ok end.
-ifdef(HAVE_DREYFUS).
update_ddoc_search_indexes(DbName, Indexes, Seq, State) ->
if Indexes =/= [] ->
% Spawn a job for each search index in the ddoc
lists:foldl(fun(#index{name=IName, ddoc_id=DDocName}=Index, Acc) ->
case dreyfus_index_manager:get_index(DbName, Index) of
{ok, Pid} ->
case maybe_start_job({DbName, DDocName, IName}, Pid, Seq, State) of
resubmit -> resubmit;
_ -> Acc
end;
_ ->
% If any job fails, retry the db.
resubmit
end end, ok, Indexes);
true -> ok end.
-endif.
-ifdef(HAVE_HASTINGS).
update_ddoc_st_indexes(DbName, Indexes, Seq, State) ->
if Indexes =/= [] ->
% The record name in hastings is #h_idx rather than #index as it is for dreyfus
% Spawn a job for each spatial index in the ddoc
lists:foldl(fun(#h_idx{ddoc_id=DDocName}=Index, Acc) ->
case hastings_index_manager:get_index(DbName, Index) of
{ok, Pid} ->
case maybe_start_job({DbName, DDocName, hastings}, Pid, Seq, State) of
resubmit -> resubmit;
_ -> Acc
end;
_ ->
% If any job fails, retry the db.
resubmit
end end, ok, Indexes);
true -> ok end.
-endif.
should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
Threshold = list_to_integer(config("max_incremental_updates", "1000")),
IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
BatchChannels = list_to_integer(config("batch_channels", "20")),
TotalChannels = IncrementalChannels + BatchChannels,
A = get_active_count(),
#state{delay = Delay, batch_size = BS} = State,
case ets:lookup(ken_workers, Name) of
[] ->
if
A < BatchChannels ->
true;
A < TotalChannels ->
case Name of
% st_index name has three elements
{_, _, hastings} ->
{ok, CurrentSeq} = hastings_index:await(Pid, 0),
(Seq - CurrentSeq) < Threshold;
% View name has two elements.
{_,_} ->
% Since seq is 0, couch_index:get_state/2 won't
% spawn an index update.
{ok, MRSt} = couch_index:get_state(Pid, 0),
CurrentSeq = couch_mrview_index:get(update_seq, MRSt),
(Seq - CurrentSeq) < Threshold;
% Search name has three elements.
{_,_,_} ->
{ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0),
(Seq - CurrentSeq) < Threshold;
_ -> % Should never happen, but if it does, ignore.
false
end;
true ->
false
end;
[#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
Now = erlang:monotonic_time(),
DeltaT = erlang:convert_time_unit(Now - LRU, native, millisecond),
if
A < BatchChannels, (Seq - OldSeq) >= BS ->
true;
A < BatchChannels, DeltaT > Delay ->
true;
A < TotalChannels, (Seq - OldSeq) < Threshold, DeltaT > Delay ->
true;
true ->
false
end;
_ ->
false
end.
maybe_start_job(JobName, IndexPid, Seq, State) ->
Job = #job{
name = JobName,
server = IndexPid,
seq = Seq
},
case should_start_job(Job, State) of
true ->
gen_server:cast(?MODULE, {trigger_update, Job});
false ->
resubmit
end.
debrief_worker(Pid, Reason, _State) ->
case ets:match_object(ken_workers, #job{worker_pid=Pid, _='_'}) of
[#job{name = Name} = Job] ->
case Name of
{DbName,_} ->
maybe_resubmit(DbName, Reason);
{DbName,_,_} ->
maybe_resubmit(DbName, Reason)
end,
ets:insert(ken_workers, Job#job{worker_pid = nil});
[] -> % should never happen, but if it does, ignore
ok
end.
maybe_resubmit(_DbName, normal) ->
ok;
maybe_resubmit(_DbName, {database_does_not_exist, _}) ->
ok;
maybe_resubmit(_DbName, {not_found, no_db_file}) ->
ok;
maybe_resubmit(DbName, resubmit) ->
resubmit(60000, DbName);
maybe_resubmit(DbName, _) ->
resubmit(5000, DbName).
resubmit(Delay, DbName) ->
case ets:insert_new(ken_resubmit, {DbName}) of
true ->
erlang:send_after(Delay, ?MODULE, {'$gen_cast', {resubmit, DbName}});
false ->
ok
end.
prune_worker_table(State) ->
% remove all entries older than specified `delay` in milliseconds
Delay = erlang:convert_time_unit(State#state.delay, millisecond, native),
C = erlang:monotonic_time() - Delay,
%% fun(#job{worker_pid=nil, lru=A) when A < C -> true end
MatchHead = #job{worker_pid=nil, lru='$1', _='_'},
Guard = {'<', '$1', C},
ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]),
State#state{pruned_last = erlang:monotonic_time()}.
allowed_languages() ->
Config = couch_proc_manager:get_servers_from_env("COUCHDB_QUERY_SERVER_") ++
couch_proc_manager:get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_"),
Allowed = [list_to_binary(string:to_lower(Lang)) || {Lang, _Cmd} <- Config],
[<<"query">> | Allowed].
config(Key, Default) ->
config:get("ken", Key, Default).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
prune_old_entries_test() ->
{
setup,
fun() ->
ets:new(ken_workers, [named_table, public, {keypos, #job.name}])
end,
fun(_) ->
catch ets:delete(ken_workers)
end,
?_test(begin
lists:foreach(fun(Idx) ->
ets:insert(ken_workers, #job{name=Idx}),
timer:sleep(100)
end, lists:seq(1, 3)),
prune_worker_table(#state{delay=250}),
?assertEqual(
[2, 3],
lists:usort(
[N || #job{name = N} <- ets:tab2list(ken_workers)])
),
ok
end)
}.
-endif.