Merge branch 'improve_pipeline_balance' of https://github.com/benjaminplee/ibrowse into merge_pull_req_123
diff --git a/.gitignore b/.gitignore
index 12f55c8..1491146 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@
 Emakefile
 *.bat
 .dialyzer_plt
+.rebar
diff --git a/CONTRIBUTORS b/CONTRIBUTORS
index 21e8d06..665e64b 100644
--- a/CONTRIBUTORS
+++ b/CONTRIBUTORS
@@ -9,9 +9,12 @@
 Adam Kocoloski
 Andrew Tunnell-Jones
 Anthony Molinaro
+Benjamin P Lee (https://github.com/benjaminplee)
 Benoit Chesneau (https://github.com/benoitc)
+Brian Richards (http://github.com/richbria)
 Chris Newcombe
 Dan Kelley
+Dan Schwabe (https://github.com/dfschwabe)
 Derek Upham
 Eric Merritt
 Erik Reitsma
diff --git a/Makefile b/Makefile
index b596b64..28dfda8 100644
--- a/Makefile
+++ b/Makefile
@@ -15,9 +15,11 @@
 	mkdir -p $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
 	cp -r ebin $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
 
-test: all
+eunit_test: all
 	./rebar eunit
-	erl -noshell -pa .eunit -pa test -s ibrowse -s ibrowse_test unit_tests \
+
+test: all
+	erl -noshell -pa test -pa ebin -s ibrowse_test unit_tests \
 	-s ibrowse_test verify_chunked_streaming \
 	-s ibrowse_test test_chunked_streaming_once \
 	-s erlang halt
diff --git a/include/ibrowse.hrl b/include/ibrowse.hrl
index 150b1b7..5da1f0d 100644
--- a/include/ibrowse.hrl
+++ b/include/ibrowse.hrl
@@ -18,4 +18,9 @@
 
 -record(ibrowse_conf, {key, value}).
 
+-define(CONNECTIONS_LOCAL_TABLE, ibrowse_lb).
+-define(LOAD_BALANCER_NAMED_TABLE, ibrowse_lb).
+-define(CONF_TABLE, ibrowse_conf).
+-define(STREAM_TABLE, ibrowse_stream).
+
 -endif.
diff --git a/rebar b/rebar
index b9c73ff..8e4deb6 100755
--- a/rebar
+++ b/rebar
Binary files differ
diff --git a/src/ibrowse.erl b/src/ibrowse.erl
index fbb4b83..51fcb86 100644
--- a/src/ibrowse.erl
+++ b/src/ibrowse.erl
@@ -651,7 +651,7 @@
     io:format("~80.80.=s~n", [""]),
     Metrics = get_metrics(),
     lists:foreach(
-      fun({Host, Port, Lb_pid, Tid, Size}) ->
+      fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
               io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
                         [Host ++ ":" ++ integer_to_list(Port),
                          integer_to_list(Tid),
@@ -697,7 +697,7 @@
       fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
               case get_metrics(X_host, X_port) of
                   {_, _, _, _, _} = X_res ->
-                      [X_res | X_acc];
+                      [{X_host, X_port, X_res} | X_acc];
                   _X_res ->
                       X_acc
               end
@@ -708,7 +708,12 @@
         [] ->
             no_active_processes;
         [#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
-            MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
+            MsgQueueSize = case (catch process_info(Lb_pid, message_queue_len)) of
+			       {message_queue_len, Msg_q_len} ->
+				   Msg_q_len;
+			       _ ->
+				   -1
+			   end,
             case Tid of
                 undefined ->
                     {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl
index db9559a..92e4964 100644
--- a/src/ibrowse_http_client.erl
+++ b/src/ibrowse_http_client.erl
@@ -2021,12 +2021,12 @@
                             proc_state        = Proc_state} = State) when Tid /= undefined,
                                                                           Proc_state /= ?dead_proc_walking ->
     Ts = os:timestamp(),
+    catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
     (catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'},
                                     [{'==', '$1', {const,self()}},
                                      {'<',  '$2', {const,Ts}}
                                     ],
                                     [true]}])),
-    catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
     State#state{cur_pipeline_size = Pipe_sz - 1};
 dec_pipeline_counter(State) ->
     State.
diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl
index 88b169b..894d8ad 100644
--- a/src/ibrowse_lb.erl
+++ b/src/ibrowse_lb.erl
@@ -124,17 +124,17 @@
     State_1   = maybe_create_ets(State),
     Tid       = State_1#state.ets_tid,
     Tid_size  = ets:info(Tid, size),
-    case Tid_size > Max_sess of
+    case Tid_size >= Max_sess of
         true ->
-            Reply = find_best_connection(Tid, Max_pipe, Tid_size),
+            Reply = find_best_connection(Tid, Max_pipe),
             {reply, Reply, State_1#state{max_sessions      = Max_sess,
                                          max_pipeline_size = Max_pipe}};
         false ->
             {ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options),
             Ts = os:timestamp(),
-            ets:insert(Tid, {{0, Ts, Pid}, []}),
-            {reply, {ok, {0, Ts, Pid}}, State_1#state{max_sessions      = Max_sess,
-                                                  max_pipeline_size = Max_pipe}}
+            ets:insert(Tid, {{1, Ts, Pid}, []}),
+            {reply, {ok, {1, Ts, Pid}}, State_1#state{max_sessions      = Max_sess,
+						      max_pipeline_size = Max_pipe}}
     end;
 
 handle_call(Request, _From, State) ->
