blob: 87dba591293514fb4fd6cb6fa1b65a4bb8626eec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.dht.tokenallocator;
import java.util.*;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
/**
* A Replication Aware allocator for tokens, that attempts to ensure an even distribution of ownership across
* the known cluster for the provided replication strategy.
*
* A unit is shorthand for a "unit of ownership" which translates roughly to a node, or a disk on the node,
* a CPU on the node, or some other relevant unit of ownership. These units should be the lowest rung over which
* ownership needs to be evenly distributed. At the moment only nodes as a whole are treated as units, but that
* will change with the introduction of token ranges per disk.
*/
class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
{
final Multimap<Unit, Token> unitToTokens;
final int replicas;
ReplicationAwareTokenAllocator(NavigableMap<Token, Unit> sortedTokens, ReplicationStrategy<Unit> strategy, IPartitioner partitioner)
{
super(sortedTokens, strategy, partitioner);
unitToTokens = HashMultimap.create();
for (Map.Entry<Token, Unit> en : sortedTokens.entrySet())
unitToTokens.put(en.getValue(), en.getKey());
this.replicas = strategy.replicas();
}
public int getReplicas()
{
return replicas;
}
public Collection<Token> addUnit(Unit newUnit, int numTokens)
{
assert !unitToTokens.containsKey(newUnit);
if (unitCount() < replicas)
// Allocation does not matter; everything replicates everywhere.
return generateRandomTokens(newUnit, numTokens);
if (numTokens > sortedTokens.size())
// Some of the heuristics below can't deal with this case. Use random for now, later allocations can fix any problems this may cause.
return generateRandomTokens(newUnit, numTokens);
// ============= construct our initial token ring state =============
double optTokenOwnership = optimalTokenOwnership(numTokens);
Map<Object, GroupInfo> groups = Maps.newHashMap();
Map<Unit, UnitInfo<Unit>> unitInfos = createUnitInfos(groups);
if (groups.size() < replicas)
{
// We need at least replicas groups to do allocation correctly. If there aren't enough,
// use random allocation.
// This part of the code should only be reached via the RATATest. StrategyAdapter should disallow
// token allocation in this case as the algorithm is not able to cover the behavior of NetworkTopologyStrategy.
return generateRandomTokens(newUnit, numTokens);
}
// initialise our new unit's state (with an idealised ownership)
// strategy must already know about this unit
UnitInfo<Unit> newUnitInfo = new UnitInfo<>(newUnit, numTokens * optTokenOwnership, groups, strategy);
// build the current token ring state
TokenInfo<Unit> tokens = createTokenInfos(unitInfos, newUnitInfo.group);
newUnitInfo.tokenCount = numTokens;
// ============= construct and rank our candidate token allocations =============
// walk the token ring, constructing the set of candidates in ring order
// as the midpoints between all existing tokens
CandidateInfo<Unit> candidates = createCandidates(tokens, newUnitInfo, optTokenOwnership);
// Evaluate the expected improvements from all candidates and form a priority queue.
PriorityQueue<Weighted<CandidateInfo<Unit>>> improvements = new PriorityQueue<>(sortedTokens.size());
CandidateInfo<Unit> candidate = candidates;
do
{
double impr = evaluateImprovement(candidate, optTokenOwnership, 1.0 / numTokens);
improvements.add(new Weighted<>(impr, candidate));
candidate = candidate.next;
} while (candidate != candidates);
// ============= iteratively take the best candidate, and re-rank =============
CandidateInfo<Unit> bestToken = improvements.remove().value;
for (int vn = 1; ; ++vn)
{
candidates = bestToken.removeFrom(candidates);
confirmCandidate(bestToken);
if (vn == numTokens)
break;
while (true)
{
// Get the next candidate in the queue. Its improvement may have changed (esp. if multiple tokens
// were good suggestions because they could improve the same problem)-- evaluate it again to check
// if it is still a good candidate.
bestToken = improvements.remove().value;
double impr = evaluateImprovement(bestToken, optTokenOwnership, (vn + 1.0) / numTokens);
Weighted<CandidateInfo<Unit>> next = improvements.peek();
// If it is better than the next in the queue, it is good enough. This is a heuristic that doesn't
// get the best results, but works well enough and on average cuts search time by a factor of O(vnodes).
if (next == null || impr >= next.weight)
break;
improvements.add(new Weighted<>(impr, bestToken));
}
}
return ImmutableList.copyOf(unitToTokens.get(newUnit));
}
private Collection<Token> generateRandomTokens(Unit newUnit, int numTokens)
{
Set<Token> tokens = new HashSet<>(numTokens);
while (tokens.size() < numTokens)
{
Token token = partitioner.getRandomToken();
if (!sortedTokens.containsKey(token))
{
tokens.add(token);
sortedTokens.put(token, newUnit);
unitToTokens.put(newUnit, token);
}
}
return tokens;
}
/**
* Construct the token ring as a CircularList of TokenInfo,
* and populate the ownership of the UnitInfo's provided
*/
private TokenInfo<Unit> createTokenInfos(Map<Unit, UnitInfo<Unit>> units, GroupInfo newUnitGroup)
{
// build the circular list
TokenInfo<Unit> prev = null;
TokenInfo<Unit> first = null;
for (Map.Entry<Token, Unit> en : sortedTokens.entrySet())
{
Token t = en.getKey();
UnitInfo<Unit> ni = units.get(en.getValue());
TokenInfo<Unit> ti = new TokenInfo<>(t, ni);
first = ti.insertAfter(first, prev);
prev = ti;
}
TokenInfo<Unit> curr = first;
do
{
populateTokenInfoAndAdjustUnit(curr, newUnitGroup);
curr = curr.next;
} while (curr != first);
return first;
}
private CandidateInfo<Unit> createCandidates(TokenInfo<Unit> tokens, UnitInfo<Unit> newUnitInfo, double initialTokenOwnership)
{
TokenInfo<Unit> curr = tokens;
CandidateInfo<Unit> first = null;
CandidateInfo<Unit> prev = null;
do
{
CandidateInfo<Unit> candidate = new CandidateInfo<Unit>(partitioner.midpoint(curr.prev.token, curr.token), curr, newUnitInfo);
first = candidate.insertAfter(first, prev);
candidate.replicatedOwnership = initialTokenOwnership;
populateCandidate(candidate);
prev = candidate;
curr = curr.next;
} while (curr != tokens);
prev.next = first;
return first;
}
private void populateCandidate(CandidateInfo<Unit> candidate)
{
// Only finding replication start would do.
populateTokenInfo(candidate, candidate.owningUnit.group);
}
/**
* Incorporates the selected candidate into the ring, adjusting ownership information and calculated token
* information.
*/
private void confirmCandidate(CandidateInfo<Unit> candidate)
{
// This process is less efficient than it could be (loops through each vnode's replication span instead
// of recalculating replicationStart, replicationThreshold from existing data + new token data in an O(1)
// case analysis similar to evaluateImprovement). This is fine as the method does not dominate processing
// time.
// Put the accepted candidate in the token list.
UnitInfo<Unit> newUnit = candidate.owningUnit;
Token newToken = candidate.token;
sortedTokens.put(newToken, newUnit.unit);
unitToTokens.put(newUnit.unit, newToken);
TokenInfo<Unit> prev = candidate.prevInRing();
TokenInfo<Unit> newTokenInfo = new TokenInfo<>(newToken, newUnit);
newTokenInfo.replicatedOwnership = candidate.replicatedOwnership;
newTokenInfo.insertAfter(prev, prev); // List is not empty so this won't need to change head of list.
// Update data for candidate.
populateTokenInfoAndAdjustUnit(newTokenInfo, newUnit.group);
ReplicationVisitor replicationVisitor = new ReplicationVisitor();
assert newTokenInfo.next == candidate.split;
for (TokenInfo<Unit> curr = newTokenInfo.next; !replicationVisitor.visitedAll(); curr = curr.next)
{
// update the candidate between curr and next
candidate = candidate.next;
populateCandidate(candidate);
if (!replicationVisitor.add(curr.owningUnit.group))
continue; // If we've already seen this group, the token cannot be affected.
populateTokenInfoAndAdjustUnit(curr, newUnit.group);
}
replicationVisitor.clean();
}
/**
* Calculates the {@code replicationStart} of a token, as well as {@code replicationThreshold} which is chosen in a way
* that permits {@code findUpdatedReplicationStart} to quickly identify changes in ownership.
*/
private Token populateTokenInfo(BaseTokenInfo<Unit, ?> token, GroupInfo newUnitGroup)
{
GroupInfo tokenGroup = token.owningUnit.group;
PopulateVisitor visitor = new PopulateVisitor();
// Replication start = the end of a token from the RF'th different group seen before the token.
Token replicationStart;
// The end of a token from the RF-1'th different group seen before the token.
Token replicationThreshold = token.token;
GroupInfo currGroup;
for (TokenInfo<Unit> curr = token.prevInRing(); ; curr = curr.prev)
{
replicationStart = curr.token;
currGroup = curr.owningUnit.group;
if (!visitor.add(currGroup))
continue; // Group is already seen.
if (visitor.visitedAll())
break;
replicationThreshold = replicationStart;
// Another instance of the same group precedes us in the replication range of the ring,
// so this is where our replication range begins
if (currGroup == tokenGroup)
break;
}
if (newUnitGroup == tokenGroup)
// new token is always a boundary (as long as it's closer than replicationStart)
replicationThreshold = token.token;
else if (newUnitGroup != currGroup && visitor.seen(newUnitGroup))
// already has new group in replication span before last seen. cannot be affected
replicationThreshold = replicationStart;
visitor.clean();
token.replicationThreshold = replicationThreshold;
token.replicationStart = replicationStart;
return replicationStart;
}
private void populateTokenInfoAndAdjustUnit(TokenInfo<Unit> populate, GroupInfo newUnitGroup)
{
Token replicationStart = populateTokenInfo(populate, newUnitGroup);
double newOwnership = replicationStart.size(populate.token);
double oldOwnership = populate.replicatedOwnership;
populate.replicatedOwnership = newOwnership;
populate.owningUnit.ownership += newOwnership - oldOwnership;
}
/**
* Evaluates the improvement in variance for both units and individual tokens when candidate is inserted into the
* ring.
*/
private double evaluateImprovement(CandidateInfo<Unit> candidate, double optTokenOwnership, double newUnitMult)
{
double tokenChange = 0;
UnitInfo<Unit> candidateUnit = candidate.owningUnit;
Token candidateEnd = candidate.token;
// Form a chain of units affected by the insertion to be able to qualify change of unit ownership.
// A unit may be affected more than once.
UnitAdjustmentTracker<Unit> unitTracker = new UnitAdjustmentTracker<>(candidateUnit);
// Reflect change in ownership of the splitting token (candidate).
tokenChange += applyOwnershipAdjustment(candidate, candidateUnit, candidate.replicationStart, candidateEnd, optTokenOwnership, unitTracker);
// Loop through all vnodes that replicate candidate or split and update their ownership.
ReplicationVisitor replicationVisitor = new ReplicationVisitor();
for (TokenInfo<Unit> curr = candidate.split; !replicationVisitor.visitedAll(); curr = curr.next)
{
UnitInfo<Unit> currUnit = curr.owningUnit;
if (!replicationVisitor.add(currUnit.group))
continue; // If this group is already seen, the token cannot be affected.
Token replicationEnd = curr.token;
Token replicationStart = findUpdatedReplicationStart(curr, candidate);
tokenChange += applyOwnershipAdjustment(curr, currUnit, replicationStart, replicationEnd, optTokenOwnership, unitTracker);
}
replicationVisitor.clean();
double nodeChange = unitTracker.calculateUnitChange(newUnitMult, optTokenOwnership);
return -(tokenChange + nodeChange);
}
/**
* Returns the start of the replication span for the token {@code curr} when {@code candidate} is inserted into the
* ring.
*/
private Token findUpdatedReplicationStart(TokenInfo<Unit> curr, CandidateInfo<Unit> candidate)
{
return furtherStartToken(curr.replicationThreshold, candidate.token, curr.token);
}
/**
* Applies the ownership adjustment for the given element, updating tracked unit ownership and returning the change
* of variance.
*/
private double applyOwnershipAdjustment(BaseTokenInfo<Unit, ?> curr, UnitInfo<Unit> currUnit,
Token replicationStart, Token replicationEnd,
double optTokenOwnership, UnitAdjustmentTracker<Unit> unitTracker)
{
double oldOwnership = curr.replicatedOwnership;
double newOwnership = replicationStart.size(replicationEnd);
double tokenCount = currUnit.tokenCount;
assert tokenCount > 0;
unitTracker.add(currUnit, newOwnership - oldOwnership);
return (sq(newOwnership - optTokenOwnership) - sq(oldOwnership - optTokenOwnership)) / sq(tokenCount);
}
/**
* Tracker for unit ownership changes. The changes are tracked by a chain of UnitInfos where the adjustedOwnership
* field is being updated as we see changes in token ownership.
*
* The chain ends with an element that points to itself; this element must be specified as argument to the
* constructor as well as be the first unit with which 'add' is called; when calculating the variance change
* a separate multiplier is applied to it (used to permit more freedom in choosing the first tokens of a unit).
*/
private static class UnitAdjustmentTracker<Unit>
{
UnitInfo<Unit> unitsChain;
UnitAdjustmentTracker(UnitInfo<Unit> newUnit)
{
unitsChain = newUnit;
}
void add(UnitInfo<Unit> currUnit, double diff)
{
if (currUnit.prevUsed == null)
{
assert unitsChain.prevUsed != null || currUnit == unitsChain;
currUnit.adjustedOwnership = currUnit.ownership + diff;
currUnit.prevUsed = unitsChain;
unitsChain = currUnit;
}
else
{
currUnit.adjustedOwnership += diff;
}
}
double calculateUnitChange(double newUnitMult, double optTokenOwnership)
{
double unitChange = 0;
UnitInfo<Unit> unitsChain = this.unitsChain;
// Now loop through the units chain and add the unit-level changes. Also clear the groups' seen marks.
while (true)
{
double newOwnership = unitsChain.adjustedOwnership;
double oldOwnership = unitsChain.ownership;
double tokenCount = unitsChain.tokenCount;
double diff = (sq(newOwnership / tokenCount - optTokenOwnership) - sq(oldOwnership / tokenCount - optTokenOwnership));
UnitInfo<Unit> prev = unitsChain.prevUsed;
unitsChain.prevUsed = null;
if (unitsChain != prev)
unitChange += diff;
else
{
unitChange += diff * newUnitMult;
break;
}
unitsChain = prev;
}
this.unitsChain = unitsChain;
return unitChange;
}
}
/**
* Helper class for marking/unmarking visited a chain of groups
*/
private abstract class GroupVisitor
{
GroupInfo groupChain = GroupInfo.TERMINATOR;
int seen = 0;
abstract GroupInfo prevSeen(GroupInfo group);
abstract void setPrevSeen(GroupInfo group, GroupInfo prevSeen);
// true iff this is the first time we've visited this group
boolean add(GroupInfo group)
{
if (prevSeen(group) != null)
return false;
++seen;
setPrevSeen(group, groupChain);
groupChain = group;
return true;
}
boolean visitedAll()
{
return seen >= replicas;
}
boolean seen(GroupInfo group)
{
return prevSeen(group) != null;
}
// Clean group seen markers.
void clean()
{
GroupInfo groupChain = this.groupChain;
while (groupChain != GroupInfo.TERMINATOR)
{
GroupInfo prev = prevSeen(groupChain);
setPrevSeen(groupChain, null);
groupChain = prev;
}
this.groupChain = GroupInfo.TERMINATOR;
}
}
private class ReplicationVisitor extends GroupVisitor
{
GroupInfo prevSeen(GroupInfo group)
{
return group.prevSeen;
}
void setPrevSeen(GroupInfo group, GroupInfo prevSeen)
{
group.prevSeen = prevSeen;
}
}
private class PopulateVisitor extends GroupVisitor
{
GroupInfo prevSeen(GroupInfo group)
{
return group.prevPopulate;
}
void setPrevSeen(GroupInfo group, GroupInfo prevSeen)
{
group.prevPopulate = prevSeen;
}
}
private double optimalTokenOwnership(int tokensToAdd)
{
return 1.0 * replicas / (sortedTokens.size() + tokensToAdd);
}
/**
* Selects from {@code t1}, {@code t2} the token that forms a bigger range with {@code towards} as the upper bound,
* taking into account wrapping.
* Unlike Token.size(), equality is taken to mean "same as" rather than covering the whole range.
*/
private static Token furtherStartToken(Token t1, Token t2, Token towards)
{
if (t1.equals(towards))
return t2;
if (t2.equals(towards))
return t1;
return t1.size(towards) > t2.size(towards) ? t1 : t2;
}
private static double sq(double d)
{
return d * d;
}
/**
* For testing, remove the given unit preserving correct state of the allocator.
*/
void removeUnit(Unit n)
{
Collection<Token> tokens = unitToTokens.removeAll(n);
sortedTokens.keySet().removeAll(tokens);
}
public int unitCount()
{
return unitToTokens.asMap().size();
}
public String toString()
{
return getClass().getSimpleName();
}
/**
* TokenInfo about candidate new tokens/vnodes.
*/
private static class CandidateInfo<Unit> extends BaseTokenInfo<Unit, CandidateInfo<Unit>>
{
// directly preceding token in the current token ring
final TokenInfo<Unit> split;
public CandidateInfo(Token token, TokenInfo<Unit> split, UnitInfo<Unit> owningUnit)
{
super(token, owningUnit);
this.split = split;
}
TokenInfo<Unit> prevInRing()
{
return split.prev;
}
}
static void dumpTokens(String lead, BaseTokenInfo<?, ?> tokens)
{
BaseTokenInfo<?, ?> token = tokens;
do
{
System.out.format("%s%s: rs %s rt %s size %.2e%n", lead, token, token.replicationStart, token.replicationThreshold, token.replicatedOwnership);
token = token.next;
} while (token != null && token != tokens);
}
}