blob: b9f04ab3f5e2b8e59135590286f67ae2c714485f [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_rep_httpc).
-include("couch_db.hrl").
-include("../ibrowse/ibrowse.hrl").
-export([db_exists/1, full_url/1, request/1, spawn_worker_process/1,
spawn_link_worker_process/1]).
request(Req) when is_record(Req, http_db) ->
do_request(Req).
do_request(#http_db{url=Url} = Req) when is_binary(Url) ->
do_request(Req#http_db{url = ?b2l(Url)});
do_request(Req) ->
#http_db{
auth = Auth,
body = B,
conn = Conn,
headers = Headers0,
method = Method,
options = Opts,
qs = QS
} = Req,
Url = full_url(Req),
Headers = case proplists:get_value(<<"oauth">>, Auth) of
undefined ->
Headers0;
{OAuthProps} ->
[oauth_header(Url, QS, Method, OAuthProps) | Headers0]
end,
Body = case B of
{Fun, InitialState} when is_function(Fun) ->
{Fun, InitialState};
nil ->
[];
_Else ->
iolist_to_binary(?JSON_ENCODE(B))
end,
Resp = case Conn of
nil ->
ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity);
_ ->
ibrowse:send_req_direct(Conn, Url, Headers, Method, Body, Opts, infinity)
end,
process_response(Resp, Req).
db_exists(Req) ->
db_exists(Req, Req#http_db.url).
db_exists(Req, CanonicalUrl) ->
#http_db{
auth = Auth,
headers = Headers0,
url = Url
} = Req,
Headers = case proplists:get_value(<<"oauth">>, Auth) of
undefined ->
Headers0;
{OAuthProps} ->
[oauth_header(Url, [], head, OAuthProps) | Headers0]
end,
case catch ibrowse:send_req(Url, Headers, head) of
{ok, "200", _, _} ->
Req#http_db{url = CanonicalUrl};
{ok, "301", RespHeaders, _} ->
MochiHeaders = mochiweb_headers:make(RespHeaders),
RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders),
db_exists(Req#http_db{url = RedirectUrl}, RedirectUrl);
{ok, "302", RespHeaders, _} ->
MochiHeaders = mochiweb_headers:make(RespHeaders),
RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders),
db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl);
Error ->
?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]),
throw({db_not_found, ?l2b(Url)})
end.
full_url(#http_db{url=Url} = Req) when is_binary(Url) ->
full_url(Req#http_db{url = ?b2l(Url)});
full_url(#http_db{qs=[]} = Req) ->
Req#http_db.url ++ Req#http_db.resource;
full_url(Req) ->
#http_db{
url = Url,
resource = Resource,
qs = QS
} = Req,
QStr = lists:map(fun({K,V}) -> io_lib:format("~s=~s",
[couch_util:to_list(K), couch_util:to_list(V)]) end, QS),
lists:flatten([Url, Resource, "?", string:join(QStr, "&")]).
process_response({ok, Status, Headers, Body}, Req) ->
Code = list_to_integer(Status),
if Code =:= 200; Code =:= 201 ->
?JSON_DECODE(maybe_decompress(Headers, Body));
Code =:= 301; Code =:= 302 ->
MochiHeaders = mochiweb_headers:make(Headers),
RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders),
do_request(Req#http_db{url = RedirectUrl});
Code =:= 409 ->
throw(conflict);
Code >= 400, Code < 500 ->
?JSON_DECODE(maybe_decompress(Headers, Body));
Code =:= 500; Code =:= 502 ->
#http_db{pause = Pause, retries = Retries} = Req,
?LOG_INFO("retrying couch_rep_httpc request in ~p seconds " ++
% "due to remote server error: ~s~s", [Pause/1000, Req#http_db.url,
"due to remote server error: ~p Body ~s", [Pause/1000, Code,
Body]),
timer:sleep(Pause),
do_request(Req#http_db{retries = Retries-1, pause = 2*Pause})
end;
process_response({ibrowse_req_id, Id}, _Req) ->
{ibrowse_req_id, Id};
process_response({error, _Reason}, #http_db{url=Url, retries=0}) ->
?LOG_ERROR("couch_rep_httpc request failed after 10 retries: ~s", [Url]),
exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
process_response({error, Reason}, Req) ->
#http_db{
method = Method,
retries = Retries,
pause = Pause
} = Req,
ShortReason = case Reason of
connection_closed ->
connection_closed;
{'EXIT', {noproc, _}} ->
noproc;
{'EXIT', {normal, _}} ->
normal;
Else ->
Else
end,
?LOG_DEBUG("retrying couch_rep_httpc ~p request in ~p seconds due to " ++
"{error, ~p}", [Method, Pause/1000, ShortReason]),
timer:sleep(Pause),
if Reason == worker_is_dead ->
C = spawn_link_worker_process(Req),
do_request(Req#http_db{retries = Retries-1, pause = 2*Pause, conn=C});
true ->
do_request(Req#http_db{retries = Retries-1, pause = 2*Pause})
end.
spawn_worker_process(Req) ->
Url = ibrowse_lib:parse_url(Req#http_db.url),
{ok, Pid} = ibrowse_http_client:start(Url),
Pid.
spawn_link_worker_process(Req) ->
Url = ibrowse_lib:parse_url(Req#http_db.url),
{ok, Pid} = ibrowse_http_client:start_link(Url),
Pid.
maybe_decompress(Headers, Body) ->
MochiHeaders = mochiweb_headers:make(Headers),
case mochiweb_headers:get_value("Content-Encoding", MochiHeaders) of
"gzip" ->
zlib:gunzip(Body);
_ ->
Body
end.
oauth_header(Url, QS, Action, Props) ->
QSL = [{couch_util:to_list(K), couch_util:to_list(V)} || {K,V} <- QS],
ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, Props)),
Token = ?b2l(proplists:get_value(<<"token">>, Props)),
TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, Props)),
ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, Props)),
Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
Method = case Action of
get -> "GET";
post -> "POST";
put -> "PUT";
head -> "HEAD"
end,
Params = oauth:signed_params(Method, Url, QSL, Consumer, Token, TokenSecret)
-- QSL,
{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}.