Merge branch 'improve_pipeline_balance' of https://github.com/benjaminplee/ibrowse into merge_pull_req_123
diff --git a/.gitignore b/.gitignore
index 12f55c8..1491146 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@
Emakefile
*.bat
.dialyzer_plt
+.rebar
diff --git a/CONTRIBUTORS b/CONTRIBUTORS
index 21e8d06..665e64b 100644
--- a/CONTRIBUTORS
+++ b/CONTRIBUTORS
@@ -9,9 +9,12 @@
Adam Kocoloski
Andrew Tunnell-Jones
Anthony Molinaro
+Benjamin P Lee (https://github.com/benjaminplee)
Benoit Chesneau (https://github.com/benoitc)
+Brian Richards (http://github.com/richbria)
Chris Newcombe
Dan Kelley
+Dan Schwabe (https://github.com/dfschwabe)
Derek Upham
Eric Merritt
Erik Reitsma
diff --git a/Makefile b/Makefile
index b596b64..28dfda8 100644
--- a/Makefile
+++ b/Makefile
@@ -15,9 +15,11 @@
mkdir -p $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
cp -r ebin $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
-test: all
+eunit_test: all
./rebar eunit
- erl -noshell -pa .eunit -pa test -s ibrowse -s ibrowse_test unit_tests \
+
+test: all
+ erl -noshell -pa test -pa ebin -s ibrowse_test unit_tests \
-s ibrowse_test verify_chunked_streaming \
-s ibrowse_test test_chunked_streaming_once \
-s erlang halt
diff --git a/include/ibrowse.hrl b/include/ibrowse.hrl
index 150b1b7..5da1f0d 100644
--- a/include/ibrowse.hrl
+++ b/include/ibrowse.hrl
@@ -18,4 +18,9 @@
-record(ibrowse_conf, {key, value}).
+-define(CONNECTIONS_LOCAL_TABLE, ibrowse_lb).
+-define(LOAD_BALANCER_NAMED_TABLE, ibrowse_lb).
+-define(CONF_TABLE, ibrowse_conf).
+-define(STREAM_TABLE, ibrowse_stream).
+
-endif.
diff --git a/rebar b/rebar
index b9c73ff..8e4deb6 100755
--- a/rebar
+++ b/rebar
Binary files differ
diff --git a/src/ibrowse.erl b/src/ibrowse.erl
index fbb4b83..51fcb86 100644
--- a/src/ibrowse.erl
+++ b/src/ibrowse.erl
@@ -651,7 +651,7 @@
io:format("~80.80.=s~n", [""]),
Metrics = get_metrics(),
lists:foreach(
- fun({Host, Port, Lb_pid, Tid, Size}) ->
+ fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
[Host ++ ":" ++ integer_to_list(Port),
integer_to_list(Tid),
@@ -697,7 +697,7 @@
fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
case get_metrics(X_host, X_port) of
{_, _, _, _, _} = X_res ->
- [X_res | X_acc];
+ [{X_host, X_port, X_res} | X_acc];
_X_res ->
X_acc
end
@@ -708,7 +708,12 @@
[] ->
no_active_processes;
[#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
- MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
+ MsgQueueSize = case (catch process_info(Lb_pid, message_queue_len)) of
+ {message_queue_len, Msg_q_len} ->
+ Msg_q_len;
+ _ ->
+ -1
+ end,
case Tid of
undefined ->
{Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl
index db9559a..92e4964 100644
--- a/src/ibrowse_http_client.erl
+++ b/src/ibrowse_http_client.erl
@@ -2021,12 +2021,12 @@
proc_state = Proc_state} = State) when Tid /= undefined,
Proc_state /= ?dead_proc_walking ->
Ts = os:timestamp(),
+ catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
(catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'},
[{'==', '$1', {const,self()}},
{'<', '$2', {const,Ts}}
],
[true]}])),
- catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
State#state{cur_pipeline_size = Pipe_sz - 1};
dec_pipeline_counter(State) ->
State.
diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl
index 88b169b..894d8ad 100644
--- a/src/ibrowse_lb.erl
+++ b/src/ibrowse_lb.erl
@@ -124,17 +124,17 @@
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
Tid_size = ets:info(Tid, size),
- case Tid_size > Max_sess of
+ case Tid_size >= Max_sess of
true ->
- Reply = find_best_connection(Tid, Max_pipe, Tid_size),
+ Reply = find_best_connection(Tid, Max_pipe),
{reply, Reply, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
false ->
{ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options),
Ts = os:timestamp(),
- ets:insert(Tid, {{0, Ts, Pid}, []}),
- {reply, {ok, {0, Ts, Pid}}, State_1#state{max_sessions = Max_sess,
- max_pipeline_size = Max_pipe}}
+ ets:insert(Tid, {{1, Ts, Pid}, []}),
+ {reply, {ok, {1, Ts, Pid}}, State_1#state{max_sessions = Max_sess,
+ max_pipeline_size = Max_pipe}}
end;
handle_call(Request, _From, State) ->
@@ -215,18 +215,13 @@
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-find_best_connection(Tid, Max_pipe, _Num_cur) ->
+find_best_connection(Tid, Max_pipe) ->
case ets:first(Tid) of
- {Spec_size, Ts, Pid} = First ->
- case Spec_size >= Max_pipe of
- true ->
- {error, retry_later};
- false ->
- ets:delete(Tid, First),
- ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
- {ok, First}
- end;
- '$end_of_table' ->
+ {Spec_size, Ts, Pid} = First when Spec_size < Max_pipe ->
+ ets:delete(Tid, First),
+ ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
+ {ok, First};
+ _ ->
{error, retry_later}
end.
diff --git a/src/ibrowse_lib.erl b/src/ibrowse_lib.erl
index 1098b0f..1f0a61a 100644
--- a/src/ibrowse_lib.erl
+++ b/src/ibrowse_lib.erl
@@ -363,6 +363,7 @@
parse_url([], State, Url, TmpAcc) ->
{invalid_uri_2, State, Url, TmpAcc}.
+default_port(socks5) -> 1080;
default_port(http) -> 80;
default_port(https) -> 443;
default_port(ftp) -> 21.
diff --git a/src/ibrowse_socks5.erl b/src/ibrowse_socks5.erl
index 41d57f2..417f595 100644
--- a/src/ibrowse_socks5.erl
+++ b/src/ibrowse_socks5.erl
@@ -1,47 +1,108 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
-module(ibrowse_socks5).
--export([connect/3]).
+-define(VERSION, 5).
+-define(CONNECT, 1).
--define(TIMEOUT, 2000).
+-define(NO_AUTH, 0).
+-define(USERPASS, 2).
+-define(UNACCEPTABLE, 16#FF).
+-define(RESERVED, 0).
--define(SOCKS5, 5).
--define(AUTH_METHOD_NO, 0).
--define(AUTH_METHOD_USERPASS, 2).
--define(ADDRESS_TYPE_IP4, 1).
--define(COMMAND_TYPE_TCPIP_STREAM, 1).
--define(RESERVER, 0).
--define(STATUS_GRANTED, 0).
+-define(ATYP_IPV4, 1).
+-define(ATYP_DOMAINNAME, 3).
+-define(ATYP_IPV6, 4).
-connect(Host, Port, Options) ->
- Socks5Host = proplists:get_value(socks5_host, Options),
- Socks5Port = proplists:get_value(socks5_port, Options),
+-define(SUCCEEDED, 0).
- {ok, Socket} = gen_tcp:connect(Socks5Host, Socks5Port, [binary, {packet, 0}, {keepalive, true}, {active, false}]),
+-export([connect/5]).
- {ok, _Bin} =
- case proplists:get_value(socks5_user, Options, undefined) of
- undefined ->
- ok = gen_tcp:send(Socket, <<?SOCKS5, 1, ?AUTH_METHOD_NO>>),
- {ok, <<?SOCKS5, ?AUTH_METHOD_NO>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT);
- _Else ->
- Socks5User = list_to_binary(proplists:get_value(socks5_user, Options)),
- Socks5Pass = list_to_binary(proplists:get_value(socks5_pass, Options)),
+-import(ibrowse_lib, [get_value/2, get_value/3]).
- ok = gen_tcp:send(Socket, <<?SOCKS5, 1, ?AUTH_METHOD_USERPASS>>),
- {ok, <<?SOCKS5, ?AUTH_METHOD_USERPASS>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT),
+connect(Host, Port, Options, SockOptions, Timeout) ->
+ Socks5Host = get_value(socks5_host, Options),
+ Socks5Port = get_value(socks5_port, Options),
+ case gen_tcp:connect(Socks5Host, Socks5Port, SockOptions, Timeout) of
+ {ok, Socket} ->
+ case handshake(Socket, Options) of
+ ok ->
+ case connect(Host, Port, Socket) of
+ ok ->
+ {ok, Socket};
+ Else ->
+ gen_tcp:close(Socket),
+ Else
+ end;
+ Else ->
+ gen_tcp:close(Socket),
+ Else
+ end;
+ Else ->
+ Else
+ end.
- UserLength = byte_size(Socks5User),
-
- ok = gen_tcp:send(Socket, << 1, UserLength >>),
- ok = gen_tcp:send(Socket, Socks5User),
- PassLength = byte_size(Socks5Pass),
- ok = gen_tcp:send(Socket, << PassLength >>),
- ok = gen_tcp:send(Socket, Socks5Pass),
- {ok, <<1, 0>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT)
+handshake(Socket, Options) when is_port(Socket) ->
+ {Handshake, Success} = case get_value(socks5_user, Options, <<>>) of
+ <<>> ->
+ {<<?VERSION, 1, ?NO_AUTH>>, ?NO_AUTH};
+ User ->
+ Password = get_value(socks5_password, Options, <<>>),
+ {<<?VERSION, 1, ?USERPASS, (byte_size(User)), User,
+ (byte_size(Password)), Password>>, ?USERPASS}
end,
+ ok = gen_tcp:send(Socket, Handshake),
+ case gen_tcp:recv(Socket, 0) of
+ {ok, <<?VERSION, Success>>} ->
+ ok;
+ {ok, <<?VERSION, ?UNACCEPTABLE>>} ->
+ {error, unacceptable};
+ {error, Reason} ->
+ {error, Reason}
+ end.
- {ok, {IP1,IP2,IP3,IP4}} = inet:getaddr(Host, inet),
+connect(Host, Port, Via) when is_list(Host) ->
+ connect(list_to_binary(Host), Port, Via);
+connect(Host, Port, Via) when is_binary(Host), is_integer(Port),
+ is_port(Via) ->
+ {AddressType, Address} = case inet:parse_address(binary_to_list(Host)) of
+ {ok, {IP1, IP2, IP3, IP4}} ->
+ {?ATYP_IPV4, <<IP1,IP2,IP3,IP4>>};
+ {ok, {IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8}} ->
+ {?ATYP_IPV6, <<IP1,IP2,IP3,IP4,IP5,IP6,IP7,IP8>>};
+ _ ->
+ HostLength = byte_size(Host),
+ {?ATYP_DOMAINNAME, <<HostLength,Host/binary>>}
+ end,
+ ok = gen_tcp:send(Via,
+ <<?VERSION, ?CONNECT, ?RESERVED,
+ AddressType, Address/binary,
+ (Port):16>>),
+ case gen_tcp:recv(Via, 0) of
+ {ok, <<?VERSION, ?SUCCEEDED, ?RESERVED, _/binary>>} ->
+ ok;
+ {ok, <<?VERSION, Rep, ?RESERVED, _/binary>>} ->
+ {error, rep(Rep)};
+ {error, Reason} ->
+ {error, Reason}
+ end.
- ok = gen_tcp:send(Socket, <<?SOCKS5, ?COMMAND_TYPE_TCPIP_STREAM, ?RESERVER, ?ADDRESS_TYPE_IP4, IP1, IP2, IP3, IP4, Port:16>>),
- {ok, << ?SOCKS5, ?STATUS_GRANTED, ?RESERVER, ?ADDRESS_TYPE_IP4, IP1, IP2, IP3, IP4, Port:16 >>} = gen_tcp:recv(Socket, 10, ?TIMEOUT),
- {ok, Socket}.
+rep(0) -> succeeded;
+rep(1) -> server_fail;
+rep(2) -> disallowed_by_ruleset;
+rep(3) -> network_unreachable;
+rep(4) -> host_unreachable;
+rep(5) -> connection_refused;
+rep(6) -> ttl_expired;
+rep(7) -> command_not_supported;
+rep(8) -> address_type_not_supported.
diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl
new file mode 100644
index 0000000..3517011
--- /dev/null
+++ b/test/ibrowse_functional_tests.erl
@@ -0,0 +1,174 @@
+%%% File : ibrowse_functional_tests.erl
+%%% Authors : Benjamin Lee <http://github.com/benjaminplee>
+%%% Dan Schwabe <http://github.com/dfschwabe>
+%%% Brian Richards <http://github.com/richbria>
+%%% Description : Functional tests of the ibrowse library using a live test HTTP server
+%%% Created : 18 November 2014 by Benjamin Lee <yardspoon@gmail.com>
+
+-module(ibrowse_functional_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-define(PER_TEST_TIMEOUT_SEC, 60).
+-define(TIMEDTEST(Desc, Fun), {Desc, {timeout, ?PER_TEST_TIMEOUT_SEC, fun Fun/0}}).
+
+-define(SERVER_PORT, 8181).
+-define(BASE_URL, "http://localhost:" ++ integer_to_list(?SERVER_PORT)).
+-define(SHORT_TIMEOUT_MS, 5000).
+-define(LONG_TIMEOUT_MS, 30000).
+-define(PAUSE_FOR_CONNECTIONS_MS, 2000).
+
+-compile(export_all).
+
+setup() ->
+ application:start(crypto),
+ application:start(public_key),
+ application:start(ssl),
+ ibrowse_test_server:start_server(?SERVER_PORT, tcp),
+ ibrowse:start(),
+ ok.
+
+teardown(_) ->
+ ibrowse:stop(),
+ ibrowse_test_server:stop_server(?SERVER_PORT),
+ ok.
+
+running_server_fixture_test_() ->
+ {foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TIMEDTEST("Simple request can be honored", simple_request),
+ ?TIMEDTEST("Slow server causes timeout", slow_server_timeout),
+ ?TIMEDTEST("Pipeline depth goes down with responses", pipeline_depth),
+ ?TIMEDTEST("Pipelines refill", pipeline_refill),
+ ?TIMEDTEST("Timeout closes pipe", closing_pipes),
+ ?TIMEDTEST("Requests are balanced over connections", balanced_connections),
+ ?TIMEDTEST("Pipeline too small signals retries", small_pipeline),
+ ?TIMEDTEST("Dest status can be gathered", status)
+ ]
+ }.
+
+simple_request() ->
+ ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [])).
+
+slow_server_timeout() ->
+ ?assertMatch({error, req_timedout}, ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [], 5000)).
+
+pipeline_depth() ->
+ MaxSessions = 2,
+ MaxPipeline = 2,
+ RequestsSent = 2,
+ EmptyPipelineDepth = 0,
+
+ ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+
+ Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+ times(RequestsSent, fun() -> spawn_link(Fun) end),
+
+ timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+ Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+ ?assertEqual(MaxSessions, length(Counts)),
+ ?assertEqual(lists:duplicate(MaxSessions, EmptyPipelineDepth), Counts).
+
+pipeline_refill() ->
+ MaxSessions = 2,
+ MaxPipeline = 2,
+ RequestsToFill = MaxSessions * MaxPipeline,
+
+ %% Send off enough requests to fill sessions and pipelines in rappid succession
+ Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+ times(RequestsToFill, fun() -> spawn_link(Fun) end),
+ timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+ % Verify that connections properly reported their completed responses and can still accept more
+ ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)),
+
+ % and do it again to make sure we really are clear
+ times(RequestsToFill, fun() -> spawn_link(Fun) end),
+ timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+ % Verify that connections properly reported their completed responses and can still accept more
+ ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)).
+
+closing_pipes() ->
+ MaxSessions = 2,
+ MaxPipeline = 2,
+ RequestsSent = 2,
+ BalancedNumberOfRequestsPerConnection = 1,
+
+ ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+
+ Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+ times(RequestsSent, fun() -> spawn_link(Fun) end),
+
+ timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+ Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+ ?assertEqual(MaxSessions, length(Counts)),
+ ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts),
+
+ timer:sleep(?SHORT_TIMEOUT_MS),
+
+ ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()).
+
+balanced_connections() ->
+ MaxSessions = 4,
+ MaxPipeline = 100,
+ RequestsSent = 80,
+ BalancedNumberOfRequestsPerConnection = 20,
+
+ ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+
+ Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?LONG_TIMEOUT_MS) end,
+ times(RequestsSent, fun() -> spawn_link(Fun) end),
+
+ timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+ Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+ ?assertEqual(MaxSessions, length(Counts)),
+
+ ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).
+
+small_pipeline() ->
+ MaxSessions = 10,
+ MaxPipeline = 10,
+ RequestsSent = 100,
+ FullRequestsPerConnection = 10,
+
+ ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+
+ Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+ times(RequestsSent, fun() -> spawn(Fun) end),
+
+ timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), %% Wait for everyone to get in line
+
+ ibrowse:show_dest_status("localhost", 8181),
+ Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+ ?assertEqual(MaxSessions, length(Counts)),
+
+ ?assertEqual(lists:duplicate(MaxSessions, FullRequestsPerConnection), Counts),
+
+ Response = ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS),
+
+ ?assertEqual({error, retry_later}, Response).
+
+status() ->
+ MaxSessions = 10,
+ MaxPipeline = 10,
+ RequestsSent = 100,
+
+ Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+ times(RequestsSent, fun() -> spawn(Fun) end),
+
+ timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), %% Wait for everyone to get in line
+
+ ibrowse:show_dest_status(),
+ ibrowse:show_dest_status("http://localhost:8181").
+
+
+times(0, _) ->
+ ok;
+times(X, Fun) ->
+ Fun(),
+ times(X - 1, Fun).
diff --git a/test/ibrowse_test.erl b/test/ibrowse_test.erl
index e216e82..0787493 100644
--- a/test/ibrowse_test.erl
+++ b/test/ibrowse_test.erl
@@ -40,7 +40,7 @@
test_retry_of_requests/1
]).
--include("ibrowse.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
test_stream_once(Url, Method, Options) ->
test_stream_once(Url, Method, Options, 5000).
@@ -257,17 +257,17 @@
] ++ ?LOCAL_TESTS).
local_unit_tests() ->
- error_logger:tty(false),
- unit_tests([], ?LOCAL_TESTS),
- error_logger:tty(true).
+ unit_tests([], ?LOCAL_TESTS).
unit_tests() ->
- unit_tests([], ?TEST_LIST).
+ error_logger:tty(false),
+ unit_tests([], ?TEST_LIST),
+ error_logger:tty(true).
unit_tests(Options, Test_list) ->
application:start(crypto),
application:start(public_key),
- application:start(ssl),
+ application:ensure_all_started(ssl),
(catch ibrowse_test_server:start_server(8181, tcp)),
application:start(ibrowse),
Options_1 = Options ++ [{connect_timeout, 5000}],
@@ -387,6 +387,8 @@
{'EXIT', Reason};
{'DOWN', _, _, _, _} ->
wait_for_resp(Pid);
+ {'EXIT', _, normal} ->
+ wait_for_resp(Pid);
Msg ->
io:format("Recvd unknown message: ~p~n", [Msg]),
wait_for_resp(Pid)
@@ -564,6 +566,7 @@
test_retry_of_requests("http://localhost:8181/ibrowse_handle_one_request_only_with_delay").
test_retry_of_requests(Url) ->
+ reset_ibrowse(),
Timeout_1 = 2050,
Res_1 = test_retry_of_requests(Url, Timeout_1),
case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
@@ -586,7 +589,6 @@
_ ->
exit({failed, Timeout_1, Res_1})
end,
- reset_ibrowse(),
Timeout_2 = 2200,
Res_2 = test_retry_of_requests(Url, Timeout_2),
case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
@@ -604,10 +606,10 @@
true ->
ok;
false ->
- exit({failed, Timeout_2, Res_2})
+ exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
end;
_ ->
- exit({failed, Timeout_2, Res_2})
+ exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
end,
success.
diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl
index 1d72210..7025286 100644
--- a/test/ibrowse_test_server.erl
+++ b/test/ibrowse_test_server.erl
@@ -1,49 +1,60 @@
%%% File : ibrowse_test_server.erl
%%% Author : Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
+%%% Benjamin Lee <http://github.com/benjaminplee>
+%%% Dan Schwabe <http://github.com/dfschwabe>
+%%% Brian Richards <http://github.com/richbria>
%%% Description : A server to simulate various test scenarios
%%% Created : 17 Oct 2010 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
-module(ibrowse_test_server).
-export([
start_server/2,
- stop_server/1
+ stop_server/1,
+ get_conn_pipeline_depth/0
]).
-record(request, {method, uri, version, headers = [], body = []}).
-define(dec2hex(X), erlang:integer_to_list(X, 16)).
+-define(ACCEPT_TIMEOUT_MS, 1000).
+-define(CONN_PIPELINE_DEPTH, conn_pipeline_depth).
start_server(Port, Sock_type) ->
Fun = fun() ->
- Proc_name = server_proc_name(Port),
- case whereis(Proc_name) of
- undefined ->
- register(Proc_name, self()),
- case do_listen(Sock_type, Port, [{active, false},
- {reuseaddr, true},
- {nodelay, true},
- {packet, http}]) of
- {ok, Sock} ->
- do_trace("Server listening on port: ~p~n", [Port]),
- accept_loop(Sock, Sock_type);
- Err ->
- erlang:error(
- lists:flatten(
- io_lib:format(
- "Failed to start server on port ~p. ~p~n",
- [Port, Err]))),
- exit({listen_error, Err})
- end;
- _X ->
- ok
- end
- end,
+ Proc_name = server_proc_name(Port),
+ case whereis(Proc_name) of
+ undefined ->
+ register(Proc_name, self()),
+ ets:new(?CONN_PIPELINE_DEPTH, [named_table, public, set]),
+ case do_listen(Sock_type, Port, [{active, false},
+ {reuseaddr, true},
+ {nodelay, true},
+ {packet, http}]) of
+ {ok, Sock} ->
+ do_trace("Server listening on port: ~p~n", [Port]),
+ accept_loop(Sock, Sock_type);
+ Err ->
+ erlang:error(
+ lists:flatten(
+ io_lib:format(
+ "Failed to start server on port ~p. ~p~n",
+ [Port, Err]))),
+ exit({listen_error, Err})
+ end;
+ _X ->
+ ok
+ end
+ end,
spawn_link(Fun).
stop_server(Port) ->
server_proc_name(Port) ! stop,
+ timer:sleep(2000), % wait for server to receive msg and unregister
ok.
+get_conn_pipeline_depth() ->
+ ets:tab2list(?CONN_PIPELINE_DEPTH).
+
server_proc_name(Port) ->
list_to_atom("ibrowse_test_server_"++integer_to_list(Port)).
@@ -55,24 +66,36 @@
ssl:listen(Port, Opts).
do_accept(tcp, Listen_sock) ->
- gen_tcp:accept(Listen_sock);
+ gen_tcp:accept(Listen_sock, ?ACCEPT_TIMEOUT_MS);
do_accept(ssl, Listen_sock) ->
- ssl:ssl_accept(Listen_sock).
+ ssl:ssl_accept(Listen_sock, ?ACCEPT_TIMEOUT_MS).
accept_loop(Sock, Sock_type) ->
case do_accept(Sock_type, Sock) of
{ok, Conn} ->
- Pid = spawn_link(
- fun() ->
- server_loop(Conn, Sock_type, #request{})
- end),
+ Pid = spawn_link(fun() -> connection(Conn, Sock_type) end),
set_controlling_process(Conn, Sock_type, Pid),
Pid ! {setopts, [{active, true}]},
accept_loop(Sock, Sock_type);
+ {error, timeout} ->
+ receive
+ stop ->
+ ok
+ after 10 ->
+ accept_loop(Sock, Sock_type)
+ end;
Err ->
Err
end.
+connection(Conn, Sock_type) ->
+ catch ets:insert(?CONN_PIPELINE_DEPTH, {self(), 0}),
+ try
+ server_loop(Conn, Sock_type, #request{})
+ after
+ catch ets:delete(?CONN_PIPELINE_DEPTH, self())
+ end.
+
set_controlling_process(Sock, tcp, Pid) ->
gen_tcp:controlling_process(Sock, Pid);
set_controlling_process(Sock, ssl, Pid) ->
@@ -86,6 +109,7 @@
server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
receive
{http, Sock, {http_request, HttpMethod, HttpUri, HttpVersion}} ->
+ catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), 1),
server_loop(Sock, Sock_type, Req#request{method = HttpMethod,
uri = HttpUri,
version = HttpVersion});
@@ -95,9 +119,12 @@
case process_request(Sock, Sock_type, Req) of
close_connection ->
gen_tcp:shutdown(Sock, read_write);
+ not_done ->
+ ok;
_ ->
- server_loop(Sock, Sock_type, #request{})
- end;
+ catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), -1)
+ end,
+ server_loop(Sock, Sock_type, #request{});
{http, Sock, {http_error, Err}} ->
io:format("Error parsing HTTP request:~n"
"Req so far : ~p~n"
@@ -109,8 +136,6 @@
{tcp_closed, Sock} ->
do_trace("Client closed connection~n", []),
ok;
- stop ->
- ok;
Other ->
io:format("Recvd unknown msg: ~p~n", [Other]),
exit({unknown_msg, Other})
@@ -163,7 +188,6 @@
uri = {abs_path, "/ibrowse_head_transfer_enc"}}) ->
Resp = <<"HTTP/1.1 400 Bad Request\r\nServer: Apache-Coyote/1.1\r\nContent-Length:5\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\n\r\nabcde">>,
do_send(Sock, Sock_type, Resp);
-
process_request(Sock, Sock_type,
#request{method='GET',
headers = Headers,
@@ -215,6 +239,8 @@
Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
do_send(Sock, Sock_type, Resp),
close_connection;
+process_request(_Sock, _Sock_type, #request{uri = {abs_path, "/never_respond"} } ) ->
+ not_done;
process_request(Sock, Sock_type, Req) ->
do_trace("Recvd req: ~p~n", [Req]),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
@@ -226,7 +252,6 @@
do_send(Sock, ssl, Resp) ->
ssl:send(Sock, Resp).
-
%%------------------------------------------------------------------------------
%% Utility functions
%%------------------------------------------------------------------------------