blob: 1c1ed6461a775eeabcc13fb6505f57e88a5d3110 [file] [log] [blame]
% Copyright 2013 Cloudant
%
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(mem3_rebalance).
-export([
contract/0,
contract/1,
contract/3,
expand/0,
expand/1,
expand/3,
fix_zoning/0,
fix_zoning/1,
fix_zoning/2,
print/1
]).
% Exposed for debugging purposes
-export([
shard_count_by_node/1,
shard_count_view/0
]).
-include("mem3.hrl").
-record (gacc, {
donors,
targets,
moves,
limit,
target_level
}).
%% @equiv expand(1000)
-spec expand() -> [{atom(), #shard{}, node()}].
expand() ->
expand(1000).
%% @doc Expands a cluster without requiring each DB to be optimally balanced.
-spec expand(integer() | global) -> [{atom(), #shard{}, node()}].
expand(global) ->
{ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]),
erlang:put(fd, FD),
global_expand(surviving_nodes(), [], 1000);
%% @doc Expands all databases in the cluster, stopping at Limit operations.
expand(Limit) when is_integer(Limit), Limit > 0 ->
{ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]),
erlang:put(fd, FD),
TargetNodes = surviving_nodes(),
LocalBalanceFun = fun(Db, Moves) -> expand(Db, TargetNodes, Moves) end,
LocalBalanceOps = apply_to_cluster(LocalBalanceFun, Limit),
% Now apply additional operations as needed to achieve global balance.
global_expand(TargetNodes, LocalBalanceOps, Limit);
expand(DbName) when is_binary(DbName); is_list(DbName) ->
TargetNodes = surviving_nodes(),
expand(DbName, TargetNodes, []).
%% @doc Computes a plan to balance the shards across the target nodes.
-spec expand(DbName::iolist(), [node()], [{atom(), #shard{}, node()}]) ->
[{atom(), #shard{}, node()}].
expand(DbName, Nodes, PrevMoves) ->
Shards = mem3:shards(DbName),
Floor = length(Shards) div length(Nodes),
% Ensure every target node reaches the floor
{NewShards, Moves0} = rebalance2(Floor, Shards, Nodes, Nodes, PrevMoves),
% Now look for any nodes with more than floor+1 shards
{_, Moves} = rebalance2(Floor+1, NewShards, Nodes, Nodes, Moves0),
Moves.
%% @equiv contract(1000)
-spec contract() -> [{atom(), #shard{}, node()}].
contract() ->
contract(1000).
%% @doc Computes a plan to remove up to Limit shards from nodes in "decom" zone.
-spec contract(integer()) -> [{atom(), #shard{}, node()}].
contract(Limit) when is_integer(Limit), Limit > 0 ->
{ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]),
erlang:put(fd, FD),
TargetNodes = surviving_nodes(),
apply_to_cluster(fun(Db, Moves) -> contract(Db, TargetNodes, Moves) end, Limit);
contract(DbName) when is_binary(DbName); is_list(DbName) ->
TargetNodes = surviving_nodes(),
contract(DbName, TargetNodes, []).
%% @doc Computes a plan to consolidate shards from a single database onto the
%% supplied set of nodes.
-spec contract(DbName::iolist(), [node()], [{atom(), #shard{}, node()}]) ->
[{atom(), #shard{}, node()}].
contract(DbName, TargetNodes, PrevMoves) ->
{OK, MoveThese} = lists:partition(fun(#shard{node=Node}) ->
lists:member(Node, TargetNodes)
end, mem3:shards(DbName)),
find_homes(MoveThese, shards_by_node(OK, TargetNodes), PrevMoves).
%% @equiv fix_zoning(1000)
-spec fix_zoning() -> [{atom(), #shard{}, node()}].
fix_zoning() ->
fix_zoning(1000).
%% @doc Computes a plan containg up to Limit operations to repair replica
%% levels and improper zoning.
-spec fix_zoning(integer()) -> [{atom(), #shard{}, node()}].
fix_zoning(Limit) when is_integer(Limit), Limit > 0 ->
{ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]),
erlang:put(fd, FD),
apply_to_cluster(fun fix_zoning/2, Limit);
fix_zoning(DbName) when is_binary(DbName); is_list(DbName) ->
fix_zoning(DbName, []).
%% @doc Computes a plan to repair replica levels and improper zoning for a
%% single database.
-spec fix_zoning(DbName::iolist(), [{atom(), #shard{}, node()}]) ->
[{atom(), #shard{}, node()}].
fix_zoning(DbName, PrevMoves) ->
IdealZoning = orddict:from_list(mem3:get_placement([])),
ByRange = shards_by_range(mem3:shards(DbName)),
orddict:fold(fun(_Range, Shards, Acc) ->
compute_moves(IdealZoning, computed_zoning(Shards), Shards, Acc)
end, PrevMoves, ByRange).
%% Internal functions.
global_expand(TargetNodes0, LocalOps, Limit) ->
TargetNodes = [couch_util:to_binary(Node) || Node <- TargetNodes0],
CountByNode = lists:filter(fun({Node, _Count}) ->
lists:member(Node, TargetNodes)
end, shard_count_by_node(LocalOps)),
TotalCount = lists:foldl(fun({_, C}, Sum) -> Sum + C end, 0, CountByNode),
TargetLevel = TotalCount div length(TargetNodes),
Donors = [{list_to_existing_atom(binary_to_list(N)), C - TargetLevel} ||
{N, C} <- CountByNode, C > TargetLevel],
InternalAcc0 = #gacc{
donors = orddict:from_list(Donors),
targets = TargetNodes0,
moves = LocalOps,
limit = Limit - length(LocalOps),
target_level = TargetLevel
},
try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
#gacc{moves = Moves} ->
Moves
catch
{complete, Moves} ->
Moves
end.
donate_fold(_Shard, #gacc{limit = 0, moves = Moves}) ->
throw({complete, Moves});
donate_fold(#shard{node = Node} = Shard, Acc0) ->
#gacc{
donors = Donors,
targets = Nodes,
moves = Moves,
limit = DC,
target_level = TargetLevel
} = Acc0,
Zone = mem3:node_info(Node, <<"zone">>),
Shards = apply_shard_moves(mem3:shards(Shard#shard.dbname), Moves),
InZone = filter_map_by_zone(shards_by_node(Shards, Nodes), Zone),
SortedByCount = lists:sort(smallest_first(Moves), InZone),
SourceCount = get_shard_count(Node, SortedByCount),
GlobalShardCounts = shard_count_by_node(Moves),
TotalSource = get_global_shard_count(Node, GlobalShardCounts),
Fun = fun({CandidateNode, OwnShards}) ->
HasRange = lists:keymember(Shard#shard.range, #shard.range, OwnShards),
TargetCount = get_shard_count(CandidateNode, SortedByCount),
TotalTarget = get_global_shard_count(CandidateNode, GlobalShardCounts),
if
CandidateNode =:= Node ->
% Can't move a shard to ourselves
true;
HasRange ->
% The candidate already has this shard
true;
TargetCount >= SourceCount ->
% Executing this move would create a local imbalance in the DB
true;
TotalTarget > TargetLevel ->
% The candidate has already exceeded the target level
true;
(TotalSource - TotalTarget) < 2 ->
% Donating here is wasted work
true;
true ->
false
end
end,
case {lists:member(Shard, Shards), lists:keymember(Node, 1, Donors)} of
{true, true} ->
case lists:dropwhile(Fun, SortedByCount) of
[{Target, _} | _] ->
NewMoves = [{move, Shard, Target} | Moves],
print({move, Shard, Target}),
Acc0#gacc{
moves = NewMoves,
limit = DC - 1,
donors = update_donors(Node, Donors, NewMoves)
};
[] ->
Acc0
end;
_ ->
Acc0
end;
donate_fold(_Shard, Acc) ->
Acc.
update_donors(Node, Donors, Moves) ->
NewDonors = case orddict:fetch(Node, Donors) of
1 ->
orddict:erase(Node, Donors);
X ->
orddict:store(Node, X-1, Donors)
end,
case orddict:size(NewDonors) of
0 ->
throw({complete, Moves});
_ ->
NewDonors
end.
get_shard_count(AtomKey, ShardsByNode) when is_atom(AtomKey) ->
length(couch_util:get_value(AtomKey, ShardsByNode, [])).
get_global_shard_count(Node, Counts) when is_atom(Node) ->
get_global_shard_count(couch_util:to_binary(Node), Counts);
get_global_shard_count(Node, Counts) when is_binary(Node) ->
couch_util:get_value(Node, Counts, 0).
compute_moves(IdealZoning, IdealZoning, _Copies, OtherMoves) ->
OtherMoves;
compute_moves(IdealZoning, ActualZoning, Copies, OtherMoves) ->
{Donor, Recipient} = find_donor_and_recipient(IdealZoning, ActualZoning),
pair_up(Donor, Recipient, Copies, OtherMoves).
find_donor_and_recipient(IdealZoning, ActualZoning) ->
lists:foldl(fun({Zone, IdealCopies}, {D,R}) ->
case couch_util:get_value(Zone, ActualZoning, 0) of
Actual when Actual < IdealCopies ->
{D, Zone};
Actual when Actual > IdealCopies ->
{Zone, R};
_ ->
{D, R}
end
end, {nil, nil}, IdealZoning).
pair_up(_, nil, _Copies, Moves) ->
Moves;
pair_up(nil, Recipient, Copies, Moves) ->
% We've got an insufficient replica level -- a recipient but no donor
Candidate = hd(Copies),
TargetNode = choose_node_in_target_zone(Candidate, Recipient, Moves),
print({copy, Candidate, TargetNode}),
[{copy, Candidate, TargetNode}|Moves];
pair_up(Donor, Recipient, Copies, Moves) ->
Candidate = hd(lists:filter(fun(#shard{node = Node}) ->
mem3:node_info(Node, <<"zone">>) =:= Donor
end, Copies)),
TargetNode = choose_node_in_target_zone(Candidate, Recipient, Moves),
print({move, Candidate, TargetNode}),
[{move, Candidate, TargetNode}|Moves].
choose_node_in_target_zone(#shard{dbname = DbName} = Candidate, Take, Moves) ->
TargetNodes = allowed_nodes(fun(Zone) -> Zone =:= Take end),
CurrentShards = apply_shard_moves(mem3:shards(DbName), Moves),
ByTargetNode = shards_by_node(CurrentShards, TargetNodes),
InZone = filter_map_by_zone(ByTargetNode, Take),
{TargetNode, _} = find_home(Candidate, InZone, Moves),
TargetNode.
-spec find_homes([#shard{}], [{node(), [#shard{}]}], [{atom(), #shard{}, node()}]) ->
[{atom(), #shard{}, node()}].
find_homes([], _ShardsByTargetNode, Result) ->
Result;
find_homes([#shard{node = Node0} = Shard | Rest], ShardsByNode, PrevMoves) ->
InZone = filter_map_by_zone(ShardsByNode, mem3:node_info(Node0, <<"zone">>)),
{TargetNode, NewMap} = find_home(Shard, InZone, PrevMoves),
print({move, Shard, TargetNode}),
MergedMap = orddict:merge(fun(_, V1, _) -> V1 end, NewMap, ShardsByNode),
find_homes(Rest, MergedMap, [{move, Shard, TargetNode} | PrevMoves]).
find_home(Shard, ShardsByNode, PrevMoves) ->
SortedByCount = lists:sort(smallest_first(PrevMoves), ShardsByNode),
% Ensure that the target node is not already an owner of this range
[{TargetNode, _} | _] = lists:dropwhile(fun({_Node, Shards}) ->
lists:keymember(Shard#shard.range, #shard.range, Shards)
end, SortedByCount),
NewMap = orddict:append(TargetNode, Shard#shard{node=TargetNode}, ShardsByNode),
{TargetNode, NewMap}.
rebalance2(_TargetLevel, Shards, _Nodes, [], Moves) ->
{Shards, Moves};
rebalance2(TargetLevel, Shards, Nodes, [Node | Rest], Moves) ->
ShardsForNode = [S || S <- Shards, S#shard.node =:= Node],
CurrentLevel = length(ShardsForNode),
case CurrentLevel < TargetLevel of
true ->
case victim(TargetLevel, Shards, Nodes, Node, Moves) of
{ok, Victim} ->
print({move, Victim, Node}),
rebalance2(TargetLevel,
replace(Victim, Victim#shard{node=Node}, Shards),
Nodes, [Node|Rest], [{move, Victim, Node}|Moves]);
false ->
rebalance2(TargetLevel, Shards, Nodes, Rest, Moves)
end;
false ->
rebalance2(TargetLevel, Shards, Nodes, Rest, Moves)
end.
victim(TargetLevel, Shards, Nodes, TargetNode, Moves) ->
% Build a map of shards owned by nodes in the target zone.
TargetZone = mem3:node_info(TargetNode, <<"zone">>),
ShardsByNode0 = filter_map_by_zone(shards_by_node(Shards, Nodes), TargetZone),
% Filter nodes that would drop below target level (including TargetNode).
ShardsByNode1 = [{N, SS} || {N, SS} <- ShardsByNode0, length(SS) > TargetLevel],
% Prefer to take from a node with more shards than others.
ShardsByNode2 = lists:sort(largest_first(Moves), ShardsByNode1),
% Don't take a shard for a range already hosted by the target.
TargetRanges = [S#shard.range || S <- Shards, S#shard.node =:= TargetNode],
ShardsByNode3 = lists:map(fun({N, SS}) ->
{N, [S || S <- SS, not lists:member(S#shard.range, TargetRanges)]}
end, ShardsByNode2),
% Find the first node that still owns a candidate shard.
case lists:dropwhile(fun({_, SS}) -> SS =:= [] end, ShardsByNode3) of
[] ->
false;
[{_SourceNode, [Victim | _OtherShards]} | _] ->
{ok, Victim}
end.
largest_first(PrevMoves) ->
% use the global shard count on each node to break the tie
Global = shard_count_by_node(PrevMoves),
fun(A, B) -> sort_by_count(A, B, Global) >= 0 end.
smallest_first(PrevMoves) ->
% use the global shard count on each node to break the tie
Global = shard_count_by_node(PrevMoves),
fun(A, B) -> sort_by_count(A, B, Global) =< 0 end.
sort_by_count({NodeA, SA}, {NodeB, SB}, Global) when length(SA) =:= length(SB) ->
CountA = get_global_shard_count(NodeA, Global),
CountB = get_global_shard_count(NodeB, Global),
cmp(CountA, CountB);
sort_by_count({_, A}, {_, B}, _) ->
cmp(length(A), length(B)).
cmp(A, B) when A < B ->
-1;
cmp(A, B) when A > B ->
1;
cmp(_, _) ->
0.
replace(A, B, List) ->
replace(A, B, List, []).
replace(_A, _B, [], Acc) ->
Acc;
replace(A, B, [A | Rest], Acc) ->
replace(A, B, Rest, [B | Acc]);
replace(A, B, [C | Rest], Acc) ->
replace(A, B, Rest, [C | Acc]).
%% @doc Takes a list of copy/move operations and applies them to the current
%% set of shards. Any moves that reference a shard not in the current set
%% will be ignored.
apply_shard_moves(Shards, []) ->
Shards;
apply_shard_moves(Shards, [{move, Shard, Node}| Rest]) ->
NewShards = replace(Shard, Shard#shard{node = Node}, Shards, []),
apply_shard_moves(NewShards, Rest);
apply_shard_moves(Shards, [{copy, Shard, Node}| Rest]) ->
case lists:member(Shard, Shards) of
true ->
apply_shard_moves([Shard#shard{node = Node} | Shards], Rest);
false ->
apply_shard_moves(Shards, Rest)
end.
allowed_nodes(Fun) ->
lists:filter(fun(Node) ->
Fun(mem3:node_info(Node, <<"zone">>))
end, surviving_nodes()).
surviving_nodes() ->
lists:filter(fun(Node) ->
mem3:node_info(Node, <<"decom">>) =/= true
end, mem3:nodes()).
shards_by_node(Shards, Nodes) ->
% Ensure every target node is present in the orddict
ShardsByNode0 = orddict:from_list([{N,[]} || N <- Nodes]),
lists:foldl(fun(#shard{node = Node} = Shard, Acc) ->
orddict:append(Node, Shard, Acc)
end, ShardsByNode0, Shards).
filter_map_by_zone(ShardsByNode, Zone) ->
Result = orddict:filter(fun(Node, _Shards) ->
mem3:node_info(Node, <<"zone">>) =:= Zone
end, ShardsByNode),
if Result =:= [] ->
erlang:error({empty_zone, Zone});
true ->
Result
end.
shards_by_range(Shards) ->
lists:foldl(fun(#shard{range = Range} = Shard, OD) ->
orddict:append(Range, Shard, OD)
end, orddict:new(), Shards).
computed_zoning(Shards) ->
lists:foldl(fun(#shard{node = Node}, OD) ->
orddict:update_counter(mem3:node_info(Node, <<"zone">>), 1, OD)
end, orddict:new(), Shards).
shard_count_by_node(PrevMoves) ->
Map0 = case erlang:get(shard_count_by_node) of
undefined ->
try shard_count_view() catch _:_ -> [] end;
{T0, Map} ->
case timer:now_diff(os:timestamp(), T0) div 1000 of
Delta when Delta < 5000 ->
Map;
_Else ->
try shard_count_view() catch _:_ -> [] end
end
end,
% Incorporate the operations we've already scheduled into the total counts
lists:foldl(fun
({copy, _, TargetNode}, OD0) ->
orddict:update_counter(couch_util:to_binary(TargetNode), 1, OD0);
({move, #shard{node = SourceNode}, TargetNode}, OD0) ->
OD1 = orddict:update_counter(couch_util:to_binary(SourceNode), -1, OD0),
orddict:update_counter(couch_util:to_binary(TargetNode), 1, OD1)
end, orddict:from_list(Map0), PrevMoves).
shard_count_view() ->
%% TODO rewrite CouchDB's internal view API. Wow!
{ok, Db} = couch_db:open(<<"dbs">>, []),
DDocId = <<"_design/rebalance">>,
Fold = fun view_cb/2,
Args = [{group_level, exact}],
{ok, Map} = couch_mrview:query_view(
Db, DDocId, <<"count_by_node">>, Fold, [], Args),
erlang:put(shard_count_by_node, {os:timestamp(), Map}),
Map.
view_cb({meta, _}, Acc) ->
{ok, Acc};
view_cb({row, Row}, Acc) ->
{key, Node} = lists:keyfind(key, 1, Row),
{value, Count} = lists:keyfind(value, 1, Row),
{ok, [{Node, Count} | Acc]};
view_cb(complete, Acc) ->
{ok, lists:reverse(Acc)}.
print({Op, Shard, TargetNode} = Operation) ->
{match, [SourceId, Cluster]} = re:run(
atom_to_list(Shard#shard.node),
"dbcore@db(?<node>[0-9]+)\.(?<cluster>[a-z0-9]+)\.cloudant.net",
[{capture, all_but_first, binary}]
),
{match, [TargetId, Cluster]} = re:run(
atom_to_list(TargetNode),
"dbcore@db(?<node>[0-9]+)\.(?<cluster>[a-z0-9]+)\.cloudant.net",
[{capture, all_but_first, binary}]
),
{match, [Range, Account, DbName]} = re:run(
Shard#shard.name,
"shards/(?<range>[0-9a-f\-]+)/(?<account>.+)/(?<dbname>[a-z\\_][a-z0-9\\_\\$()\\+\\-\\/]+)\.[0-9]{8}",
[{capture, all_but_first, binary}]
),
OpName = case Op of move -> move2; _ -> Op end,
case get(fd) of
undefined ->
io:format("clou shard ~s ~s ~s ~s ~s ~s ~s~n", [OpName,
Cluster, Account, DbName, Range, SourceId, TargetId]);
FD ->
io:format(FD, "clou shard ~s ~s ~s ~s ~s ~s ~s~n", [OpName,
Cluster, Account, DbName, Range, SourceId, TargetId])
end,
Operation;
print(Operations) when is_list(Operations) ->
[print(Operation) || Operation <- Operations].
apply_to_cluster(UserFun, Limit) ->
try mem3_shards:fold(cluster_fold_fun(UserFun, Limit), {nil, []}) of
{_LastDb, Moves} ->
Moves
catch
{complete, Moves} ->
Moves
end.
cluster_fold_fun(UserFun, Limit) ->
fun
(#shard{dbname = DbName}, {DbName, PrevMoves}) ->
{DbName, PrevMoves};
(#shard{dbname = DbName}, {_PrevName, PrevMoves}) ->
Moves = UserFun(DbName, PrevMoves),
check_limit(Moves, Limit),
{DbName, Moves}
end.
check_limit(Moves, Limit) when length(Moves) >= Limit ->
throw({complete, Moves});
check_limit(_, _) ->
ok.