blob: 3c09b90a0ee41a42310919eb6b3b8c5317ba8c1e [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_rep_att).
-export([convert_stub/2, cleanup/0]).
-include("couch_db.hrl").
convert_stub(#att{data=stub, name=Name} = Attachment,
{#http_db{} = Db, Id, Rev}) ->
{Pos, [RevId|_]} = Rev,
Request = Db#http_db{
resource = lists:flatten([couch_util:url_encode(Id), "/",
couch_util:url_encode(Name)]),
qs = [{rev, couch_doc:rev_to_str({Pos,RevId})}]
},
Ref = make_ref(),
RcvFun = fun() -> attachment_receiver(Ref, Request) end,
Attachment#att{data=RcvFun}.
cleanup() ->
receive
{ibrowse_async_response, _, _} ->
%% TODO maybe log, didn't expect to have data here
cleanup();
{ibrowse_async_response_end, _} ->
cleanup();
{ibrowse_async_headers, _, _, _} ->
cleanup()
after 0 ->
erase(),
ok
end.
% internal funs
attachment_receiver(Ref, Request) ->
try case get(Ref) of
undefined ->
{ReqId, ContentEncoding} = start_http_request(Request),
put(Ref, {ReqId, ContentEncoding}),
receive_data(Ref, ReqId, ContentEncoding);
{ReqId, ContentEncoding} ->
receive_data(Ref, ReqId, ContentEncoding)
end
catch
throw:{attachment_request_failed, _} ->
case {Request#http_db.retries, Request#http_db.pause} of
{0, _} ->
?LOG_INFO("request for ~p failed", [Request#http_db.resource]),
throw({attachment_request_failed, max_retries_reached});
{N, Pause} when N > 0 ->
?LOG_INFO("request for ~p timed out, retrying in ~p seconds",
[Request#http_db.resource, Pause/1000]),
timer:sleep(Pause),
cleanup(),
attachment_receiver(Ref, Request#http_db{retries = N-1})
end
end.
receive_data(Ref, ReqId, ContentEncoding) ->
receive
{ibrowse_async_response, ReqId, {chunk_start,_}} ->
receive_data(Ref, ReqId, ContentEncoding);
{ibrowse_async_response, ReqId, chunk_end} ->
receive_data(Ref, ReqId, ContentEncoding);
{ibrowse_async_response, ReqId, {error, Err}} ->
?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]),
throw({attachment_request_failed, Err});
{ibrowse_async_response, ReqId, Data} ->
% ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]),
Data;
{ibrowse_async_response_end, ReqId} ->
?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]),
throw({attachment_request_failed, premature_end})
after 31000 ->
throw({attachment_request_failed, timeout})
end.
start_http_request(Req) ->
%% set stream_to here because self() has changed
Req2 = Req#http_db{options = [{stream_to,self()} | Req#http_db.options]},
{ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req2),
receive {ibrowse_async_headers, ReqId, Code, Headers} ->
case validate_headers(Req2, list_to_integer(Code), Headers) of
{ok, ContentEncoding} ->
{ReqId, ContentEncoding};
{ok, ContentEncoding, NewReqId} ->
{NewReqId, ContentEncoding}
end
after 10000 ->
throw({attachment_request_failed, timeout})
end.
validate_headers(_Req, 200, Headers) ->
MochiHeaders = mochiweb_headers:make(Headers),
{ok, mochiweb_headers:get_value("Content-Encoding", MochiHeaders)};
validate_headers(Req, Code, Headers) when Code > 299, Code < 400 ->
Url = couch_rep_httpc:redirect_url(Headers, Req#http_db.url),
NewReq = couch_rep_httpc:redirected_request(Req, Url),
{ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq),
receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} ->
{ok, Encoding} = validate_headers(NewReq, list_to_integer(NewCode),
NewHeaders)
end,
{ok, Encoding, ReqId};
validate_headers(Req, Code, _Headers) ->
#http_db{url=Url, resource=Resource} = Req,
?LOG_ERROR("got ~p for ~s~s", [Code, Url, Resource]),
throw({attachment_request_failed, {bad_code, Code}}).