blob: 783ee96078406ce6fff76b2ec7bef673a10b474e [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_rep_httpc).
-include("couch_db.hrl").
-include("../ibrowse/ibrowse.hrl").
-export([db_exists/1, db_exists/2, full_url/1, request/1, redirected_request/2,
redirect_url/2, spawn_worker_process/1, spawn_link_worker_process/1]).
-export([ssl_options/1]).
request(#http_db{} = Req) ->
do_request(Req).
do_request(#http_db{url=Url} = Req) when is_binary(Url) ->
do_request(Req#http_db{url = ?b2l(Url)});
do_request(Req) ->
#http_db{
auth = Auth,
body = B,
conn = Conn,
headers = Headers0,
method = Method,
options = Opts,
qs = QS
} = Req,
Url = full_url(Req),
Headers = case couch_util:get_value(<<"oauth">>, Auth) of
undefined ->
Headers0;
{OAuthProps} ->
[oauth_header(Url, QS, Method, OAuthProps) | Headers0]
end,
Body = case B of
{Fun, InitialState} when is_function(Fun) ->
{Fun, InitialState};
nil ->
[];
_Else ->
iolist_to_binary(?JSON_ENCODE(B))
end,
Resp = case Conn of
nil ->
ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity);
_ ->
ibrowse:send_req_direct(Conn, Url, Headers, Method, Body, Opts, infinity)
end,
process_response(Resp, Req).
db_exists(Req) ->
db_exists(Req, Req#http_db.url).
db_exists(Req, true) ->
db_exists(Req, Req#http_db.url, true);
db_exists(Req, false) ->
db_exists(Req, Req#http_db.url, false);
db_exists(Req, CanonicalUrl) ->
db_exists(Req, CanonicalUrl, false).
db_exists(Req, CanonicalUrl, CreateDB) ->
#http_db{
auth = Auth,
headers = Headers0,
options = Options,
url = Url
} = Req,
HeadersFun = fun(Method) ->
case couch_util:get_value(<<"oauth">>, Auth) of
undefined ->
Headers0;
{OAuthProps} ->
[oauth_header(Url, [], Method, OAuthProps) | Headers0]
end
end,
case CreateDB of
true ->
Headers = [{"Content-Length", 0} | HeadersFun(put)],
catch ibrowse:send_req(Url, Headers, put, [], Options);
_Else -> ok
end,
case catch ibrowse:send_req(Url, HeadersFun(head), head, [], Options) of
{ok, "200", _, _} ->
config_http(CanonicalUrl),
Req#http_db{url = CanonicalUrl};
{ok, "301", RespHeaders, _} ->
RedirectUrl = redirect_url(RespHeaders, Req#http_db.url),
db_exists(Req#http_db{url = RedirectUrl}, RedirectUrl);
{ok, "302", RespHeaders, _} ->
RedirectUrl = redirect_url(RespHeaders, Req#http_db.url),
db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl);
{ok, "401", _, _} ->
throw({unauthorized, ?l2b(Url)});
Error ->
?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]),
throw({db_not_found, ?l2b(Url)})
end.
config_http(Url) ->
#url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
ok = ibrowse:set_max_sessions(Host, Port, list_to_integer(
couch_config:get("replicator", "max_http_sessions", "20"))),
ok = ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(
couch_config:get("replicator", "max_http_pipeline_size", "50"))),
ok = couch_config:register(
fun("replicator", "max_http_sessions", MaxSessions) ->
ibrowse:set_max_sessions(Host, Port, list_to_integer(MaxSessions));
("replicator", "max_http_pipeline_size", PipeSize) ->
ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(PipeSize))
end).
redirect_url(RespHeaders, OrigUrl) ->
MochiHeaders = mochiweb_headers:make(RespHeaders),
RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
#url{
host = Host, host_type = HostType, port = Port,
path = Path, protocol = Proto
} = ibrowse_lib:parse_url(RedUrl),
#url{username = User, password = Passwd} = ibrowse_lib:parse_url(OrigUrl),
Creds = case is_list(User) andalso is_list(Passwd) of
true ->
User ++ ":" ++ Passwd ++ "@";
false ->
[]
end,
HostPart = case HostType of
ipv6_address ->
"[" ++ Host ++ "]";
_ ->
Host
end,
atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++
integer_to_list(Port) ++ Path.
full_url(#http_db{url=Url} = Req) when is_binary(Url) ->
full_url(Req#http_db{url = ?b2l(Url)});
full_url(#http_db{qs=[]} = Req) ->
Req#http_db.url ++ Req#http_db.resource;
full_url(Req) ->
#http_db{
url = Url,
resource = Resource,
qs = QS
} = Req,
QStr = lists:map(fun({K,V}) -> io_lib:format("~s=~s",
[couch_util:to_list(K), couch_util:to_list(V)]) end, QS),
lists:flatten([Url, Resource, "?", string:join(QStr, "&")]).
process_response({ok, Status, Headers, Body}, Req) ->
Code = list_to_integer(Status),
if Code =:= 200; Code =:= 201 ->
?JSON_DECODE(maybe_decompress(Headers, Body));
Code =:= 301; Code =:= 302 ->
RedirectUrl = redirect_url(Headers, Req#http_db.url),
do_request(redirected_request(Req, RedirectUrl));
Code =:= 409 ->
throw(conflict);
Code >= 400, Code < 500 ->
?JSON_DECODE(maybe_decompress(Headers, Body));
Code =:= 500; Code =:= 502; Code =:= 503 ->
#http_db{pause = Pause, retries = Retries} = Req,
?LOG_INFO("retrying couch_rep_httpc request in ~p seconds " ++
% "due to remote server error: ~s~s", [Pause/1000, Req#http_db.url,
"due to remote server error: ~p Body ~s", [Pause/1000, Code,
Body]),
timer:sleep(Pause),
do_request(Req#http_db{retries = Retries-1, pause = 2*Pause});
true ->
exit({http_request_failed, ?l2b(["unhandled response code ", Status])})
end;
process_response({ibrowse_req_id, Id}, _Req) ->
{ibrowse_req_id, Id};
process_response({error, _Reason}, #http_db{url=Url, retries=0}) ->
?LOG_ERROR("couch_rep_httpc request failed after 10 retries: ~s", [Url]),
exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
process_response({error, Reason}, Req) ->
#http_db{
method = Method,
retries = Retries,
pause = Pause
} = Req,
ShortReason = case Reason of
sel_conn_closed ->
connection_closed;
{'EXIT', {noproc, _}} ->
noproc;
{'EXIT', {normal, _}} ->
normal;
Else ->
Else
end,
?LOG_DEBUG("retrying couch_rep_httpc ~p request in ~p seconds due to " ++
"{error, ~p}", [Method, Pause/1000, ShortReason]),
timer:sleep(Pause),
if Reason == worker_is_dead ->
C = spawn_link_worker_process(Req),
do_request(Req#http_db{retries = Retries-1, pause = 2*Pause, conn=C});
true ->
do_request(Req#http_db{retries = Retries-1, pause = 2*Pause})
end.
redirected_request(Req, RedirectUrl) ->
{Base, QStr, _} = mochiweb_util:urlsplit_path(RedirectUrl),
QS = mochiweb_util:parse_qs(QStr),
Hdrs = case couch_util:get_value(<<"oauth">>, Req#http_db.auth) of
undefined ->
Req#http_db.headers;
_Else ->
lists:keydelete("Authorization", 1, Req#http_db.headers)
end,
Req#http_db{url=Base, resource="", qs=QS, headers=Hdrs}.
spawn_worker_process(Req) ->
Url = ibrowse_lib:parse_url(Req#http_db.url),
{ok, Pid} = ibrowse_http_client:start(Url),
Pid.
spawn_link_worker_process(Req) ->
{ok, Pid} = ibrowse:spawn_link_worker_process(Req#http_db.url),
Pid.
maybe_decompress(Headers, Body) ->
MochiHeaders = mochiweb_headers:make(Headers),
case mochiweb_headers:get_value("Content-Encoding", MochiHeaders) of
"gzip" ->
zlib:gunzip(Body);
_ ->
Body
end.
oauth_header(Url, QS, Action, Props) ->
% erlang-oauth doesn't like iolists
QSL = [{couch_util:to_list(K), ?b2l(?l2b(couch_util:to_list(V)))} ||
{K,V} <- QS],
ConsumerKey = ?b2l(couch_util:get_value(<<"consumer_key">>, Props)),
Token = ?b2l(couch_util:get_value(<<"token">>, Props)),
TokenSecret = ?b2l(couch_util:get_value(<<"token_secret">>, Props)),
ConsumerSecret = ?b2l(couch_util:get_value(<<"consumer_secret">>, Props)),
SignatureMethodStr = ?b2l(couch_util:get_value(<<"signature_method">>, Props, <<"HMAC-SHA1">>)),
SignatureMethodAtom = case SignatureMethodStr of
"PLAINTEXT" ->
plaintext;
"HMAC-SHA1" ->
hmac_sha1;
"RSA-SHA1" ->
rsa_sha1
end,
Consumer = {ConsumerKey, ConsumerSecret, SignatureMethodAtom},
Method = case Action of
get -> "GET";
post -> "POST";
put -> "PUT";
head -> "HEAD"
end,
Params = oauth:signed_params(Method, Url, QSL, Consumer, Token, TokenSecret)
-- QSL,
{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}.
ssl_options(#http_db{url = Url}) ->
case ibrowse_lib:parse_url(Url) of
#url{protocol = https} ->
Depth = list_to_integer(
couch_config:get("replicator", "ssl_certificate_max_depth", "3")
),
SslOpts = [{depth, Depth} |
case couch_config:get("replicator", "verify_ssl_certificates") of
"true" ->
ssl_verify_options(true);
_ ->
ssl_verify_options(false)
end],
[{is_ssl, true}, {ssl_options, SslOpts}];
#url{protocol = http} ->
[]
end.
ssl_verify_options(Value) ->
ssl_verify_options(Value, erlang:system_info(otp_release)).
ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
[{verify, verify_peer}, {cacertfile, CAFile}];
ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
[{verify, verify_none}];
ssl_verify_options(true, _OTPVersion) ->
CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
[{verify, 2}, {cacertfile, CAFile}];
ssl_verify_options(false, _OTPVersion) ->
[{verify, 0}].