blob: 727c8ddb31a124d4a9c1bcbce71bc6094a1246dc [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_index_indexer).
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {index,
dbname,
threshold,
refresh_interval,
db_updates=0,
tref=nil,
notifier=nil,
locks}).
start_link(Index, DbName) ->
gen_server:start_link(?MODULE, {Index, DbName}, []).
init({Index, DbName}) ->
process_flag(trap_exit, true),
%% register to config events
Self = self(),
ok = couch_config:register(fun
("couch_index", "threshold") ->
gen_server:cast(Self, config_threshold);
("couch_index", "refresh_interval") ->
gen_server:cast(Self, config_refresh)
end),
%% get defaults
Threshold = get_db_threshold(),
Refresh = get_refresh_interval(),
%% delay background index indexing
self() ! start_indexing,
{ok, #state{index=Index,
dbname=DbName,
threshold=Threshold,
refresh_interval=Refresh,
locks=dict:new()}}.
handle_call({acquire, Pid}, _From, #state{locks=Locks}=State) ->
NLocks = case dict:find(Pid, Locks) of
error ->
dict:store(Pid, {erlang:monitor(process, Pid), 1}, Locks);
{ok, {MRef, Refc}} ->
dict:store(Pid, {MRef, Refc+1}, Locks)
end,
{reply, ok, State#state{locks=NLocks}};
handle_call({release, Pid}, _From, #state{locks=Locks}=State) ->
NLocks = case dict:find(Pid, Locks) of
{ok, {MRef, 1}} ->
erlang:demonitor(MRef, [flush]),
dict:erase(Pid, Locks);
{ok, {MRef, Refc}} ->
dict:store(Pid, {MRef, Refc-1}, Locks);
error ->
Locks
end,
NState = State#state{locks=NLocks},
case should_close() of
true -> {stop, normal, ok, NState};
false -> {reply, ok, NState}
end;
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast(config_threshold, State) ->
Threshold = get_db_threshold(),
{noreply, State#state{threshold=Threshold}};
handle_cast(config_refresh, #state{tref=TRef}=State) ->
R = get_refresh_interval(),
%% stop the old timee
if TRef /= nil ->
erlang:cancel_timer(TRef);
true -> ok
end,
%% start the new timer
NTRef = erlang:start_timer(R, self(), refresh_index),
{noreply, State#state{refresh_interval=R, tref=NTRef}};
handle_cast(updated, #state{index=Index, dbname=DbName,
threshold=Threshold,
db_updates=Updates}=State) ->
NUpdates = Updates + 1,
%% we only update if the number of updates is greater than the
%% threshold.
case NUpdates =:= Threshold of
true ->
refresh_index(DbName, Index),
{noreply, State#state{db_updates=0}};
false ->
{noreply, State#state{db_updates=NUpdates}}
end;
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(start_indexing, #state{dbname=DbName,
refresh_interval=R}=State) ->
%% start the db notifier to watch db update events
{ok, NotifierPid} = start_db_notifier(DbName),
%% start the timer
TRef = erlang:start_timer(R, self(), refresh_index),
{noreply, State#state{tref=TRef, notifier=NotifierPid}};
handle_info({timeout, TRef, refresh_index}, #state{index=Index,
dbname=DbName,
tref=TRef,
db_updates=N}=State) ->
%% only refresh the index if an update happened
case N > 0 of
true ->
refresh_index(DbName, Index);
false ->
ok
end,
{noreply, #state{db_updates=0}=State};
handle_info({'DOWN', MRef, _, Pid, _}, #state{locks=Locks}=State) ->
NLocks = case dict:find(Pid, Locks) of
{ok, {MRef, _}} ->
dict:erase(Pid, Locks);
error ->
Locks
end,
NState = State#state{locks=NLocks},
case should_close() of
true -> {stop, normal, NState};
false -> {noreply, NState}
end;
handle_info({'EXIT', Pid, _Reason}, #state{notifier=Pid}=State) ->
%% db notifier exited
{stop, normal, State#state{notifier=nil}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, #state{tref=TRef, notifier=Pid}) ->
if TRef /= nil ->
erlang:cancel_timer(TRef);
true -> ok
end,
case is_pid(Pid) of
true -> couch_util:shutdown_sync(Pid);
_ -> ok
end,
ok.
%% refresh the index to trigger updates.
refresh_index(Db, Index) ->
UpdateSeq = couch_util:with_db(Db, fun(WDb) ->
couch_db:get_update_seq(WDb)
end),
case catch couch_index:get_state(Index, UpdateSeq) of
{ok, _} -> ok;
Error -> {error, Error}
end.
%% if none has acquired us, we could stop the server.
should_close() ->
case process_info(self(), monitors) of
{monitors, []} -> true;
_ -> false
end.
%% number of max updates before refreshing the index. We don't
%% update the index on each db update. Instead we are waiting for a
%% minimum. If the minimum is not acchieved, the update will happen
%% in the next interval.
get_db_threshold() ->
list_to_integer(
couch_config:get("couch_index", "threshold", "200")
).
%% refresh interval in ms, the interval in which the index will be
%% updated
get_refresh_interval() ->
list_to_integer(
couch_config:get("couch_index", "refresh_interval", "1000")
).
%% db notifier
start_db_notifier(DbName) ->
Self = self(),
couch_db_update_notifier:start_link(fun
({updated, Name}) when Name =:= DbName ->
gen_server:cast(Self, updated);
(_) ->
ok
end).