| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_rep_httpc). |
| -include("couch_db.hrl"). |
| -include("../ibrowse/ibrowse.hrl"). |
| |
| -export([db_exists/1, db_exists/2, full_url/1, request/1, redirected_request/2, |
| redirect_url/2, spawn_worker_process/1, spawn_link_worker_process/1]). |
| -export([ssl_options/1]). |
| |
| request(#http_db{} = Req) -> |
| do_request(Req). |
| |
| do_request(#http_db{url=Url} = Req) when is_binary(Url) -> |
| do_request(Req#http_db{url = ?b2l(Url)}); |
| |
| do_request(Req) -> |
| #http_db{ |
| auth = Auth, |
| body = B, |
| conn = Conn, |
| headers = Headers0, |
| method = Method, |
| options = Opts, |
| qs = QS |
| } = Req, |
| Url = full_url(Req), |
| Headers = case couch_util:get_value(<<"oauth">>, Auth) of |
| undefined -> |
| Headers0; |
| {OAuthProps} -> |
| [oauth_header(Url, QS, Method, OAuthProps) | Headers0] |
| end, |
| Body = case B of |
| {Fun, InitialState} when is_function(Fun) -> |
| {Fun, InitialState}; |
| nil -> |
| []; |
| _Else -> |
| iolist_to_binary(?JSON_ENCODE(B)) |
| end, |
| Resp = case Conn of |
| nil -> |
| ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity); |
| _ -> |
| ibrowse:send_req_direct(Conn, Url, Headers, Method, Body, Opts, infinity) |
| end, |
| process_response(Resp, Req). |
| |
| db_exists(Req) -> |
| db_exists(Req, Req#http_db.url). |
| |
| db_exists(Req, true) -> |
| db_exists(Req, Req#http_db.url, true); |
| |
| db_exists(Req, false) -> |
| db_exists(Req, Req#http_db.url, false); |
| |
| db_exists(Req, CanonicalUrl) -> |
| db_exists(Req, CanonicalUrl, false). |
| |
| db_exists(Req, CanonicalUrl, CreateDB) -> |
| #http_db{ |
| auth = Auth, |
| headers = Headers0, |
| options = Options, |
| url = Url |
| } = Req, |
| HeadersFun = fun(Method) -> |
| case couch_util:get_value(<<"oauth">>, Auth) of |
| undefined -> |
| Headers0; |
| {OAuthProps} -> |
| [oauth_header(Url, [], Method, OAuthProps) | Headers0] |
| end |
| end, |
| case CreateDB of |
| true -> |
| Headers = [{"Content-Length", 0} | HeadersFun(put)], |
| catch ibrowse:send_req(Url, Headers, put, [], Options); |
| _Else -> ok |
| end, |
| case catch ibrowse:send_req(Url, HeadersFun(head), head, [], Options) of |
| {ok, "200", _, _} -> |
| config_http(CanonicalUrl), |
| Req#http_db{url = CanonicalUrl}; |
| {ok, "301", RespHeaders, _} -> |
| RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), |
| db_exists(Req#http_db{url = RedirectUrl}, RedirectUrl); |
| {ok, "302", RespHeaders, _} -> |
| RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), |
| db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl); |
| {ok, "401", _, _} -> |
| throw({unauthorized, ?l2b(Url)}); |
| Error -> |
| ?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]), |
| throw({db_not_found, ?l2b(Url)}) |
| end. |
| |
| config_http(Url) -> |
| #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), |
| ok = ibrowse:set_max_sessions(Host, Port, list_to_integer( |
| couch_config:get("replicator", "max_http_sessions", "20"))), |
| ok = ibrowse:set_max_pipeline_size(Host, Port, list_to_integer( |
| couch_config:get("replicator", "max_http_pipeline_size", "50"))), |
| ok = couch_config:register( |
| fun("replicator", "max_http_sessions", MaxSessions) -> |
| ibrowse:set_max_sessions(Host, Port, list_to_integer(MaxSessions)); |
| ("replicator", "max_http_pipeline_size", PipeSize) -> |
| ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(PipeSize)) |
| end). |
| |
| redirect_url(RespHeaders, OrigUrl) -> |
| MochiHeaders = mochiweb_headers:make(RespHeaders), |
| RedUrl = mochiweb_headers:get_value("Location", MochiHeaders), |
| #url{ |
| host = Host, host_type = HostType, port = Port, |
| path = Path, protocol = Proto |
| } = ibrowse_lib:parse_url(RedUrl), |
| #url{username = User, password = Passwd} = ibrowse_lib:parse_url(OrigUrl), |
| Creds = case is_list(User) andalso is_list(Passwd) of |
| true -> |
| User ++ ":" ++ Passwd ++ "@"; |
| false -> |
| [] |
| end, |
| HostPart = case HostType of |
| ipv6_address -> |
| "[" ++ Host ++ "]"; |
| _ -> |
| Host |
| end, |
| atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++ |
| integer_to_list(Port) ++ Path. |
| |
| full_url(#http_db{url=Url} = Req) when is_binary(Url) -> |
| full_url(Req#http_db{url = ?b2l(Url)}); |
| |
| full_url(#http_db{qs=[]} = Req) -> |
| Req#http_db.url ++ Req#http_db.resource; |
| |
| full_url(Req) -> |
| #http_db{ |
| url = Url, |
| resource = Resource, |
| qs = QS |
| } = Req, |
| QStr = lists:map(fun({K,V}) -> io_lib:format("~s=~s", |
| [couch_util:to_list(K), couch_util:to_list(V)]) end, QS), |
| lists:flatten([Url, Resource, "?", string:join(QStr, "&")]). |
| |
| process_response({ok, Status, Headers, Body}, Req) -> |
| Code = list_to_integer(Status), |
| if Code =:= 200; Code =:= 201 -> |
| ?JSON_DECODE(maybe_decompress(Headers, Body)); |
| Code =:= 301; Code =:= 302 -> |
| RedirectUrl = redirect_url(Headers, Req#http_db.url), |
| do_request(redirected_request(Req, RedirectUrl)); |
| Code =:= 409 -> |
| throw(conflict); |
| Code >= 400, Code < 500 -> |
| ?JSON_DECODE(maybe_decompress(Headers, Body)); |
| Code =:= 500; Code =:= 502; Code =:= 503 -> |
| #http_db{pause = Pause, retries = Retries} = Req, |
| ?LOG_INFO("retrying couch_rep_httpc request in ~p seconds " ++ |
| % "due to remote server error: ~s~s", [Pause/1000, Req#http_db.url, |
| "due to remote server error: ~p Body ~s", [Pause/1000, Code, |
| Body]), |
| timer:sleep(Pause), |
| do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}); |
| true -> |
| exit({http_request_failed, ?l2b(["unhandled response code ", Status])}) |
| end; |
| |
| process_response({ibrowse_req_id, Id}, _Req) -> |
| {ibrowse_req_id, Id}; |
| |
| process_response({error, _Reason}, #http_db{url=Url, retries=0}) -> |
| ?LOG_ERROR("couch_rep_httpc request failed after 10 retries: ~s", [Url]), |
| exit({http_request_failed, ?l2b(["failed to replicate ", Url])}); |
| process_response({error, Reason}, Req) -> |
| #http_db{ |
| method = Method, |
| retries = Retries, |
| pause = Pause |
| } = Req, |
| ShortReason = case Reason of |
| sel_conn_closed -> |
| connection_closed; |
| {'EXIT', {noproc, _}} -> |
| noproc; |
| {'EXIT', {normal, _}} -> |
| normal; |
| Else -> |
| Else |
| end, |
| ?LOG_DEBUG("retrying couch_rep_httpc ~p request in ~p seconds due to " ++ |
| "{error, ~p}", [Method, Pause/1000, ShortReason]), |
| timer:sleep(Pause), |
| if Reason == worker_is_dead -> |
| C = spawn_link_worker_process(Req), |
| do_request(Req#http_db{retries = Retries-1, pause = 2*Pause, conn=C}); |
| true -> |
| do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}) |
| end. |
| |
| redirected_request(Req, RedirectUrl) -> |
| {Base, QStr, _} = mochiweb_util:urlsplit_path(RedirectUrl), |
| QS = mochiweb_util:parse_qs(QStr), |
| Hdrs = case couch_util:get_value(<<"oauth">>, Req#http_db.auth) of |
| undefined -> |
| Req#http_db.headers; |
| _Else -> |
| lists:keydelete("Authorization", 1, Req#http_db.headers) |
| end, |
| Req#http_db{url=Base, resource="", qs=QS, headers=Hdrs}. |
| |
| spawn_worker_process(Req) -> |
| Url = ibrowse_lib:parse_url(Req#http_db.url), |
| {ok, Pid} = ibrowse_http_client:start(Url), |
| Pid. |
| |
| spawn_link_worker_process(Req) -> |
| {ok, Pid} = ibrowse:spawn_link_worker_process(Req#http_db.url), |
| Pid. |
| |
| maybe_decompress(Headers, Body) -> |
| MochiHeaders = mochiweb_headers:make(Headers), |
| case mochiweb_headers:get_value("Content-Encoding", MochiHeaders) of |
| "gzip" -> |
| zlib:gunzip(Body); |
| _ -> |
| Body |
| end. |
| |
| oauth_header(Url, QS, Action, Props) -> |
| % erlang-oauth doesn't like iolists |
| QSL = [{couch_util:to_list(K), ?b2l(?l2b(couch_util:to_list(V)))} || |
| {K,V} <- QS], |
| ConsumerKey = ?b2l(couch_util:get_value(<<"consumer_key">>, Props)), |
| Token = ?b2l(couch_util:get_value(<<"token">>, Props)), |
| TokenSecret = ?b2l(couch_util:get_value(<<"token_secret">>, Props)), |
| ConsumerSecret = ?b2l(couch_util:get_value(<<"consumer_secret">>, Props)), |
| SignatureMethodStr = ?b2l(couch_util:get_value(<<"signature_method">>, Props, <<"HMAC-SHA1">>)), |
| SignatureMethodAtom = case SignatureMethodStr of |
| "PLAINTEXT" -> |
| plaintext; |
| "HMAC-SHA1" -> |
| hmac_sha1; |
| "RSA-SHA1" -> |
| rsa_sha1 |
| end, |
| Consumer = {ConsumerKey, ConsumerSecret, SignatureMethodAtom}, |
| Method = case Action of |
| get -> "GET"; |
| post -> "POST"; |
| put -> "PUT"; |
| head -> "HEAD" |
| end, |
| Params = oauth:signed_params(Method, Url, QSL, Consumer, Token, TokenSecret) |
| -- QSL, |
| {"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}. |
| |
| ssl_options(#http_db{url = Url}) -> |
| case ibrowse_lib:parse_url(Url) of |
| #url{protocol = https} -> |
| Depth = list_to_integer( |
| couch_config:get("replicator", "ssl_certificate_max_depth", "3") |
| ), |
| SslOpts = [{depth, Depth} | |
| case couch_config:get("replicator", "verify_ssl_certificates") of |
| "true" -> |
| ssl_verify_options(true); |
| _ -> |
| ssl_verify_options(false) |
| end], |
| [{is_ssl, true}, {ssl_options, SslOpts}]; |
| #url{protocol = http} -> |
| [] |
| end. |
| |
| ssl_verify_options(Value) -> |
| ssl_verify_options(Value, erlang:system_info(otp_release)). |
| |
| ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" -> |
| CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), |
| [{verify, verify_peer}, {cacertfile, CAFile}]; |
| ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" -> |
| [{verify, verify_none}]; |
| ssl_verify_options(true, _OTPVersion) -> |
| CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), |
| [{verify, 2}, {cacertfile, CAFile}]; |
| ssl_verify_options(false, _OTPVersion) -> |
| [{verify, 0}]. |