blob: ebefc733994b8542a56129980c1abfee61b26d6d [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(ioq_opener).
-behavior(gen_server).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-export([
start_link/0,
fetch_pid_for/1,
fetch_pid_for/2,
fetch_pid_for/3,
get_pid_for/1,
set_pid_for/2,
get_ioq_pids/0,
get_pid_idx/0,
get_monitor_idx/0
]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("ioq/include/ioq.hrl").
-define(BY_USER, by_user).
-define(BY_SHARD, by_shard).
-define(BY_CLASS, by_class).
-define(BY_FD, by_fd).
-define(BY_DB, by_db).
-define(DEFAULT_DISPATCH, ?BY_SHARD).
-define(PDICT_MARKER, ioq_pid_for).
-record(st, {
idle = [] :: [{erlang:timestamp(), pid()}],
pid_idx :: khash:khash(),
monitors :: khash:khash(),
dispatch :: ?BY_SHARD | ?BY_DB | ?BY_USER | ?BY_CLASS | ?BY_FD | undefined
}).
%% HACK: experiment to allow for spawning IOQ2 pids prior to the spawning
%% the associated couch_file pids
%%fetch_pid_for(DbName) when is_binary(DbName) ->
%% fetch_pid_for(DbName, self()).
fetch_pid_for(#ioq_request{}=Req) ->
gen_server:call(?MODULE, {fetch, Req}, infinity).
%% TODO: cleanup the overloaded arity once experiments concluded
%%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
%% fetch_pid_for(DbName, undefined, self());
%%fetch_pid_for(DbName, #user_ctx{}=Ctx) when is_binary(DbName) ->
%% fetch_pid_for(DbName, Ctx, self());
fetch_pid_for(DbName, FdPid) when is_binary(DbName), is_pid(FdPid) ->
fetch_pid_for(DbName, undefined, FdPid).
fetch_pid_for(DbName, UserCtx, FdPid) when is_binary(DbName), is_pid(FdPid) ->
gen_server:call(?MODULE, {fetch, DbName, UserCtx, FdPid}, infinity).
get_pid_for(undefined) ->
undefined;
get_pid_for(DbName) when is_binary(DbName) ->
%% HACK: use the same shard format as per #ioq_request{} to post facto
%% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
%% to having a db handle
erlang:get({?PDICT_MARKER, filename:rootname(DbName)});
get_pid_for(FdPid) when is_pid(FdPid) ->
erlang:get({?PDICT_MARKER, FdPid}).
set_pid_for(_, undefined) ->
ok;
set_pid_for(DbName, IOQPid) when is_binary(DbName), is_pid(IOQPid) ->
%% HACK: use the same shard format as per #ioq_request{} to post facto
%% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
%% to having a db handle
erlang:put({?PDICT_MARKER, filename:rootname(DbName)}, IOQPid),
ok;
set_pid_for(FdPid, IOQPid) when is_pid(FdPid), is_pid(IOQPid) ->
erlang:put({?PDICT_MARKER, FdPid}, IOQPid),
ok.
get_pid_idx() ->
gen_server:call(?MODULE, get_pid_idx, infinity).
get_monitor_idx() ->
gen_server:call(?MODULE, get_monitor_idx, infinity).
get_ioq_pids() ->
lists:foldl(
fun
({K, _V}, Acc) when is_pid(K) ->
[K | Acc];
(_, Acc) ->
Acc
end, [], get_pid_idx()).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
process_flag(trap_exit, true),
{ok, PidIdx} = khash:new(),
{ok, Monitors} = khash:new(),
Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
"by_shard" -> ?BY_SHARD;
"by_db" -> ?BY_DB;
"by_user" -> ?BY_USER;
"by_class" -> ?BY_CLASS;
"by_fd" -> ?BY_FD;
_ -> ?DEFAULT_DISPATCH
end,
St = #st{
pid_idx = PidIdx,
monitors = Monitors,
dispatch = Dispatch
},
{ok, St}.
handle_call({fetch, #ioq_request{}=Req}, _From, #st{dispatch=Dispatch}=St) ->
Key = case Dispatch of
?BY_SHARD ->
Req#ioq_request.shard;
?BY_DB ->
Req#ioq_request.db;
?BY_USER ->
Req#ioq_request.user;
?BY_CLASS ->
Req#ioq_request.class;
?BY_FD ->
{fd, Req#ioq_request.fd}
end,
IOQPid = case khash:get(St#st.pid_idx, Key, not_found) of
not_found ->
{ok, Pid} = ioq_server2:start_link({Dispatch, Key}),
khash:put(St#st.pid_idx, Key, Pid),
khash:put(St#st.pid_idx, Pid, Key),
Pid;
Pid ->
Pid
end,
ok = add_monitor(St#st.monitors, Req#ioq_request.fd, IOQPid),
{reply, IOQPid, St};
handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
Caller = case FdPid of
undefined -> From;
_ when is_pid(FdPid) -> FdPid
end,
Name = case UserCtx of
#user_ctx{name=Name0} -> Name0;
%% TODO: support unknown user
undefined -> throw(unknown_user)
end,
IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
not_found ->
{ok, Pid} = ioq_server2:start_link({?BY_USER, Name}),
khash:put(St#st.pid_idx, Name, Pid),
khash:put(St#st.pid_idx, Pid, Name),
Pid;
Pid ->
Pid
end,
ok = add_monitor(St#st.monitors, Caller, IOQPid),
{reply, IOQPid, St};
handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St) ->
Caller = case FdPid of
undefined -> From;
_ when is_pid(FdPid) -> FdPid
end,
%% TODO: DbName = drop_compact_ext(DbName0),
IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
not_found ->
{ok, Pid} = ioq_server2:start_link({?BY_SHARD, DbName}),
khash:put(St#st.pid_idx, DbName, Pid),
khash:put(St#st.pid_idx, Pid, DbName),
Pid;
Pid ->
Pid
end,
ok = add_monitor(St#st.monitors, Caller, IOQPid),
{reply, IOQPid, St};
handle_call(get_pid_idx, _From, #st{}=St) ->
{reply, khash:to_list(St#st.pid_idx), St};
handle_call(get_monitor_idx, _From, #st{}=St) ->
{reply, khash:to_list(St#st.monitors), St};
handle_call(_, _From, St) ->
{reply, ok, St}.
handle_cast(_Msg, St) ->
{noreply, St}.
handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
case drop_monitor(St#st.monitors, Ref) of
{IOQPid, []} ->
Name = khash:get(St#st.pid_idx, IOQPid), %% TODO: assert found?
khash:del(St#st.pid_idx, IOQPid),
khash:del(St#st.pid_idx, Name);
{_IOQPid, _Refs} ->
ok
end,
{noreply, St};
handle_info({'EXIT', Pid, _}, St) ->
case khash:get(St#st.pid_idx, Pid, not_found) of
not_found ->
%% TODO: shouldn't happen, throw error?
ok;
Name ->
khash:del(St#st.pid_idx, Pid),
khash:del(St#st.pid_idx, Name)
end,
{noreply, St};
handle_info(_Info, St) ->
{noreply, St}.
terminate(_Reason, _St) ->
ok.
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
add_monitor(Mons, FdPid, IOQPid) ->
PidKey = {FdPid, IOQPid},
Ref = case khash:get(Mons, PidKey, not_found) of
not_found ->
Ref0 = erlang:monitor(process, FdPid),
khash:put(Mons, Ref0, PidKey),
khash:put(Mons, PidKey, Ref0),
khash:put(Mons, FdPid, Ref0);
Ref0 ->
Ref0
end,
case khash:get(Mons, IOQPid, not_found) of
not_found ->
khash:put(Mons, IOQPid, [Ref]);
Refs ->
case lists:member(Ref, Refs) of
true ->
ok;
false ->
khash:put(Mons, IOQPid, [Ref | Refs])
end
end,
ok.
drop_monitor(Mons, Ref) when is_reference(Ref) ->
case khash:get(Mons, Ref, not_found) of
not_found ->
%% TODO: shouldn't happen
throw(unexpected);
{FdPid, IOQPid}=PidKey ->
case khash:get(Mons, IOQPid, not_found) of
not_found ->
%% TODO: shouldn't happen
throw(unexpected);
Refs ->
khash:del(Mons, FdPid),
khash:del(Mons, Ref),
khash:del(Mons, PidKey),
case lists:delete(Ref, Refs) of
[] ->
unlink(IOQPid),
khash:del(Mons, IOQPid),
exit(IOQPid, idle),
{IOQPid, []};
Refs1 ->
khash:put(Mons, IOQPid, Refs1),
{IOQPid, Refs1}
end
end
end.