Monitor changes consumer in the tests
diff --git a/test/couch_changes_tests.erl b/test/couch_changes_tests.erl
index d8adead..4f81a00 100644
--- a/test/couch_changes_tests.erl
+++ b/test/couch_changes_tests.erl
@@ -692,7 +692,7 @@
{ok, Rev} = couch_db:update_doc(Db, Doc, []),
{ok, couch_doc:rev_to_str(Rev)}.
-get_rows(Consumer) ->
+get_rows({Consumer, _}) ->
Ref = make_ref(),
Consumer ! {get_rows, Ref},
Resp = receive
@@ -704,7 +704,7 @@
?assertNotEqual(timeout, Resp),
Resp.
-get_heartbeats(Consumer) ->
+get_heartbeats({Consumer, _}) ->
Ref = make_ref(),
Consumer ! {get_heartbeats, Ref},
Resp = receive
@@ -716,7 +716,7 @@
?assertNotEqual(timeout, Resp),
Resp.
-clear_rows(Consumer) ->
+clear_rows({Consumer, _}) ->
Ref = make_ref(),
Consumer ! {reset, Ref},
Resp = receive
@@ -728,7 +728,7 @@
?assertNotEqual(timeout, Resp),
Resp.
-stop_consumer(Consumer) ->
+stop_consumer({Consumer, _}) ->
Ref = make_ref(),
Consumer ! {stop, Ref},
Resp = receive
@@ -740,7 +740,7 @@
?assertNotEqual(timeout, Resp),
Resp.
-pause(Consumer) ->
+pause({Consumer, _}) ->
Ref = make_ref(),
Consumer ! {pause, Ref},
Resp = receive
@@ -752,7 +752,7 @@
?assertNotEqual(timeout, Resp),
Resp.
-unpause(Consumer) ->
+unpause({Consumer, _}) ->
Ref = make_ref(),
Consumer ! {continue, Ref},
Resp = receive
@@ -764,19 +764,29 @@
?assertNotEqual(timeout, Resp),
Resp.
-wait_finished(_Consumer) ->
- Resp = receive
+wait_finished({_, ConsumerRef}) ->
+ receive
{consumer_finished, Rows, LastSeq} ->
- {Rows, LastSeq}
+ {Rows, LastSeq};
+ {'DOWN', ConsumerRef, _, _, Msg} when Msg == normal; Msg == ok ->
+ ok;
+ {'DOWN', ConsumerRef, _, _, Msg} ->
+ erlang:error({consumer_died, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {value, Msg}
+ ]})
after ?TIMEOUT ->
- timeout
- end,
- ?assertNotEqual(timeout, Resp),
- Resp.
+ erlang:error({consumer_died, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {value, timeout}
+ ]})
+ end.
spawn_consumer(DbName, ChangesArgs0, Req) ->
Parent = self(),
- spawn(fun() ->
+ spawn_monitor(fun() ->
put(heartbeat_count, 0),
Callback = fun
({change, {Change}, _}, _, Acc) ->
@@ -804,10 +814,12 @@
FeedFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db),
try
FeedFun({Callback, []})
- catch throw:{stop, _} ->
- ok
- end,
- catch couch_db:close(Db)
+ catch
+ throw:{stop, _} -> ok;
+ _:Error -> Error
+ after
+ couch_db:close(Db)
+ end
end).
maybe_pause(Parent, Acc) ->