@@ -215,18 +215,13 @@
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
-find_best_connection(Tid, Max_pipe, _Num_cur) ->
+find_best_connection(Tid, Max_pipe) ->
     case ets:first(Tid) of
-        {Spec_size, Ts, Pid} = First ->
-            case Spec_size >= Max_pipe of
-                true ->
-                    {error, retry_later};
-                false ->
-                    ets:delete(Tid, First),
-                    ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
-                    {ok, First}
-            end;
-        '$end_of_table' ->
+        {Spec_size, Ts, Pid} = First when Spec_size < Max_pipe ->
+	    ets:delete(Tid, First),
+	    ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
+	    {ok, First};
+        _ ->
             {error, retry_later}
     end.
 
diff --git a/src/ibrowse_lib.erl b/src/ibrowse_lib.erl
index 1098b0f..1f0a61a 100644
--- a/src/ibrowse_lib.erl
+++ b/src/ibrowse_lib.erl
@@ -363,6 +363,7 @@
 parse_url([], State, Url, TmpAcc) ->
     {invalid_uri_2, State, Url, TmpAcc}.
 
+default_port(socks5) -> 1080;
 default_port(http)  -> 80;
 default_port(https) -> 443;
 default_port(ftp)   -> 21.
diff --git a/src/ibrowse_socks5.erl b/src/ibrowse_socks5.erl
index 41d57f2..417f595 100644
--- a/src/ibrowse_socks5.erl
+++ b/src/ibrowse_socks5.erl
@@ -1,47 +1,108 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
 -module(ibrowse_socks5).
 
--export([connect/3]).
+-define(VERSION, 5).
+-define(CONNECT, 1).
 
