blob: e3e81dc1163f95772385c0c7dee5027fc84d5539 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.dht.tokenallocator;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.locator.TokenMetadata.Topology;
public class TokenAllocation
{
public static final double WARN_STDEV_GROWTH = 0.05;
private static final Logger logger = LoggerFactory.getLogger(TokenAllocation.class);
final TokenMetadata tokenMetadata;
final AbstractReplicationStrategy replicationStrategy;
final int numTokens;
final Map<String, Map<String, StrategyAdapter>> strategyByRackDc = new HashMap<>();
private TokenAllocation(TokenMetadata tokenMetadata, AbstractReplicationStrategy replicationStrategy, int numTokens)
{
this.tokenMetadata = tokenMetadata.cloneOnlyTokenMap();
this.replicationStrategy = replicationStrategy;
this.numTokens = numTokens;
}
public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
final AbstractReplicationStrategy rs,
final InetAddressAndPort endpoint,
int numTokens)
{
return create(tokenMetadata, rs, numTokens).allocate(endpoint);
}
public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
final int replicas,
final InetAddressAndPort endpoint,
int numTokens)
{
return create(DatabaseDescriptor.getEndpointSnitch(), tokenMetadata, replicas, numTokens).allocate(endpoint);
}
static TokenAllocation create(IEndpointSnitch snitch, TokenMetadata tokenMetadata, int replicas, int numTokens)
{
// We create a fake NTS replication strategy with the specified RF in the local DC
HashMap<String, String> options = new HashMap<>();
options.put(snitch.getLocalDatacenter(), Integer.toString(replicas));
NetworkTopologyStrategy fakeReplicationStrategy = new NetworkTopologyStrategy(null, tokenMetadata, snitch, options);
TokenAllocation allocator = new TokenAllocation(tokenMetadata, fakeReplicationStrategy, numTokens);
return allocator;
}
static TokenAllocation create(TokenMetadata tokenMetadata, AbstractReplicationStrategy rs, int numTokens)
{
return new TokenAllocation(tokenMetadata, rs, numTokens);
}
Collection<Token> allocate(InetAddressAndPort endpoint)
{
StrategyAdapter strategy = getOrCreateStrategy(endpoint);
Collection<Token> tokens = strategy.createAllocator().addUnit(endpoint, numTokens);
tokens = strategy.adjustForCrossDatacenterClashes(tokens);
SummaryStatistics os = strategy.replicatedOwnershipStats();
tokenMetadata.updateNormalTokens(tokens, endpoint);
SummaryStatistics ns = strategy.replicatedOwnershipStats();
logger.info("Selected tokens {}", tokens);
logger.debug("Replicated node load in datacenter before allocation {}", statToString(os));
logger.debug("Replicated node load in datacenter after allocation {}", statToString(ns));
double stdDevGrowth = ns.getStandardDeviation() - os.getStandardDeviation();
if (stdDevGrowth > TokenAllocation.WARN_STDEV_GROWTH)
{
logger.warn(String.format("Growth of %.2f%% in token ownership standard deviation after allocation above warning threshold of %d%%",
stdDevGrowth * 100, (int)(TokenAllocation.WARN_STDEV_GROWTH * 100)));
}
return tokens;
}
static String statToString(SummaryStatistics stat)
{
return String.format("max %.2f min %.2f stddev %.4f", stat.getMax() / stat.getMean(), stat.getMin() / stat.getMean(), stat.getStandardDeviation());
}
SummaryStatistics getAllocationRingOwnership(String datacenter, String rack)
{
return getOrCreateStrategy(datacenter, rack).replicatedOwnershipStats();
}
SummaryStatistics getAllocationRingOwnership(InetAddressAndPort endpoint)
{
return getOrCreateStrategy(endpoint).replicatedOwnershipStats();
}
abstract class StrategyAdapter implements ReplicationStrategy<InetAddressAndPort>
{
// return true iff the provided endpoint occurs in the same virtual token-ring we are allocating for
// i.e. the set of the nodes that share ownership with the node we are allocating
// alternatively: return false if the endpoint's ownership is independent of the node we are allocating tokens for
abstract boolean inAllocationRing(InetAddressAndPort other);
final TokenAllocator<InetAddressAndPort> createAllocator()
{
NavigableMap<Token, InetAddressAndPort> sortedTokens = new TreeMap<>();
for (Map.Entry<Token, InetAddressAndPort> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet())
{
if (inAllocationRing(en.getValue()))
sortedTokens.put(en.getKey(), en.getValue());
}
return TokenAllocatorFactory.createTokenAllocator(sortedTokens, this, tokenMetadata.partitioner);
}
final Collection<Token> adjustForCrossDatacenterClashes(Collection<Token> tokens)
{
List<Token> filtered = Lists.newArrayListWithCapacity(tokens.size());
for (Token t : tokens)
{
while (tokenMetadata.getEndpoint(t) != null)
{
InetAddressAndPort other = tokenMetadata.getEndpoint(t);
if (inAllocationRing(other))
throw new ConfigurationException(String.format("Allocated token %s already assigned to node %s. Is another node also allocating tokens?", t, other));
t = t.increaseSlightly();
}
filtered.add(t);
}
return filtered;
}
final SummaryStatistics replicatedOwnershipStats()
{
SummaryStatistics stat = new SummaryStatistics();
for (Map.Entry<InetAddressAndPort, Double> en : evaluateReplicatedOwnership().entrySet())
{
// Filter only in the same allocation ring
if (inAllocationRing(en.getKey()))
stat.addValue(en.getValue() / tokenMetadata.getTokens(en.getKey()).size());
}
return stat;
}
// return the ratio of ownership for each endpoint
private Map<InetAddressAndPort, Double> evaluateReplicatedOwnership()
{
Map<InetAddressAndPort, Double> ownership = Maps.newHashMap();
List<Token> sortedTokens = tokenMetadata.sortedTokens();
if (sortedTokens.isEmpty())
return ownership;
Iterator<Token> it = sortedTokens.iterator();
Token current = it.next();
while (it.hasNext())
{
Token next = it.next();
addOwnership(current, next, ownership);
current = next;
}
addOwnership(current, sortedTokens.get(0), ownership);
return ownership;
}
private void addOwnership(Token current, Token next, Map<InetAddressAndPort, Double> ownership)
{
double size = current.size(next);
Token representative = current.getPartitioner().midpoint(current, next);
for (InetAddressAndPort n : replicationStrategy.calculateNaturalReplicas(representative, tokenMetadata).endpoints())
{
Double v = ownership.get(n);
ownership.put(n, v != null ? v + size : size);
}
}
}
private StrategyAdapter getOrCreateStrategy(InetAddressAndPort endpoint)
{
String dc = replicationStrategy.snitch.getDatacenter(endpoint);
String rack = replicationStrategy.snitch.getRack(endpoint);
return getOrCreateStrategy(dc, rack);
}
private StrategyAdapter getOrCreateStrategy(String dc, String rack)
{
return strategyByRackDc.computeIfAbsent(dc, k -> new HashMap<>()).computeIfAbsent(rack, k -> createStrategy(dc, rack));
}
private StrategyAdapter createStrategy(String dc, String rack)
{
if (replicationStrategy instanceof NetworkTopologyStrategy)
return createStrategy(tokenMetadata, (NetworkTopologyStrategy) replicationStrategy, dc, rack);
if (replicationStrategy instanceof SimpleStrategy)
return createStrategy((SimpleStrategy) replicationStrategy);
throw new ConfigurationException("Token allocation does not support replication strategy " + replicationStrategy.getClass().getSimpleName());
}
private StrategyAdapter createStrategy(final SimpleStrategy rs)
{
return createStrategy(rs.snitch, null, null, rs.getReplicationFactor().allReplicas, false);
}
private StrategyAdapter createStrategy(TokenMetadata tokenMetadata, NetworkTopologyStrategy strategy, String dc, String rack)
{
int replicas = strategy.getReplicationFactor(dc).allReplicas;
Topology topology = tokenMetadata.getTopology();
// if topology hasn't been setup yet for this dc+rack then treat it as a separate unit
int racks = topology.getDatacenterRacks().get(dc) != null && topology.getDatacenterRacks().get(dc).containsKey(rack)
? topology.getDatacenterRacks().get(dc).asMap().size()
: 1;
if (replicas <= 1)
{
// each node is treated as separate and replicates once
return createStrategy(strategy.snitch, dc, null, 1, false);
}
else if (racks == replicas)
{
// each node is treated as separate and replicates once, with separate allocation rings for each rack
return createStrategy(strategy.snitch, dc, rack, 1, false);
}
else if (racks > replicas)
{
// group by rack
return createStrategy(strategy.snitch, dc, null, replicas, true);
}
else if (racks == 1)
{
return createStrategy(strategy.snitch, dc, null, replicas, false);
}
throw new ConfigurationException(String.format("Token allocation failed: the number of racks %d in datacenter %s is lower than its replication factor %d.",
racks, dc, replicas));
}
// a null dc will always return true for inAllocationRing(..)
// a null rack will return true for inAllocationRing(..) for all nodes in the same dc
private StrategyAdapter createStrategy(IEndpointSnitch snitch, String dc, String rack, int replicas, boolean groupByRack)
{
return new StrategyAdapter()
{
@Override
public int replicas()
{
return replicas;
}
@Override
public Object getGroup(InetAddressAndPort unit)
{
return groupByRack ? snitch.getRack(unit) : unit;
}
@Override
public boolean inAllocationRing(InetAddressAndPort other)
{
return (dc == null || dc.equals(snitch.getDatacenter(other))) && (rack == null || rack.equals(snitch.getRack(other)));
}
};
}
}