blob: 4c3407a3b31ba1835de1a8a25e75da6c8bee002a [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_streams).
-export([
start/2,
start/3,
start/4,
start/5,
cleanup/1,
enable_watchdog/0,
kick_watchdog/0
]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-define(WORKER_WATCHDOG, fabric_worker_watchdog).
-define(ADD_WORKER, add_worker).
-define(WATCHDOG_ENABLE, watchdog_enable).
-define(WATCHDOG_TIMEOUT, watchdog_timeout).
-define(WATCHDOG_KICK, watchdog_kick).
-define(WATCHDOG_LAST_KICK, watchdog_last_kick).
-define(WATCHDOG_WAS_KICKED, St#watchdog_state.kicked).
-define(WATCHDOG_IS_ENABLED, St#watchdog_state.timer /= undefined).
-record(watchdog_state, {
coordinator,
workers,
timer,
kicked = false
}).
start(Workers, Keypos) ->
start(Workers, Keypos, undefined, undefined).
start(Workers, Keypos, RingOpts) ->
start(Workers, Keypos, undefined, undefined, RingOpts).
start(Workers, Keypos, StartFun, Replacements) ->
start(Workers, Keypos, StartFun, Replacements, []).
start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
Fun = fun handle_stream_start/3,
Acc = #stream_acc{
workers = fabric_dict:init(Workers0, waiting),
ready = [],
start_fun = StartFun,
replacements = Replacements,
ring_opts = RingOpts
},
spawn_worker_watchdog(self(), Workers0),
Timeout = fabric_util:request_timeout(),
case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
{ok, #stream_acc{ready = Workers}} ->
AckedWorkers = fabric_dict:fold(
fun(Worker, From, WorkerAcc) ->
rexi:stream_start(From),
[Worker | WorkerAcc]
end,
[],
Workers
),
{ok, AckedWorkers};
Else ->
Else
end.
cleanup(Workers) ->
% Stop the auxiliary watchdog process as we got to the point where cleanup
% happens in the regular fashion so we don't want to send 2x the number kill
% messages
case get(?WORKER_WATCHDOG) of
WatchdogPid when is_pid(WatchdogPid) ->
erase(?WORKER_WATCHDOG),
exit(WatchdogPid, kill);
_ ->
ok
end,
fabric_util:cleanup(Workers).
handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
#stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
case fabric_ring:node_down(NodeRef, Workers, Ready, RingOpts) of
{ok, Workers1} ->
{ok, St#stream_acc{workers = Workers1}};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
#stream_acc{
workers = Workers,
ready = Ready,
replacements = Replacements,
ring_opts = RingOpts
} = St,
case {fabric_ring:handle_error(Worker, Workers, Ready, RingOpts), Reason} of
{{ok, Workers1}, _Reason} ->
{ok, St#stream_acc{workers = Workers1}};
{error, {maintenance_mode, _Node}} when Replacements /= undefined ->
% Check if we have replacements for this range
% and start the new workers if so.
case lists:keytake(Worker#shard.range, 1, Replacements) of
{value, {_Range, WorkerReplacements}, NewReplacements} ->
FinalWorkers = lists:foldl(
fun(Repl, NewWorkers) ->
NewWorker = (St#stream_acc.start_fun)(Repl),
add_worker_to_watchdog(self(), NewWorker),
fabric_dict:store(NewWorker, waiting, NewWorkers)
end,
Workers,
WorkerReplacements
),
% Assert that our replaced worker provides us
% the oppurtunity to make progress. Need to make sure
% to include already processed responses, since we are
% checking the full range and some workers have already
% responded and were removed from the workers list
ReadyWorkers = [{W, R} || {_, W, R} <- Ready],
AllWorkers = FinalWorkers ++ ReadyWorkers,
true = fabric_ring:is_progress_possible(AllWorkers),
NewRefs = fabric_dict:fetch_keys(FinalWorkers),
{new_refs, NewRefs, St#stream_acc{
workers = FinalWorkers,
replacements = NewReplacements
}};
false ->
% If we progress isn't possible and we don't have any
% replacements then we're dead in the water.
{error, {nodedown, <<"progress not possible">>}}
end;
{error, _} ->
{error, fabric_util:error_info(Reason)}
end;
handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
#stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
case fabric_dict:lookup_element(Worker, Workers) of
undefined ->
% This worker lost the race with other partition copies, terminate
rexi:stream_cancel(From),
{ok, St};
waiting ->
case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of
{ok, {Workers1, Ready1}} ->
% Don't have a full ring yet. Keep getting responses
{ok, St#stream_acc{workers = Workers1, ready = Ready1}};
{stop, Ready1} ->
% Have a full ring of workers. But don't ack the worker
% yet so they don't start sending us rows until we're ready
{stop, St#stream_acc{workers = [], ready = Ready1}}
end
end;
handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error == insufficient_storage ->
WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
cleanup(WaitingWorkers ++ ReadyWorkers),
{stop, Error};
handle_stream_start(Else, _, _) ->
exit({invalid_stream_start, Else}).
% Spawn an auxiliary rexi worker watchdog which triggers cleanup if;
% * nothing has been streamed for $timeout duration
% * the coordinator (request) process is forceably killed and doesn't
% get a chance to process its `after` fabric:clean/1 clause.
% The watchdog is initially disabled. Clients can enable it by calling
% enable_watchdog/0 before starting a fabric_stream. That client is responsible
% for calling kick_watchdog/0 on activity to prevent the watchdog from acting.
% If kick_watchdog/0 is not called at least once in each watchdog interval,
% the stream coordinator is killed and the workers are cleaned up.
spawn_worker_watchdog(Coordinator, Workers) ->
case get(?WORKER_WATCHDOG) of
undefined ->
State0 = #watchdog_state{
coordinator = Coordinator,
workers = Workers
},
Enabled = get(?WATCHDOG_ENABLE),
Pid = spawn(fun() ->
erlang:monitor(process, Coordinator),
State1 =
case Enabled of
true ->
reset_watchdog(State0);
undefined ->
State0
end,
watchdog_loop(State1)
end),
put(?WORKER_WATCHDOG, Pid),
Pid;
ExistingWatchdog ->
ExistingWatchdog
end.
watchdog_loop(#watchdog_state{} = St) ->
receive
{?ADD_WORKER, Pid, Worker} when Pid == St#watchdog_state.coordinator ->
Workers = St#watchdog_state.workers,
watchdog_loop(St#watchdog_state{workers = [Worker | Workers]});
?WATCHDOG_KICK when ?WATCHDOG_IS_ENABLED ->
watchdog_loop(St#watchdog_state{kicked = true});
?WATCHDOG_KICK ->
watchdog_loop(St);
?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED, not ?WATCHDOG_WAS_KICKED ->
exit(St#watchdog_state.coordinator, {shutdown, watchdog_fired}),
watchdog_loop(St);
?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED ->
watchdog_loop(reset_watchdog(St));
?WATCHDOG_TIMEOUT ->
St;
{'DOWN', _, _, Pid, _} when Pid == St#watchdog_state.coordinator ->
fabric_util:cleanup(St#watchdog_state.workers)
end.
add_worker_to_watchdog(CoordinatorPid, Worker) ->
send_to_watchdog({?ADD_WORKER, CoordinatorPid, Worker}).
enable_watchdog() ->
put(?WATCHDOG_ENABLE, true).
reset_watchdog(#watchdog_state{} = St) ->
case watchdog_timeout() of
infinity ->
St;
Timeout ->
TRef = erlang:send_after(Timeout, self(), ?WATCHDOG_TIMEOUT),
St#watchdog_state{timer = TRef, kicked = false}
end.
kick_watchdog() ->
case should_kick() of
true ->
send_to_watchdog(?WATCHDOG_KICK),
update_last_kick();
false ->
ok
end.
should_kick() ->
case watchdog_timeout() of
infinity ->
false;
Timeout when is_integer(Timeout) ->
case get(?WATCHDOG_LAST_KICK) of
undefined ->
true;
LastKick when is_integer(LastKick) ->
Now = erlang:monotonic_time(),
Delta = erlang:convert_time_unit(
Now - LastKick, native, millisecond
),
Delta > Timeout
end
end.
update_last_kick() ->
put(?WATCHDOG_LAST_KICK, erlang:monotonic_time()).
send_to_watchdog(Msg) ->
case get(?WORKER_WATCHDOG) of
WatchdogPid when is_pid(WatchdogPid) ->
WatchdogPid ! Msg;
_ ->
ok
end.
watchdog_timeout() ->
fabric_util:timeout("idle_stream", "60000").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
worker_watchdog_test_() ->
{
"Fabric spawn_worker_watchdog test",
{
setup,
fun setup/0,
fun teardown/1,
fun(_) ->
[
should_clean_workers(),
does_not_fire_if_cleanup_called(),
should_clean_additional_worker_too()
]
end
}
}.
should_clean_workers() ->
?_test(begin
meck:reset(rexi),
erase(?WORKER_WATCHDOG),
Workers = [
#shard{node = 'n1', ref = make_ref()},
#shard{node = 'n2', ref = make_ref()}
],
{Coord, _} = spawn_monitor(fun() ->
receive
die -> ok
end
end),
Watchdog = spawn_worker_watchdog(Coord, Workers),
Ref = erlang:monitor(process, Watchdog),
Coord ! die,
receive
{'DOWN', Ref, _, Watchdog, _} -> ok
end,
?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
does_not_fire_if_cleanup_called() ->
?_test(begin
meck:reset(rexi),
erase(?WORKER_WATCHDOG),
Workers = [
#shard{node = 'n1', ref = make_ref()},
#shard{node = 'n2', ref = make_ref()}
],
{Coord, _} = spawn_monitor(fun() ->
receive
die -> ok
end
end),
Watchdog = spawn_worker_watchdog(Coord, Workers),
Ref = erlang:monitor(process, Watchdog),
cleanup(Workers),
Coord ! die,
receive
{'DOWN', Ref, _, _, _} -> ok
end,
% 2 calls would be from cleanup/1 function. If cleanup process fired
% too it would have been 4 calls total.
?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
should_clean_additional_worker_too() ->
?_test(begin
meck:reset(rexi),
erase(?WORKER_WATCHDOG),
Workers = [
#shard{node = 'n1', ref = make_ref()}
],
{Coord, _} = spawn_monitor(fun() ->
receive
die -> ok
end
end),
Watchdog = spawn_worker_watchdog(Coord, Workers),
add_worker_to_watchdog(Coord, #shard{node = 'n2', ref = make_ref()}),
Ref = erlang:monitor(process, Watchdog),
Coord ! die,
receive
{'DOWN', Ref, _, Watchdog, _} -> ok
end,
?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
setup() ->
ok = meck:expect(rexi, kill_all, fun(_) -> ok end).
teardown(_) ->
meck:unload().
-endif.