--define(TIMEOUT, 2000).
+-define(NO_AUTH, 0).
+-define(USERPASS, 2).
+-define(UNACCEPTABLE, 16#FF).
+-define(RESERVED, 0).
 
--define(SOCKS5, 5).
--define(AUTH_METHOD_NO, 0).
--define(AUTH_METHOD_USERPASS, 2).
--define(ADDRESS_TYPE_IP4, 1).
--define(COMMAND_TYPE_TCPIP_STREAM, 1).
--define(RESERVER, 0).
--define(STATUS_GRANTED, 0).
+-define(ATYP_IPV4, 1).
+-define(ATYP_DOMAINNAME, 3).
+-define(ATYP_IPV6, 4).
 
-connect(Host, Port, Options) ->
-    Socks5Host = proplists:get_value(socks5_host, Options),
-    Socks5Port = proplists:get_value(socks5_port, Options),
+-define(SUCCEEDED, 0).
 
-    {ok, Socket} = gen_tcp:connect(Socks5Host, Socks5Port, [binary, {packet, 0}, {keepalive, true}, {active, false}]),
+-export([connect/5]).
 
-    {ok, _Bin} =
-    case proplists:get_value(socks5_user, Options, undefined) of
-        undefined ->
-            ok = gen_tcp:send(Socket, <<?SOCKS5, 1, ?AUTH_METHOD_NO>>),
-            {ok, <<?SOCKS5, ?AUTH_METHOD_NO>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT);
-        _Else ->
-            Socks5User = list_to_binary(proplists:get_value(socks5_user, Options)),
-            Socks5Pass = list_to_binary(proplists:get_value(socks5_pass, Options)),
+-import(ibrowse_lib, [get_value/2, get_value/3]).
 
-            ok = gen_tcp:send(Socket, <<?SOCKS5, 1, ?AUTH_METHOD_USERPASS>>),
-            {ok, <<?SOCKS5, ?AUTH_METHOD_USERPASS>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT),
+connect(Host, Port, Options, SockOptions, Timeout) ->
+    Socks5Host = get_value(socks5_host, Options),
+    Socks5Port = get_value(socks5_port, Options),
+    case gen_tcp:connect(Socks5Host, Socks5Port, SockOptions, Timeout) of
+        {ok, Socket} ->
+            case handshake(Socket, Options) of
+                ok ->
+                    case connect(Host, Port, Socket) of
+                        ok ->
+                            {ok, Socket};
+                        Else ->
+                            gen_tcp:close(Socket),
+                            Else
+                    end;
+                Else ->
+                    gen_tcp:close(Socket),
+                    Else
+            end;
+        Else ->
+            Else
+    end.
 
-            UserLength = byte_size(Socks5User),
-
-            ok = gen_tcp:send(Socket, << 1, UserLength >>),
-            ok = gen_tcp:send(Socket, Socks5User),
-            PassLength = byte_size(Socks5Pass),
-            ok = gen_tcp:send(Socket, << PassLength >>),
-            ok = gen_tcp:send(Socket, Socks5Pass),
-            {ok, <<1, 0>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT)
+handshake(Socket, Options) when is_port(Socket) ->
+    {Handshake, Success} = case get_value(socks5_user, Options, <<>>) of
+        <<>> ->
+            {<<?VERSION, 1, ?NO_AUTH>>, ?NO_AUTH};
+        User ->
+            Password = get_value(socks5_password, Options, <<>>),
+            {<<?VERSION, 1, ?USERPASS, (byte_size(User)), User,
+               (byte_size(Password)), Password>>, ?USERPASS}
     end,
+    ok = gen_tcp:send(Socket, Handshake),
+    case gen_tcp:recv(Socket, 0) of
+        {ok, <<?VERSION, Success>>} ->
+            ok;
+        {ok, <<?VERSION, ?UNACCEPTABLE>>} ->
+            {error, unacceptable};
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
-    {ok, {IP1,IP2,IP3,IP4}} = inet:getaddr(Host, inet),
+connect(Host, Port, Via) when is_list(Host) ->
+    connect(list_to_binary(Host), Port, Via);
+connect(Host, Port, Via) when is_binary(Host), is_integer(Port),
+                              is_port(Via) ->
+    {AddressType, Address} = case inet:parse_address(binary_to_list(Host)) of
+        {ok, {IP1, IP2, IP3, IP4}} ->
+            {?ATYP_IPV4, <<IP1,IP2,IP3,IP4>>};
+        {ok, {IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8}} ->
+            {?ATYP_IPV6, <<IP1,IP2,IP3,IP4,IP5,IP6,IP7,IP8>>};
+        _ ->
+            HostLength = byte_size(Host),
+            {?ATYP_DOMAINNAME, <<HostLength,Host/binary>>}
+    end,
+    ok = gen_tcp:send(Via,
+        <<?VERSION, ?CONNECT, ?RESERVED,
+          AddressType, Address/binary,
+          (Port):16>>),
+    case gen_tcp:recv(Via, 0) of
+        {ok, <<?VERSION, ?SUCCEEDED, ?RESERVED, _/binary>>} ->
+            ok;
+        {ok, <<?VERSION, Rep, ?RESERVED, _/binary>>} ->
+            {error, rep(Rep)};
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
-    ok = gen_tcp:send(Socket, <<?SOCKS5, ?COMMAND_TYPE_TCPIP_STREAM, ?RESERVER, ?ADDRESS_TYPE_IP4, IP1, IP2, IP3, IP4, Port:16>>),
-    {ok, << ?SOCKS5, ?STATUS_GRANTED, ?RESERVER, ?ADDRESS_TYPE_IP4, IP1, IP2, IP3, IP4, Port:16 >>} = gen_tcp:recv(Socket, 10, ?TIMEOUT),
-    {ok, Socket}.
+rep(0) -> succeeded;
+rep(1) -> server_fail;
+rep(2) -> disallowed_by_ruleset;
+rep(3) -> network_unreachable;
+rep(4) -> host_unreachable;
+rep(5) -> connection_refused;
+rep(6) -> ttl_expired;
+rep(7) -> command_not_supported;
+rep(8) -> address_type_not_supported.
diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl
new file mode 100644
index 0000000..3517011
--- /dev/null
+++ b/test/ibrowse_functional_tests.erl
@@ -0,0 +1,174 @@
+%%% File    : ibrowse_functional_tests.erl
+%%% Authors : Benjamin Lee <http://github.com/benjaminplee>
+%%%           Dan Schwabe <http://github.com/dfschwabe>
+%%%           Brian Richards <http://github.com/richbria>
+%%% Description : Functional tests of the ibrowse library using a live test HTTP server
+%%% Created : 18 November 2014 by Benjamin Lee <yardspoon@gmail.com>
+
+-module(ibrowse_functional_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-define(PER_TEST_TIMEOUT_SEC, 60).
+-define(TIMEDTEST(Desc, Fun), {Desc, {timeout, ?PER_TEST_TIMEOUT_SEC, fun Fun/0}}).
+
+-define(SERVER_PORT, 8181).
+-define(BASE_URL, "http://localhost:" ++ integer_to_list(?SERVER_PORT)).
+-define(SHORT_TIMEOUT_MS, 5000).
+-define(LONG_TIMEOUT_MS, 30000).
+-define(PAUSE_FOR_CONNECTIONS_MS, 2000).
+
+-compile(export_all).
+
+setup() ->
+    application:start(crypto),
+    application:start(public_key),
+    application:start(ssl),
+    ibrowse_test_server:start_server(?SERVER_PORT, tcp),
+    ibrowse:start(),
+    ok.
+
+teardown(_) ->
+    ibrowse:stop(),
+    ibrowse_test_server:stop_server(?SERVER_PORT),
+    ok.
+
+running_server_fixture_test_() ->
+    {foreach,
+     fun setup/0,
+     fun teardown/1,
+     [
+        ?TIMEDTEST("Simple request can be honored", simple_request),
+        ?TIMEDTEST("Slow server causes timeout", slow_server_timeout),
+        ?TIMEDTEST("Pipeline depth goes down with responses", pipeline_depth),
+        ?TIMEDTEST("Pipelines refill", pipeline_refill),
+        ?TIMEDTEST("Timeout closes pipe", closing_pipes),
+        ?TIMEDTEST("Requests are balanced over connections", balanced_connections),
+        ?TIMEDTEST("Pipeline too small signals retries", small_pipeline),
+        ?TIMEDTEST("Dest status can be gathered", status)
+     ]
+    }.
+
+simple_request() ->
+    ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [])).
+
+slow_server_timeout() ->
+    ?assertMatch({error, req_timedout}, ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [], 5000)).
+
+pipeline_depth() ->
+    MaxSessions = 2,
+    MaxPipeline = 2,
+    RequestsSent = 2,
+    EmptyPipelineDepth = 0,
+
+    ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+
+    Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+    times(RequestsSent, fun() -> spawn_link(Fun) end),
+
+    timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+    Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+    ?assertEqual(MaxSessions, length(Counts)),
+    ?assertEqual(lists:duplicate(MaxSessions, EmptyPipelineDepth), Counts).
+
+pipeline_refill() ->
+    MaxSessions = 2,
+    MaxPipeline = 2,
+    RequestsToFill = MaxSessions * MaxPipeline,
+
+    %% Send off enough requests to fill sessions and pipelines in rappid succession
+    Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+    times(RequestsToFill, fun() -> spawn_link(Fun) end),
+    timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+    % Verify that connections properly reported their completed responses and can still accept more
+    ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)),
+
+    % and do it again to make sure we really are clear
+    times(RequestsToFill, fun() -> spawn_link(Fun) end),
+    timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+    % Verify that connections properly reported their completed responses and can still accept more
+    ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)).
+
+closing_pipes() ->
+    MaxSessions = 2,
+    MaxPipeline = 2,
+    RequestsSent = 2,
+    BalancedNumberOfRequestsPerConnection = 1,
+
+    ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+
+    Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+    times(RequestsSent, fun() -> spawn_link(Fun) end),
+
+    timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+    Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+    ?assertEqual(MaxSessions, length(Counts)),
+    ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts),
+
+    timer:sleep(?SHORT_TIMEOUT_MS),
+
+    ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()).
+
+balanced_connections() ->
+    MaxSessions = 4,
+    MaxPipeline = 100,
+    RequestsSent = 80,
+    BalancedNumberOfRequestsPerConnection = 20,
+
+    ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+
+    Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?LONG_TIMEOUT_MS) end,
+    times(RequestsSent, fun() -> spawn_link(Fun) end),
+
+    timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+
+    Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+    ?assertEqual(MaxSessions, length(Counts)),
+
+    ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).
+
+small_pipeline() ->
+    MaxSessions = 10,
+    MaxPipeline = 10,
+    RequestsSent = 100,
+    FullRequestsPerConnection = 10,
+
+    ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+
+    Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+    times(RequestsSent, fun() -> spawn(Fun) end),
+
+    timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),  %% Wait for everyone to get in line
+
+    ibrowse:show_dest_status("localhost", 8181),
+    Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+    ?assertEqual(MaxSessions, length(Counts)),
+
+    ?assertEqual(lists:duplicate(MaxSessions, FullRequestsPerConnection), Counts),
+
+    Response = ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS),
+
+    ?assertEqual({error, retry_later}, Response).
+
+status() ->
+    MaxSessions = 10,
+    MaxPipeline = 10,
+    RequestsSent = 100,
+
+    Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+    times(RequestsSent, fun() -> spawn(Fun) end),
+
+    timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),  %% Wait for everyone to get in line
+
+    ibrowse:show_dest_status(),
+    ibrowse:show_dest_status("http://localhost:8181").
+
+
+times(0, _) ->
+    ok;
+times(X, Fun) ->
+    Fun(),
+    times(X - 1, Fun).
diff --git a/test/ibrowse_test.erl b/test/ibrowse_test.erl
index e216e82..0787493 100644
--- a/test/ibrowse_test.erl
+++ b/test/ibrowse_test.erl
@@ -40,7 +40,7 @@
          test_retry_of_requests/1
 	]).
 
