blob: a27612a683be1be20043341f467f939041015549 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_index_server).
-behaviour(gen_server).
-behaviour(config_listener).
-vsn(2).
-export([start_link/1, validate/2, get_index/4, get_index/3, get_index/2]).
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
% Sharding functions
-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1]).
-export([aggregate_queue_len/0, names/0]).
% Exported for callbacks
-export([
handle_config_change/5,
handle_config_terminate/3,
handle_db_event/3
]).
-include_lib("couch/include/couch_db.hrl").
-define(RELISTEN_DELAY, 5000).
-record(st, {
root_dir,
server_name,
by_sig,
by_pid,
by_db
}).
start_link(N) ->
gen_server:start_link({local, server_name(N)}, ?MODULE, [N], []).
validate(Db, DDoc) ->
LoadModFun = fun
({ModNameList, "true"}) ->
try
[list_to_existing_atom(ModNameList)]
catch
error:badarg ->
[]
end;
({_ModNameList, _Enabled}) ->
[]
end,
ValidateFun = fun(ModName) ->
ModName:validate(Db, DDoc)
end,
EnabledIndexers = lists:flatmap(LoadModFun, config:get("indexers")),
lists:foreach(ValidateFun, EnabledIndexers).
get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) when
is_record(DDoc, doc)
->
get_index(Module, DbName, DDoc, nil);
get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) ->
{Pid, Ref} = spawn_monitor(fun() ->
exit(fabric:open_doc(mem3:dbname(DbName), DDoc, [ejson_body, ?ADMIN_CTX]))
end),
receive
{'DOWN', Ref, process, Pid, {ok, Doc}} ->
get_index(Module, DbName, Doc, nil);
{'DOWN', Ref, process, Pid, Error} ->
Error
after 61000 ->
erlang:demonitor(Ref, [flush]),
{error, timeout}
end;
get_index(Module, DbName, DDoc) when is_binary(DbName) ->
get_index(Module, DbName, DDoc, nil);
get_index(Module, Db, DDoc) ->
get_index(Module, couch_db:name(Db), DDoc).
get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) ->
couch_util:with_db(DbName, fun(Db) ->
get_index(Module, Db, DDoc, Fun)
end);
get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
case couch_db:open_doc(Db, DDoc, [ejson_body, ?ADMIN_CTX]) of
{ok, Doc} -> get_index(Module, Db, Doc, Fun);
Error -> Error
end;
get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
{ok, InitState} = Module:init(Db, DDoc),
{ok, FunResp} = Fun(InitState),
{ok, Pid} = get_index(Module, InitState),
{ok, Pid, FunResp};
get_index(Module, Db, DDoc, _Fun) ->
{ok, InitState} = Module:init(Db, DDoc),
get_index(Module, InitState).
get_index(Module, IdxState) ->
DbName = Module:get(db_name, IdxState),
Sig = Module:get(signature, IdxState),
case ets:lookup(by_sig(DbName), {DbName, Sig}) of
[{_, Pid}] when is_pid(Pid) ->
DDocId = Module:get(idx_name, IdxState),
case ets:match_object(by_db(DbName), {DbName, {DDocId, Sig}}) of
[] ->
Args = [Pid, DbName, DDocId, Sig],
gen_server:cast(server_name(DbName), {add_to_ets, Args});
_ ->
ok
end,
{ok, Pid};
_ ->
Args = {Module, IdxState, DbName, Sig},
gen_server:call(server_name(DbName), {get_index, Args}, infinity)
end.
init([N]) ->
process_flag(trap_exit, true),
ets:new(by_sig(N), [protected, set, named_table]),
ets:new(by_pid(N), [private, set, named_table]),
ets:new(by_db(N), [protected, bag, named_table]),
RootDir = couch_index_util:root_dir(),
% We only need one of the index servers to nuke this on startup.
case N of
1 -> couch_file:init_delete_dir(RootDir);
_ -> ok
end,
St = #st{
root_dir = RootDir,
server_name = server_name(N),
by_sig = by_sig(N),
by_pid = by_pid(N),
by_db = by_db(N)
},
ok = config:listen_for_changes(?MODULE, St),
couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]),
{ok, St}.
terminate(_Reason, State) ->
Pids = [Pid || {Pid, _} <- ets:tab2list(State#st.by_pid)],
lists:map(fun couch_util:shutdown_sync/1, Pids),
ok.
handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
case ets:lookup(State#st.by_sig, {DbName, Sig}) of
[] ->
spawn_link(fun() -> new_index(Args) end),
ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}),
{noreply, State};
[{_, Waiters}] when is_list(Waiters) ->
ets:insert(State#st.by_sig, {{DbName, Sig}, [From | Waiters]}),
{noreply, State};
[{_, Pid}] when is_pid(Pid) ->
{reply, {ok, Pid}, State}
end;
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
link(Pid),
add_to_ets(DbName, Sig, DDocId, Pid, State),
{reply, ok, State};
handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
ets:delete(State#st.by_sig, {DbName, Sig}),
{reply, ok, State};
handle_call({reset_indexes, DbName}, _From, State) ->
reset_indexes(DbName, State),
{reply, ok, State}.
handle_cast({reset_indexes, DbName}, State) ->
reset_indexes(DbName, State),
{noreply, State};
handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
% check if Pid still exists
case ets:lookup(State#st.by_pid, Pid) of
[{Pid, {DbName, Sig}}] when is_pid(Pid) ->
ets:insert(State#st.by_db, {DbName, {DDocId, Sig}});
_ ->
ok
end,
{noreply, State};
handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, Server) ->
case ets:lookup(Server#st.by_pid, Pid) of
[{Pid, {DbName, Sig}}] ->
DDocIds = [
DDocId
|| {_, {DDocId, _}} <-
ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
],
rem_from_ets(DbName, Sig, DDocIds, Pid, Server);
[] when Reason /= normal ->
exit(Reason);
_Else ->
ok
end,
{noreply, Server};
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
{noreply, State};
handle_info(Msg, State) ->
couch_log:warning("~p did not expect ~p", [?MODULE, Msg]),
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
{ok, St};
handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
{ok, St};
handle_config_change("couchdb", "index_dir", _, _, St) ->
exit(whereis(St#st.server_name), config_change),
remove_handler;
handle_config_change("couchdb", "view_index_dir", _, _, St) ->
exit(whereis(St#st.server_name), config_change),
remove_handler;
handle_config_change(_, _, _, _, St) ->
{ok, St}.
handle_config_terminate(_, stop, _) ->
ok;
handle_config_terminate(_Server, _Reason, State) ->
erlang:send_after(?RELISTEN_DELAY, whereis(State#st.server_name), restart_config_listener),
{ok, State}.
new_index({Mod, IdxState, DbName, Sig}) ->
DDocId = Mod:get(idx_name, IdxState),
case couch_index:start_link({Mod, IdxState}) of
{ok, Pid} ->
ok = gen_server:call(
server_name(DbName), {async_open, {DbName, DDocId, Sig}, {ok, Pid}}
),
unlink(Pid);
Error ->
ok = gen_server:call(
server_name(DbName), {async_error, {DbName, DDocId, Sig}, Error}
)
end.
reset_indexes(DbName, #st{} = State) ->
% shutdown all the updaters and clear the files, the db got changed
SigDDocIds = lists:foldl(
fun({_, {DDocId, Sig}}, DDict) ->
dict:append(Sig, DDocId, DDict)
end,
dict:new(),
ets:lookup(State#st.by_db, DbName)
),
Fun = fun({Sig, DDocIds}) ->
[{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
unlink(Pid),
gen_server:cast(Pid, delete),
receive
{'EXIT', Pid, _} ->
ok
after 0 ->
ok
end,
rem_from_ets(DbName, Sig, DDocIds, Pid, State)
end,
lists:foreach(Fun, dict:to_list(SigDDocIds)),
% We only need one of the index servers to do this.
case State#st.server_name == server_name(1) of
true ->
Path = couch_index_util:index_dir("", DbName),
couch_file:nuke_dir(State#st.root_dir, Path);
false ->
ok
end.
add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
ets:delete(St#st.by_sig, {DbName, Sig}),
ets:delete(St#st.by_pid, Pid),
lists:foreach(
fun(DDocId) ->
ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
end,
DDocIds
).
handle_db_event(DbName, created, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(DbName, deleted, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
DDocResult = couch_util:with_db(DbName, fun(Db) ->
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
end),
LocalShards =
try
mem3:local_shards(mem3:dbname(DbName))
catch
error:database_does_not_exist ->
[];
Class:Msg ->
couch_log:warning(
"~p got ~p:~p when fetching local shards for ~p",
[?MODULE, Class, Msg, DbName]
),
[]
end,
DbShards = [mem3:name(Sh) || Sh <- LocalShards],
lists:foreach(
fun(DbShard) ->
lists:foreach(
fun({_DbShard, {_DDocId, Sig}}) ->
% check if there are other ddocs with the same Sig for the same db
SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
if
length(SigDDocs) > 1 ->
% remove records from by_db for this DDoc
Args = [DbShard, DDocId, Sig],
gen_server:cast(St#st.server_name, {rem_from_ets, Args});
true ->
% single DDoc with this Sig - close couch_index processes
case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
[{_, IndexPid}] ->
(catch gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
[] ->
[]
end
end
end,
ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
)
end,
DbShards
),
{ok, St};
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(
fun({_DbName, {_DDocId, Sig}}) ->
case ets:lookup(St#st.by_sig, {DbName, Sig}) of
[{_, IndexPid}] ->
(catch gen_server:cast(IndexPid, ddoc_updated));
[] ->
ok
end
end,
ets:match_object(St#st.by_db, {DbName, {DDocId, '$1'}})
),
{ok, St};
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
num_servers() ->
erlang:system_info(schedulers).
server_name(Arg) ->
name("index_server", Arg).
by_sig(Arg) ->
name("couchdb_indexes_by_sig", Arg).
by_pid(Arg) ->
name("couchdb_indexes_by_pid", Arg).
by_db(Arg) ->
name("couchdb_indexes_by_db", Arg).
name(BaseName, Arg) when is_list(Arg) ->
name(BaseName, ?l2b(Arg));
name(BaseName, Arg) when is_binary(Arg) ->
N = 1 + erlang:phash2(Arg, num_servers()),
name(BaseName, N);
name(BaseName, N) when is_integer(N), N > 0 ->
list_to_atom(BaseName ++ "_" ++ integer_to_list(N)).
names() ->
N = num_servers(),
[server_name(I) || I <- lists:seq(1, N)].
aggregate_queue_len() ->
N = num_servers(),
Names = [server_name(I) || I <- lists:seq(1, N)],
MQs = [
process_info(whereis(Name), message_queue_len)
|| Name <- Names
],
lists:sum([X || {_, X} <- MQs]).