% Use pg process groups to reduce the chance of duplicate replication jobs
% running on the same cluster.
% A custom replicator pg group is started via start_link/0. Then, replication
% jobs check if they would be leaders before starting. If, by chance, two jobs
% with the same RepId start anyway, then replication jobs would do an extra
% check before each checkpoint. If the they are not leaders any longer, they
% should stop running. The "leader" is just the first sorted element in the
% [node(Pid), ...] list.
% Start a custom pg group. Should be called from the replication supervisor.
start_link() ->
% Join a replication job pid to a RepId group
join({_, _} = RepId, Pid) when is_pid(Pid) ->
pg:join(?MODULE, id(RepId), Pid).
% Leave a replication RepId group. This doesn't have to be called explicitly as
% the processes are monitored and automatically removed by pg. It may be nice,
% to call it from terminate/2 to speed things along a bit and clear the group
% quicker.
leave({_, _} = RepId, Pid) when is_pid(Pid) ->
pg:leave(?MODULE, id(RepId), Pid).
% Determine if a replication job should start on a particular node. If it
% should, return `yes`, otherwise return `{no, OtherPid}`. `OtherPid` is
% the pid of the replication job that is already running.
should_start({_, _} = RepId, Node) when is_atom(Node) ->
no_other_nodes(Node, pids(RepId)).
% Determine if the replication job should keep running as the main job for that
% RepId. If it is, return yes, otherwise return `{no, OtherPid}`. `OtherPid` is
% the pid of the replication job that should stay running instead of this one.
should_run({_, _} = RepId, Pid) when is_pid(Pid) ->
case pids(RepId) of
[OtherPid | _] when OtherPid =/= Pid -> {no, OtherPid};
_ -> yes
% Sort all the pids by node first to get some deterministic order. For all we
% know, pids may already sort that way, but we're just making it explicit here
% in case it somehow changes in the future.
pids({_, _} = RepId) ->
NodePids = [{node(P), P} || P <- pg:get_members(?MODULE, id(RepId))],
{_, Pids} = lists:unzip(lists:usort(NodePids)),
id({Base, Ext}) ->
iolist_to_binary([Base, Ext]).
no_other_nodes(_, []) ->
no_other_nodes(Node, [Pid | _]) when Node =/= node(Pid) ->
{no, Pid};
no_other_nodes(Node, [Pid | Pids]) when Node =:= node(Pid) ->
no_other_nodes(Node, Pids).
couch_replicator_pg_test_() ->
fun setup/0,
fun teardown/1,
setup() ->
{ok, PGPid} = start_link(),
teardown(PGPid) when is_pid(PGPid) ->
?assertEqual(ok, gen_server:stop(PGPid)).
t_start_stop(PGPid) ->
?assertEqual([], pg:which_groups(?MODULE)).
t_join_leave(_) ->
RepId = {"a", "+b"},
?assertEqual([], pids(RepId)),
Pid = self(),
?assertEqual(ok, join(RepId, Pid)),
?assertEqual([id(RepId)], pg:which_groups(?MODULE)),
?assertEqual([Pid], pids(RepId)),
?assertEqual(ok, leave(RepId, Pid)),
?assertEqual(not_joined, leave(RepId, Pid)),
?assertEqual([], pids(RepId)),
?assertEqual([], pg:which_groups(?MODULE)).
t_should_start(_) ->
RepId = {"a", "+b"},
?assertEqual(yes, should_start(RepId, node())),
?assertEqual(yes, should_start(RepId, '')),
Pid = self(),
ok = join(RepId, Pid),
% On the same node we let it start, it will blow up anyway in the
% supervisor.
?assertEqual(yes, should_start(RepId, node())),
?assertEqual({no, Pid}, should_start(RepId, '')).
t_should_run(_) ->
RepId = {"a", "+b"},
Pid = self(),
% This is odd case, somehow a job asks if it should run but it hasn't
% registered. We just choose to let it run.
?assertEqual(yes, should_run(RepId, Pid)),
ok = join(RepId, Pid),
% The only job registered is itself
?assertEqual(yes, should_run(RepId, Pid)),
% Let's add <0.0.0> init so it can sort lower
InitPid = whereis(init),
ok = join(RepId, InitPid),
?assertEqual({no, InitPid}, should_run(RepId, Pid)).