--include("ibrowse.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
 
 test_stream_once(Url, Method, Options) ->
     test_stream_once(Url, Method, Options, 5000).
@@ -257,17 +257,17 @@
 		   ] ++ ?LOCAL_TESTS).
 
 local_unit_tests() ->
-    error_logger:tty(false),
-    unit_tests([], ?LOCAL_TESTS),
-    error_logger:tty(true).
+    unit_tests([], ?LOCAL_TESTS).
 
 unit_tests() ->
-    unit_tests([], ?TEST_LIST).
+    error_logger:tty(false),
+    unit_tests([], ?TEST_LIST),
+    error_logger:tty(true).
 
 unit_tests(Options, Test_list) ->
     application:start(crypto),
     application:start(public_key),
-    application:start(ssl),
+    application:ensure_all_started(ssl),
     (catch ibrowse_test_server:start_server(8181, tcp)),
     application:start(ibrowse),
     Options_1 = Options ++ [{connect_timeout, 5000}],
@@ -387,6 +387,8 @@
 	    {'EXIT', Reason};
 	{'DOWN', _, _, _, _} ->
 	    wait_for_resp(Pid);
+	{'EXIT', _, normal} ->
+	    wait_for_resp(Pid);
 	Msg ->
 	    io:format("Recvd unknown message: ~p~n", [Msg]),
 	    wait_for_resp(Pid)
