A _find request can run for a very long time (on large databases when the
selector matches no index) and this continues even if the client disconnects.

We want to stop the fabric work when there is no client to receive the
result. fabric_streams already kills the workers if the coordinating process
dies but in this circumstance it does not.

this commit enhances (and renames) the existing cleanup process to be a watchdog. If
enabled, the watchdog needs to be kicked regularly (by whatever activity we
think indicates its worth continuing) or it will terminate the process it is
watching, and kill the worker processes also.

Currently only mango_httpd:handle_find_req enables the watchdog and it only
kicks the watchdog when it enqueues a row to be returned (i.e, only on selector
matches).
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
index 3188248..4c3407a 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -17,13 +17,30 @@
     start/3,
     start/4,
     start/5,
-    cleanup/1
+    cleanup/1,
+    enable_watchdog/0,
+    kick_watchdog/0
 ]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
--define(WORKER_CLEANER, fabric_worker_cleaner).
+-define(WORKER_WATCHDOG, fabric_worker_watchdog).
+-define(ADD_WORKER, add_worker).
+-define(WATCHDOG_ENABLE, watchdog_enable).
+-define(WATCHDOG_TIMEOUT, watchdog_timeout).
+-define(WATCHDOG_KICK, watchdog_kick).
+-define(WATCHDOG_LAST_KICK, watchdog_last_kick).
+
+-define(WATCHDOG_WAS_KICKED, St#watchdog_state.kicked).
+-define(WATCHDOG_IS_ENABLED, St#watchdog_state.timer /= undefined).
+
+-record(watchdog_state, {
+    coordinator,
+    workers,
+    timer,
+    kicked = false
+}).
 
 start(Workers, Keypos) ->
     start(Workers, Keypos, undefined, undefined).
@@ -43,7 +60,7 @@
         replacements = Replacements,
         ring_opts = RingOpts
     },
-    spawn_worker_cleaner(self(), Workers0),
+    spawn_worker_watchdog(self(), Workers0),
     Timeout = fabric_util:request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{ready = Workers}} ->
@@ -61,13 +78,13 @@
     end.
 
 cleanup(Workers) ->
-    % Stop the auxiliary cleaner process as we got to the point where cleanup
-    % happesn in the regular fashion so we don't want to send 2x the number kill
+    % Stop the auxiliary watchdog process as we got to the point where cleanup
+    % happens in the regular fashion so we don't want to send 2x the number kill
     % messages
-    case get(?WORKER_CLEANER) of
-        CleanerPid when is_pid(CleanerPid) ->
-            erase(?WORKER_CLEANER),
-            exit(CleanerPid, kill);
+    case get(?WORKER_WATCHDOG) of
+        WatchdogPid when is_pid(WatchdogPid) ->
+            erase(?WORKER_WATCHDOG),
+            exit(WatchdogPid, kill);
         _ ->
             ok
     end,
@@ -99,7 +116,7 @@
                     FinalWorkers = lists:foldl(
                         fun(Repl, NewWorkers) ->
                             NewWorker = (St#stream_acc.start_fun)(Repl),
-                            add_worker_to_cleaner(self(), NewWorker),
+                            add_worker_to_watchdog(self(), NewWorker),
                             fabric_dict:store(NewWorker, waiting, NewWorkers)
                         end,
                         Workers,
@@ -152,45 +169,124 @@
 handle_stream_start(Else, _, _) ->
     exit({invalid_stream_start, Else}).
 
-% Spawn an auxiliary rexi worker cleaner. This will be used in cases
-% when the coordinator (request) process is forceably killed and doesn't
+% Spawn an auxiliary rexi worker watchdog which triggers cleanup if;
+% * nothing has been streamed for $timeout duration
+% * the coordinator (request) process is forceably killed and doesn't
 % get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers) ->
-    case get(?WORKER_CLEANER) of
+
+% The watchdog is initially disabled. Clients can enable it by calling
+% enable_watchdog/0 before starting a fabric_stream. That client is responsible
+% for calling kick_watchdog/0 on activity to prevent the watchdog from acting.
+% If kick_watchdog/0 is not called at least once in each watchdog interval,
+% the stream coordinator is killed and the workers are cleaned up.
+spawn_worker_watchdog(Coordinator, Workers) ->
+    case get(?WORKER_WATCHDOG) of
         undefined ->
+            State0 = #watchdog_state{
+                coordinator = Coordinator,
+                workers = Workers
+            },
+            Enabled = get(?WATCHDOG_ENABLE),
             Pid = spawn(fun() ->
                 erlang:monitor(process, Coordinator),
-                cleaner_loop(Coordinator, Workers)
+                State1 =
+                    case Enabled of
+                        true ->
+                            reset_watchdog(State0);
+                        undefined ->
+                            State0
+                    end,
+                watchdog_loop(State1)
             end),
-            put(?WORKER_CLEANER, Pid),
+            put(?WORKER_WATCHDOG, Pid),
             Pid;
-        ExistingCleaner ->
-            ExistingCleaner
+        ExistingWatchdog ->
+            ExistingWatchdog
     end.
 
