blob: 48dd573695a9eee54a62c74bf4524433526f0ea6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.locator;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.stream.Collectors;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.TokenMetadata.Topology;
import org.apache.cassandra.service.StorageService;
public class NetworkTopologyStrategyTest
{
private String keyspaceName = "Keyspace1";
private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategyTest.class);
@BeforeClass
public static void setupDD()
{
DatabaseDescriptor.daemonInitialization();
}
@Test
public void testProperties() throws IOException, ConfigurationException
{
IEndpointSnitch snitch = new PropertyFileSnitch();
DatabaseDescriptor.setEndpointSnitch(snitch);
TokenMetadata metadata = new TokenMetadata();
createDummyTokens(metadata, true);
Map<String, String> configOptions = new HashMap<String, String>();
configOptions.put("DC1", "3");
configOptions.put("DC2", "2");
configOptions.put("DC3", "1");
// Set the localhost to the tokenmetadata. Embedded cassandra way?
NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
assert strategy.getReplicationFactor("DC1") == 3;
assert strategy.getReplicationFactor("DC2") == 2;
assert strategy.getReplicationFactor("DC3") == 1;
// Query for the natural hosts
ArrayList<InetAddress> endpoints = strategy.getNaturalEndpoints(new StringToken("123"));
assert 6 == endpoints.size();
assert 6 == new HashSet<InetAddress>(endpoints).size(); // ensure uniqueness
}
@Test
public void testPropertiesWithEmptyDC() throws IOException, ConfigurationException
{
IEndpointSnitch snitch = new PropertyFileSnitch();
DatabaseDescriptor.setEndpointSnitch(snitch);
TokenMetadata metadata = new TokenMetadata();
createDummyTokens(metadata, false);
Map<String, String> configOptions = new HashMap<String, String>();
configOptions.put("DC1", "3");
configOptions.put("DC2", "3");
configOptions.put("DC3", "0");
// Set the localhost to the tokenmetadata. Embedded cassandra way?
NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
assert strategy.getReplicationFactor("DC1") == 3;
assert strategy.getReplicationFactor("DC2") == 3;
assert strategy.getReplicationFactor("DC3") == 0;
// Query for the natural hosts
ArrayList<InetAddress> endpoints = strategy.getNaturalEndpoints(new StringToken("123"));
assert 6 == endpoints.size();
assert 6 == new HashSet<InetAddress>(endpoints).size(); // ensure uniqueness
}
@Test
public void testLargeCluster() throws UnknownHostException, ConfigurationException
{
int[] dcRacks = new int[]{2, 4, 8};
int[] dcEndpoints = new int[]{128, 256, 512};
int[] dcReplication = new int[]{2, 6, 6};
IEndpointSnitch snitch = new RackInferringSnitch();
DatabaseDescriptor.setEndpointSnitch(snitch);
TokenMetadata metadata = new TokenMetadata();
Map<String, String> configOptions = new HashMap<String, String>();
Multimap<InetAddress, Token> tokens = HashMultimap.create();
int totalRF = 0;
for (int dc = 0; dc < dcRacks.length; ++dc)
{
totalRF += dcReplication[dc];
configOptions.put(Integer.toString(dc), Integer.toString(dcReplication[dc]));
for (int rack = 0; rack < dcRacks[dc]; ++rack)
{
for (int ep = 1; ep <= dcEndpoints[dc]/dcRacks[dc]; ++ep)
{
byte[] ipBytes = new byte[]{10, (byte)dc, (byte)rack, (byte)ep};
InetAddress address = InetAddress.getByAddress(ipBytes);
StringToken token = new StringToken(String.format("%02x%02x%02x", ep, rack, dc));
logger.debug("adding node {} at {}", address, token);
tokens.put(address, token);
}
}
}
metadata.updateNormalTokens(tokens);
NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"})
{
List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(new StringToken(testToken), metadata);
Set<InetAddress> epSet = new HashSet<InetAddress>(endpoints);
Assert.assertEquals(totalRF, endpoints.size());
Assert.assertEquals(totalRF, epSet.size());
logger.debug("{}: {}", testToken, endpoints);
}
}
public void createDummyTokens(TokenMetadata metadata, boolean populateDC3) throws UnknownHostException
{
// DC 1
tokenFactory(metadata, "123", new byte[]{ 10, 0, 0, 10 });
tokenFactory(metadata, "234", new byte[]{ 10, 0, 0, 11 });
tokenFactory(metadata, "345", new byte[]{ 10, 0, 0, 12 });
// Tokens for DC 2
tokenFactory(metadata, "789", new byte[]{ 10, 20, 114, 10 });
tokenFactory(metadata, "890", new byte[]{ 10, 20, 114, 11 });
//tokens for DC3
if (populateDC3)
{
tokenFactory(metadata, "456", new byte[]{ 10, 21, 119, 13 });
tokenFactory(metadata, "567", new byte[]{ 10, 21, 119, 10 });
}
// Extra Tokens
tokenFactory(metadata, "90A", new byte[]{ 10, 0, 0, 13 });
if (populateDC3)
tokenFactory(metadata, "0AB", new byte[]{ 10, 21, 119, 14 });
tokenFactory(metadata, "ABC", new byte[]{ 10, 20, 114, 15 });
}
public void tokenFactory(TokenMetadata metadata, String token, byte[] bytes) throws UnknownHostException
{
Token token1 = new StringToken(token);
InetAddress add1 = InetAddress.getByAddress(bytes);
metadata.updateNormalToken(token1, add1);
}
@Test
public void testCalculateEndpoints() throws UnknownHostException
{
final int NODES = 100;
final int VNODES = 64;
final int RUNS = 10;
StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
Map<String, Integer> datacenters = ImmutableMap.of("rf1", 1, "rf3", 3, "rf5_1", 5, "rf5_2", 5, "rf5_3", 5);
List<InetAddress> nodes = new ArrayList<>(NODES);
for (byte i=0; i<NODES; ++i)
nodes.add(InetAddress.getByAddress(new byte[]{127, 0, 0, i}));
for (int run=0; run<RUNS; ++run)
{
Random rand = new Random();
IEndpointSnitch snitch = generateSnitch(datacenters, nodes, rand);
DatabaseDescriptor.setEndpointSnitch(snitch);
TokenMetadata meta = new TokenMetadata();
for (int i=0; i<NODES; ++i) // Nodes
for (int j=0; j<VNODES; ++j) // tokens/vnodes per node
meta.updateNormalToken(Murmur3Partitioner.instance.getRandomToken(rand), nodes.get(i));
testEquivalence(meta, snitch, datacenters, rand);
}
}
void testEquivalence(TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, Integer> datacenters, Random rand)
{
NetworkTopologyStrategy nts = new NetworkTopologyStrategy("ks", tokenMetadata, snitch,
datacenters.entrySet().stream().
collect(Collectors.toMap(x -> x.getKey(), x -> Integer.toString(x.getValue()))));
for (int i=0; i<1000; ++i)
{
Token token = Murmur3Partitioner.instance.getRandomToken(rand);
List<InetAddress> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch);
List<InetAddress> actual = nts.calculateNaturalEndpoints(token, tokenMetadata);
if (endpointsDiffer(expected, actual))
{
System.err.println("Endpoints mismatch for token " + token);
System.err.println(" expected: " + expected);
System.err.println(" actual : " + actual);
Assert.assertEquals("Endpoints for token " + token + " mismatch.", expected, actual);
}
}
}
private boolean endpointsDiffer(List<InetAddress> ep1, List<InetAddress> ep2)
{
// Because the old algorithm does not put the nodes in the correct order in the case where more replicas
// are required than there are racks in a dc, we accept different order as long as the primary
// replica is the same.
if (ep1.equals(ep2))
return false;
if (!ep1.get(0).equals(ep2.get(0)))
return true;
Set<InetAddress> s1 = new HashSet<>(ep1);
Set<InetAddress> s2 = new HashSet<>(ep2);
return !s1.equals(s2);
}
IEndpointSnitch generateSnitch(Map<String, Integer> datacenters, Collection<InetAddress> nodes, Random rand)
{
final Map<InetAddress, String> nodeToRack = new HashMap<>();
final Map<InetAddress, String> nodeToDC = new HashMap<>();
Map<String, List<String>> racksPerDC = new HashMap<>();
datacenters.forEach((dc, rf) -> racksPerDC.put(dc, randomRacks(rf, rand)));
int rf = datacenters.values().stream().mapToInt(x -> x).sum();
String[] dcs = new String[rf];
int pos = 0;
for (Map.Entry<String, Integer> dce : datacenters.entrySet())
{
for (int i = 0; i < dce.getValue(); ++i)
dcs[pos++] = dce.getKey();
}
for (InetAddress node : nodes)
{
String dc = dcs[rand.nextInt(rf)];
List<String> racks = racksPerDC.get(dc);
String rack = racks.get(rand.nextInt(racks.size()));
nodeToRack.put(node, rack);
nodeToDC.put(node, dc);
}
return new AbstractNetworkTopologySnitch()
{
public String getRack(InetAddress endpoint)
{
return nodeToRack.get(endpoint);
}
public String getDatacenter(InetAddress endpoint)
{
return nodeToDC.get(endpoint);
}
};
}
private List<String> randomRacks(int rf, Random rand)
{
int rc = rand.nextInt(rf * 3 - 1) + 1;
List<String> racks = new ArrayList<>(rc);
for (int i=0; i<rc; ++i)
racks.add(Integer.toString(i));
return racks;
}
// Copy of older endpoints calculation algorithm for comparison
public static List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, Map<String, Integer> datacenters, IEndpointSnitch snitch)
{
// we want to preserve insertion order so that the first added endpoint becomes primary
Set<InetAddress> replicas = new LinkedHashSet<>();
// replicas we have found in each DC
Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size());
for (Map.Entry<String, Integer> dc : datacenters.entrySet())
dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
Topology topology = tokenMetadata.getTopology();
// all endpoints in each DC, so we can check when we have exhausted all the members of a DC
Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
// all racks in a DC so we can check when we have exhausted all racks in a DC
Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
// tracks the racks we have already placed replicas in
Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
for (Map.Entry<String, Integer> dc : datacenters.entrySet())
seenRacks.put(dc.getKey(), new HashSet<String>());
// tracks the endpoints that we skipped over while looking for unique racks
// when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
for (Map.Entry<String, Integer> dc : datacenters.entrySet())
skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints, datacenters))
{
Token next = tokenIter.next();
InetAddress ep = tokenMetadata.getEndpoint(next);
String dc = snitch.getDatacenter(ep);
// have we already found all replicas for this dc?
if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
continue;
// can we skip checking the rack?
if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
{
dcReplicas.get(dc).add(ep);
replicas.add(ep);
}
else
{
String rack = snitch.getRack(ep);
// is this a new rack?
if (seenRacks.get(dc).contains(rack))
{
skippedDcEndpoints.get(dc).add(ep);
}
else
{
dcReplicas.get(dc).add(ep);
replicas.add(ep);
seenRacks.get(dc).add(rack);
// if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
{
Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
{
InetAddress nextSkipped = skippedIt.next();
dcReplicas.get(dc).add(nextSkipped);
replicas.add(nextSkipped);
}
}
}
}
}
return new ArrayList<InetAddress>(replicas);
}
private static boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
{
return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc, datacenters));
}
private static boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
{
for (String dc : datacenters.keySet())
if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
return false;
return true;
}
public static int getReplicationFactor(String dc, Map<String, Integer> datacenters)
{
Integer replicas = datacenters.get(dc);
return replicas == null ? 0 : replicas;
}
}