@@ -564,6 +566,7 @@
     test_retry_of_requests("http://localhost:8181/ibrowse_handle_one_request_only_with_delay").
 
 test_retry_of_requests(Url) ->
+    reset_ibrowse(),
     Timeout_1 = 2050,
     Res_1 = test_retry_of_requests(Url, Timeout_1),
     case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
@@ -586,7 +589,6 @@
         _ ->
             exit({failed, Timeout_1, Res_1})
     end,
-    reset_ibrowse(),
     Timeout_2 = 2200,
     Res_2 = test_retry_of_requests(Url, Timeout_2),
     case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
@@ -604,10 +606,10 @@
                 true ->
                     ok;
                 false ->
-                    exit({failed, Timeout_2, Res_2})
+                    exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
             end;
         _ ->
-            exit({failed, Timeout_2, Res_2})
+            exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
     end,
     success.
 
diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl
index 1d72210..7025286 100644
--- a/test/ibrowse_test_server.erl
+++ b/test/ibrowse_test_server.erl
@@ -1,49 +1,60 @@
 %%% File    : ibrowse_test_server.erl
 %%% Author  : Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
+%%%           Benjamin Lee <http://github.com/benjaminplee>
+%%%           Dan Schwabe <http://github.com/dfschwabe>
+%%%           Brian Richards <http://github.com/richbria>
 %%% Description : A server to simulate various test scenarios
 %%% Created : 17 Oct 2010 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
 
 -module(ibrowse_test_server).
 -export([
          start_server/2,
-         stop_server/1
+         stop_server/1,
+         get_conn_pipeline_depth/0
         ]).
 
 -record(request, {method, uri, version, headers = [], body = []}).
 
 -define(dec2hex(X), erlang:integer_to_list(X, 16)).
