Changed pipeline algo to smallest pipeline first

Big commit. Switched algorithm to one which will favor
the connection with the smallest pipeline first
(deciding ties by timestamp of last finished request,
and then by pid as ultimate tie breaker).

Note: this also drastically changes the internal
representation of the connection in ets and is dependent
on specific order of operations when changing key values
to limit risk of race conditions between loadbalancer
and a given connection.

Also removed connection reporting of start of request
as this was no longer necessary since the load balancer
tees up the entry into ets with a 1.
diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl
index 1bb95d2..c9161b0 100644
--- a/src/ibrowse_http_client.erl
+++ b/src/ibrowse_http_client.erl
@@ -762,11 +762,10 @@
                 {ok, _Sent_body} ->
                     trace_request_body(Body_1),
                     _ = active_once(State_1),
-                    State_1_1 = inc_pipeline_counter(State_1),
-                    State_2 = State_1_1#state{status     = get_header,
-                                              cur_req    = NewReq,
-                                              proxy_tunnel_setup = in_progress,
-                                              tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
+                    State_2 = State_1#state{status     = get_header,
+                                            cur_req    = NewReq,
+                                            proxy_tunnel_setup = in_progress,
+                                            tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
                     State_3 = set_inac_timer(State_2),
                     {noreply, State_3};
                 Err ->
@@ -853,15 +852,14 @@
                     Raw_req = list_to_binary([Req, Sent_body]),
                     NewReq_1 = NewReq#request{raw_req = Raw_req},
                     State_1 = State#state{reqs=queue:in(NewReq_1, State#state.reqs)},
-                    State_2 = inc_pipeline_counter(State_1),
-                    _ = active_once(State_2),
-                    State_3 = case Status of
+                    _ = active_once(State_1),
+                    State_2 = case Status of
                                   idle ->
-                                      State_2#state{
+                                      State_1#state{
                                         status     = get_header,
                                         cur_req    = NewReq_1};
                                   _ ->
-                                      State_2
+                                      State_1
                               end,
                     case StreamTo of
                         undefined ->
@@ -875,8 +873,8 @@
                                     catch StreamTo ! {ibrowse_async_raw_req, Raw_req}
                             end
                     end,
-                    State_4 = set_inac_timer(State_3),
-                    {noreply, State_4};
+                    State_3 = set_inac_timer(State_2),
+                    {noreply, State_3};
                 Err ->
                     shutting_down(State),
                     do_trace("Send failed... Reason: ~p~n", [Err]),
@@ -1815,13 +1813,13 @@
 do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) ->
     Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)},
     gen_server:reply(From, Msg_1),
-    dec_pipeline_counter(State);
+    report_request_complete(State);
 do_reply(State, From, undefined, _, _, Msg) ->
     gen_server:reply(From, Msg),
-    dec_pipeline_counter(State);
+    report_request_complete(State);
 do_reply(#state{prev_req_id = Prev_req_id} = State,
          _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
-    State_1 = dec_pipeline_counter(State),
+    State_1 = report_request_complete(State),
     case Body of
         [] ->
             ok;
@@ -1843,7 +1841,7 @@
     ets:delete(?STREAM_TABLE, {req_id_pid, Prev_req_id}),
     State_1#state{prev_req_id = ReqId};
 do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
-    State_1 = dec_pipeline_counter(State),
+    State_1 = report_request_complete(State),
     Msg_1 = format_response_data(Resp_format, Msg),
     catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1},
     State_1.
