blob: e75cc5a63ab5faf62fdfd94ad292f51136fd9554 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator_connection_tests).
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-define(TIMEOUT, 1000).
setup() ->
Host = config:get("httpd", "bind_address", "127.0.0.1"),
Port = config:get("httpd", "port", "5984"),
{Host, Port}.
teardown(_) ->
ok.
httpc_pool_test_() ->
{
"replicator connection sharing tests",
{
setup,
fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
{
foreach,
fun setup/0, fun teardown/1,
[
fun connections_shared_after_release/1,
fun connections_not_shared_after_owner_death/1,
fun idle_connections_closed/1,
fun test_owner_monitors/1,
fun worker_discards_creds_on_create/1,
fun worker_discards_url_creds_after_request/1,
fun worker_discards_creds_in_headers_after_request/1,
fun worker_discards_proxy_creds_after_request/1
]
}
}
}.
connections_shared_after_release({Host, Port}) ->
?_test(begin
URL = "http://" ++ Host ++ ":" ++ Port,
Self = self(),
{ok, Pid} = couch_replicator_connection:acquire(URL),
couch_replicator_connection:release(Pid),
spawn(fun() ->
Self ! couch_replicator_connection:acquire(URL)
end),
receive
{ok, Pid2} ->
?assertEqual(Pid, Pid2)
end
end).
connections_not_shared_after_owner_death({Host, Port}) ->
?_test(begin
URL = "http://" ++ Host ++ ":" ++ Port,
Self = self(),
spawn(fun() ->
Self ! couch_replicator_connection:acquire(URL),
error("simulate division by zero without compiler warning")
end),
receive
{ok, Pid} ->
{ok, Pid2} = couch_replicator_connection:acquire(URL),
?assertNotEqual(Pid, Pid2),
MRef = monitor(process, Pid),
receive {'DOWN', MRef, process, Pid, _Reason} ->
?assert(not is_process_alive(Pid));
Other -> throw(Other)
end
end
end).
idle_connections_closed({Host, Port}) ->
?_test(begin
URL = "http://" ++ Host ++ ":" ++ Port,
{ok, Pid} = couch_replicator_connection:acquire(URL),
couch_replicator_connection ! close_idle_connections,
?assert(ets:member(couch_replicator_connection, Pid)),
% block until idle connections have closed
sys:get_status(couch_replicator_connection),
couch_replicator_connection:release(Pid),
couch_replicator_connection ! close_idle_connections,
% block until idle connections have closed
sys:get_status(couch_replicator_connection),
?assert(not ets:member(couch_replicator_connection, Pid))
end).
test_owner_monitors({Host, Port}) ->
?_test(begin
URL = "http://" ++ Host ++ ":" ++ Port,
{ok, Worker0} = couch_replicator_connection:acquire(URL),
assert_monitors_equal([{process, self()}]),
couch_replicator_connection:release(Worker0),
assert_monitors_equal([]),
{Workers, Monitors} = lists:foldl(fun(_, {WAcc, MAcc}) ->
{ok, Worker1} = couch_replicator_connection:acquire(URL),
MAcc1 = [{process, self()} | MAcc],
assert_monitors_equal(MAcc1),
{[Worker1 | WAcc], MAcc1}
end, {[], []}, lists:seq(1,5)),
lists:foldl(fun(Worker2, Acc) ->
[_ | NewAcc] = Acc,
couch_replicator_connection:release(Worker2),
assert_monitors_equal(NewAcc),
NewAcc
end, Monitors, Workers)
end).
worker_discards_creds_on_create({Host, Port}) ->
?_test(begin
{User, Pass, B64Auth} = user_pass(),
URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ Port,
{ok, WPid} = couch_replicator_connection:acquire(URL),
Internals = worker_internals(WPid),
?assert(string:str(Internals, B64Auth) =:= 0),
?assert(string:str(Internals, Pass) =:= 0)
end).
worker_discards_url_creds_after_request({Host, _}) ->
?_test(begin
{User, Pass, B64Auth} = user_pass(),
{Port, ServerPid} = server(),
PortStr = integer_to_list(Port),
URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ PortStr,
{ok, WPid} = couch_replicator_connection:acquire(URL),
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], [])),
Internals = worker_internals(WPid),
?assert(string:str(Internals, B64Auth) =:= 0),
?assert(string:str(Internals, Pass) =:= 0),
couch_replicator_connection:release(WPid),
unlink(ServerPid),
exit(ServerPid, kill)
end).
worker_discards_creds_in_headers_after_request({Host, _}) ->
?_test(begin
{_User, Pass, B64Auth} = user_pass(),
{Port, ServerPid} = server(),
PortStr = integer_to_list(Port),
URL = "http://" ++ Host ++ ":" ++ PortStr,
{ok, WPid} = couch_replicator_connection:acquire(URL),
Headers = [{"Authorization", "Basic " ++ B64Auth}],
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, Headers, [])),
Internals = worker_internals(WPid),
?assert(string:str(Internals, B64Auth) =:= 0),
?assert(string:str(Internals, Pass) =:= 0),
couch_replicator_connection:release(WPid),
unlink(ServerPid),
exit(ServerPid, kill)
end).
worker_discards_proxy_creds_after_request({Host, _}) ->
?_test(begin
{User, Pass, B64Auth} = user_pass(),
{Port, ServerPid} = server(),
PortStr = integer_to_list(Port),
URL = "http://" ++ Host ++ ":" ++ PortStr,
{ok, WPid} = couch_replicator_connection:acquire(URL),
Opts = [
{proxy_host, Host},
{proxy_port, Port},
{proxy_user, User},
{proxy_pass, Pass}
],
?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], Opts)),
Internals = worker_internals(WPid),
?assert(string:str(Internals, B64Auth) =:= 0),
?assert(string:str(Internals, Pass) =:= 0),
couch_replicator_connection:release(WPid),
unlink(ServerPid),
exit(ServerPid, kill)
end).
send_req(WPid, URL, Headers, Opts) ->
ibrowse:send_req_direct(WPid, URL, Headers, get, [], Opts).
user_pass() ->
User = "specialuser",
Pass = "averysecretpassword",
B64Auth = ibrowse_lib:encode_base64(User ++ ":" ++ Pass),
{User, Pass, B64Auth}.
worker_internals(Pid) ->
Dict = io_lib:format("~p", [erlang:process_info(Pid, dictionary)]),
State = io_lib:format("~p", [sys:get_state(Pid)]),
lists:flatten([Dict, State]).
server() ->
{ok, LSock} = gen_tcp:listen(0, [{recbuf, 256}, {active, false}]),
{ok, LPort} = inet:port(LSock),
SPid = spawn_link(fun() -> server_responder(LSock) end),
{LPort, SPid}.
server_responder(LSock) ->
{ok, Sock} = gen_tcp:accept(LSock),
case gen_tcp:recv(Sock, 0) of
{ok, Data} ->
% sanity check that all the request data was received
?assert(lists:prefix("GET ", Data)),
?assert(lists:suffix("\r\n\r\n", Data)),
Res = ["HTTP/1.1 200 OK", "Content-Length: 0", "\r\n"],
ok = gen_tcp:send(Sock, string:join(Res, "\r\n"));
Other ->
gen_tcp:close(Sock),
throw({replication_eunit_tcp_server_crashed, Other})
end,
server_responder(LSock).
assert_monitors_equal(ShouldBe) ->
sys:get_status(couch_replicator_connection),
{monitors, Monitors} = process_info(whereis(couch_replicator_connection), monitors),
?assertEqual(Monitors, ShouldBe).