Fix fabric_db_update_listener rexi_DOWN handling

A recent change that fixed the list comprehension ended up uncovering
the fact that we don't handle rexi_DOWN errors properly. This patch just
tracks the shards that are still listening and uses
`fabric_view:is_progress_possible/1` to know if we are still able to
continue listening for changes.

Fixes COUCHDB-3036
diff --git a/src/fabric_db_update_listener.erl b/src/fabric_db_update_listener.erl
index ac4d8a2..f0155de 100644
--- a/src/fabric_db_update_listener.erl
+++ b/src/fabric_db_update_listener.erl
@@ -32,11 +32,13 @@
 
 -record(acc, {
     parent,
-    state
+    state,
+    shards
 }).
 
 go(Parent, ParentRef, DbName, Timeout) ->
-    Notifiers = start_update_notifiers(DbName),
+    Shards = mem3:shards(DbName),
+    Notifiers = start_update_notifiers(Shards),
     MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- Notifiers]),
     RexiMon = rexi_monitor:start(MonRefs),
     MonPid = start_cleanup_monitor(self(), Notifiers),
@@ -44,8 +46,13 @@
     %% process to communicate via handle_message/3 we "fake" it as a
     %% a spawned worker.
     Workers = [#worker{ref=ParentRef, pid=Parent} | Notifiers],
+    Acc = #acc{
+        parent = Parent,
+        state = unset,
+        shards = Shards
+    },
     Resp = try
-        receive_results(Workers, #acc{parent=Parent, state=unset}, Timeout)
+        receive_results(Workers, Acc, Timeout)
     after
         rexi_monitor:stop(RexiMon),
         stop_cleanup_monitor(MonPid)
@@ -56,10 +63,10 @@
         Error -> erlang:error(Error)
     end.
 
-start_update_notifiers(DbName) ->
+start_update_notifiers(Shards) ->
     EndPointDict = lists:foldl(fun(#shard{node=Node, name=Name}, Acc) ->
         dict:append(Node, Name, Acc)
-    end, dict:new(), mem3:shards(DbName)),
+    end, dict:new(), Shards),
     lists:map(fun({Node, DbNames}) ->
         Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}),
         #worker{ref=Ref, node=Node}
@@ -134,12 +141,12 @@
     end.
 
 
-handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, _Acc) ->
-    {error, {nodedown, Node}};
-handle_message({rexi_EXIT, _Reason}, Worker, _Acc) ->
-    {error, {worker_exit, Worker}};
-handle_message({gen_event_EXIT, Node, Reason}, _Worker, _Acc) ->
-    {error, {gen_event_exit, Node, Reason}};
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+    handle_error(Node, {nodedown, Node}, Acc);
+handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
+    handle_error(Worker#worker.node, {worker_exit, Worker}, Acc);
+handle_message({gen_event_EXIT, Node, Reason}, _Worker, Acc) ->
+    handle_error(Node, {gen_event_EXIT, Node, Reason}, Acc);
 handle_message(db_updated, _Worker, #acc{state=waiting}=Acc) ->
     % propagate message to calling controller
     erlang:send(Acc#acc.parent, {state, self(), updated}),
@@ -155,4 +162,11 @@
     {stop, ok}.
 
 
-
+handle_error(Node, Reason, #acc{shards = Shards} = Acc) ->
+    Rest = lists:filter(fun(#shard{node = N}) -> N /= Node end, Shards),
+    case fabric_view:is_progress_possible([{R, nil} || R <- Rest]) of
+        true ->
+            {ok, Acc#acc{shards = Rest}};
+        false ->
+            {error, Reason}
+    end.