Shed load in ioq_server

This avoids processing any request that we find either in our message
box or in our internal queuing system that has already timedout and had
the client exit.
diff --git a/src/ioq_benchmark.erl b/src/ioq_benchmark.erl
index 4e2ea2d..8ef63e9 100644
--- a/src/ioq_benchmark.erl
+++ b/src/ioq_benchmark.erl
@@ -77,7 +77,8 @@
     ets:new(?WORKERS, [public, set, named_table]),
     create_files(Opts),
     %run_static(Opts),
-    run_dynamic(Opts).
+    %run_dynamic(Opts).
+    run_static(Opts).
 
 
 run_static(Opts) ->
@@ -328,7 +329,11 @@
         view ->
             case rand:uniform() < 0.6 of
                 true ->
-                    {pread_iolist, rand:uniform()};
+                    Pos = case rand:uniform() of
+                        V when V < 0.05 -> 0.0;
+                        V -> V
+                    end,
+                    {pread_iolist, Pos};
                 false ->
                     {append_binary, rand:uniform()}
             end
diff --git a/src/ioq_server.erl b/src/ioq_server.erl
index 1c8a2c6..4bc6eae 100644
--- a/src/ioq_server.erl
+++ b/src/ioq_server.erl
@@ -319,15 +319,21 @@
 append(A, B) ->
     [A, B].
 
-enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
+enqueue_channel(#request{name = Account} = Req, #state{channels = Q} = State) ->
     DD = State#state.dedupe,
-    % ioq_q's are update-in-place
-    case find_channel(Account, State) of
-        {new, Channel} ->
-            update_channel(Channel, Req, DD),
-            ioq_q:in(Channel, Q);
-        Channel ->
-            update_channel(Channel, Req, DD)
+    ShouldProcess = should_process_request(Req),
+    case should_process_request(Req) of
+        true ->
+            % ioq_q's are update-in-place
+            case find_channel(Account, State) of
+                {new, Channel} ->
+                    update_channel(Channel, Req, DD),
+                    ioq_q:in(Channel, Q);
+                Channel ->
+                    update_channel(Channel, Req, DD)
+            end;
+        false ->
+            ok
     end,
     State.
 
@@ -391,8 +397,26 @@
         false ->
             NewCh2 = ioq_q:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
         end,
-        submit_request(Item2, State#state{channels=NewCh2, qC=NewQC,
-            qR=NewQR, qL=NewQL});
+        NewState = State#state{
+            channels = NewCh2,
+            qC = NewQC,
+            qR = NewQR,
+            qL = NewQL
+        },
+        ShouldProcess = should_process_request(Item2),
+        case ShouldProcess of
+            true ->
+                submit_request(Item2, NewState);
+            false ->
+                % We're recursing here because of the logic
+                % in maybe_submit_request where returning
+                % NewState may actually end up being equal
+                % to the old state due to ioq_q instances
+                % now being references. This would be a bug
+                % that prevented from using all of the
+                % available concurrent request slots.
+                make_next_request(NewState)
+        end;
     _ ->
         % Item is a background (compaction or internal replication) task
         submit_request(Item, State#state{channels=NewCh, qC=NewQC, qR=NewQR,
@@ -423,6 +447,35 @@
     update_counter(Counters, Channel, IOClass, RW),
     State#state{reqs = [Request#request{tsub=SubmitTime, ref=Ref} | Reqs]}.
 
+
+should_process_request(#request{class = interactive} = Request) ->
+    #request{
+        from = Froms,
+        t0 = T0
+    } = Req,
+
+    % Calculate our timeout
+    RawDiff = erlang:monotonic_time() - T0,
+    MilliDiff = erlang:convert_time_unit(Diff, native, millisecond),
+    TimedOut = MilliDiff > ?INITIAL_TIMEOUT,
+
+    if not TimedOut -> true; true ->
+        % Gather the list of pids
+        ClientPids = case Froms of
+            {ClientPid, _Tag} ->
+                [ClientPid];
+            Froms when is_list(Froms) ->
+                [ClientPid || {ClientPid | _From} <- Froms]
+        end,
+
+        AnyAlive = lists:any(fun erlang:is_process_alive/1, ClientPids),
+        if AnyAlive -> true; true ->
+            false
+        end
+    end;
+should_process_request(_Request) ->
+    true.
+
 update_counter(Tab, Channel, IOClass, RW) ->
     upsert(Tab, {Channel, IOClass, RW}, 1).