blob: c5f2a07e13630c3578702645d4e4f35dbbde10bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.*;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.utils.FBUtilities;
import org.assertj.core.api.Assertions;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
import static org.apache.cassandra.distributed.shared.ClusterUtils.runAndWaitForLogs;
import static org.apache.cassandra.distributed.shared.NetworkTopology.singleDcNetworkTopology;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class GossipTest extends TestBaseImpl
{
@Test
public void nodeDownDuringMove() throws Throwable
{
int liveCount = 1;
try (Cluster cluster = Cluster.build(2 + liveCount)
.withConfig(config -> config.with(NETWORK).with(GOSSIP))
.createWithoutStarting())
{
int fail = liveCount + 1;
int late = fail + 1;
for (int i = 1 ; i <= liveCount ; ++i)
cluster.get(i).startup();
cluster.get(fail).startup();
Collection<String> expectTokens =
cluster.get(fail)
.callsOnInstance(() -> StorageService.instance.getTokenMetadata()
.getTokens(FBUtilities.getBroadcastAddressAndPort())
.stream()
.map(Object::toString)
.collect(Collectors.toList()))
.call();
InetSocketAddress failAddress = cluster.get(fail).broadcastAddress();
// wait for NORMAL state
for (int i = 1 ; i <= liveCount ; ++i)
{
cluster.get(i).acceptsOnInstance((InetSocketAddress address) -> {
EndpointState ep;
InetAddressAndPort endpoint = toCassandraInetAddressAndPort(address);
while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
|| ep.getApplicationState(ApplicationState.STATUS_WITH_PORT) == null
|| !ep.getApplicationState(ApplicationState.STATUS_WITH_PORT).value.startsWith("NORMAL"))
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
}).accept(failAddress);
}
// set ourselves to MOVING, and wait for it to propagate
cluster.get(fail).runOnInstance(() -> {
Token token = Iterables.getFirst(StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddressAndPort()), null);
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.moving(token));
});
for (int i = 1 ; i <= liveCount ; ++i)
{
cluster.get(i).acceptsOnInstance((InetSocketAddress address) -> {
EndpointState ep;
InetAddressAndPort endpoint = toCassandraInetAddressAndPort(address);
while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
|| (ep.getApplicationState(ApplicationState.STATUS_WITH_PORT) == null
|| !ep.getApplicationState(ApplicationState.STATUS_WITH_PORT).value.startsWith("MOVING")))
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
}).accept(failAddress);
}
ClusterUtils.stopAbrupt(cluster, cluster.get(fail));
cluster.get(late).startup();
cluster.get(late).acceptsOnInstance((InetSocketAddress address) -> {
EndpointState ep;
InetAddressAndPort endpoint = toCassandraInetAddressAndPort(address);
while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
|| !ep.getApplicationState(ApplicationState.STATUS_WITH_PORT).value.startsWith("MOVING"))
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
}).accept(failAddress);
Collection<String> tokens =
cluster.get(late)
.appliesOnInstance((InetSocketAddress address) ->
StorageService.instance.getTokenMetadata()
.getTokens(toCassandraInetAddressAndPort(address))
.stream()
.map(Object::toString)
.collect(Collectors.toList()))
.apply(failAddress);
Assert.assertEquals(expectTokens, tokens);
}
}
public static class BBBootstrapInterceptor
{
final static CountDownLatch bootstrapReady = new CountDownLatch(1);
final static CountDownLatch bootstrapStart = new CountDownLatch(1);
static void install(ClassLoader cl, int nodeNumber)
{
if (nodeNumber != 2)
return;
new ByteBuddy().rebase(StorageService.class)
.method(named("bootstrap").and(takesArguments(2)))
.intercept(MethodDelegation.to(BBBootstrapInterceptor.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
public static boolean bootstrap(Collection<Token> tokens, long bootstrapTimeoutMillis)
{
bootstrapStart.countDown();
Uninterruptibles.awaitUninterruptibly(bootstrapReady);
return false; // bootstrap fails
}
}
@Test
public void testPreventStoppingGossipDuringBootstrap() throws Exception
{
ExecutorService es = Executors.newFixedThreadPool(1);
try (Cluster cluster = builder().withNodes(2)
.withConfig(config -> config.with(GOSSIP)
.with(NETWORK)
.set("auto_bootstrap", true))
.withInstanceInitializer(BBBootstrapInterceptor::install)
.createWithoutStarting();
Closeable ignored = es::shutdown)
{
Runnable test = () ->
{
try
{
cluster.get(2).runOnInstance(() -> {
Uninterruptibles.awaitUninterruptibly(BBBootstrapInterceptor.bootstrapStart);
BBBootstrapInterceptor.bootstrapReady.countDown();
try
{
StorageService.instance.stopGossiping();
Assert.fail("stopGossiping did not fail!");
}
catch (Exception ex)
{
Assert.assertSame(ex.getClass(), IllegalStateException.class);
Assert.assertEquals(ex.getMessage(), "Unable to stop gossip because the node is not in the normal state. Try to stop the node instead.");
}
});
}
finally
{
// shut down the node2 to speed up cluster startup.
cluster.get(2).shutdown();
}
};
Future<?> testResult = es.submit(test);
try
{
cluster.startup();
}
catch (Exception ex) {
// ignore exceptions from startup process. More interested in the test result.
}
testResult.get();
}
es.awaitTermination(5, TimeUnit.SECONDS);
}
@Test
public void testPreventEnablingGossipDuringMove() throws Exception
{
try (Cluster cluster = builder().withNodes(2)
.withConfig(config -> config.with(GOSSIP)
.with(NETWORK))
.start())
{
cluster.get(1).runOnInstance(() -> {
StorageService.instance.stopGossiping();
StorageService.instance.setMovingModeUnsafe();
try
{
StorageService.instance.startGossiping();
Assert.fail("startGossiping did not fail!");
}
catch (Exception ex)
{
Assert.assertSame(ex.getClass(), IllegalStateException.class);
Assert.assertEquals(ex.getMessage(), "Unable to start gossip because the node is not in the normal state.");
}
});
}
}
@Test
public void gossipShutdownUpdatesTokenMetadata() throws Exception
{
// TODO: fails with vnode enabled
try (Cluster cluster = Cluster.build(3)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
.withInstanceInitializer(FailureHelper::installMoveFailure)
.withoutVNodes()
.start())
{
init(cluster, 2);
populate(cluster);
IInvokableInstance node1 = cluster.get(1);
IInvokableInstance node2 = cluster.get(2);
IInvokableInstance node3 = cluster.get(3);
// initiate a move for node2, which will not complete due to the
// ByteBuddy interceptor we injected. Wait for the other two nodes
// to mark node2 as moving before proceeding.
long t2 = Long.parseLong(getLocalToken(node2));
long t3 = Long.parseLong(getLocalToken(node3));
long moveTo = t2 + ((t3 - t2)/2);
String logMsg = "Node " + node2.broadcastAddress() + " state moving, new token " + moveTo;
runAndWaitForLogs(() -> node2.nodetoolResult("move", "--", Long.toString(moveTo)).asserts().failure(),
logMsg,
cluster);
InetSocketAddress movingAddress = node2.broadcastAddress();
// node1 & node3 should now consider some ranges pending for node2
assertPendingRangesForPeer(true, movingAddress, cluster);
// A controlled shutdown causes peers to replace the MOVING status to be with SHUTDOWN, but prior to
// CASSANDRA-16796 this doesn't update TokenMetadata, so they maintain pending ranges for the down node
// indefinitely, even after it has been removed from the ring.
logMsg = "Marked " + node2.broadcastAddress() + " as shutdown";
runAndWaitForLogs(() -> Futures.getUnchecked(node2.shutdown()),
logMsg,
node1, node3);
// node1 & node3 should not consider any ranges as still pending for node2
assertPendingRangesForPeer(false, movingAddress, cluster);
}
}
@Test
public void restartGossipOnGossippingOnlyMember() throws Throwable
{
int originalNodeCount = 1;
int expandedNodeCount = originalNodeCount + 1;
try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(evenlyDistributedTokens(expandedNodeCount, 1))
.withNodeIdTopology(singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance gossippingOnlyMember = cluster.bootstrap(config);
withProperty("cassandra.join_ring", Boolean.toString(false), () -> gossippingOnlyMember.startup(cluster));
assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isGossipRunning()));
gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.stopGossiping());
assertFalse(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isGossipRunning()));
gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.startGossiping());
assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isGossipRunning()));
}
}
private static String getLocalToken(IInvokableInstance node)
{
Collection<String> tokens = ClusterUtils.getLocalTokens(node);
Assertions.assertThat(tokens).hasSize(1);
return tokens.stream().findFirst().get();
}
void assertPendingRangesForPeer(final boolean expectPending, final InetSocketAddress movingAddress, final Cluster cluster)
{
for (IInvokableInstance inst : new IInvokableInstance[]{ cluster.get(1), cluster.get(3)})
{
boolean hasPending = inst.appliesOnInstance((InetSocketAddress address) -> {
InetAddressAndPort peer = toCassandraInetAddressAndPort(address);
PendingRangeCalculatorService.instance.blockUntilFinished();
boolean isMoving = StorageService.instance.getTokenMetadata()
.getMovingEndpoints()
.stream()
.map(pair -> pair.right)
.anyMatch(peer::equals);
return isMoving && !StorageService.instance.getTokenMetadata()
.getPendingRanges(KEYSPACE, peer)
.isEmpty();
}).apply(movingAddress);
assertEquals(String.format("%s should %shave PENDING RANGES for %s",
inst.broadcastAddress().getHostString(),
expectPending ? "" : "not ",
movingAddress),
hasPending, expectPending);
}
}
static void populate(Cluster cluster)
{
cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int PRIMARY KEY)");
for (int i = 0; i < 10; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)",
ConsistencyLevel.ALL,
i);
}
}
public static class FailureHelper
{
static void installMoveFailure(ClassLoader cl, int nodeNumber)
{
if (nodeNumber == 2)
{
new ByteBuddy().redefine(StreamPlan.class)
.method(named("execute"))
.intercept(MethodDelegation.to(FailureHelper.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
}
public static StreamResultFuture execute()
{
throw new RuntimeException("failing to execute move");
}
}
}