Shed load in ioq_server
This avoids processing any request that we find either in our message
box or in our internal queuing system that has already timedout and had
the client exit.
diff --git a/src/ioq_benchmark.erl b/src/ioq_benchmark.erl
index 4e2ea2d..8ef63e9 100644
--- a/src/ioq_benchmark.erl
+++ b/src/ioq_benchmark.erl
@@ -77,7 +77,8 @@
ets:new(?WORKERS, [public, set, named_table]),
create_files(Opts),
%run_static(Opts),
- run_dynamic(Opts).
+ %run_dynamic(Opts).
+ run_static(Opts).
run_static(Opts) ->
@@ -328,7 +329,11 @@
view ->
case rand:uniform() < 0.6 of
true ->
- {pread_iolist, rand:uniform()};
+ Pos = case rand:uniform() of
+ V when V < 0.05 -> 0.0;
+ V -> V
+ end,
+ {pread_iolist, Pos};
false ->
{append_binary, rand:uniform()}
end
diff --git a/src/ioq_server.erl b/src/ioq_server.erl
index 1c8a2c6..4bc6eae 100644
--- a/src/ioq_server.erl
+++ b/src/ioq_server.erl
@@ -319,15 +319,21 @@
append(A, B) ->
[A, B].
-enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
+enqueue_channel(#request{name = Account} = Req, #state{channels = Q} = State) ->
DD = State#state.dedupe,
- % ioq_q's are update-in-place
- case find_channel(Account, State) of
- {new, Channel} ->
- update_channel(Channel, Req, DD),
- ioq_q:in(Channel, Q);
- Channel ->
- update_channel(Channel, Req, DD)
+ ShouldProcess = should_process_request(Req),
+ case should_process_request(Req) of
+ true ->
+ % ioq_q's are update-in-place
+ case find_channel(Account, State) of
+ {new, Channel} ->
+ update_channel(Channel, Req, DD),
+ ioq_q:in(Channel, Q);
+ Channel ->
+ update_channel(Channel, Req, DD)
+ end;
+ false ->
+ ok
end,
State.
@@ -391,8 +397,26 @@
false ->
NewCh2 = ioq_q:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
end,
- submit_request(Item2, State#state{channels=NewCh2, qC=NewQC,
- qR=NewQR, qL=NewQL});
+ NewState = State#state{
+ channels = NewCh2,
+ qC = NewQC,
+ qR = NewQR,
+ qL = NewQL
+ },
+ ShouldProcess = should_process_request(Item2),
+ case ShouldProcess of
+ true ->
+ submit_request(Item2, NewState);
+ false ->
+ % We're recursing here because of the logic
+ % in maybe_submit_request where returning
+ % NewState may actually end up being equal
+ % to the old state due to ioq_q instances
+ % now being references. This would be a bug
+ % that prevented from using all of the
+ % available concurrent request slots.
+ make_next_request(NewState)
+ end;
_ ->
% Item is a background (compaction or internal replication) task
submit_request(Item, State#state{channels=NewCh, qC=NewQC, qR=NewQR,
@@ -423,6 +447,35 @@
update_counter(Counters, Channel, IOClass, RW),
State#state{reqs = [Request#request{tsub=SubmitTime, ref=Ref} | Reqs]}.
+
+should_process_request(#request{class = interactive} = Request) ->
+ #request{
+ from = Froms,
+ t0 = T0
+ } = Req,
+
+ % Calculate our timeout
+ RawDiff = erlang:monotonic_time() - T0,
+ MilliDiff = erlang:convert_time_unit(Diff, native, millisecond),
+ TimedOut = MilliDiff > ?INITIAL_TIMEOUT,
+
+ if not TimedOut -> true; true ->
+ % Gather the list of pids
+ ClientPids = case Froms of
+ {ClientPid, _Tag} ->
+ [ClientPid];
+ Froms when is_list(Froms) ->
+ [ClientPid || {ClientPid | _From} <- Froms]
+ end,
+
+ AnyAlive = lists:any(fun erlang:is_process_alive/1, ClientPids),
+ if AnyAlive -> true; true ->
+ false
+ end
+ end;
+should_process_request(_Request) ->
+ true.
+
update_counter(Tab, Channel, IOClass, RW) ->
upsert(Tab, {Channel, IOClass, RW}, 1).