Be more explicit on values of ?STREAM_STATUS
Also I should add a note about how the changes ending due to a throw
when processing the last_seq leads to the un-consumed stream messages.
This is a cherry-pick of:
https://github.com/cloudant/couch_replicator/commit/20d11c7d342ea77ffd5384d75e9cd570cbcbf5ba
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index e61fba5..544c2ef 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -36,7 +36,7 @@
send_req(HttpDb, Params1, Callback) ->
- put(STREAM_STATUS, init),
+ put(?STREAM_STATUS, init),
couch_stats:increment_counter([couch_replicator, requests]),
Params2 = ?replace(Params1, qs,
[{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
@@ -134,7 +134,7 @@
StreamDataFun = fun() ->
stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
end,
- put(STREAM_STATUS, streaming),
+ put(?STREAM_STATUS, streaming),
ibrowse:stream_next(ReqId),
try
Ret = Callback(Ok, Headers, StreamDataFun),
@@ -170,22 +170,22 @@
% messages for the given ReqId on the floor since we're
% no longer in the HTTP request.
clean_mailbox({ibrowse_req_id, ReqId}) ->
- case get(STREAM_STATUS) of
+ case get(?STREAM_STATUS) of
streaming ->
ibrowse:stream_next(ReqId),
receive
{ibrowse_async_response, ReqId, _} ->
clean_mailbox({ibrowse_req_id, ReqId});
{ibrowse_async_response_end, ReqId} ->
- put(STREAM_STATUS, ended),
+ put(?STREAM_STATUS, ended),
ok
end;
- _ ->
+ Status when Status == init; Status == ended ->
receive
{ibrowse_async_response, ReqId, _} ->
clean_mailbox({ibrowse_req_id, ReqId});
{ibrowse_async_response_end, ReqId} ->
- put(STREAM_STATUS, ended),
+ put(?STREAM_STATUS, ended),
ok
end
end;
@@ -236,7 +236,7 @@
ibrowse:stream_next(ReqId),
{Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
{Data, ibrowse_async_response_end} ->
- put(STREAM_STATUS, ended),
+ put(?STREAM_STATUS, ended),
{Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
end.