| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_replicator_parse). |
| |
| -export([ |
| parse_rep_doc/1, |
| parse_rep_doc/2, |
| parse_rep_db/3, |
| parse_rep_doc_without_id/1, |
| parse_rep_doc_without_id/2 |
| ]). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("ibrowse/include/ibrowse.hrl"). |
| -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). |
| -include("couch_replicator.hrl"). |
| |
| -define(DEFAULT_SOCK_OPTS, [{keepalive, true}, {nodelay, false}]). |
| -define(VALID_SOCK_OPTS, [ |
| buffer, |
| keepalive, |
| nodelay, |
| priority, |
| recbuf, |
| sndbuf |
| ]). |
| -define(VALID_PROTOCOLS, #{ |
| endpoint => [http, https], |
| proxy => [http, https, socks5] |
| }). |
| |
| %% erlfmt-ignore |
| default_options() -> |
| [ |
| {connection_timeout, cfg_int("connection_timeout", 30000)}, |
| {retries, cfg_int("retries_per_request", 5)}, |
| {http_connections, cfg_int("http_connections", 20)}, |
| {worker_batch_size, cfg_int("worker_batch_size", 500)}, |
| {worker_processes, cfg_int("worker_processes", 4)}, |
| {checkpoint_interval, cfg_int("checkpoint_interval", 30000)}, |
| {use_checkpoints, cfg_boolean("use_checkpoints", true)}, |
| {use_bulk_get, cfg_boolean("use_bulk_get", true)}, |
| {socket_options, cfg_sock_opts()} |
| ]. |
| |
| % Note: parse_rep_doc can handle filtered replications. During parsing of the |
| % replication doc it will make possibly remote http requests to the source |
| % database. If failure or parsing of filter docs fails, parse_doc throws a |
| % {filter_fetch_error, Error} exception. This exception should be considered |
| % transient in respect to the contents of the document itself, since it depends |
| % on netowrk availability of the source db and other factors. |
| -spec parse_rep_doc({[_]}) -> #rep{}. |
| parse_rep_doc(RepDoc) -> |
| {ok, Rep} = |
| try |
| parse_rep_doc(RepDoc, rep_user_ctx(RepDoc)) |
| catch |
| throw:{error, Reason} -> |
| throw({bad_rep_doc, Reason}); |
| throw:{filter_fetch_error, Reason} -> |
| throw({filter_fetch_error, Reason}); |
| Tag:Err -> |
| throw({bad_rep_doc, to_binary({Tag, Err})}) |
| end, |
| Rep. |
| |
| -spec parse_rep_doc_without_id({[_]}) -> #rep{}. |
| parse_rep_doc_without_id(RepDoc) -> |
| {ok, Rep} = |
| try |
| parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc)) |
| catch |
| throw:{error, Reason} -> |
| throw({bad_rep_doc, Reason}); |
| Tag:Err -> |
| throw({bad_rep_doc, to_binary({Tag, Err})}) |
| end, |
| Rep. |
| |
| -spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}. |
| parse_rep_doc(Doc, UserCtx) -> |
| {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx), |
| Cancel = get_value(cancel, Rep#rep.options, false), |
| Id = get_value(id, Rep#rep.options, nil), |
| case {Cancel, Id} of |
| {true, nil} -> |
| % Cancel request with no id, must parse id out of body contents |
| {ok, update_rep_id(Rep)}; |
| {true, Id} -> |
| % Cancel request with an id specified, so do not parse id from body |
| {ok, Rep}; |
| {false, _Id} -> |
| % Not a cancel request, regular replication doc |
| {ok, update_rep_id(Rep)} |
| end. |
| |
| -spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}. |
| parse_rep_doc_without_id({Props}, UserCtx) -> |
| {SrcProxy, TgtProxy} = parse_proxy_settings(Props), |
| Opts = make_options(Props), |
| case |
| get_value(cancel, Opts, false) andalso |
| (get_value(id, Opts, nil) =/= nil) |
| of |
| true -> |
| {ok, #rep{options = Opts, user_ctx = UserCtx}}; |
| false -> |
| Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts), |
| Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts), |
| {Type, View} = |
| case couch_replicator_filters:view_type(Props, Opts) of |
| {error, Error} -> |
| throw({bad_request, Error}); |
| Result -> |
| Result |
| end, |
| Rep = #rep{ |
| source = Source, |
| target = Target, |
| options = Opts, |
| user_ctx = UserCtx, |
| type = Type, |
| view = View, |
| doc_id = get_value(<<"_id">>, Props, null) |
| }, |
| % Check if can parse filter code, if not throw exception |
| case couch_replicator_filters:parse(Opts) of |
| {error, FilterError} -> |
| throw({error, FilterError}); |
| {ok, _Filter} -> |
| ok |
| end, |
| {ok, Rep} |
| end. |
| |
| parse_proxy_settings(Props) when is_list(Props) -> |
| Proxy = get_value(<<"proxy">>, Props, <<>>), |
| SrcProxy = get_value(<<"source_proxy">>, Props, <<>>), |
| TgtProxy = get_value(<<"target_proxy">>, Props, <<>>), |
| |
| case Proxy =/= <<>> of |
| true when SrcProxy =/= <<>> -> |
| Error = "`proxy` is mutually exclusive with `source_proxy`", |
| throw({bad_request, Error}); |
| true when TgtProxy =/= <<>> -> |
| Error = "`proxy` is mutually exclusive with `target_proxy`", |
| throw({bad_request, Error}); |
| true -> |
| {Proxy, Proxy}; |
| false -> |
| {SrcProxy, TgtProxy} |
| end. |
| |
| % Update a #rep{} record with a replication_id. Calculating the id might involve |
| % fetching a filter from the source db, and so it could fail intermetently. |
| % In case of a failure to fetch the filter this function will throw a |
| % `{filter_fetch_error, Reason} exception. |
| update_rep_id(Rep) -> |
| RepId = couch_replicator_ids:replication_id(Rep), |
| Rep#rep{id = RepId}. |
| |
| -spec rep_user_ctx({[_]}) -> #user_ctx{}. |
| rep_user_ctx({RepDoc}) -> |
| case get_json_value(<<"user_ctx">>, RepDoc) of |
| undefined -> |
| #user_ctx{}; |
| {UserCtx} -> |
| #user_ctx{ |
| name = get_json_value(<<"name">>, UserCtx, null), |
| roles = get_json_value(<<"roles">>, UserCtx, []) |
| } |
| end. |
| |
| -spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return(). |
| parse_rep_db({Props}, Proxy, Options) -> |
| Url0 = get_value(<<"url">>, Props), |
| ok = check_url(Url0, endpoint), |
| Url = maybe_add_trailing_slash(Url0), |
| ProxyParams = parse_proxy_params(Proxy), |
| ProxyURL = |
| case ProxyParams of |
| [] -> undefined; |
| _ -> binary_to_list(Proxy) |
| end, |
| {AuthProps} = get_value(<<"auth">>, Props, {[]}), |
| {BinHeaders} = get_value(<<"headers">>, Props, {[]}), |
| Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), |
| DefaultHeaders = (#httpdb{})#httpdb.headers, |
| HttpDb = #httpdb{ |
| url = Url, |
| auth_props = AuthProps, |
| headers = lists:ukeymerge(1, Headers, DefaultHeaders), |
| ibrowse_options = lists:keysort( |
| 1, |
| [ |
| {socket_options, get_value(socket_options, Options)} |
| | ProxyParams ++ ssl_params(Url) |
| ] |
| ), |
| timeout = get_value(connection_timeout, Options), |
| http_connections = get_value(http_connections, Options), |
| retries = get_value(retries, Options), |
| proxy_url = ProxyURL |
| }, |
| couch_replicator_utils:normalize_basic_auth(HttpDb); |
| parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) -> |
| parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); |
| parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) -> |
| parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); |
| parse_rep_db(<<_/binary>>, _Proxy, _Options) -> |
| throw({error, local_endpoints_not_supported}); |
| parse_rep_db(undefined, _Proxy, _Options) -> |
| throw({error, <<"Missing replicator database">>}). |
| |
| check_url(<<_/binary>> = Url, Type) when is_atom(Type) -> |
| case ibrowse_lib:parse_url(?b2l(Url)) of |
| #url{protocol = Protocol} -> |
| check_protocol(Protocol, Type); |
| {error, _} -> |
| BinType = atom_to_binary(Type), |
| throw({error, <<BinType/binary, " has an invalid url">>}) |
| end; |
| check_url(_, Type) when is_atom(Type) -> |
| BinType = atom_to_binary(Type), |
| throw({error, <<BinType/binary, " has an invalid url">>}). |
| |
| check_protocol(Protocol, Type) -> |
| CfgName = "valid_" ++ atom_to_list(Type) ++ "_protocols", |
| Allowed = cfg_atoms(CfgName, maps:get(Type, ?VALID_PROTOCOLS)), |
| check_protocol(Protocol, Type, Allowed). |
| |
| check_protocol(Protocol, Type, Allowed) -> |
| case lists:member(Protocol, Allowed) of |
| true -> |
| ok; |
| false -> |
| BinType = atom_to_binary(Type), |
| throw({error, <<BinType/binary, " has an invalid url">>}) |
| end. |
| |
| -spec maybe_add_trailing_slash(binary() | list()) -> list(). |
| maybe_add_trailing_slash(Url) when is_binary(Url) -> |
| maybe_add_trailing_slash(?b2l(Url)); |
| maybe_add_trailing_slash(Url) -> |
| case lists:member($?, Url) of |
| true -> |
| % skip if there are query params |
| Url; |
| false -> |
| case lists:last(Url) of |
| $/ -> |
| Url; |
| _ -> |
| Url ++ "/" |
| end |
| end. |
| |
| -spec make_options([_]) -> [_]. |
| make_options(Props) -> |
| Options0 = lists:ukeysort(1, convert_options(Props)), |
| Options = check_options(Options0), |
| Defaults = lists:keysort(1, default_options()), |
| lists:ukeymerge(1, Options, Defaults). |
| |
| cfg_int(Var, Default) -> |
| config:get_integer("replicator", Var, Default). |
| |
| cfg_boolean(Var, Default) -> |
| config:get_boolean("replicator", Var, Default). |
| |
| cfg_atoms(Cfg, Default) -> |
| case cfg(Cfg) of |
| undefined -> |
| Default; |
| V when is_list(V) -> |
| [list_to_atom(string:strip(S)) || S <- string:split(V, ",")] |
| end. |
| |
| cfg_sock_opts() -> |
| CfgTerm = cfg("socket_options"), |
| parse_sock_opts(CfgTerm, ?DEFAULT_SOCK_OPTS, ?VALID_SOCK_OPTS). |
| |
| cfg(Var) -> |
| config:get("replicator", Var). |
| |
| parse_sock_opts(undefined, Defaults, _) -> |
| Defaults; |
| parse_sock_opts(Term, _Defaults, ValidOpts) -> |
| SocketOptions = |
| case couch_util:parse_term(Term) of |
| {ok, Opts} -> Opts; |
| {error, _Error} -> [] |
| end, |
| Fun = fun({K, _}) -> lists:member(K, ValidOpts) end, |
| lists:filtermap(Fun, SocketOptions). |
| |
| sock_opts(CfgTerm) -> |
| ValidOpts = cfg_atoms("valid_socket_options", ?VALID_SOCK_OPTS), |
| parse_sock_opts(CfgTerm, ?DEFAULT_SOCK_OPTS, ValidOpts). |
| |
| -spec convert_options([_]) -> [_]. |
| convert_options([]) -> |
| []; |
| convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V) -> |
| throw({bad_request, <<"parameter `cancel` must be a boolean">>}); |
| convert_options([{<<"cancel">>, V} | R]) -> |
| [{cancel, V} | convert_options(R)]; |
| convert_options([{IdOpt, V} | R]) when |
| IdOpt =:= <<"_local_id">>; |
| IdOpt =:= <<"replication_id">>; |
| IdOpt =:= <<"id">> |
| -> |
| [{id, couch_replicator_ids:convert(V)} | convert_options(R)]; |
| convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V) -> |
| throw({bad_request, <<"parameter `create_target` must be a boolean">>}); |
| convert_options([{<<"create_target">>, V} | R]) -> |
| [{create_target, V} | convert_options(R)]; |
| convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) -> |
| throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>}); |
| convert_options([{<<"create_target_params">>, V} | R]) -> |
| [{create_target_params, V} | convert_options(R)]; |
| convert_options([{<<"winning_revs_only">>, V} | _R]) when not is_boolean(V) -> |
| throw({bad_request, <<"parameter `winning_revs_only` must be a boolean">>}); |
| convert_options([{<<"winning_revs_only">>, V} | R]) -> |
| [{winning_revs_only, V} | convert_options(R)]; |
| convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V) -> |
| throw({bad_request, <<"parameter `continuous` must be a boolean">>}); |
| convert_options([{<<"continuous">>, V} | R]) -> |
| [{continuous, V} | convert_options(R)]; |
| convert_options([{<<"filter">>, V} | R]) -> |
| [{filter, V} | convert_options(R)]; |
| convert_options([{<<"query_params">>, V} | _R]) when not is_tuple(V) -> |
| throw({bad_request, <<"parameter `query_params` must be an object">>}); |
| convert_options([{<<"query_params">>, V} | R]) -> |
| [{query_params, V} | convert_options(R)]; |
| convert_options([{<<"doc_ids">>, null} | R]) -> |
| convert_options(R); |
| convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) -> |
| throw({bad_request, <<"parameter `doc_ids` must be an array">>}); |
| convert_options([{<<"doc_ids">>, V} | R]) -> |
| % Ensure same behaviour as old replicator: accept a list of percent |
| % encoded doc IDs. |
| DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]), |
| [{doc_ids, DocIds} | convert_options(R)]; |
| convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) -> |
| throw({bad_request, <<"parameter `selector` must be a JSON object">>}); |
| convert_options([{<<"selector">>, V} | R]) -> |
| [{selector, V} | convert_options(R)]; |
| convert_options([{<<"worker_processes">>, V} | R]) -> |
| [{worker_processes, couch_util:to_integer(V)} | convert_options(R)]; |
| convert_options([{<<"worker_batch_size">>, V} | R]) -> |
| [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)]; |
| convert_options([{<<"http_connections">>, V} | R]) -> |
| [{http_connections, couch_util:to_integer(V)} | convert_options(R)]; |
| convert_options([{<<"connection_timeout">>, V} | R]) -> |
| [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)]; |
| convert_options([{<<"retries_per_request">>, V} | R]) -> |
| [{retries, couch_util:to_integer(V)} | convert_options(R)]; |
| convert_options([{<<"socket_options">>, V} | R]) -> |
| [{socket_options, sock_opts(V)} | convert_options(R)]; |
| convert_options([{<<"since_seq">>, V} | R]) -> |
| [{since_seq, V} | convert_options(R)]; |
| convert_options([{<<"use_checkpoints">>, V} | R]) -> |
| [{use_checkpoints, V} | convert_options(R)]; |
| convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) -> |
| throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>}); |
| convert_options([{<<"use_bulk_get">>, V} | R]) -> |
| [{use_bulk_get, V} | convert_options(R)]; |
| convert_options([{<<"checkpoint_interval">>, V} | R]) -> |
| [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; |
| % skip unknown option |
| convert_options([_ | R]) -> |
| convert_options(R). |
| |
| -spec check_options([_]) -> [_]. |
| check_options(Options) -> |
| DocIds = lists:keyfind(doc_ids, 1, Options), |
| Filter = lists:keyfind(filter, 1, Options), |
| Selector = lists:keyfind(selector, 1, Options), |
| case {DocIds, Filter, Selector} of |
| {false, false, false} -> Options; |
| {false, false, _} -> Options; |
| {false, _, false} -> Options; |
| {_, false, false} -> Options; |
| _ -> throw({bad_request, "`doc_ids`,`filter`,`selector` are mutually exclusive"}) |
| end. |
| |
| -spec parse_proxy_params(binary() | [_]) -> [_]. |
| parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> |
| parse_proxy_params(?b2l(ProxyUrl)); |
| parse_proxy_params([]) -> |
| []; |
| parse_proxy_params(ProxyUrl) when is_list(ProxyUrl) -> |
| #url{ |
| host = Host, |
| port = Port, |
| username = User, |
| password = Passwd, |
| protocol = Protocol |
| } = ibrowse_lib:parse_url(ProxyUrl), |
| Params = |
| [ |
| {proxy_host, Host}, |
| {proxy_port, Port} |
| ] ++ |
| case is_list(User) andalso is_list(Passwd) of |
| false -> |
| []; |
| true -> |
| [{proxy_user, User}, {proxy_password, Passwd}] |
| end, |
| ok = check_protocol(Protocol, proxy), |
| case Protocol of |
| socks5 -> |
| [proxy_to_socks5(Param) || Param <- Params]; |
| _ -> |
| Params |
| end; |
| parse_proxy_params(_) -> |
| throw({error, <<"Invalid proxy url">>}). |
| |
| -spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}. |
| proxy_to_socks5({proxy_host, Val}) -> |
| {socks5_host, Val}; |
| proxy_to_socks5({proxy_port, Val}) -> |
| {socks5_port, Val}; |
| proxy_to_socks5({proxy_user, Val}) -> |
| {socks5_user, Val}; |
| proxy_to_socks5({proxy_password, Val}) -> |
| {socks5_password, Val}. |
| |
| -spec ssl_params([_]) -> [_]. |
| ssl_params(Url) -> |
| case ibrowse_lib:parse_url(Url) of |
| #url{protocol = https} -> |
| Depth = cfg_int("ssl_certificate_max_depth", 3), |
| VerifyCerts = cfg_boolean("verify_ssl_certificates", false), |
| CertFile = cfg("cert_file"), |
| KeyFile = cfg("key_file"), |
| Password = cfg("password"), |
| SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)], |
| SslOpts1 = |
| case CertFile /= undefined andalso KeyFile /= undefined of |
| true -> |
| case Password of |
| undefined -> |
| [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts; |
| _ -> |
| [ |
| {certfile, CertFile}, |
| {keyfile, KeyFile}, |
| {password, Password} |
| ] ++ SslOpts |
| end; |
| false -> |
| SslOpts |
| end, |
| [{is_ssl, true}, {ssl_options, SslOpts1}]; |
| #url{protocol = http} -> |
| [] |
| end. |
| |
| -spec ssl_verify_options(true | false) -> [_]. |
| ssl_verify_options(true) -> |
| CAFile = cfg("ssl_trusted_certificates_file"), |
| [{verify, verify_peer}, {cacertfile, CAFile}]; |
| ssl_verify_options(false) -> |
| [{verify, verify_none}]. |
| |
| get_value(Key, Props) -> |
| couch_util:get_value(Key, Props). |
| |
| get_value(Key, Props, Default) -> |
| couch_util:get_value(Key, Props, Default). |
| |
| to_binary(Val) -> |
| couch_util:to_binary(Val). |
| |
| get_json_value(Key, Obj) -> |
| couch_replicator_utils:get_json_value(Key, Obj). |
| |
| get_json_value(Key, Obj, Default) -> |
| couch_replicator_utils:get_json_value(Key, Obj, Default). |
| |
| -ifdef(TEST). |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| |
| check_options_pass_values_test() -> |
| ?assertEqual(check_options([]), []), |
| ?assertEqual(check_options([baz, {other, fiz}]), [baz, {other, fiz}]), |
| ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]), |
| ?assertEqual(check_options([{filter, x}]), [{filter, x}]), |
| ?assertEqual(check_options([{selector, x}]), [{selector, x}]). |
| |
| check_options_fail_values_test() -> |
| ?assertThrow( |
| {bad_request, _}, |
| check_options([{doc_ids, x}, {filter, y}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| check_options([{doc_ids, x}, {selector, y}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| check_options([{filter, x}, {selector, y}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| check_options([{doc_ids, x}, {selector, y}, {filter, z}]) |
| ). |
| |
| check_convert_options_pass_test() -> |
| ?assertEqual([], convert_options([])), |
| ?assertEqual([], convert_options([{<<"random">>, 42}])), |
| ?assertEqual( |
| [{cancel, true}], |
| convert_options([{<<"cancel">>, true}]) |
| ), |
| ?assertEqual( |
| [{create_target, true}], |
| convert_options([{<<"create_target">>, true}]) |
| ), |
| ?assertEqual( |
| [{winning_revs_only, true}], |
| convert_options([{<<"winning_revs_only">>, true}]) |
| ), |
| ?assertEqual( |
| [{continuous, true}], |
| convert_options([{<<"continuous">>, true}]) |
| ), |
| ?assertEqual( |
| [{doc_ids, [<<"id">>]}], |
| convert_options([{<<"doc_ids">>, [<<"id">>]}]) |
| ), |
| ?assertEqual( |
| [{selector, {key, value}}], |
| convert_options([{<<"selector">>, {key, value}}]) |
| ), |
| ?assertEqual( |
| [{query_params, {[{<<"x">>, 1}]}}], |
| convert_options([{<<"query_params">>, {[{<<"x">>, 1}]}}]) |
| ). |
| |
| check_convert_options_fail_test() -> |
| ?assertThrow( |
| {bad_request, _}, |
| convert_options([{<<"cancel">>, <<"true">>}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| convert_options([{<<"create_target">>, <<"true">>}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| convert_options([{<<"winning_revs_only">>, <<"foo">>}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| convert_options([{<<"continuous">>, <<"true">>}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| convert_options([{<<"doc_ids">>, not_a_list}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| convert_options([{<<"selector">>, [{key, value}]}]) |
| ), |
| ?assertThrow( |
| {bad_request, _}, |
| convert_options([{<<"query_params">>, 42}]) |
| ). |
| |
| rep_user_ctx_test() -> |
| RepDoc = {[{<<"user_ctx">>, {[]}}]}, |
| ?assertEqual( |
| #user_ctx{name = null, roles = [], handler = undefined}, |
| rep_user_ctx(RepDoc) |
| ). |
| |
| local_replication_endpoint_error_test_() -> |
| { |
| foreach, |
| fun meck_config/0, |
| fun(_) -> meck:unload() end, |
| [ |
| ?TDEF_FE(t_error_on_local_endpoint), |
| ?TDEF_FE(t_proxy_params_default), |
| ?TDEF_FE(t_parse_db_string), |
| ?TDEF_FE(t_parse_db_url), |
| ?TDEF_FE(t_parse_db_invalid_protocol), |
| ?TDEF_FE(t_parse_proxy_invalid_protocol), |
| ?TDEF_FE(t_parse_sock_opts), |
| ?TDEF_FE(t_parse_sock_opts_invalid) |
| ] |
| }. |
| |
| meck_config() -> |
| meck:expect(config, get, fun(_, _, Default) -> Default end). |
| |
| t_error_on_local_endpoint(_) -> |
| RepDoc = |
| {[ |
| {<<"_id">>, <<"someid">>}, |
| {<<"source">>, <<"localdb">>}, |
| {<<"target">>, <<"http://somehost.local/tgt">>} |
| ]}, |
| Expect = local_endpoints_not_supported, |
| ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc_without_id(RepDoc)). |
| |
| t_proxy_params_default(_) -> |
| ?assertEqual( |
| [ |
| {proxy_host, "foo.com"}, |
| {proxy_port, 443}, |
| {proxy_user, "u"}, |
| {proxy_password, "p"} |
| ], |
| parse_proxy_params("https://u:p@foo.com") |
| ), |
| ?assertEqual( |
| [ |
| {socks5_host, "foo.com"}, |
| {socks5_port, 1080}, |
| {socks5_user, "u"}, |
| {socks5_password, "p"} |
| ], |
| parse_proxy_params("socks5://u:p@foo.com") |
| ). |
| |
| t_parse_db_string(_) -> |
| ?assertMatch( |
| #httpdb{ |
| url = "http://a/", |
| proxy_url = "http://x" |
| }, |
| parse_rep_db(<<"http://a">>, <<"http://x">>, []) |
| ), |
| ?assertMatch( |
| #httpdb{ |
| url = "https://a/", |
| proxy_url = "https://x" |
| }, |
| parse_rep_db(<<"https://a">>, <<"https://x">>, []) |
| ), |
| ?assertThrow({error, _}, parse_rep_db(<<"abc">>, <<"foo">>, [])), |
| ?assertThrow({error, _}, parse_rep_db(undefined, <<"foo">>, [])). |
| |
| t_parse_db_url(_) -> |
| ?assertMatch( |
| #httpdb{ |
| url = "http://a?foo", |
| proxy_url = "http://x" |
| }, |
| parse_rep_db({[{<<"url">>, <<"http://a?foo">>}]}, <<"http://x">>, []) |
| ), |
| ?assertMatch( |
| #httpdb{ |
| url = "https://a/", |
| proxy_url = "https://x" |
| }, |
| parse_rep_db({[{<<"url">>, <<"https://a">>}]}, <<"https://x">>, []) |
| ), |
| PUrl = <<"http://x">>, |
| ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"abc">>}]}, PUrl, [])), |
| ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"httpx://a">>}]}, PUrl, [])), |
| ?assertThrow({error, _}, parse_rep_db({[]}, PUrl, [])). |
| |
| t_parse_db_invalid_protocol(_) -> |
| MeckFun = fun |
| ("replicator", "valid_endpoint_protocols", _) -> "https"; |
| (_, _, Default) -> Default |
| end, |
| meck:expect(config, get, MeckFun), |
| PUrl = <<"http://x">>, |
| ?assertMatch( |
| #httpdb{ |
| url = "https://a/", |
| proxy_url = "http://x" |
| }, |
| parse_rep_db({[{<<"url">>, <<"https://a">>}]}, PUrl, []) |
| ), |
| ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"http://a">>}]}, PUrl, [])). |
| |
| t_parse_proxy_invalid_protocol(_) -> |
| MeckFun = fun |
| ("replicator", "valid_proxy_protocols", _) -> "socks5"; |
| (_, _, Default) -> Default |
| end, |
| meck:expect(config, get, MeckFun), |
| Url = <<"http://a">>, |
| ?assertMatch( |
| #httpdb{ |
| url = "https://a/", |
| proxy_url = "socks5://x" |
| }, |
| parse_rep_db({[{<<"url">>, <<"https://a">>}]}, <<"socks5://x">>, []) |
| ), |
| ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, Url}]}, <<"http://x">>, [])). |
| |
| t_parse_sock_opts(_) -> |
| Allowed = "priority, sndbuf", |
| MeckFun = fun |
| ("replicator", "valid_socket_options", _) -> Allowed; |
| (_, _, Default) -> Default |
| end, |
| meck:expect(config, get, MeckFun), |
| RepDoc = |
| {[ |
| {<<"source">>, <<"http://a">>}, |
| {<<"target">>, <<"http://b/">>}, |
| {<<"socket_options">>, <<"[{priority, 3}, {potato, true}, {sndbuf, 10000}]">>} |
| ]}, |
| Rep = parse_rep_doc_without_id(RepDoc), |
| ?assertMatch( |
| #rep{ |
| source = #httpdb{url = "http://a/"}, |
| target = #httpdb{url = "http://b/"}, |
| options = [{_, _} | _] |
| }, |
| Rep |
| ), |
| Options = Rep#rep.options, |
| ?assertEqual( |
| [ |
| {checkpoint_interval, 30000}, |
| {connection_timeout, 30000}, |
| {http_connections, 20}, |
| {retries, 5}, |
| {socket_options, [ |
| {priority, 3}, |
| {sndbuf, 10000} |
| ]}, |
| {use_bulk_get, true}, |
| {use_checkpoints, true}, |
| {worker_batch_size, 500}, |
| {worker_processes, 4} |
| ], |
| Options |
| ). |
| |
| t_parse_sock_opts_invalid(_) -> |
| ?assertEqual([], parse_sock_opts(<<"<}garbage]][">>, [], [])). |
| |
| -endif. |