Add an index keyed on client reference
This prevents table scans triggered by the kill handler from causing
the server to fall over under very high throughput.
I also took the opportunity to refactor a bit and use #job records
throughout the server instead of raw tuples.
BugzID: 12344
diff --git a/src/rexi_server.erl b/src/rexi_server.erl
index d230533..931e340 100644
--- a/src/rexi_server.erl
+++ b/src/rexi_server.erl
@@ -22,8 +22,15 @@
-include("rexi.hrl").
-include_lib("eunit/include/eunit.hrl").
+-record(job, {
+ client::reference(),
+ worker::reference(),
+ pid::pid()
+}).
+
-record(st, {
- workers = ets:new(workers, [private, {keypos,2}]),
+ workers = ets:new(workers, [private, {keypos, #job.worker}]),
+ clients = ets:new(clients, [private, {keypos, #job.client}]),
errors = queue:new(),
error_limit = 20,
error_count = 0
@@ -61,17 +68,18 @@
handle_cast({doit, From, MFA}, St) ->
handle_cast({doit, From, undefined, MFA}, St);
-handle_cast({doit, From, Nonce, MFA}, #st{workers=Workers} = St) ->
+handle_cast({doit, From, Nonce, MFA}, State) ->
{LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]),
- {noreply, St#st{workers = add_worker({LocalPid, Ref, From}, Workers)}};
+ Job = #job{client = element(2, From), worker = Ref, pid = LocalPid},
+ {noreply, add_job(Job, State)};
-handle_cast({kill, FromRef}, #st{workers=Workers} = St) ->
- case find_worker_from(FromRef, Workers) of
- {Pid, KeyRef, {_, FromRef}} ->
+handle_cast({kill, FromRef}, #st{clients = Clients} = St) ->
+ case find_worker(FromRef, Clients) of
+ #job{worker = KeyRef, pid = Pid} = Job ->
erlang:demonitor(KeyRef),
exit(Pid, kill),
- {noreply, St#st{workers = remove_worker(KeyRef, Workers)}};
+ {noreply, remove_job(Job, St)};
false ->
{noreply, St}
end;
@@ -81,18 +89,23 @@
{noreply, St}.
handle_info({'DOWN', Ref, process, _, normal}, #st{workers=Workers} = St) ->
- {noreply, St#st{workers = remove_worker(Ref, Workers)}};
+ case find_worker(Ref, Workers) of
+ #job{} = Job ->
+ {noreply, remove_job(Job, St)};
+ false ->
+ {noreply, St}
+ end;
handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers=Workers} = St) ->
case find_worker(Ref, Workers) of
- {Pid, Ref, From} ->
+ #job{pid=Pid, worker=Ref, client=From} = Job ->
case Error of #error{reason = {_Class, Reason}, stack = Stack} ->
notify_caller(From, {Reason, Stack}),
St1 = save_error(Error, St),
- {noreply, St1#st{workers = remove_worker(Ref, Workers)}};
+ {noreply, remove_job(Job, St1)};
_ ->
notify_caller(From, Error),
- {noreply, St#st{workers = remove_worker(Ref, Workers)}}
+ {noreply, remove_job(Job, St)}
end;
false ->
{noreply, St}
@@ -102,12 +115,21 @@
{noreply, St}.
terminate(_Reason, St) ->
- ets:foldl(fun({Pid, _, _}, _) -> exit(Pid,kill) end, nil, St#st.workers),
+ ets:foldl(fun(#job{pid=Pid},_) -> exit(Pid,kill) end, nil, St#st.workers),
ok.
code_change(_OldVsn, {st, Workers}, _Extra) ->
{ok, #st{workers = Workers}};
+code_change(_OldVsn, {st, Workers0, Errors, Limit, Count}, _Extra) ->
+ Jobs = [#job{pid=A, worker=B, client=C} || {A, B, {_, C}}
+ <- ets:tab2list(Workers0)],
+ ets:delete(Workers0),
+ State = #st{errors = Errors, error_limit = Limit, error_count = Count},
+ ets:insert(State#st.workers, Jobs),
+ ets:insert(State#st.clients, Jobs),
+ {ok, State};
+
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
@@ -144,22 +166,18 @@
lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end,
erlang:get_stacktrace()).
-add_worker(Worker, Tab) ->
- ets:insert(Tab, Worker), Tab.
+add_job(Job, #st{workers = Workers, clients = Clients} = State) ->
+ ets:insert(Workers, Job),
+ ets:insert(Clients, Job),
+ State.
-remove_worker(Ref, Tab) ->
- ets:delete(Tab, Ref), Tab.
+remove_job(Job, #st{workers = Workers, clients = Clients} = State) ->
+ ets:delete_object(Workers, Job),
+ ets:delete_object(Clients, Job),
+ State.
find_worker(Ref, Tab) ->
case ets:lookup(Tab, Ref) of [] -> false; [Worker] -> Worker end.
-find_worker_from(Ref, Tab) ->
- case ets:match_object(Tab, {'_', '_', {'_', Ref}}) of
- [] ->
- false;
- [Worker] ->
- Worker
- end.
-
notify_caller({Caller, Ref}, Reason) ->
Caller ! {Ref, {rexi_EXIT, Reason}}.