blob: 68841ea30a369d7dc07596259d25d7bab977e973 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.gms;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.InetAddresses;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class GossiperTest
{
static
{
System.setProperty(Gossiper.Props.DISABLE_THREAD_VALIDATION, "true");
DatabaseDescriptor.daemonInitialization();
CommitLog.instance.start();
}
private static final CassandraVersion CURRENT_VERSION = new CassandraVersion(FBUtilities.getReleaseVersionString());
static final IPartitioner partitioner = new RandomPartitioner();
StorageService ss = StorageService.instance;
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
ArrayList<Token> endpointTokens = new ArrayList<>();
ArrayList<Token> keyTokens = new ArrayList<>();
List<InetAddressAndPort> hosts = new ArrayList<>();
List<UUID> hostIds = new ArrayList<>();
private SeedProvider originalSeedProvider;
@Before
public void setup()
{
tmd.clearUnsafe();
originalSeedProvider = DatabaseDescriptor.getSeedProvider();
}
@After
public void tearDown()
{
DatabaseDescriptor.setSeedProvider(originalSeedProvider);
}
@AfterClass
public static void afterClass()
{
Gossiper.instance.stop();
}
@Test
public void testPaddingIntact() throws Exception
{
// sanity check that all 10 pads still exist
assert ApplicationState.X1 == ApplicationState.X1;
assert ApplicationState.X2 == ApplicationState.X2;
assert ApplicationState.X3 == ApplicationState.X3;
assert ApplicationState.X4 == ApplicationState.X4;
assert ApplicationState.X5 == ApplicationState.X5;
assert ApplicationState.X6 == ApplicationState.X6;
assert ApplicationState.X7 == ApplicationState.X7;
assert ApplicationState.X8 == ApplicationState.X8;
assert ApplicationState.X9 == ApplicationState.X9;
assert ApplicationState.X10 == ApplicationState.X10;
}
@Test
public void testHasVersion3Nodes() throws Exception
{
Gossiper.instance.start(0);
Gossiper.instance.expireUpgradeFromVersion();
VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null);
EndpointState es = new EndpointState((HeartBeatState) null);
es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion(CURRENT_VERSION.toString()));
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.1"), es);
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.1"));
es = new EndpointState((HeartBeatState) null);
es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.11.3"));
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.2"), es);
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.2"));
es = new EndpointState((HeartBeatState) null);
es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.0.0"));
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.3"), es);
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3"));
assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.0")) < 0);
assertTrue(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.1")) < 0);
assertTrue(Gossiper.instance.hasMajorVersion3Nodes());
Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.3"));
Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.3"));
assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.0")) < 0);
assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.1")) < 0);
assertTrue(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.12")) < 0);
assertTrue(Gossiper.instance.hasMajorVersion3Nodes());
Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2"));
Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2"));
assertEquals(SystemKeyspace.CURRENT_VERSION, Gossiper.instance.upgradeFromVersionSupplier.get().value());
}
@Test
public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
{
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
try
{
InetAddressAndPort remoteHostAddress = hosts.get(1);
EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
//Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
//The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
//Propose a generation 10 years in the future - this should be rejected.
HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis() / 1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
//The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
}
finally
{
// clean up the gossip states
Gossiper.instance.endpointStateMap.clear();
}
}
int stateChangedNum = 0;
@Test
public void testDuplicatedStateUpdate() throws Exception
{
VersionedValue.VersionedValueFactory valueFactory =
new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
SimpleStateChangeListener stateChangeListener = null;
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
try
{
InetAddressAndPort remoteHostAddress = hosts.get(1);
EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
//Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration());
EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token));
proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
stateChangeListener = new SimpleStateChangeListener();
stateChangeListener.setOnChangeVerifier(onChangeParams -> {
assertEquals(ApplicationState.TOKENS, onChangeParams.state);
stateChangedNum++;
});
Gossiper.instance.register(stateChangeListener);
stateChangedNum = 0;
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
assertEquals(1, stateChangedNum);
HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
// Clone a new HeartBeatState
proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration(), proposedRemoteHeartBeat.getHeartBeatVersion());
proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
// Bump the heartbeat version and use the same TOKENS state
proposedRemoteHeartBeat.updateHeartBeat();
proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
// The following state change should only update heartbeat without updating the TOKENS state
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
assertEquals(1, stateChangedNum);
actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
}
finally
{
// clean up the gossip states
Gossiper.instance.endpointStateMap.clear();
if (stateChangeListener != null)
Gossiper.instance.unregister(stateChangeListener);
}
}
// Note: This test might fail if for some reason the node broadcast address is in 127.99.0.0/16
@Test
public void testReloadSeeds() throws UnknownHostException
{
Gossiper gossiper = new Gossiper(false);
List<String> loadedList;
// Initialize the seed list directly to a known set to start with
gossiper.seeds.clear();
InetAddressAndPort addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.1.1"));
int nextSize = 4;
List<InetAddressAndPort> nextSeeds = new ArrayList<>(nextSize);
for (int i = 0; i < nextSize; i ++)
{
gossiper.seeds.add(addr);
nextSeeds.add(addr);
addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.getAddress()));
}
Assert.assertEquals(nextSize, gossiper.seeds.size());
// Add another unique address to the list
addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.getAddress()));
nextSeeds.add(addr);
nextSize++;
DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
loadedList = gossiper.reloadSeeds();
// Check that the new entry was added
Assert.assertEquals(nextSize, loadedList.size());
for (InetAddressAndPort a : nextSeeds)
assertTrue(loadedList.contains(a.toString()));
// Check that the return value of the reloadSeeds matches the content of the getSeeds call
// and that they both match the internal contents of the Gossiper seeds list
Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size());
for (InetAddressAndPort a : gossiper.seeds)
{
assertTrue(loadedList.contains(a.toString()));
assertTrue(gossiper.getSeeds().contains(a.toString()));
}
// Add a duplicate of the last address to the seed provider list
int uniqueSize = nextSize;
nextSeeds.add(addr);
nextSize++;
DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
loadedList = gossiper.reloadSeeds();
// Check that the number of seed nodes reported hasn't increased
Assert.assertEquals(uniqueSize, loadedList.size());
for (InetAddressAndPort a : nextSeeds)
assertTrue(loadedList.contains(a.toString()));
// Create a new list that has no overlaps with the previous list
addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1"));
int disjointSize = 3;
List<InetAddressAndPort> disjointSeeds = new ArrayList<>(disjointSize);
for (int i = 0; i < disjointSize; i ++)
{
disjointSeeds.add(addr);
addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.getAddress()));
}
DatabaseDescriptor.setSeedProvider(new TestSeedProvider(disjointSeeds));
loadedList = gossiper.reloadSeeds();
// Check that the list now contains exactly the new other list.
Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
Assert.assertEquals(disjointSize, loadedList.size());
for (InetAddressAndPort a : disjointSeeds)
{
assertTrue(gossiper.getSeeds().contains(a.toString()));
assertTrue(loadedList.contains(a.toString()));
}
// Set the seed node provider to return an empty list
DatabaseDescriptor.setSeedProvider(new TestSeedProvider(new ArrayList<InetAddressAndPort>()));
loadedList = gossiper.reloadSeeds();
// Check that the in memory seed node list was not modified
Assert.assertEquals(disjointSize, loadedList.size());
for (InetAddressAndPort a : disjointSeeds)
assertTrue(loadedList.contains(a.toString()));
// Change the seed provider to one that throws an unchecked exception
DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider());
loadedList = gossiper.reloadSeeds();
// Check for the expected null response from a reload error
assertNull(loadedList);
// Check that the in memory seed node list was not modified and the exception was caught
Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
for (InetAddressAndPort a : disjointSeeds)
assertTrue(gossiper.getSeeds().contains(a.toString()));
}
@Test
public void testNotFireDuplicatedNotificationsWithUpdateContainsOldAndNewState() throws UnknownHostException
{
VersionedValue.VersionedValueFactory valueFactory =
new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
SimpleStateChangeListener stateChangeListener = null;
try
{
InetAddressAndPort remoteHostAddress = hosts.get(1);
EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
//Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
// Test begins
AtomicInteger notificationCount = new AtomicInteger(0);
HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration());
EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
proposedRemoteState.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singletonList(token)));
stateChangeListener = new SimpleStateChangeListener();
Gossiper.instance.register(stateChangeListener);
// STEP 1. register verifier and apply state with just STATUS
// simulate applying gossip state from a v3 peer
stateChangeListener.setOnChangeVerifier(onChangeParams -> {
notificationCount.getAndIncrement();
assertEquals("It should fire notification for STATUS when gossiper local state not yet has STATUS_WITH_PORT",
ApplicationState.STATUS, onChangeParams.state);
});
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
// STEP 2. includes both STATUS and STATUS_WITH_PORT. The gossiper knows that the remote peer is now in v4
// update verifier and apply state again
proposedRemoteState.addApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(Collections.singletonList(token)));
stateChangeListener.setOnChangeVerifier(onChangeParams -> {
notificationCount.getAndIncrement();
assertEquals("It should only fire notification for STATUS_WITH_PORT",
ApplicationState.STATUS_WITH_PORT, onChangeParams.state);
});
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
// STEP 3. somehow, the peer send only the STATUS in the update.
proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
proposedRemoteState.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singletonList(token)));
stateChangeListener.setOnChangeVerifier(onChangeParams -> {
notificationCount.getAndIncrement();
fail("It should not fire notification for STATUS");
});
assertEquals("Expect exact 2 notifications with the test setup",
2, notificationCount.get());
}
finally
{
// clean up the gossip states
Gossiper.instance.endpointStateMap.clear();
if (stateChangeListener != null)
Gossiper.instance.unregister(stateChangeListener);
}
}
static class SimpleStateChangeListener implements IEndpointStateChangeSubscriber
{
static class OnChangeParams
{
InetAddressAndPort endpoint;
ApplicationState state;
VersionedValue value;
OnChangeParams(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
{
this.endpoint = endpoint;
this.state = state;
this.value = value;
}
}
private volatile Consumer<OnChangeParams> onChangeVerifier;
public void setOnChangeVerifier(Consumer<OnChangeParams> verifier)
{
onChangeVerifier = verifier;
}
public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
public void onRemove(InetAddressAndPort endpoint) {}
public void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
{
onChangeVerifier.accept(new OnChangeParams(endpoint, state, value));
}
}
static class TestSeedProvider implements SeedProvider
{
private List<InetAddressAndPort> seeds;
TestSeedProvider(List<InetAddressAndPort> seeds)
{
this.seeds = seeds;
}
@Override
public List<InetAddressAndPort> getSeeds()
{
return seeds;
}
}
// A seed provider for testing which throws assertion errors when queried
static class ErrorSeedProvider implements SeedProvider
{
@Override
public List<InetAddressAndPort> getSeeds()
{
assert(false);
return new ArrayList<>();
}
}
}