| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_rep_att). |
| |
| -export([convert_stub/2, cleanup/0]). |
| |
| -include("couch_db.hrl"). |
| |
| convert_stub(#att{data=stub} = Attachment, {#http_db{} = Db, Id, Rev}) -> |
| {Pos, [RevId|_]} = Rev, |
| Name = Attachment#att.name, |
| Request = Db#http_db{ |
| resource = lists:flatten([couch_util:url_encode(Id), "/", |
| couch_util:url_encode(Name)]), |
| qs = [{rev, couch_doc:rev_to_str({Pos,RevId})}] |
| }, |
| Ref = make_ref(), |
| RcvFun = fun() -> attachment_receiver(Ref, Request) end, |
| Attachment#att{data=RcvFun}. |
| |
| cleanup() -> |
| receive |
| {ibrowse_async_response, _, _} -> |
| %% TODO maybe log, didn't expect to have data here |
| cleanup(); |
| {ibrowse_async_response_end, _} -> |
| cleanup() |
| after 0 -> |
| erase(), |
| ok |
| end. |
| |
| % internal funs |
| |
| attachment_receiver(Ref, Request) -> |
| case get(Ref) of |
| undefined -> |
| ReqId = start_http_request(Request), |
| put(Ref, ReqId), |
| receive_data(Ref, ReqId); |
| ReqId -> |
| receive_data(Ref, ReqId) |
| end. |
| |
| receive_data(Ref, ReqId) -> |
| receive |
| {ibrowse_async_response, ReqId, {chunk_start,_}} -> |
| receive_data(Ref, ReqId); |
| {ibrowse_async_response, ReqId, chunk_end} -> |
| receive_data(Ref, ReqId); |
| {ibrowse_async_response, ReqId, {error, Err}} -> |
| ?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]), |
| throw({attachment_request_failed, Err}); |
| {ibrowse_async_response, ReqId, Data} -> |
| % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]), |
| Data; |
| {ibrowse_async_response_end, ReqId} -> |
| ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]), |
| throw({attachment_request_failed, premature_end}) |
| end. |
| |
| start_http_request(Req) -> |
| %% set stream_to here because self() has changed |
| Req2 = Req#http_db{options = [{stream_to,self()} | Req#http_db.options]}, |
| {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req2), |
| receive {ibrowse_async_headers, ReqId, Code, Headers} -> |
| case validate_headers(Req2, list_to_integer(Code), Headers) of |
| ok -> |
| ReqId; |
| {ok, NewReqId} -> |
| NewReqId |
| end |
| end. |
| |
| validate_headers(_Req, 200, _Headers) -> |
| ok; |
| validate_headers(Req, Code, Headers) when Code > 299, Code < 400 -> |
| Url = mochiweb_headers:get_value("Location",mochiweb_headers:make(Headers)), |
| NewReq = couch_rep_httpc:redirected_request(Req, Url), |
| {ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq), |
| receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} -> |
| ok = validate_headers(NewReq, list_to_integer(NewCode), NewHeaders) |
| end, |
| {ok, ReqId}; |
| validate_headers(Req, Code, _Headers) -> |
| #http_db{url=Url, resource=Resource} = Req, |
| ?LOG_ERROR("got ~p for ~s~s", [Code, Url, Resource]), |
| throw({attachment_request_failed, {bad_code, Code}}). |