blob: febbd3169a851c4ae7aaf4b75a35519b10bb2c2c [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_view_changes).
-export([go/5, pack_seqs/1, unpack_seqs/2]).
-export([increment_changes_epoch/0]).
%% exported for upgrade purposes.
-export([keep_sending_changes/8]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(fabric_db_update_listener, [wait_db_updated/1]).
go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
Feed == "longpoll" orelse Feed == "eventsource" ->
Args = make_changes_args(Options),
Since = get_start_seq(DbName, Args),
case validate_start_seq(DbName, Since) of
ok ->
{ok, Acc} = Callback(start, Acc0),
{Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
Ref = make_ref(),
Parent = self(),
UpdateListener = {spawn_link(fabric_db_update_listener, go,
[Parent, Ref, DbName, Timeout]),
Ref},
put(changes_epoch, get_changes_epoch()),
try
keep_sending_changes(
DbName,
Args,
Callback,
Since,
Acc,
Timeout,
UpdateListener,
os:timestamp()
)
after
fabric_db_update_listener:stop(UpdateListener)
end;
Error ->
Callback(Error, Acc0)
end;
go(DbName, "normal", Options, Callback, Acc0) ->
Args = make_changes_args(Options),
Since = get_start_seq(DbName, Args),
case validate_start_seq(DbName, Since) of
ok ->
{ok, Acc} = Callback(start, Acc0),
{ok, Collector} = send_changes(
DbName,
Args,
Callback,
Since,
Acc,
5000
),
#collector{counters=Seqs, user_acc=AccOut, offset=Offset} = Collector,
Callback({stop, pack_seqs(Seqs), pending_count(Offset)}, AccOut);
Error ->
Callback(Error, Acc0)
end.
keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) ->
#changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
{ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
#collector{
limit = Limit2,
counters = NewSeqs,
offset = Offset,
user_acc = AccOut0
} = Collector,
LastSeq = pack_seqs(NewSeqs),
MaintenanceMode = config:get("couchdb", "maintenance_mode"),
NewEpoch = get_changes_epoch() > erlang:get(changes_epoch),
if Limit > Limit2, Feed == "longpoll";
MaintenanceMode == "true"; MaintenanceMode == "nolb"; NewEpoch ->
Callback({stop, LastSeq, pending_count(Offset)}, AccOut0);
true ->
{ok, AccOut} = Callback(waiting_for_updates, AccOut0),
WaitForUpdate = wait_db_updated(UpListen),
AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
Max = case config:get("fabric", "changes_duration") of
undefined ->
infinity;
MaxStr ->
list_to_integer(MaxStr)
end,
case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of
{_, _, changes_feed_died} ->
Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
{undefined, _, timeout} ->
Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
{_, true, timeout} ->
Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
_ ->
{ok, AccTimeout} = Callback(timeout, AccOut),
?MODULE:keep_sending_changes(
DbName,
Args#changes_args{limit=Limit2},
Callback,
LastSeq,
AccTimeout,
Timeout,
UpListen,
T0
)
end
end.
send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
LiveNodes = [node() | nodes()],
AllLiveShards = mem3:live_shards(DbName, LiveNodes),
Seqs0 = unpack_seqs(PackedSeqs, DbName),
{WSeqs0, Dead, Reps} = find_replacements(Seqs0, AllLiveShards),
% Start workers which didn't need replacements
WSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) ->
Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Seq]}),
{S#shard{ref = Ref}, Seq}
end, WSeqs0),
% For some dead workers see if they are a result of split shards. In that
% case make a replacement argument so that local rexi workers can calculate
% (hopefully) a > 0 update sequence.
{WSplitSeqs0, Reps1} = find_split_shard_replacements(Dead, Reps),
WSplitSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) ->
Arg = make_replacement_arg(N, Seq),
Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}),
{S#shard{ref = Ref}, Seq}
end, WSplitSeqs0),
% For ranges that were not split start sequences from 0
WReps = lists:map(fun(#shard{name = Name, node = N} = S) ->
Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
{S#shard{ref = Ref}, 0}
end, Reps1),
Seqs = WSeqs ++ WSplitSeqs ++ WReps,
{Workers0, _} = lists:unzip(Seqs),
Repls = fabric_ring:get_shard_replacements(DbName, Workers0),
StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) ->
%% Find the original shard copy in the Seqs array
case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of
[{#shard{}, {replace, _, _, _}} | _] ->
% Don't attempt to replace a replacement
SeqArg = 0;
[{#shard{node = OldNode}, OldSeq} | _] ->
SeqArg = make_replacement_arg(OldNode, OldSeq);
_ ->
% TODO this clause is probably unreachable in the N>2
% case because we compute replacements only if a shard has one
% in the original set.
couch_log:error("Streaming ~s from zero while replacing ~p",
[Name, PackedSeqs]),
SeqArg = 0
end,
Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, SeqArg]}),
Shard#shard{ref = Ref}
end,
RexiMon = fabric_util:create_monitors(Workers0),
try
case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
{ok, Workers} ->
try
LiveSeqs = lists:map(fun(W) ->
case lists:keyfind(W, 1, Seqs) of
{W, Seq} -> {W, Seq};
_ -> {W, 0}
end
end, Workers),
send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
Callback, AccIn, Timeout)
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers,
waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"changes"
),
throw({error, timeout});
{error, Reason} ->
throw({error, Reason});
Else ->
throw({error, Else})
end
after
rexi_monitor:stop(RexiMon)
end.
send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) ->
State = #collector{
db_name = DbName,
query_args = ChangesArgs,
callback = Callback,
counters = orddict:from_list(Seqs),
user_acc = AccIn,
limit = ChangesArgs#changes_args.limit,
offset = fabric_dict:init(Workers, null),
rows = Seqs % store sequence positions instead
},
%% TODO: errors need to be handled here
receive_results(Workers, State, Timeout, Callback).
receive_results(Workers, State, Timeout, Callback) ->
case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
Timeout, infinity) of
{timeout, NewState0} ->
{ok, AccOut} = Callback(timeout, NewState0#collector.user_acc),
NewState = NewState0#collector{user_acc = AccOut},
receive_results(Workers, NewState, Timeout, Callback);
{_, NewState} ->
{ok, NewState}
end.
handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
fabric_view:check_down_shards(State, NodeRef);
handle_message({rexi_EXIT, Reason}, Worker, State) ->
fabric_view:handle_worker_exit(State, Worker, Reason);
% Temporary upgrade clause - Case 24236
handle_message({complete, Key}, Worker, State) when is_tuple(Key) ->
handle_message({complete, [{seq, Key}, {pending, 0}]}, Worker, State);
handle_message({change, Props}, {Worker, _}, #collector{limit=0} = State) ->
O0 = State#collector.offset,
O1 = case fabric_dict:lookup_element(Worker, O0) of
null ->
% Use Pending+1 because we're ignoring this row in the response
Pending = couch_util:get_value(pending, Props, 0),
fabric_dict:store(Worker, Pending+1, O0);
_ ->
O0
end,
maybe_stop(State#collector{offset = O1});
handle_message({complete, Props}, Worker, #collector{limit=0} = State) ->
O0 = State#collector.offset,
O1 = case fabric_dict:lookup_element(Worker, O0) of
null ->
fabric_dict:store(Worker, couch_util:get_value(pending,Props), O0);
_ ->
O0
end,
maybe_stop(State#collector{offset = O1});
handle_message({no_pass, Props}, {Worker, From}, #collector{limit=0} = State)
when is_list(Props) ->
#collector{counters = S0, offset = O0} = State,
O1 = case fabric_dict:lookup_element(Worker, O0) of
null ->
fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0);
_ ->
O0
end,
S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0),
rexi:stream_ack(From),
maybe_stop(State#collector{counters = S1, offset = O1});
handle_message(#change{} = Row, {Worker, From}, St) ->
Change = {change, [
{seq, Row#change.key},
{id, Row#change.id},
{changes, Row#change.value},
{deleted, Row#change.deleted},
{doc, Row#change.doc}
]},
handle_message(Change, {Worker, From}, St);
handle_message({change, Props}, {Worker, From}, St) ->
#collector{
callback = Callback,
counters = S0,
offset = O0,
limit = Limit,
user_acc = AccIn
} = St,
true = fabric_dict:is_key(Worker, S0),
S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0),
O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0),
% Temporary hack for FB 23637
Interval = erlang:get(changes_seq_interval),
if (Interval == undefined) orelse (Limit rem Interval == 0) ->
Props2 = lists:keyreplace(seq, 1, Props, {seq, pack_seqs(S1)});
true ->
Props2 = lists:keyreplace(seq, 1, Props, {seq, null})
end,
{Go, Acc} = Callback(changes_row(Props2), AccIn),
rexi:stream_ack(From),
{Go, St#collector{counters=S1, offset=O1, limit=Limit-1, user_acc=Acc}};
%% upgrade clause
handle_message({no_pass, Seq}, From, St) when is_integer(Seq) ->
handle_message({no_pass, [{seq, Seq}]}, From, St);
handle_message({no_pass, Props}, {Worker, From}, St) ->
Seq = couch_util:get_value(seq, Props),
#collector{counters = S0} = St,
true = fabric_dict:is_key(Worker, S0),
S1 = fabric_dict:store(Worker, Seq, S0),
rexi:stream_ack(From),
{ok, St#collector{counters=S1}};
handle_message({complete, Props}, Worker, State) ->
Key = couch_util:get_value(seq, Props),
#collector{
counters = S0,
offset = O0,
total_rows = Completed % override
} = State,
true = fabric_dict:is_key(Worker, S0),
S1 = fabric_dict:store(Worker, Key, S0),
O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0),
NewState = State#collector{counters=S1, offset=O1, total_rows=Completed+1},
% We're relying on S1 having exactly the numnber of workers that
% are participtaing in this response. With the new stream_start
% that's a bit more obvious but historically it wasn't quite
% so clear. The Completed variable is just a hacky override
% of the total_rows field in the #collector{} record.
NumWorkers = fabric_dict:size(S1),
Go = case NumWorkers =:= (Completed+1) of
true -> stop;
false -> ok
end,
{Go, NewState}.
make_replacement_arg(Node, {Seq, Uuid}) ->
{replace, Node, Uuid, Seq};
make_replacement_arg(_Node, {Seq, Uuid, EpochNode}) ->
% The replacement should properly be computed aginst the node that owned
% the sequence when it was written to disk (the EpochNode) rather than the
% node we're trying to replace.
{replace, EpochNode, Uuid, Seq};
make_replacement_arg(_, _) ->
0.
maybe_stop(#collector{offset = Offset} = State) ->
case fabric_dict:any(null, Offset) of
false ->
{stop, State};
true ->
% Wait till we've heard from everyone to compute pending count
{ok, State}
end.
make_changes_args(#changes_args{style=Style, filter_fun=undefined}=Args) ->
Args#changes_args{filter_fun = {default, Style}};
make_changes_args(Args) ->
Args.
get_start_seq(DbName, #changes_args{dir=Dir, since=Since})
when Dir == rev; Since == "now" ->
{ok, Info} = fabric:get_db_info(DbName),
couch_util:get_value(update_seq, Info);
get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
Since.
pending_count(Dict) ->
fabric_dict:fold(fun
(_Worker, Count, Acc) when is_integer(Count), is_integer(Acc) ->
Count + Acc;
(_Worker, _Count, _Acc) ->
null
end, 0, Dict).
pack_seqs(Workers) ->
SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
SeqSum = lists:sum([seq(S) || {_,_,S} <- SeqList]),
Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
?l2b([integer_to_list(SeqSum), $-, Opaque]).
seq({Seq, _Uuid, _Node}) -> Seq;
seq({Seq, _Uuid}) -> Seq;
seq(Seq) -> Seq.
unpack_seq_regex_match(Packed) ->
NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$",
OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
Options = [{capture, [opaque], binary}],
case re:run(Packed, NewPattern, Options) of
{match, Match} ->
Match;
nomatch ->
{match, Match} = re:run(Packed, OldPattern, Options),
Match
end.
unpack_seq_decode_term(Opaque) ->
binary_to_term(couch_util:decodeBase64Url(Opaque)).
unpack_seqs(0, DbName) ->
fabric_dict:init(mem3:shards(DbName), 0);
unpack_seqs("0", DbName) ->
fabric_dict:init(mem3:shards(DbName), 0);
unpack_seqs([_SeqNum, Opaque], DbName) -> % deprecated
do_unpack_seqs(Opaque, DbName);
unpack_seqs(Packed, DbName) ->
Opaque = unpack_seq_regex_match(Packed),
do_unpack_seqs(Opaque, DbName).
do_unpack_seqs(Opaque, DbName) ->
% A preventative fix for FB 13533 to remove duplicate shards.
% This just picks each unique shard and keeps the largest seq
% value recorded.
Decoded = unpack_seq_decode_term(Opaque),
DedupDict = lists:foldl(fun({Node, [A, B], Seq}, Acc) ->
dict:append({Node, [A, B]}, Seq, Acc)
end, dict:new(), Decoded),
Deduped = lists:map(fun({{Node, [A, B]}, SeqList}) ->
{Node, [A, B], lists:max(SeqList)}
end, dict:to_list(DedupDict)),
% Create a fabric_dict of {Shard, Seq} entries
% TODO relies on internal structure of fabric_dict as keylist
Unpacked = lists:flatmap(fun({Node, [A,B], Seq}) ->
case mem3:get_shard(DbName, Node, [A,B]) of
{ok, Shard} ->
[{Shard, Seq}];
{error, not_found} ->
[]
end
end, Deduped),
% This just handles the case if the ring in the unpacked sequence
% received is not complete and in that case tries to fill in the
% missing ranges with shards from the shard map
case fabric_ring:is_progress_possible(Unpacked) of
true ->
Unpacked;
false ->
PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) ->
case mem3:get_shard(DbName, Node, [A, B]) of
{ok, Shard} ->
{Shard, Seq};
{error, not_found} ->
{#shard{node = Node, range = [A, B]}, Seq}
end
end, Deduped),
Shards = mem3:shards(DbName),
{Unpacked1, Dead, Reps} = find_replacements(PotentialWorkers, Shards),
{Splits, Reps1} = find_split_shard_replacements(Dead, Reps),
RepSeqs = lists:map(fun(#shard{} = S) ->
{S, get_old_seq(S, Deduped)}
end, Reps1),
Unpacked1 ++ Splits ++ RepSeqs
end.
get_old_seq(#shard{range=R}=Shard, SinceSeqs) ->
case lists:keyfind(R, 2, SinceSeqs) of
{Node, R, Seq} when is_number(Seq) ->
% Unfortunately we don't have access to the db
% uuid so we can't set a replacememnt here.
couch_log:warning("~p get_old_seq missing uuid "
"node: ~p, range: ~p, seq: ~p", [?MODULE, Node, R, Seq]),
0;
{Node, R, {Seq, Uuid}} ->
% This update seq is using the old format that
% didn't include the node. This information is
% important for replacement.
{Seq, Uuid, Node};
{_Node, R, {Seq, Uuid, EpochNode}} ->
% This is the newest sequence format that we
% can use for replacement.
{Seq, Uuid, EpochNode};
Error ->
couch_log:warning("~p get_old_seq error: ~p, shard: ~p, seqs: ~p",
[?MODULE, Error, Shard, SinceSeqs]),
0
end.
changes_row(Props0) ->
Props1 = case couch_util:get_value(deleted, Props0) of
true ->
Props0;
_ ->
lists:keydelete(deleted, 1, Props0)
end,
Allowed = [seq, id, changes, deleted, doc, error],
Props2 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props1),
{change, {Props2}}.
find_replacements(Workers, AllShards) ->
% Build map [B, E] => [Worker1, Worker2, ...] for all workers
WrkMap = lists:foldl(fun({#shard{range = [B, E]}, _} = W, Acc) ->
maps:update_with({B, E}, fun(Ws) -> [W | Ws] end, [W], Acc)
end, #{}, fabric_dict:to_list(Workers)),
% Build map [B, E] => [Shard1, Shard2, ...] for all shards
AllMap = lists:foldl(fun(#shard{range = [B, E]} = S, Acc) ->
maps:update_with({B, E}, fun(Ss) -> [S | Ss] end, [S], Acc)
end, #{}, AllShards),
% Custom sort function will prioritize workers over other shards.
% The idea is to not unnecessarily kill workers if we don't have to
SortFun = fun
(R1 = {B, E1}, R2 = {B, E2}) ->
case {maps:is_key(R1, WrkMap), maps:is_key(R2, WrkMap)} of
{true, true} ->
% Both are workers, larger interval wins
E1 >= E2;
{true, false} ->
% First element is a worker range, it wins
true;
{false, true} ->
% Second element is a worker range, it wins
false;
{false, false} ->
% Neither one is a worker interval, pick larger one
E1 >= E2
end;
({B1, _}, {B2, _}) ->
B1 =< B2
end,
Ring = mem3_util:get_ring(maps:keys(AllMap), SortFun),
% Keep only workers in the ring and from one of the available nodes
Keep = fun(#shard{range = [B, E], node = N}) ->
lists:member({B, E}, Ring) andalso lists:keyfind(N, #shard.node,
maps:get({B, E}, AllMap)) =/= false
end,
Workers1 = fabric_dict:filter(fun(S, _) -> Keep(S) end, Workers),
Removed = fabric_dict:filter(fun(S, _) -> not Keep(S) end, Workers),
{Rep, _} = lists:foldl(fun(R, {RepAcc, AllMapAcc}) ->
case maps:is_key(R, WrkMap)of
true ->
% It's a worker and in the map of available shards. Make sure
% to keep it only if there is a range available on that node
% only (reuse Keep/1 predicate from above)
WorkersInRange = maps:get(R, WrkMap),
case lists:any(fun({S, _}) -> Keep(S) end, WorkersInRange) of
true ->
{RepAcc, AllMapAcc};
false ->
[Shard | Rest] = maps:get(R, AllMapAcc),
{[Shard | RepAcc], AllMapAcc#{R := Rest}}
end;
false ->
% No worker for this range. Replace from available shards
[Shard | Rest] = maps:get(R, AllMapAcc),
{[Shard | RepAcc], AllMapAcc#{R := Rest}}
end
end, {[], AllMap}, Ring),
% Return the list of workers that are part of ring, list of removed workers
% and a list of replacement shards that could be used to make sure the ring
% completes.
{Workers1, Removed, Rep}.
% From the list of dead workers determine if any are a result of a split shard.
% In that case perhaps there is a way to not rewind the changes feed back to 0.
% Returns {NewWorkers, Available} where NewWorkers is the list of
% viable workers Available is the list of still unused input Shards
find_split_shard_replacements(DeadWorkers, Shards) ->
Acc0 = {[], Shards},
AccF = fabric_dict:fold(fun(#shard{node = WN, range = R}, Seq, Acc) ->
[B, E] = R,
{SplitWorkers, Available} = Acc,
ShardsOnSameNode = [S || #shard{node = N} = S <- Available, N =:= WN],
SplitShards = mem3_util:non_overlapping_shards(ShardsOnSameNode, B, E),
RepCount = length(SplitShards),
NewWorkers = [{S, make_split_seq(Seq, RepCount)} || S <- SplitShards],
NewAvailable = [S || S <- Available, not lists:member(S, SplitShards)],
{NewWorkers ++ SplitWorkers, NewAvailable}
end, Acc0, DeadWorkers),
{Workers, Available} = AccF,
{fabric_dict:from_list(Workers), Available}.
make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 ->
{Num, {split, Uuid}, Node};
make_split_seq(Seq, _) ->
Seq.
validate_start_seq(_DbName, "now") ->
ok;
validate_start_seq(_DbName, 0) ->
ok;
validate_start_seq(_DbName, "0") ->
ok;
validate_start_seq(_DbName, Seq) ->
try
case Seq of
[_SeqNum, Opaque] ->
unpack_seq_decode_term(Opaque);
Seq ->
Opaque = unpack_seq_regex_match(Seq),
unpack_seq_decode_term(Opaque)
end,
ok
catch
_:_ ->
Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
{error, {bad_request, Reason}}
end.
get_changes_epoch() ->
case application:get_env(fabric, changes_epoch) of
undefined ->
increment_changes_epoch(),
get_changes_epoch();
{ok, Epoch} ->
Epoch
end.
increment_changes_epoch() ->
application:set_env(fabric, changes_epoch, os:timestamp()).
unpack_seq_setup() ->
meck:new(mem3),
meck:new(fabric_view),
meck:expect(mem3, get_shard, fun(_, _, _) -> {ok, #shard{}} end),
meck:expect(fabric_ring, is_progress_possible, fun(_) -> true end),
ok.
unpack_seqs_test_() ->
{
setup,
fun unpack_seq_setup/0,
fun (_) -> meck:unload() end,
[
t_unpack_seqs()
]
}.
t_unpack_seqs() ->
?_test(begin
% BigCouch 0.3 style.
assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"),
% BigCouch 0.4 style.
assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]),
% BigCouch 0.4 style (as string).
assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
% with internal hypen
assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
"5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
"VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"),
assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
"5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
"VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]),
% CouchDB 1.2 style
assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"")
end).
assert_shards(Packed) ->
?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)).
find_replacements_test() ->
% None of the workers are in the live list of shard but there is a
% replacement on n3 for the full range. It should get picked instead of
% the two smaller one on n2.
Workers1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
AllShards1 = [
mk_shard("n1", 11, ?RING_END),
mk_shard("n2", 0, 4),
mk_shard("n2", 5, 10),
mk_shard("n3", 0, ?RING_END)
],
{WorkersRes1, Dead1, Reps1} = find_replacements(Workers1, AllShards1),
?assertEqual([], WorkersRes1),
?assertEqual(Workers1, Dead1),
?assertEqual([mk_shard("n3", 0, ?RING_END)], Reps1),
% None of the workers are in the live list of shards and there is a
% split replacement from n2 (range [0, 10] replaced with [0, 4], [5, 10])
Workers2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
AllShards2 = [
mk_shard("n1", 11, ?RING_END),
mk_shard("n2", 0, 4),
mk_shard("n2", 5, 10)
],
{WorkersRes2, Dead2, Reps2} = find_replacements(Workers2, AllShards2),
?assertEqual([], WorkersRes2),
?assertEqual(Workers2, Dead2),
?assertEqual([
mk_shard("n1", 11, ?RING_END),
mk_shard("n2", 0, 4),
mk_shard("n2", 5, 10)
], lists:sort(Reps2)),
% One worker is available and one needs to be replaced. Replacement will be
% from two split shards
Workers3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
AllShards3 = [
mk_shard("n1", 11, ?RING_END),
mk_shard("n2", 0, 4),
mk_shard("n2", 5, 10),
mk_shard("n2", 11, ?RING_END)
],
{WorkersRes3, Dead3, Reps3} = find_replacements(Workers3, AllShards3),
?assertEqual(mk_workers([{"n2", 11, ?RING_END}]), WorkersRes3),
?assertEqual(mk_workers([{"n1", 0, 10}]), Dead3),
?assertEqual([
mk_shard("n2", 0, 4),
mk_shard("n2", 5, 10)
], lists:sort(Reps3)),
% All workers are available. Make sure they are not killed even if there is
% a longer (single) shard to replace them.
Workers4 = mk_workers([{"n1", 0, 10}, {"n1", 11, ?RING_END}]),
AllShards4 = [
mk_shard("n1", 0, 10),
mk_shard("n1", 11, ?RING_END),
mk_shard("n2", 0, 4),
mk_shard("n2", 5, 10),
mk_shard("n3", 0, ?RING_END)
],
{WorkersRes4, Dead4, Reps4} = find_replacements(Workers4, AllShards4),
?assertEqual(Workers4, WorkersRes4),
?assertEqual([], Dead4),
?assertEqual([], Reps4).
mk_workers(NodesRanges) ->
mk_workers(NodesRanges, nil).
mk_workers(NodesRanges, Val) ->
orddict:from_list([{mk_shard(N, B, E), Val} || {N, B, E} <- NodesRanges]).
mk_shard(Name, B, E) ->
Node = list_to_atom(Name),
BName = list_to_binary(Name),
#shard{name = BName, node = Node, range = [B, E]}.
find_split_shard_replacements_test() ->
% One worker is can be replaced and one can't
Dead1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
Shards1 = [
mk_shard("n1", 0, 4),
mk_shard("n1", 5, 10),
mk_shard("n3", 11, ?RING_END)
],
{Workers1, ShardsLeft1} = find_split_shard_replacements(Dead1, Shards1),
?assertEqual(mk_workers([{"n1", 0, 4}, {"n1", 5, 10}], 42), Workers1),
?assertEqual([mk_shard("n3", 11, ?RING_END)], ShardsLeft1),
% All workers can be replaced - one by 1 shard, another by 3 smaller shards
Dead2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
Shards2 = [
mk_shard("n1", 0, 10),
mk_shard("n2", 11, 12),
mk_shard("n2", 13, 14),
mk_shard("n2", 15, ?RING_END)
],
{Workers2, ShardsLeft2} = find_split_shard_replacements(Dead2, Shards2),
?assertEqual(mk_workers([
{"n1", 0, 10},
{"n2", 11, 12},
{"n2", 13, 14},
{"n2", 15, ?RING_END}
], 42), Workers2),
?assertEqual([], ShardsLeft2),
% No workers can be replaced. Ranges match but they are on different nodes
Dead3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
Shards3 = [
mk_shard("n2", 0, 10),
mk_shard("n3", 11, ?RING_END)
],
{Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3),
?assertEqual([], Workers3),
?assertEqual(Shards3, ShardsLeft3).