blob: 47f25424338a48810ca566d02bca7be6612d31da [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-module(dreyfus_index_manager).
-behaviour(gen_server).
-vsn(1).
-include_lib("couch/include/couch_db.hrl").
-include("dreyfus.hrl").
-define(BY_SIG, dreyfus_by_sig).
-define(BY_PID, dreyfus_by_pid).
% public api.
-export([start_link/0, get_index/2, get_disk_size/2]).
% gen_server api.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([handle_db_event/3]).
% public functions.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_index(DbName, Index) ->
gen_server:call(?MODULE, {get_index, DbName, Index}, infinity).
get_disk_size(DbName, #index{sig=Sig}) ->
Path = <<DbName/binary, "/", Sig/binary>>,
clouseau_rpc:disk_size(Path).
% gen_server functions.
init([]) ->
ets:new(?BY_SIG, [set, private, named_table]),
ets:new(?BY_PID, [set, private, named_table]),
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
process_flag(trap_exit, true),
{ok, nil}.
handle_call({get_index, DbName, #index{sig=Sig}=Index}, From, State) ->
case ets:lookup(?BY_SIG, {DbName, Sig}) of
[] ->
Pid = spawn_link(fun() -> new_index(DbName, Index) end),
ets:insert(?BY_PID, {Pid, opening, {DbName, Sig}}),
ets:insert(?BY_SIG, {{DbName,Sig}, [From]}),
{noreply, State};
[{_, WaitList}] when is_list(WaitList) ->
ets:insert(?BY_SIG, {{DbName, Sig}, [From | WaitList]}),
{noreply, State};
[{_, ExistingPid}] ->
{reply, {ok, ExistingPid}, State}
end;
handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) ->
link(NewPid),
[{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}),
[gen_server:reply(From, {ok, NewPid}) || From <- WaitList],
ets:delete(?BY_PID, OpenerPid),
add_to_ets(NewPid, DbName, Sig),
{reply, ok, State};
handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) ->
[{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- WaitList],
ets:delete(?BY_PID, OpenerPid),
ets:delete(?BY_SIG, {DbName, Sig}),
{reply, ok, State}.
handle_cast({cleanup, DbName}, State) ->
clouseau_rpc:cleanup(DbName),
{noreply, State};
handle_cast({rename, DbName}, State) ->
clouseau_rpc:rename(DbName),
{noreply, State}.
handle_info({'EXIT', FromPid, Reason}, State) ->
case ets:lookup(?BY_PID, FromPid) of
[] ->
if Reason =/= normal ->
couch_log:error("Exit on non-updater process: ~p", [Reason]),
exit(Reason);
true -> ok
end;
% Using Reason /= normal to force a match error
% if we didn't delete the Pid in a handle_call
% message for some reason.
[{_, opening, {DbName, Sig}}] when Reason /= normal ->
Msg = {open_error, DbName, Sig, Reason},
{reply, ok, _} = handle_call(Msg, {FromPid, nil}, State);
[{_, {DbName, Sig}}] ->
delete_from_ets(FromPid, DbName, Sig)
end,
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, nil, _Extra) ->
{ok, nil}.
% private functions
handle_db_event(DbName, created, _St) ->
gen_server:cast(?MODULE, {cleanup, DbName}),
{ok, nil};
handle_db_event(DbName, deleted, _St) ->
RecoveryEnabled = config:get_boolean("couchdb",
"enable_database_recovery", false),
case RecoveryEnabled of
true ->
gen_server:cast(?MODULE, {rename, DbName});
false ->
gen_server:cast(?MODULE, {cleanup, DbName})
end,
{ok, nil};
handle_db_event(_DbName, _Event, _St) ->
{ok, nil}.
new_index(DbName, #index{sig=Sig}=Index) ->
case (catch dreyfus_index:start_link(DbName, Index)) of
{ok, NewPid} ->
Msg = {open_ok, DbName, Sig, NewPid},
ok = gen_server:call(?MODULE, Msg, infinity),
unlink(NewPid);
Error ->
Msg = {open_error, DbName, Sig, Error},
ok = gen_server:call(?MODULE, Msg, infinity)
end.
add_to_ets(Pid, DbName, Sig) ->
true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}).
delete_from_ets(Pid, DbName, Sig) ->
true = ets:delete(?BY_PID, Pid),
true = ets:delete(?BY_SIG, {DbName, Sig}).