blob: ac372e482794f0654f4d3f6584e21324bf15eed5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.dht.tokenallocator;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import com.google.common.collect.Lists;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RackInferringSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TokenAllocationTest
{
static IPartitioner oldPartitioner;
static Random rand = new Random(1);
@BeforeClass
public static void setup() throws ConfigurationException
{
DatabaseDescriptor.daemonInitialization();
oldPartitioner = StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
SchemaLoader.startGossiper();
SchemaLoader.prepareServer();
SchemaLoader.schemaDefinition("TokenAllocationTest");
}
@AfterClass
public static void tearDown()
{
DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner);
}
private static TokenAllocation createForTest(TokenMetadata tokenMetadata, int replicas, int numTokens)
{
return TokenAllocation.create(DatabaseDescriptor.getEndpointSnitch(), tokenMetadata, replicas, numTokens);
}
@Test
public void testAllocateTokensForKeyspace() throws UnknownHostException
{
int vn = 16;
String ks = "TokenAllocationTestKeyspace3";
TokenMetadata tm = new TokenMetadata();
generateFakeEndpoints(tm, 10, vn);
InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
allocateTokensForKeyspace(vn, ks, tm, addr);
}
@Test
public void testAllocateTokensForLocalRF() throws UnknownHostException
{
int vn = 16;
int allocateTokensForLocalRf = 3;
TokenMetadata tm = new TokenMetadata();
generateFakeEndpoints(tm, 10, vn);
InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
allocateTokensForLocalReplicationFactor(vn, allocateTokensForLocalRf, tm, addr);
}
private Collection<Token> allocateTokensForKeyspace(int vnodes, String keyspace, TokenMetadata tm, InetAddressAndPort addr)
{
AbstractReplicationStrategy rs = Keyspace.open(keyspace).getReplicationStrategy();
TokenAllocation tokenAllocation = TokenAllocation.create(tm, rs, vnodes);
return allocateAndVerify(vnodes, addr, tokenAllocation);
}
private void allocateTokensForLocalReplicationFactor(int vnodes, int rf, TokenMetadata tm, InetAddressAndPort addr)
{
TokenAllocation tokenAllocation = createForTest(tm, rf, vnodes);
allocateAndVerify(vnodes, addr, tokenAllocation);
}
private Collection<Token> allocateAndVerify(int vnodes, InetAddressAndPort addr, TokenAllocation tokenAllocation)
{
SummaryStatistics os = tokenAllocation.getAllocationRingOwnership(addr);
Collection<Token> tokens = tokenAllocation.allocate(addr);
assertEquals(vnodes, tokens.size());
SummaryStatistics ns = tokenAllocation.getAllocationRingOwnership(addr);
if (ns.getStandardDeviation() > os.getStandardDeviation())
{
fail(String.format("Token allocation unexpectedly increased standard deviation.\nStats before:\n%s\nStats after:\n%s", os, ns));
}
return tokens;
}
@Test
public void testAllocateTokensNetworkStrategyOneRack() throws UnknownHostException
{
testAllocateTokensNetworkStrategy(1, 3);
}
@Test(expected = ConfigurationException.class)
public void testAllocateTokensNetworkStrategyTwoRacks() throws UnknownHostException
{
testAllocateTokensNetworkStrategy(2, 3);
}
@Test
public void testAllocateTokensNetworkStrategyThreeRacks() throws UnknownHostException
{
testAllocateTokensNetworkStrategy(3, 3);
}
@Test
public void testAllocateTokensNetworkStrategyFiveRacks() throws UnknownHostException
{
testAllocateTokensNetworkStrategy(5, 3);
}
@Test
public void testAllocateTokensNetworkStrategyOneRackOneReplica() throws UnknownHostException
{
testAllocateTokensNetworkStrategy(1, 1);
}
public void testAllocateTokensNetworkStrategy(int rackCount, int replicas) throws UnknownHostException
{
IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
try
{
DatabaseDescriptor.setEndpointSnitch(new RackInferringSnitch());
int vn = 16;
String ks = "TokenAllocationTestNTSKeyspace" + rackCount + replicas;
String dc = "1";
// Register peers with expected DC for NetworkTopologyStrategy.
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
metadata.clearUnsafe();
metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.99"));
metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.15.0.99"));
SchemaLoader.createKeyspace(ks, KeyspaceParams.nts(dc, replicas, "15", 15), SchemaLoader.standardCFMD(ks, "Standard1"));
TokenMetadata tm = StorageService.instance.getTokenMetadata();
tm.clearUnsafe();
for (int i = 0; i < rackCount; ++i)
generateFakeEndpoints(tm, 10, vn, dc, Integer.toString(i));
InetAddressAndPort addr = InetAddressAndPort.getByName("127." + dc + ".0.99");
allocateTokensForKeyspace(vn, ks, tm, addr);
// Note: Not matching replication factor in second datacentre, but this should not affect us.
} finally {
DatabaseDescriptor.setEndpointSnitch(oldSnitch);
}
}
@Test
public void testAllocateTokensRfEqRacks() throws UnknownHostException
{
IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
try
{
DatabaseDescriptor.setEndpointSnitch(new RackInferringSnitch());
int vn = 8;
int replicas = 3;
int rackCount = replicas;
String ks = "TokenAllocationTestNTSKeyspaceRfEqRacks";
String dc = "1";
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
metadata.clearUnsafe();
metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.99"));
metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.15.0.99"));
SchemaLoader.createKeyspace(ks, KeyspaceParams.nts(dc, replicas, "15", 15), SchemaLoader.standardCFMD(ks, "Standard1"));
int base = 5;
for (int i = 0; i < rackCount; ++i)
generateFakeEndpoints(metadata, base << i, vn, dc, Integer.toString(i)); // unbalanced racks
int cnt = 5;
for (int i = 0; i < cnt; ++i)
{
InetAddressAndPort endpoint = InetAddressAndPort.getByName("127." + dc + ".0." + (99 + i));
Collection<Token> tokens = allocateTokensForKeyspace(vn, ks, metadata, endpoint);
metadata.updateNormalTokens(tokens, endpoint);
}
double target = 1.0 / (base + cnt);
double permittedOver = 1.0 / (2 * vn + 1) + 0.01;
Map<InetAddress, Float> ownership = StorageService.instance.effectiveOwnership(ks);
boolean failed = false;
for (Map.Entry<InetAddress, Float> o : ownership.entrySet())
{
int rack = o.getKey().getAddress()[2];
if (rack != 0)
continue;
System.out.format("Node %s owns %f ratio to optimal %.2f\n", o.getKey(), o.getValue(), o.getValue() / target);
if (o.getValue()/target > 1 + permittedOver)
failed = true;
}
Assert.assertFalse(String.format("One of the nodes in the rack has over %.2f%% overutilization.", permittedOver * 100), failed);
} finally {
DatabaseDescriptor.setEndpointSnitch(oldSnitch);
}
}
/**
* TODO: This scenario isn't supported very well. Investigate a multi-keyspace version of the algorithm.
*/
@Test
public void testAllocateTokensMultipleKeyspaces() throws UnknownHostException
{
final int TOKENS = 16;
TokenMetadata tokenMetadata = new TokenMetadata();
generateFakeEndpoints(tokenMetadata, 10, TOKENS);
// Do not clone token metadata so tokens allocated by different allocators are reflected on the parent TokenMetadata
TokenAllocation rf2Allocator = createForTest(tokenMetadata, 2, TOKENS);
TokenAllocation rf3Allocator = createForTest(tokenMetadata, 3, TOKENS);
SummaryStatistics rf2StatsBefore = rf2Allocator.getAllocationRingOwnership(FBUtilities.getBroadcastAddressAndPort());
SummaryStatistics rf3StatsBefore = rf3Allocator.getAllocationRingOwnership(FBUtilities.getBroadcastAddressAndPort());
TokenAllocation current = rf3Allocator;
TokenAllocation next = rf2Allocator;
for (int i=11; i<=20; ++i)
{
InetAddressAndPort endpoint = InetAddressAndPort.getByName("127.0.0." + (i + 1));
Collection<Token> tokens = current.allocate(endpoint);
// Update tokens on next to verify ownership calculation below
next.tokenMetadata.updateNormalTokens(tokens, endpoint);
TokenAllocation tmp = current;
current = next;
next = tmp;
}
SummaryStatistics rf2StatsAfter = rf2Allocator.getAllocationRingOwnership(FBUtilities.getBroadcastAddressAndPort());
SummaryStatistics rf3StatsAfter = rf3Allocator.getAllocationRingOwnership(FBUtilities.getBroadcastAddressAndPort());
verifyImprovement(rf2StatsBefore, rf2StatsAfter);
verifyImprovement(rf3StatsBefore, rf3StatsAfter);
}
private void verifyImprovement(SummaryStatistics os, SummaryStatistics ns)
{
if (ns.getStandardDeviation() > os.getStandardDeviation())
{
fail(String.format("Token allocation unexpectedly increased standard deviation.\nStats before:\n%s\nStats after:\n%s", os, ns));
}
}
private void generateFakeEndpoints(TokenMetadata tmd, int numOldNodes, int numVNodes) throws UnknownHostException
{
tmd.clearUnsafe();
generateFakeEndpoints(tmd, numOldNodes, numVNodes, "0", "0");
}
private void generateFakeEndpoints(TokenMetadata tmd, int nodes, int vnodes, String dc, String rack) throws UnknownHostException
{
System.out.printf("Adding %d nodes to dc=%s, rack=%s.%n", nodes, dc, rack);
IPartitioner p = tmd.partitioner;
for (int i = 1; i <= nodes; i++)
{
// leave .1 for myEndpoint
InetAddressAndPort addr = InetAddressAndPort.getByName("127." + dc + '.' + rack + '.' + (i + 1));
List<Token> tokens = Lists.newArrayListWithCapacity(vnodes);
for (int j = 0; j < vnodes; ++j)
tokens.add(p.getRandomToken(rand));
tmd.updateNormalTokens(tokens, addr);
}
}
}