+-define(ACCEPT_TIMEOUT_MS, 1000).
+-define(CONN_PIPELINE_DEPTH, conn_pipeline_depth).
 
 start_server(Port, Sock_type) ->
     Fun = fun() ->
-        Proc_name = server_proc_name(Port),
-        case whereis(Proc_name) of
-            undefined ->
-                register(Proc_name, self()),
-                case do_listen(Sock_type, Port, [{active, false},
-                    {reuseaddr, true},
-                    {nodelay, true},
-                    {packet, http}]) of
-                    {ok, Sock} ->
-                        do_trace("Server listening on port: ~p~n", [Port]),
-                        accept_loop(Sock, Sock_type);
-                    Err ->
-                        erlang:error(
-                            lists:flatten(
-                                io_lib:format(
-                                    "Failed to start server on port ~p. ~p~n",
-                                    [Port, Err]))),
-                        exit({listen_error, Err})
-                end;
-            _X ->
-                ok
-        end
-    end,
+		  Proc_name = server_proc_name(Port),
+		  case whereis(Proc_name) of
+		      undefined ->
+			  register(Proc_name, self()),
+			  ets:new(?CONN_PIPELINE_DEPTH, [named_table, public, set]),
+			  case do_listen(Sock_type, Port, [{active, false},
+							   {reuseaddr, true},
+							   {nodelay, true},
+							   {packet, http}]) of
+			      {ok, Sock} ->
+				  do_trace("Server listening on port: ~p~n", [Port]),
+				  accept_loop(Sock, Sock_type);
+			      Err ->
+				  erlang:error(
+				    lists:flatten(
+				      io_lib:format(
+					"Failed to start server on port ~p. ~p~n",
+					[Port, Err]))),
+				  exit({listen_error, Err})
+			  end;
+		      _X ->
+			  ok
+		  end
+	  end,
     spawn_link(Fun).
 
 stop_server(Port) ->
     server_proc_name(Port) ! stop,
+    timer:sleep(2000),  % wait for server to receive msg and unregister
     ok.
 