@@ -1946,19 +1944,11 @@
 shutting_down(#state{lb_ets_tid = Tid}) ->
     ibrowse_lb:report_connection_down(Tid).
 
-inc_pipeline_counter(#state{is_closing = true} = State) ->
+report_request_complete(#state{is_closing = true} = State) ->
     State;
-inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
+report_request_complete(#state{lb_ets_tid = undefined} = State) ->
     State;
-inc_pipeline_counter(#state{lb_ets_tid = Tid} = State) ->
-    ibrowse_lb:report_request_underway(Tid),
-    State.
-
-dec_pipeline_counter(#state{is_closing = true} = State) ->
-    State;
-dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
-    State;
-dec_pipeline_counter(#state{lb_ets_tid = Tid} = State) ->
+report_request_complete(#state{lb_ets_tid = Tid} = State) ->
     ibrowse_lb:report_request_complete(Tid),
     State.
 
diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl
index cc067fc..3d487d4 100644
--- a/src/ibrowse_lb.erl
+++ b/src/ibrowse_lb.erl
@@ -16,7 +16,6 @@
 	 spawn_connection/6,
      stop/1,
      report_connection_down/1,
-     report_request_underway/1,
      report_request_complete/1
 	]).
 
@@ -39,6 +38,9 @@
                 proc_state}).
 
 -define(PIPELINE_MAX, 99999).
+-define(KEY_MATCHSPEC_BY_PID(Pid), [{{{'_', '_', Pid}, '_'}, [], ['$_']}]).
+-define(KEY_MATCHSPEC(Key), [{{Key, '_'}, [], ['$_']}]).
+-define(KEY_MATCHSPEC_FOR_DELETE(Key), [{{Key, '_'}, [], [true]}]).
 
 -include("ibrowse.hrl").
 
@@ -74,13 +76,23 @@
     end.
 
 report_connection_down(Tid) ->
-    catch ets:delete(Tid, self()).
-
-report_request_underway(Tid) ->
-    catch ets:update_counter(Tid, self(), {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}).
+    %% Don't cascade errors since Tid is really managed by other process
+    catch ets:select_delete(Tid, ?KEY_MATCHSPEC_BY_PID(self())).
 
 report_request_complete(Tid) ->
-    catch ets:update_counter(Tid, self(), {2, -1, 0, 0}).
+    %% Don't cascade errors since Tid is really managed by other process
+    catch case ets:select(Tid, ?KEY_MATCHSPEC_BY_PID(self())) of
+        [MatchKey] ->
+            case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(MatchKey)) of
+                1 ->
+                    ets:insert(Tid, {decremented(MatchKey), undefined}),
+                    true;
+                _ ->
+                    false
+            end;
+        _ ->
+            false
+    end.
 
 %%====================================================================
 %% Server functions
@@ -210,23 +222,17 @@
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
-find_best_connection(Tid, Max_pipe) ->
-    find_best_connection(ets:first(Tid), Tid, Max_pipe).
-
-find_best_connection('$end_of_table', _, _) ->
-    {error, retry_later};
-find_best_connection(Pid, Tid, Max_pipe) ->
-    case ets:lookup(Tid, Pid) of
-        [{Pid, Cur_sz}] when Cur_sz < Max_pipe ->
-            case record_request_for_connection(Tid, Pid) of
-                {'EXIT', _} ->
-                    %% The selected process has shutdown
-                    find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe);
-                _ ->
-                    {ok, Pid}
+find_best_connection(Tid, Max_pipeline_size) ->
+    case ets:first(Tid) of
+        {Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size ->
+            case record_request_for_connection(Tid, Key) of
+                true ->
+                    {ok, Pid};
+                false ->
+                    find_best_connection(Tid, Max_pipeline_size)
             end;
-         _ ->
-            find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
+        _ -> 
+            {error, retry_later}
     end.
 
 maybe_create_ets(#state{ets_tid = undefined} = State) ->
@@ -240,10 +246,25 @@
     catch ets:info(Tid, size).
 
 record_new_connection(Tid, Pid) ->
-    catch ets:insert(Tid, {Pid, 0}).
+    catch ets:insert(Tid, {new_key(Pid), undefined}).
 
-record_request_for_connection(Tid, Pid) ->
-    catch ets:update_counter(Tid, Pid, {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}).
+record_request_for_connection(Tid, Key) ->
+    case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(Key)) of
+        1 ->
+            ets:insert(Tid, {incremented(Key), undefined}),
+            true;
+        _ ->
+            false
+    end.
+
+new_key(Pid) ->
+    {1, os:timestamp(), Pid}.
+
+incremented({Size, Timestamp, Pid}) ->
+    {Size + 1, Timestamp, Pid}.
+
+decremented({Size, _Timestamp, Pid}) ->
+    {Size - 1, os:timestamp(), Pid}.
 
 for_each_connection_pid(Tid, Fun) ->
     catch ets:foldl(fun({Pid, _}, _) -> Fun(Pid) end, undefined, Tid),
diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl
index 0a68e14..fc7afec 100644
--- a/test/ibrowse_functional_tests.erl
+++ b/test/ibrowse_functional_tests.erl
@@ -56,15 +56,10 @@
 
     timer:sleep(1000),
 
-    Diffs = [Count - BalancedNumberOfRequestsPerConnection || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
-    ?assertEqual(MaxSessions, length(Diffs)),
+    Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+    ?assertEqual(MaxSessions, length(Counts)),
 
-    lists:foreach(fun(X) -> ?assertEqual(yep, close_to_zero(X)) end, Diffs).
-
-close_to_zero(0) -> yep;
-close_to_zero(-1) -> yep;
-close_to_zero(1) -> yep;
-close_to_zero(X) -> {nope, X}.
+    ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).
 
 times(0, _) ->
     ok;