| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(fabric_streams). |
| |
| -export([ |
| start/2, |
| start/3, |
| start/4, |
| start/5, |
| cleanup/1 |
| ]). |
| |
| -include_lib("fabric/include/fabric.hrl"). |
| -include_lib("mem3/include/mem3.hrl"). |
| |
| -define(WORKER_CLEANER, fabric_worker_cleaner). |
| |
| start(Workers, Keypos) -> |
| start(Workers, Keypos, undefined, undefined). |
| |
| start(Workers, Keypos, RingOpts) -> |
| start(Workers, Keypos, undefined, undefined, RingOpts). |
| |
| start(Workers, Keypos, StartFun, Replacements) -> |
| start(Workers, Keypos, StartFun, Replacements, []). |
| |
| start(Workers0, Keypos, StartFun, Replacements, RingOpts) -> |
| Fun = fun handle_stream_start/3, |
| Acc = #stream_acc{ |
| workers = fabric_dict:init(Workers0, waiting), |
| ready = [], |
| start_fun = StartFun, |
| replacements = Replacements, |
| ring_opts = RingOpts |
| }, |
| spawn_worker_cleaner(self(), Workers0), |
| Timeout = fabric_util:request_timeout(), |
| case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of |
| {ok, #stream_acc{ready = Workers}} -> |
| AckedWorkers = fabric_dict:fold( |
| fun(Worker, From, WorkerAcc) -> |
| rexi:stream_start(From), |
| [Worker | WorkerAcc] |
| end, |
| [], |
| Workers |
| ), |
| {ok, AckedWorkers}; |
| Else -> |
| Else |
| end. |
| |
| cleanup(Workers) -> |
| % Stop the auxiliary cleaner process as we got to the point where cleanup |
| % happesn in the regular fashion so we don't want to send 2x the number kill |
| % messages |
| case get(?WORKER_CLEANER) of |
| CleanerPid when is_pid(CleanerPid) -> |
| erase(?WORKER_CLEANER), |
| exit(CleanerPid, kill); |
| _ -> |
| ok |
| end, |
| fabric_util:cleanup(Workers). |
| |
| handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) -> |
| #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St, |
| case fabric_ring:node_down(NodeRef, Workers, Ready, RingOpts) of |
| {ok, Workers1} -> |
| {ok, St#stream_acc{workers = Workers1}}; |
| error -> |
| {error, {nodedown, <<"progress not possible">>}} |
| end; |
| handle_stream_start({rexi_EXIT, Reason}, Worker, St) -> |
| #stream_acc{ |
| workers = Workers, |
| ready = Ready, |
| replacements = Replacements, |
| ring_opts = RingOpts |
| } = St, |
| case {fabric_ring:handle_error(Worker, Workers, Ready, RingOpts), Reason} of |
| {{ok, Workers1}, _Reason} -> |
| {ok, St#stream_acc{workers = Workers1}}; |
| {error, {maintenance_mode, _Node}} when Replacements /= undefined -> |
| % Check if we have replacements for this range |
| % and start the new workers if so. |
| case lists:keytake(Worker#shard.range, 1, Replacements) of |
| {value, {_Range, WorkerReplacements}, NewReplacements} -> |
| FinalWorkers = lists:foldl( |
| fun(Repl, NewWorkers) -> |
| NewWorker = (St#stream_acc.start_fun)(Repl), |
| add_worker_to_cleaner(self(), NewWorker), |
| fabric_dict:store(NewWorker, waiting, NewWorkers) |
| end, |
| Workers, |
| WorkerReplacements |
| ), |
| % Assert that our replaced worker provides us |
| % the oppurtunity to make progress. Need to make sure |
| % to include already processed responses, since we are |
| % checking the full range and some workers have already |
| % responded and were removed from the workers list |
| ReadyWorkers = [{W, R} || {_, W, R} <- Ready], |
| AllWorkers = FinalWorkers ++ ReadyWorkers, |
| true = fabric_ring:is_progress_possible(AllWorkers), |
| NewRefs = fabric_dict:fetch_keys(FinalWorkers), |
| {new_refs, NewRefs, St#stream_acc{ |
| workers = FinalWorkers, |
| replacements = NewReplacements |
| }}; |
| false -> |
| % If we progress isn't possible and we don't have any |
| % replacements then we're dead in the water. |
| {error, {nodedown, <<"progress not possible">>}} |
| end; |
| {error, _} -> |
| {error, fabric_util:error_info(Reason)} |
| end; |
| handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) -> |
| #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St, |
| case fabric_dict:lookup_element(Worker, Workers) of |
| undefined -> |
| % This worker lost the race with other partition copies, terminate |
| rexi:stream_cancel(From), |
| {ok, St}; |
| waiting -> |
| case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of |
| {ok, {Workers1, Ready1}} -> |
| % Don't have a full ring yet. Keep getting responses |
| {ok, St#stream_acc{workers = Workers1, ready = Ready1}}; |
| {stop, Ready1} -> |
| % Have a full ring of workers. But don't ack the worker |
| % yet so they don't start sending us rows until we're ready |
| {stop, St#stream_acc{workers = [], ready = Ready1}} |
| end |
| end; |
| handle_stream_start({ok, ddoc_updated}, _, St) -> |
| WaitingWorkers = [W || {W, _} <- St#stream_acc.workers], |
| ReadyWorkers = [W || {W, _} <- St#stream_acc.ready], |
| cleanup(WaitingWorkers ++ ReadyWorkers), |
| {stop, ddoc_updated}; |
| handle_stream_start(Else, _, _) -> |
| exit({invalid_stream_start, Else}). |
| |
| % Spawn an auxiliary rexi worker cleaner. This will be used in cases |
| % when the coordinator (request) process is forceably killed and doesn't |
| % get a chance to process its `after` fabric:clean/1 clause. |
| spawn_worker_cleaner(Coordinator, Workers) -> |
| case get(?WORKER_CLEANER) of |
| undefined -> |
| Pid = spawn(fun() -> |
| erlang:monitor(process, Coordinator), |
| cleaner_loop(Coordinator, Workers) |
| end), |
| put(?WORKER_CLEANER, Pid), |
| Pid; |
| ExistingCleaner -> |
| ExistingCleaner |
| end. |
| |
| cleaner_loop(Pid, Workers) -> |
| receive |
| {add_worker, Pid, Worker} -> |
| cleaner_loop(Pid, [Worker | Workers]); |
| {'DOWN', _, _, Pid, _} -> |
| fabric_util:cleanup(Workers) |
| end. |
| |
| add_worker_to_cleaner(CoordinatorPid, Worker) -> |
| case get(?WORKER_CLEANER) of |
| CleanerPid when is_pid(CleanerPid) -> |
| CleanerPid ! {add_worker, CoordinatorPid, Worker}; |
| _ -> |
| ok |
| end. |
| |
| -ifdef(TEST). |
| |
| -include_lib("eunit/include/eunit.hrl"). |
| |
| worker_cleaner_test_() -> |
| { |
| "Fabric spawn_worker_cleaner test", |
| { |
| setup, |
| fun setup/0, |
| fun teardown/1, |
| fun(_) -> |
| [ |
| should_clean_workers(), |
| does_not_fire_if_cleanup_called(), |
| should_clean_additional_worker_too() |
| ] |
| end |
| } |
| }. |
| |
| should_clean_workers() -> |
| ?_test(begin |
| meck:reset(rexi), |
| erase(?WORKER_CLEANER), |
| Workers = [ |
| #shard{node = 'n1', ref = make_ref()}, |
| #shard{node = 'n2', ref = make_ref()} |
| ], |
| {Coord, _} = spawn_monitor(fun() -> |
| receive |
| die -> ok |
| end |
| end), |
| Cleaner = spawn_worker_cleaner(Coord, Workers), |
| Ref = erlang:monitor(process, Cleaner), |
| Coord ! die, |
| receive |
| {'DOWN', Ref, _, Cleaner, _} -> ok |
| end, |
| ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) |
| end). |
| |
| does_not_fire_if_cleanup_called() -> |
| ?_test(begin |
| meck:reset(rexi), |
| erase(?WORKER_CLEANER), |
| Workers = [ |
| #shard{node = 'n1', ref = make_ref()}, |
| #shard{node = 'n2', ref = make_ref()} |
| ], |
| {Coord, _} = spawn_monitor(fun() -> |
| receive |
| die -> ok |
| end |
| end), |
| Cleaner = spawn_worker_cleaner(Coord, Workers), |
| Ref = erlang:monitor(process, Cleaner), |
| cleanup(Workers), |
| Coord ! die, |
| receive |
| {'DOWN', Ref, _, _, _} -> ok |
| end, |
| % 2 calls would be from cleanup/1 function. If cleanup process fired |
| % too it would have been 4 calls total. |
| ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) |
| end). |
| |
| should_clean_additional_worker_too() -> |
| ?_test(begin |
| meck:reset(rexi), |
| erase(?WORKER_CLEANER), |
| Workers = [ |
| #shard{node = 'n1', ref = make_ref()} |
| ], |
| {Coord, _} = spawn_monitor(fun() -> |
| receive |
| die -> ok |
| end |
| end), |
| Cleaner = spawn_worker_cleaner(Coord, Workers), |
| add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}), |
| Ref = erlang:monitor(process, Cleaner), |
| Coord ! die, |
| receive |
| {'DOWN', Ref, _, Cleaner, _} -> ok |
| end, |
| ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) |
| end). |
| |
| setup() -> |
| ok = meck:expect(rexi, kill_all, fun(_) -> ok end). |
| |
| teardown(_) -> |
| meck:unload(). |
| |
| -endif. |