blob: f85a2cf5e470bcec6d866323ea1f71f1d8fb1cbd [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_db_update_listener).
-export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-record(worker, {
ref,
node,
pid
}).
-record(acc, {
parent,
state
}).
go(Parent, ParentRef, DbName, Timeout) ->
Notifiers = start_update_notifiers(DbName),
MonRefs = lists:usort([{rexi_server, Node} || {Node, _Ref} <- Notifiers]),
RexiMon = rexi_monitor:start(MonRefs),
MonPid = start_cleanup_monitor(self(), Notifiers),
%% This is not a common pattern for rexi but to enable the calling
%% process to communicate via handle_message/3 we "fake" it as a
%% a spawned worker.
Workers = [#worker{ref=ParentRef, pid=Parent} | Notifiers],
Resp = try
receive_results(Workers, #acc{parent=Parent, state=unset}, Timeout)
after
rexi_monitor:stop(RexiMon),
stop_cleanup_monitor(MonPid)
end,
case Resp of
{ok, _} -> ok;
{error, Error} -> erlang:error(Error);
Error -> erlang:error(Error)
end.
start_update_notifiers(DbName) ->
EndPointDict = lists:foldl(fun(#shard{node=Node, name=Name}, Acc) ->
dict:append(Node, Name, Acc)
end, dict:new(), mem3:shards(DbName)),
lists:map(fun({Node, DbNames}) ->
Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}),
#worker{ref=Ref, node=Node}
end, dict:to_list(EndPointDict)).
% rexi endpoint
start_update_notifier(DbNames) ->
{Caller, Ref} = get(rexi_from),
Fun = fun({_, X}) ->
case lists:member(X, DbNames) of
true -> erlang:send(Caller, {Ref, db_updated});
false -> ok
end
end,
Id = {couch_db_update_notifier, make_ref()},
ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
receive {gen_event_EXIT, Id, Reason} ->
rexi:reply({gen_event_EXIT, node(), Reason})
end.
start_cleanup_monitor(Parent, Notifiers) ->
spawn(fun() ->
Ref = erlang:monitor(process, Parent),
cleanup_monitor(Parent, Ref, Notifiers)
end).
stop_cleanup_monitor(MonPid) ->
MonPid ! {self(), stop}.
cleanup_monitor(Parent, Ref, Notifiers) ->
receive
{'DOWN', Ref, _, _, _} ->
stop_update_notifiers(Notifiers);
{Parent, stop} ->
stop_update_notifiers(Notifiers);
Else ->
couch_log:error("Unkown message in ~w :: ~w", [?MODULE, Else]),
stop_update_notifiers(Notifiers),
exit(Parent, {unknown_message, Else})
end.
stop_update_notifiers(Notifiers) ->
[rexi:kill(Node, Ref) || #worker{node=Node, ref=Ref} <- Notifiers].
stop({Pid, Ref}) ->
erlang:send(Pid, {Ref, done}).
wait_db_updated({Pid, Ref}) ->
MonRef = erlang:monitor(process, Pid),
erlang:send(Pid, {Ref, get_state}),
receive
{state, Pid, State} ->
erlang:demonitor(MonRef, [flush]),
State;
{'DOWN', MonRef, process, Pid, Reason} ->
throw({changes_feed_died, Reason})
end.
receive_results(Workers, Acc0, Timeout) ->
Fun = fun handle_message/3,
case rexi_utils:recv(Workers, #worker.ref, Fun, Acc0, infinity, Timeout) of
{timeout, #acc{state=updated}=Acc} ->
receive_results(Workers, Acc, Timeout);
{timeout, #acc{state=waiting}=Acc} ->
erlang:send(Acc#acc.parent, {state, self(), timeout}),
receive_results(Workers, Acc#acc{state=unset}, Timeout);
{timeout, Acc} ->
receive_results(Workers, Acc#acc{state=timeout}, Timeout);
{_, Acc} ->
{ok, Acc}
end.
handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, _Acc) ->
{error, {nodedown, Node}};
handle_message({rexi_EXIT, _Reason}, Worker, _Acc) ->
{error, {worker_exit, Worker}};
handle_message({gen_event_EXIT, Node, Reason}, _Worker, _Acc) ->
{error, {gen_event_exit, Node, Reason}};
handle_message(db_updated, _Worker, #acc{state=waiting}=Acc) ->
% propagate message to calling controller
erlang:send(Acc#acc.parent, {state, self(), updated}),
{ok, Acc#acc{state=unset}};
handle_message(db_updated, _Worker, Acc) ->
{ok, Acc#acc{state=updated}};
handle_message(get_state, _Worker, #acc{state=unset}=Acc) ->
{ok, Acc#acc{state=waiting}};
handle_message(get_state, _Worker, Acc) ->
erlang:send(Acc#acc.parent, {state, self(), Acc#acc.state}),
{ok, Acc#acc{state=unset}};
handle_message(done, _, _) ->
{stop, ok}.