+get_conn_pipeline_depth() ->
+    ets:tab2list(?CONN_PIPELINE_DEPTH).
+
 server_proc_name(Port) ->
     list_to_atom("ibrowse_test_server_"++integer_to_list(Port)).
 
@@ -55,24 +66,36 @@
     ssl:listen(Port, Opts).
 
 do_accept(tcp, Listen_sock) ->
-    gen_tcp:accept(Listen_sock);
+    gen_tcp:accept(Listen_sock, ?ACCEPT_TIMEOUT_MS);
 do_accept(ssl, Listen_sock) ->
-    ssl:ssl_accept(Listen_sock).
+    ssl:ssl_accept(Listen_sock, ?ACCEPT_TIMEOUT_MS).
 
 accept_loop(Sock, Sock_type) ->
     case do_accept(Sock_type, Sock) of
         {ok, Conn} ->
-            Pid = spawn_link(
-              fun() ->
-                      server_loop(Conn, Sock_type, #request{})
-              end),
+            Pid = spawn_link(fun() -> connection(Conn, Sock_type) end),
             set_controlling_process(Conn, Sock_type, Pid),
             Pid ! {setopts, [{active, true}]},
             accept_loop(Sock, Sock_type);
+        {error, timeout} ->
+            receive
+                stop ->
+                    ok
+            after 10 ->
+                accept_loop(Sock, Sock_type)
+            end;
         Err ->
             Err
     end.
 
+connection(Conn, Sock_type) ->
+    catch ets:insert(?CONN_PIPELINE_DEPTH, {self(), 0}),
+    try
+        server_loop(Conn, Sock_type, #request{})
+    after
+        catch ets:delete(?CONN_PIPELINE_DEPTH, self())
+    end.
+
 set_controlling_process(Sock, tcp, Pid) ->
     gen_tcp:controlling_process(Sock, Pid);
 set_controlling_process(Sock, ssl, Pid) ->
@@ -86,6 +109,7 @@
 server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
     receive
         {http, Sock, {http_request, HttpMethod, HttpUri, HttpVersion}} ->
+            catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), 1),
             server_loop(Sock, Sock_type, Req#request{method = HttpMethod,
                                                      uri = HttpUri,
                                                      version = HttpVersion});
@@ -95,9 +119,12 @@
             case process_request(Sock, Sock_type, Req) of
                 close_connection ->
                     gen_tcp:shutdown(Sock, read_write);
+                not_done ->
+                    ok;
                 _ ->
-                    server_loop(Sock, Sock_type, #request{})
-            end;
+                    catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), -1)
+            end,
+            server_loop(Sock, Sock_type, #request{});
         {http, Sock, {http_error, Err}} ->
             io:format("Error parsing HTTP request:~n"
                       "Req so far : ~p~n"
@@ -109,8 +136,6 @@
         {tcp_closed, Sock} ->
             do_trace("Client closed connection~n", []),
             ok;
-        stop ->
-            ok;
         Other ->
             io:format("Recvd unknown msg: ~p~n", [Other]),
             exit({unknown_msg, Other})
@@ -163,7 +188,6 @@
                          uri = {abs_path, "/ibrowse_head_transfer_enc"}}) ->
     Resp = <<"HTTP/1.1 400 Bad Request\r\nServer: Apache-Coyote/1.1\r\nContent-Length:5\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\n\r\nabcde">>,
     do_send(Sock, Sock_type, Resp);
-
 process_request(Sock, Sock_type,
                 #request{method='GET',
                          headers = Headers,
@@ -215,6 +239,8 @@
     Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
     do_send(Sock, Sock_type, Resp),
     close_connection;
+process_request(_Sock, _Sock_type, #request{uri = {abs_path, "/never_respond"} } ) ->
+    not_done;
 process_request(Sock, Sock_type, Req) ->
     do_trace("Recvd req: ~p~n", [Req]),
     Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
@@ -226,7 +252,6 @@
 do_send(Sock, ssl, Resp) ->
     ssl:send(Sock, Resp).
 
-
 %%------------------------------------------------------------------------------
 %% Utility functions
 %%------------------------------------------------------------------------------