-cleaner_loop(Pid, Workers) ->
+watchdog_loop(#watchdog_state{} = St) ->
     receive
-        {add_worker, Pid, Worker} ->
-            cleaner_loop(Pid, [Worker | Workers]);
-        {'DOWN', _, _, Pid, _} ->
-            fabric_util:cleanup(Workers)
+        {?ADD_WORKER, Pid, Worker} when Pid == St#watchdog_state.coordinator ->
+            Workers = St#watchdog_state.workers,
+            watchdog_loop(St#watchdog_state{workers = [Worker | Workers]});
+        ?WATCHDOG_KICK when ?WATCHDOG_IS_ENABLED ->
+            watchdog_loop(St#watchdog_state{kicked = true});
+        ?WATCHDOG_KICK ->
+            watchdog_loop(St);
+        ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED, not ?WATCHDOG_WAS_KICKED ->
+            exit(St#watchdog_state.coordinator, {shutdown, watchdog_fired}),
+            watchdog_loop(St);
+        ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED ->
+            watchdog_loop(reset_watchdog(St));
+        ?WATCHDOG_TIMEOUT ->
+            St;
+        {'DOWN', _, _, Pid, _} when Pid == St#watchdog_state.coordinator ->
+            fabric_util:cleanup(St#watchdog_state.workers)
     end.
 
-add_worker_to_cleaner(CoordinatorPid, Worker) ->
-    case get(?WORKER_CLEANER) of
-        CleanerPid when is_pid(CleanerPid) ->
-            CleanerPid ! {add_worker, CoordinatorPid, Worker};
+add_worker_to_watchdog(CoordinatorPid, Worker) ->
+    send_to_watchdog({?ADD_WORKER, CoordinatorPid, Worker}).
+
+enable_watchdog() ->
+    put(?WATCHDOG_ENABLE, true).
+
+reset_watchdog(#watchdog_state{} = St) ->
+    case watchdog_timeout() of
+        infinity ->
+            St;
+        Timeout ->
+            TRef = erlang:send_after(Timeout, self(), ?WATCHDOG_TIMEOUT),
+            St#watchdog_state{timer = TRef, kicked = false}
+    end.
+
+kick_watchdog() ->
+    case should_kick() of
+        true ->
+            send_to_watchdog(?WATCHDOG_KICK),
+            update_last_kick();
+        false ->
+            ok
+    end.
+
+
+should_kick() ->
+    case watchdog_timeout() of
+        infinity ->
+            false;
+        Timeout when is_integer(Timeout) ->
+            case get(?WATCHDOG_LAST_KICK) of
+                undefined ->
+                    true;
+                LastKick when is_integer(LastKick) ->
+                    Now = erlang:monotonic_time(),
+                    Delta = erlang:convert_time_unit(
+                        Now - LastKick, native, millisecond
+                    ),
+                    Delta > Timeout
+            end
+    end.
+
+update_last_kick() ->
+    put(?WATCHDOG_LAST_KICK, erlang:monotonic_time()).
+
+send_to_watchdog(Msg) ->
+    case get(?WORKER_WATCHDOG) of
+        WatchdogPid when is_pid(WatchdogPid) ->
+            WatchdogPid ! Msg;
         _ ->
             ok
     end.
 
+watchdog_timeout() ->
+    fabric_util:timeout("idle_stream", "60000").
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
 
-worker_cleaner_test_() ->
+worker_watchdog_test_() ->
     {
-        "Fabric spawn_worker_cleaner test",
+        "Fabric spawn_worker_watchdog test",
         {
             setup,
             fun setup/0,
@@ -208,7 +304,7 @@
 should_clean_workers() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()},
             #shard{node = 'n2', ref = make_ref()}
@@ -218,11 +314,11 @@
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        Ref = erlang:monitor(process, Watchdog),
         Coord ! die,
         receive
-            {'DOWN', Ref, _, Cleaner, _} -> ok
+            {'DOWN', Ref, _, Watchdog, _} -> ok
         end,
         ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
     end).
@@ -230,7 +326,7 @@
 does_not_fire_if_cleanup_called() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()},
             #shard{node = 'n2', ref = make_ref()}
@@ -240,8 +336,8 @@
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        Ref = erlang:monitor(process, Watchdog),
         cleanup(Workers),
         Coord ! die,
         receive
@@ -255,7 +351,7 @@
 should_clean_additional_worker_too() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()}
         ],
@@ -264,12 +360,12 @@
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        add_worker_to_watchdog(Coord, #shard{node = 'n2', ref = make_ref()}),
+        Ref = erlang:monitor(process, Watchdog),
         Coord ! die,
         receive
-            {'DOWN', Ref, _, Cleaner, _} -> ok
+            {'DOWN', Ref, _, Watchdog, _} -> ok
         end,
         ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
     end).
diff --git a/src/mango/src/mango_httpd.erl b/src/mango/src/mango_httpd.erl
index bd91652..0f19f54 100644
--- a/src/mango/src/mango_httpd.erl
+++ b/src/mango/src/mango_httpd.erl
@@ -278,6 +278,7 @@
     end.
 
 start_find_resp(Req) ->
+    fabric_streams:enable_watchdog(),
     chttpd:start_delayed_json_response(Req, 200, [], "{\"docs\":[").
 
 end_find_resp(Acc0) ->
@@ -310,6 +311,7 @@
     NewKVs = lists:keystore(Key, 1, KVs, {Key, Value}),
     {ok, Acc0#vacc{kvs = NewKVs}};
 handle_doc({row, Doc}, Acc0) ->
+    fabric_streams:kick_watchdog(),
     #vacc{prepend = Prepend} = Acc0,
     Chunk = [Prepend, ?JSON_ENCODE(Doc)],
     maybe_flush_response(Acc0, Chunk, iolist_size(Chunk)).