blob: dbd464e4abd658bb1efa44f420bb22de7c067d0c [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_views_server).
-behaviour(gen_server).
-include_lib("kernel/include/logger.hrl").
-export([
start_link/0
]).
-export([
accepted/1
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3,
format_status/2
]).
-define(MAX_ACCEPTORS, 5).
-define(MAX_WORKERS, 100).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
accepted(Worker) when is_pid(Worker) ->
gen_server:call(?MODULE, {accepted, Worker}, infinity).
init(_) ->
process_flag(trap_exit, true),
couch_views_jobs:set_timeout(),
St = #{
acceptors => #{},
workers => #{},
max_acceptors => max_acceptors(),
max_workers => max_workers()
},
{ok, spawn_acceptors(St)}.
terminate(_, _St) ->
ok.
handle_call({accepted, Pid}, _From, St) ->
#{
acceptors := Acceptors,
workers := Workers
} = St,
case maps:is_key(Pid, Acceptors) of
true ->
St1 = St#{
acceptors := maps:remove(Pid, Acceptors),
workers := Workers#{Pid => true}
},
{reply, ok, spawn_acceptors(St1)};
false ->
?LOG_ERROR(#{what => unknown_acceptor, pid => Pid}),
LogMsg = "~p : unknown acceptor process ~p",
couch_log:error(LogMsg, [?MODULE, Pid]),
{stop, {unknown_acceptor_pid, Pid}, St}
end;
handle_call(Msg, _From, St) ->
{stop, {bad_call, Msg}, {bad_call, Msg}, St}.
handle_cast(Msg, St) ->
{stop, {bad_cast, Msg}, St}.
handle_info({'EXIT', Pid, Reason}, St) ->
#{
acceptors := Acceptors,
workers := Workers
} = St,
% In Erlang 21+ could check map keys directly in the function head
case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
{true, false} -> handle_acceptor_exit(St, Pid, Reason);
{false, true} -> handle_worker_exit(St, Pid, Reason);
{false, false} -> handle_unknown_exit(St, Pid, Reason)
end;
handle_info(Msg, St) ->
{stop, {bad_info, Msg}, St}.
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
format_status(_Opt, [_PDict, State]) ->
#{
workers := Workers,
acceptors := Acceptors
} = State,
Scrubbed = State#{
workers => {map_size, maps:size(Workers)},
acceptors => {map_size, maps:size(Acceptors)}
},
[{data, [{"State", Scrubbed}]}].
% Worker process exit handlers
handle_acceptor_exit(#{acceptors := Acceptors} = St, Pid, Reason) ->
St1 = St#{acceptors := maps:remove(Pid, Acceptors)},
?LOG_ERROR(#{what => acceptor_crash, pid => Pid, reason => Reason}),
LogMsg = "~p : acceptor process ~p exited with ~p",
couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
{noreply, spawn_acceptors(St1)}.
handle_worker_exit(#{workers := Workers} = St, Pid, normal) ->
St1 = St#{workers := maps:remove(Pid, Workers)},
{noreply, spawn_acceptors(St1)};
handle_worker_exit(#{workers := Workers} = St, Pid, Reason) ->
St1 = St#{workers := maps:remove(Pid, Workers)},
?LOG_ERROR(#{what => indexer_crash, pid => Pid, reason => Reason}),
LogMsg = "~p : indexer process ~p exited with ~p",
couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
{noreply, spawn_acceptors(St1)}.
handle_unknown_exit(St, Pid, Reason) ->
?LOG_ERROR(#{what => unknown_process_crash, pid => Pid, reason => Reason}),
LogMsg = "~p : unknown process ~p exited with ~p",
couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
{stop, {unknown_pid_exit, Pid}, St}.
spawn_acceptors(St) ->
#{
workers := Workers,
acceptors := Acceptors,
max_acceptors := MaxAcceptors,
max_workers := MaxWorkers
} = St,
ACnt = maps:size(Acceptors),
WCnt = maps:size(Workers),
case ACnt < MaxAcceptors andalso (ACnt + WCnt) < MaxWorkers of
true ->
Pid = couch_views_indexer:spawn_link(),
NewSt = St#{acceptors := Acceptors#{Pid => true}},
spawn_acceptors(NewSt);
false ->
St
end.
max_acceptors() ->
config:get_integer("couch_views", "max_acceptors", ?MAX_ACCEPTORS).
max_workers() ->
config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).