Fix fabric_db_update_listener rexi_DOWN handling
A recent change that fixed the list comprehension ended up uncovering
the fact that we don't handle rexi_DOWN errors properly. This patch just
tracks the shards that are still listening and uses
`fabric_view:is_progress_possible/1` to know if we are still able to
continue listening for changes.
Fixes COUCHDB-3036
diff --git a/src/fabric_db_update_listener.erl b/src/fabric_db_update_listener.erl
index ac4d8a2..f0155de 100644
--- a/src/fabric_db_update_listener.erl
+++ b/src/fabric_db_update_listener.erl
@@ -32,11 +32,13 @@
-record(acc, {
parent,
- state
+ state,
+ shards
}).
go(Parent, ParentRef, DbName, Timeout) ->
- Notifiers = start_update_notifiers(DbName),
+ Shards = mem3:shards(DbName),
+ Notifiers = start_update_notifiers(Shards),
MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- Notifiers]),
RexiMon = rexi_monitor:start(MonRefs),
MonPid = start_cleanup_monitor(self(), Notifiers),
@@ -44,8 +46,13 @@
%% process to communicate via handle_message/3 we "fake" it as a
%% a spawned worker.
Workers = [#worker{ref=ParentRef, pid=Parent} | Notifiers],
+ Acc = #acc{
+ parent = Parent,
+ state = unset,
+ shards = Shards
+ },
Resp = try
- receive_results(Workers, #acc{parent=Parent, state=unset}, Timeout)
+ receive_results(Workers, Acc, Timeout)
after
rexi_monitor:stop(RexiMon),
stop_cleanup_monitor(MonPid)
@@ -56,10 +63,10 @@
Error -> erlang:error(Error)
end.
-start_update_notifiers(DbName) ->
+start_update_notifiers(Shards) ->
EndPointDict = lists:foldl(fun(#shard{node=Node, name=Name}, Acc) ->
dict:append(Node, Name, Acc)
- end, dict:new(), mem3:shards(DbName)),
+ end, dict:new(), Shards),
lists:map(fun({Node, DbNames}) ->
Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}),
#worker{ref=Ref, node=Node}
@@ -134,12 +141,12 @@
end.
-handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, _Acc) ->
- {error, {nodedown, Node}};
-handle_message({rexi_EXIT, _Reason}, Worker, _Acc) ->
- {error, {worker_exit, Worker}};
-handle_message({gen_event_EXIT, Node, Reason}, _Worker, _Acc) ->
- {error, {gen_event_exit, Node, Reason}};
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+ handle_error(Node, {nodedown, Node}, Acc);
+handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
+ handle_error(Worker#worker.node, {worker_exit, Worker}, Acc);
+handle_message({gen_event_EXIT, Node, Reason}, _Worker, Acc) ->
+ handle_error(Node, {gen_event_EXIT, Node, Reason}, Acc);
handle_message(db_updated, _Worker, #acc{state=waiting}=Acc) ->
% propagate message to calling controller
erlang:send(Acc#acc.parent, {state, self(), updated}),
@@ -155,4 +162,11 @@
{stop, ok}.
-
+handle_error(Node, Reason, #acc{shards = Shards} = Acc) ->
+ Rest = lists:filter(fun(#shard{node = N}) -> N /= Node end, Shards),
+ case fabric_view:is_progress_possible([{R, nil} || R <- Rest]) of
+ true ->
+ {ok, Acc#acc{shards = Rest}};
+ false ->
+ {error, Reason}
+ end.