blob: 91a7ab27abfa64ab66c4d8d41034829832e33f06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.service;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.Util.PartitionerSwitcher;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.SimpleSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.*;
public class LeaveAndBootstrapTest
{
private static final IPartitioner partitioner = RandomPartitioner.instance;
private static PartitionerSwitcher partitionerSwitcher;
private static final String KEYSPACE1 = "LeaveAndBootstrapTestKeyspace1";
private static final String KEYSPACE2 = "LeaveAndBootstrapTestKeyspace2";
private static final String KEYSPACE3 = "LeaveAndBootstrapTestKeyspace3";
private static final String KEYSPACE4 = "LeaveAndBootstrapTestKeyspace4";
@BeforeClass
public static void defineSchema() throws Exception
{
partitionerSwitcher = Util.switchPartitioner(partitioner);
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition("LeaveAndBootstrapTest");
}
@AfterClass
public static void tearDown()
{
partitionerSwitcher.close();
}
/**
* Test whether write endpoints is correct when the node is leaving. Uses
* StorageService.onChange and does not manipulate token metadata directly.
*/
@Test
public void newTestWriteEndpointsDuringLeave() throws Exception
{
StorageService ss = StorageService.instance;
final int RING_SIZE = 6;
final int LEAVING_NODE = 3;
TokenMetadata tmd = ss.getTokenMetadata();
tmd.clearUnsafe();
IPartitioner partitioner = RandomPartitioner.instance;
VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
ArrayList<Token> endpointTokens = new ArrayList<Token>();
ArrayList<Token> keyTokens = new ArrayList<Token>();
List<InetAddress> hosts = new ArrayList<InetAddress>();
List<UUID> hostIds = new ArrayList<UUID>();
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, List<InetAddress>>();
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
{
for (Token token : keyTokens)
{
List<InetAddress> endpoints = new ArrayList<InetAddress>();
Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), token, false);
while (tokenIter.hasNext())
{
endpoints.add(tmd.getEndpoint(tokenIter.next()));
}
expectedEndpoints.put(token, endpoints);
}
}
// Third node leaves
ss.onChange(hosts.get(LEAVING_NODE),
ApplicationState.STATUS,
valueFactory.leaving(Collections.singleton(endpointTokens.get(LEAVING_NODE))));
assertTrue(tmd.isLeaving(hosts.get(LEAVING_NODE)));
Thread.sleep(100); // because there is a tight race between submit and blockUntilFinished
PendingRangeCalculatorService.instance.blockUntilFinished();
AbstractReplicationStrategy strategy;
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
{
strategy = getStrategy(keyspaceName, tmd);
for (Token token : keyTokens)
{
int replicationFactor = strategy.getReplicationFactor();
HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap())));
HashSet<InetAddress> expected = new HashSet<InetAddress>();
for (int i = 0; i < replicationFactor; i++)
{
expected.add(expectedEndpoints.get(token).get(i));
}
// if the leaving node is in the endpoint list,
// then we should expect it plus one extra for when it's gone
if (expected.contains(hosts.get(LEAVING_NODE)))
expected.add(expectedEndpoints.get(token).get(replicationFactor));
assertEquals("mismatched endpoint sets", expected, actual);
}
}
}
/**
* Test pending ranges and write endpoints when multiple nodes are on the move
* simultaneously
*/
@Test
public void testSimultaneousMove() throws UnknownHostException
{
StorageService ss = StorageService.instance;
final int RING_SIZE = 10;
TokenMetadata tmd = ss.getTokenMetadata();
tmd.clearUnsafe();
IPartitioner partitioner = RandomPartitioner.instance;
VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
ArrayList<Token> endpointTokens = new ArrayList<Token>();
ArrayList<Token> keyTokens = new ArrayList<Token>();
List<InetAddress> hosts = new ArrayList<InetAddress>();
List<UUID> hostIds = new ArrayList<UUID>();
// create a ring or 10 nodes
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
// nodes 6, 8 and 9 leave
final int[] LEAVING = new int[] {6, 8, 9};
for (int leaving : LEAVING)
ss.onChange(hosts.get(leaving),
ApplicationState.STATUS,
valueFactory.leaving(Collections.singleton(endpointTokens.get(leaving))));
// boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
InetAddress boot1 = InetAddress.getByName("127.0.1.1");
Gossiper.instance.initializeNodeUnsafe(boot1, UUID.randomUUID(), 1);
Gossiper.instance.injectApplicationState(boot1, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(5))));
ss.onChange(boot1,
ApplicationState.STATUS,
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5))));
InetAddress boot2 = InetAddress.getByName("127.0.1.2");
Gossiper.instance.initializeNodeUnsafe(boot2, UUID.randomUUID(), 1);
Gossiper.instance.injectApplicationState(boot2, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(7))));
ss.onChange(boot2,
ApplicationState.STATUS,
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7))));
Collection<InetAddress> endpoints = null;
/* don't require test update every time a new keyspace is added to test/conf/cassandra.yaml */
Map<String, AbstractReplicationStrategy> keyspaceStrategyMap = new HashMap<String, AbstractReplicationStrategy>();
for (int i=1; i<=4; i++)
{
keyspaceStrategyMap.put("LeaveAndBootstrapTestKeyspace" + i, getStrategy("LeaveAndBootstrapTestKeyspace" + i, tmd));
}
// pre-calculate the results.
Map<String, Multimap<Token, InetAddress>> expectedEndpoints = new HashMap<String, Multimap<Token, InetAddress>>();
expectedEndpoints.put(KEYSPACE1, HashMultimap.<Token, InetAddress>create());
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.1.1"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.8"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.1.2", "127.0.0.1"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1"));
expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1"));
expectedEndpoints.put(KEYSPACE2, HashMultimap.<Token, InetAddress>create());
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.1.1"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.8"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.1.2", "127.0.0.1"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1"));
expectedEndpoints.put(KEYSPACE3, HashMultimap.<Token, InetAddress>create());
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5", "127.0.0.6"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3", "127.0.0.4", "127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.1.1", "127.0.0.8"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4", "127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.1.2", "127.0.0.1", "127.0.1.1"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.2", "127.0.0.1", "127.0.0.2", "127.0.1.1"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.1.2", "127.0.0.1", "127.0.0.2", "127.0.1.1", "127.0.0.3"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.1.1", "127.0.1.2"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.1.2", "127.0.0.3", "127.0.0.4"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.1.2", "127.0.0.4", "127.0.0.5"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"));
expectedEndpoints.put(KEYSPACE4, HashMultimap.<Token, InetAddress>create());
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2", "127.0.0.3", "127.0.0.4"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3", "127.0.0.4", "127.0.0.5"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4", "127.0.0.5", "127.0.0.6"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.1.1", "127.0.0.8"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.1.2", "127.0.0.1", "127.0.1.1"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.1", "127.0.0.2", "127.0.1.1", "127.0.1.2"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.1.2", "127.0.0.1", "127.0.0.2"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.1.2", "127.0.0.2", "127.0.0.3"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3"));
expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1", "127.0.0.2", "127.0.0.3"));
PendingRangeCalculatorService.instance.blockUntilFinished();
for (Map.Entry<String, AbstractReplicationStrategy> keyspaceStrategy : keyspaceStrategyMap.entrySet())
{
String keyspaceName = keyspaceStrategy.getKey();
AbstractReplicationStrategy strategy = keyspaceStrategy.getValue();
for (int i = 0; i < keyTokens.size(); i++)
{
endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
assertEquals(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).size(), endpoints.size());
assertTrue(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).containsAll(endpoints));
}
// just to be sure that things still work according to the old tests, run them:
if (strategy.getReplicationFactor() != 3)
continue;
// tokens 5, 15 and 25 should go three nodes
for (int i=0; i<3; ++i)
{
endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(i+1)));
assertTrue(endpoints.contains(hosts.get(i+2)));
assertTrue(endpoints.contains(hosts.get(i+3)));
}
// token 35 should go to nodes 4, 5, 6, 7 and boot1
endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(3)));
assertEquals(5, endpoints.size());
assertTrue(endpoints.contains(hosts.get(4)));
assertTrue(endpoints.contains(hosts.get(5)));
assertTrue(endpoints.contains(hosts.get(6)));
assertTrue(endpoints.contains(hosts.get(7)));
assertTrue(endpoints.contains(boot1));
// token 45 should go to nodes 5, 6, 7, 0, boot1 and boot2
endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(4)));
assertEquals(6, endpoints.size());
assertTrue(endpoints.contains(hosts.get(5)));
assertTrue(endpoints.contains(hosts.get(6)));
assertTrue(endpoints.contains(hosts.get(7)));
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(boot1));
assertTrue(endpoints.contains(boot2));
// token 55 should go to nodes 6, 7, 8, 0, 1, boot1 and boot2
endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(5)));
assertEquals(7, endpoints.size());
assertTrue(endpoints.contains(hosts.get(6)));
assertTrue(endpoints.contains(hosts.get(7)));
assertTrue(endpoints.contains(hosts.get(8)));
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(boot1));
assertTrue(endpoints.contains(boot2));
// token 65 should go to nodes 7, 8, 9, 0, 1 and boot2
endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(6)));
assertEquals(6, endpoints.size());
assertTrue(endpoints.contains(hosts.get(7)));
assertTrue(endpoints.contains(hosts.get(8)));
assertTrue(endpoints.contains(hosts.get(9)));
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(boot2));
// token 75 should to go nodes 8, 9, 0, 1, 2 and boot2
endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(7)));
assertEquals(6, endpoints.size());
assertTrue(endpoints.contains(hosts.get(8)));
assertTrue(endpoints.contains(hosts.get(9)));
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(hosts.get(2)));
assertTrue(endpoints.contains(boot2));
// token 85 should go to nodes 9, 0, 1 and 2
endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(8)));
assertEquals(4, endpoints.size());
assertTrue(endpoints.contains(hosts.get(9)));
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(hosts.get(2)));
// token 95 should go to nodes 0, 1 and 2
endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(9)));
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(hosts.get(2)));
}
// Now finish node 6 and node 9 leaving, as well as boot1 (after this node 8 is still
// leaving and boot2 in progress
ss.onChange(hosts.get(LEAVING[0]), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[0])), Gossiper.computeExpireTime()));
ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[2])), Gossiper.computeExpireTime()));
ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(5))));
// adjust precalcuated results. this changes what the epected endpoints are.
expectedEndpoints.get(KEYSPACE1).get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8"));
expectedEndpoints.get(KEYSPACE1).get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10"));
expectedEndpoints.get(KEYSPACE2).get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8"));
expectedEndpoints.get(KEYSPACE2).get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10"));
expectedEndpoints.get(KEYSPACE3).get(new BigIntegerToken("15")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8"));
expectedEndpoints.get(KEYSPACE3).get(new BigIntegerToken("25")).removeAll(makeAddrs("127.0.0.7", "127.0.1.2", "127.0.0.1"));
expectedEndpoints.get(KEYSPACE3).get(new BigIntegerToken("35")).removeAll(makeAddrs("127.0.0.7", "127.0.0.2"));
expectedEndpoints.get(KEYSPACE3).get(new BigIntegerToken("45")).removeAll(makeAddrs("127.0.0.7", "127.0.0.10", "127.0.0.3"));
expectedEndpoints.get(KEYSPACE3).get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.10", "127.0.0.4"));
expectedEndpoints.get(KEYSPACE3).get(new BigIntegerToken("65")).removeAll(makeAddrs("127.0.0.10"));
expectedEndpoints.get(KEYSPACE3).get(new BigIntegerToken("75")).removeAll(makeAddrs("127.0.0.10"));
expectedEndpoints.get(KEYSPACE3).get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10"));
expectedEndpoints.get(KEYSPACE4).get(new BigIntegerToken("35")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8"));
expectedEndpoints.get(KEYSPACE4).get(new BigIntegerToken("45")).removeAll(makeAddrs("127.0.0.7", "127.0.1.2", "127.0.0.1"));
expectedEndpoints.get(KEYSPACE4).get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.2", "127.0.0.7"));
expectedEndpoints.get(KEYSPACE4).get(new BigIntegerToken("65")).removeAll(makeAddrs("127.0.0.10"));
expectedEndpoints.get(KEYSPACE4).get(new BigIntegerToken("75")).removeAll(makeAddrs("127.0.0.10"));
expectedEndpoints.get(KEYSPACE4).get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10"));
PendingRangeCalculatorService.instance.blockUntilFinished();
for (Map.Entry<String, AbstractReplicationStrategy> keyspaceStrategy : keyspaceStrategyMap.entrySet())
{
String keyspaceName = keyspaceStrategy.getKey();
AbstractReplicationStrategy strategy = keyspaceStrategy.getValue();
for (int i = 0; i < keyTokens.size(); i++)
{
endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
assertEquals(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).size(), endpoints.size());
assertTrue(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).containsAll(endpoints));
}
if (strategy.getReplicationFactor() != 3)
continue;
// leave this stuff in to guarantee the old tests work the way they were supposed to.
// tokens 5, 15 and 25 should go three nodes
for (int i=0; i<3; ++i)
{
endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(i+1)));
assertTrue(endpoints.contains(hosts.get(i+2)));
assertTrue(endpoints.contains(hosts.get(i+3)));
}
// token 35 goes to nodes 4, 5 and boot1
endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(3)));
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(4)));
assertTrue(endpoints.contains(hosts.get(5)));
assertTrue(endpoints.contains(boot1));
// token 45 goes to nodes 5, boot1 and node7
endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(4)));
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(5)));
assertTrue(endpoints.contains(boot1));
assertTrue(endpoints.contains(hosts.get(7)));
// token 55 goes to boot1, 7, boot2, 8 and 0
endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(5)));
assertEquals(5, endpoints.size());
assertTrue(endpoints.contains(boot1));
assertTrue(endpoints.contains(hosts.get(7)));
assertTrue(endpoints.contains(boot2));
assertTrue(endpoints.contains(hosts.get(8)));
assertTrue(endpoints.contains(hosts.get(0)));
// token 65 goes to nodes 7, boot2, 8, 0 and 1
endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(6)));
assertEquals(5, endpoints.size());
assertTrue(endpoints.contains(hosts.get(7)));
assertTrue(endpoints.contains(boot2));
assertTrue(endpoints.contains(hosts.get(8)));
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
// token 75 goes to nodes boot2, 8, 0, 1 and 2
endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(7)));
assertEquals(5, endpoints.size());
assertTrue(endpoints.contains(boot2));
assertTrue(endpoints.contains(hosts.get(8)));
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(hosts.get(2)));
// token 85 goes to nodes 0, 1 and 2
endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(8)));
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(hosts.get(2)));
// token 95 goes to nodes 0, 1 and 2
endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(9)));
assertEquals(3, endpoints.size());
assertTrue(endpoints.contains(hosts.get(0)));
assertTrue(endpoints.contains(hosts.get(1)));
assertTrue(endpoints.contains(hosts.get(2)));
}
}
@Test
public void testStateJumpToBootstrap() throws UnknownHostException
{
StorageService ss = StorageService.instance;
TokenMetadata tmd = ss.getTokenMetadata();
tmd.clearUnsafe();
IPartitioner partitioner = RandomPartitioner.instance;
VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
ArrayList<Token> endpointTokens = new ArrayList<Token>();
ArrayList<Token> keyTokens = new ArrayList<Token>();
List<InetAddress> hosts = new ArrayList<InetAddress>();
List<UUID> hostIds = new ArrayList<UUID>();
// create a ring or 5 nodes
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 7);
// node 2 leaves
ss.onChange(hosts.get(2),
ApplicationState.STATUS,
valueFactory.leaving(Collections.singleton(endpointTokens.get(2))));
// don't bother to test pending ranges here, that is extensively tested by other
// tests. Just check that the node is in appropriate lists.
assertTrue(tmd.isMember(hosts.get(2)));
assertTrue(tmd.isLeaving(hosts.get(2)));
assertTrue(tmd.getBootstrapTokens().isEmpty());
// Bootstrap the node immedidiately to keyTokens.get(4) without going through STATE_LEFT
Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(4))));
ss.onChange(hosts.get(2),
ApplicationState.STATUS,
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(4))));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
assertEquals(hosts.get(2), tmd.getBootstrapTokens().get(keyTokens.get(4)));
// Bootstrap node hosts.get(3) to keyTokens.get(1)
Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1))));
ss.onChange(hosts.get(3),
ApplicationState.STATUS,
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1))));
assertFalse(tmd.isMember(hosts.get(3)));
assertFalse(tmd.isLeaving(hosts.get(3)));
assertEquals(hosts.get(2), tmd.getBootstrapTokens().get(keyTokens.get(4)));
assertEquals(hosts.get(3), tmd.getBootstrapTokens().get(keyTokens.get(1)));
// Bootstrap node hosts.get(2) further to keyTokens.get(3)
Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(3))));
ss.onChange(hosts.get(2),
ApplicationState.STATUS,
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(3))));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
assertEquals(hosts.get(2), tmd.getBootstrapTokens().get(keyTokens.get(3)));
assertNull(tmd.getBootstrapTokens().get(keyTokens.get(4)));
assertEquals(hosts.get(3), tmd.getBootstrapTokens().get(keyTokens.get(1)));
// Go to normal again for both nodes
Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(2))));
Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(3))));
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(3))));
ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2))));
assertTrue(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
assertEquals(keyTokens.get(3), tmd.getToken(hosts.get(2)));
assertTrue(tmd.isMember(hosts.get(3)));
assertFalse(tmd.isLeaving(hosts.get(3)));
assertEquals(keyTokens.get(2), tmd.getToken(hosts.get(3)));
assertTrue(tmd.getBootstrapTokens().isEmpty());
}
@Test
public void testStateJumpToNormal() throws UnknownHostException
{
StorageService ss = StorageService.instance;
TokenMetadata tmd = ss.getTokenMetadata();
tmd.clearUnsafe();
IPartitioner partitioner = RandomPartitioner.instance;
VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
ArrayList<Token> endpointTokens = new ArrayList<Token>();
ArrayList<Token> keyTokens = new ArrayList<Token>();
List<InetAddress> hosts = new ArrayList<InetAddress>();
List<UUID> hostIds = new ArrayList<UUID>();
// create a ring or 5 nodes
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6);
// node 2 leaves
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(endpointTokens.get(2))));
assertTrue(tmd.isLeaving(hosts.get(2)));
assertEquals(endpointTokens.get(2), tmd.getToken(hosts.get(2)));
// back to normal
Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(2))));
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2))));
assertTrue(tmd.getLeavingEndpoints().isEmpty());
assertEquals(keyTokens.get(2), tmd.getToken(hosts.get(2)));
// node 3 goes through leave and left and then jumps to normal at its new token
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(2))));
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(keyTokens.get(2)), Gossiper.computeExpireTime()));
Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(4))));
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(4))));
assertTrue(tmd.getBootstrapTokens().isEmpty());
assertTrue(tmd.getLeavingEndpoints().isEmpty());
assertEquals(keyTokens.get(4), tmd.getToken(hosts.get(2)));
}
@Test
public void testStateJumpToLeaving() throws UnknownHostException
{
StorageService ss = StorageService.instance;
TokenMetadata tmd = ss.getTokenMetadata();
tmd.clearUnsafe();
IPartitioner partitioner = RandomPartitioner.instance;
VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
ArrayList<Token> endpointTokens = new ArrayList<Token>();
ArrayList<Token> keyTokens = new ArrayList<Token>();
List<InetAddress> hosts = new ArrayList<InetAddress>();
List<UUID> hostIds = new ArrayList<UUID>();
// create a ring or 5 nodes
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6);
// node 2 leaves with _different_ token
Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(0))));
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(0))));
assertEquals(keyTokens.get(0), tmd.getToken(hosts.get(2)));
assertTrue(tmd.isLeaving(hosts.get(2)));
assertNull(tmd.getEndpoint(endpointTokens.get(2)));
// go to boostrap
Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1))));
ss.onChange(hosts.get(2),
ApplicationState.STATUS,
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1))));
assertFalse(tmd.isLeaving(hosts.get(2)));
assertEquals(1, tmd.getBootstrapTokens().size());
assertEquals(hosts.get(2), tmd.getBootstrapTokens().get(keyTokens.get(1)));
// jump to leaving again
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(1))));
assertEquals(hosts.get(2), tmd.getEndpoint(keyTokens.get(1)));
assertTrue(tmd.isLeaving(hosts.get(2)));
assertTrue(tmd.getBootstrapTokens().isEmpty());
// go to state left
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(keyTokens.get(1)), Gossiper.computeExpireTime()));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
}
@Test
public void testStateJumpToLeft() throws UnknownHostException
{
StorageService ss = StorageService.instance;
TokenMetadata tmd = ss.getTokenMetadata();
tmd.clearUnsafe();
IPartitioner partitioner = RandomPartitioner.instance;
VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
ArrayList<Token> endpointTokens = new ArrayList<Token>();
ArrayList<Token> keyTokens = new ArrayList<Token>();
List<InetAddress> hosts = new ArrayList<InetAddress>();
List<UUID> hostIds = new ArrayList<UUID>();
// create a ring of 6 nodes
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 7);
// node hosts.get(2) goes jumps to left
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(endpointTokens.get(2)), Gossiper.computeExpireTime()));
assertFalse(tmd.isMember(hosts.get(2)));
// node hosts.get(4) goes to bootstrap
Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1))));
ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1))));
assertFalse(tmd.isMember(hosts.get(3)));
assertEquals(1, tmd.getBootstrapTokens().size());
assertEquals(hosts.get(3), tmd.getBootstrapTokens().get(keyTokens.get(1)));
// and then directly to 'left'
Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1))));
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(keyTokens.get(1)), Gossiper.computeExpireTime()));
assertTrue(tmd.getBootstrapTokens().size() == 0);
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
}
/**
* Tests that the system.peers table is not updated after a node has been removed. (See CASSANDRA-6053)
*/
@Test
public void testStateChangeOnRemovedNode() throws UnknownHostException
{
StorageService ss = StorageService.instance;
VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
// create a ring of 2 nodes
ArrayList<Token> endpointTokens = new ArrayList<>();
List<InetAddress> hosts = new ArrayList<>();
Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2);
InetAddress toRemove = hosts.get(1);
final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42", executor));
FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42", executor));
assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));
// mark the node as removed
Gossiper.instance.injectApplicationState(toRemove, ApplicationState.STATUS,
valueFactory.left(Collections.singleton(endpointTokens.get(1)), Gossiper.computeExpireTime()));
assertTrue(Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(hosts.get(1))));
// state changes made after the endpoint has left should be ignored
ss.onChange(hosts.get(1), ApplicationState.RACK,
valueFactory.rack("rack9999"));
assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));
}
@Test
public void testRemovingStatusForNonMember() throws UnknownHostException
{
// create a ring of 1 node
StorageService ss = StorageService.instance;
VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
Util.createInitialRing(ss, partitioner, new ArrayList<Token>(), new ArrayList<Token>(), new ArrayList<InetAddress>(), new ArrayList<UUID>(), 1);
// make a REMOVING state change on a non-member endpoint; without the CASSANDRA-6564 fix, this
// would result in an ArrayIndexOutOfBoundsException
ss.onChange(InetAddress.getByName("192.168.1.42"), ApplicationState.STATUS, valueFactory.removingNonlocal(UUID.randomUUID()));
}
private static Collection<InetAddress> makeAddrs(String... hosts) throws UnknownHostException
{
ArrayList<InetAddress> addrs = new ArrayList<InetAddress>(hosts.length);
for (String host : hosts)
addrs.add(InetAddress.getByName(host));
return addrs;
}
private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd)
{
KeyspaceMetadata ksmd = Schema.instance.getKSMetaData(keyspaceName);
return AbstractReplicationStrategy.createReplicationStrategy(
keyspaceName,
ksmd.params.replication.klass,
tmd,
new SimpleSnitch(),
ksmd.params.replication.options);
}
}