blob: 79120a2fedefa5d4a9874b53e948d008707d6cde [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.impl;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.junit.Assume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Constants;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IClassTransformer;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInstanceInitializer;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.api.IMessageSink;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.api.LogAction;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
import org.apache.cassandra.distributed.shared.Metrics;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.shared.ShutdownException;
import org.apache.cassandra.distributed.shared.Versions;
import org.apache.cassandra.io.util.PathUtils;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.Shared.Recursive;
import org.apache.cassandra.utils.concurrent.Condition;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ConfigurationBuilder;
import org.reflections.util.NameHelper;
import static java.util.stream.Stream.of;
import static org.apache.cassandra.distributed.impl.IsolatedExecutor.DEFAULT_SHUTDOWN_EXECUTOR;
import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort;
import static org.apache.cassandra.utils.Shared.Recursive.ALL;
import static org.apache.cassandra.utils.Shared.Recursive.NONE;
import static org.apache.cassandra.utils.Shared.Scope.ANY;
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
/**
* AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}.
*
* All instances created under the same cluster will have a shared ClassLoader that'll preload
* common classes required for configuration and communication (byte buffers, primitives, config
* objects etc). Shared classes are listed in {@link InstanceClassLoader}.
*
* Each instance has its own class loader that will load logging, yaml libraries and all non-shared
* Cassandra package classes. The rule of thumb is that we'd like to have all Cassandra-specific things
* (unless explitily shared through the common classloader) on a per-classloader basis in order to
* allow creating more than one instance of DatabaseDescriptor and other Cassandra singletones.
*
* All actions (reading, writing, schema changes, etc) are executed by serializing lambda/runnables,
* transferring them to instance-specific classloaders, deserializing and running them there. Most of
* the things can be simply captured in closure or passed through `apply` method of the wrapped serializable
* function/callable. You can use {@link Instance#{applies|runs|consumes}OnInstance} for executing
* code on specific instance.
*
* Each instance has its own logger. Each instance log line will contain INSTANCE{instance_id}.
*
* As of today, messaging is faked by hooking into MessagingService, so we're not using usual Cassandra
* handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
* coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
*/
public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
{
public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());
// WARNING: we have this logger not (necessarily) for logging, but
// to ensure we have instantiated the main classloader's LoggerFactory (and any LogbackStatusListener)
// before we instantiate any for a new instance
private static final Logger logger = LoggerFactory.getLogger(AbstractCluster.class);
// include byteman so tests can use
public static final Predicate<String> SHARED_PREDICATE = getSharedClassPredicate(ANY);
private final UUID clusterId = UUID.randomUUID();
private final Path root;
private final ClassLoader sharedClassLoader;
private final Predicate<String> sharedClassPredicate;
private final IClassTransformer classTransformer;
private final int subnet;
private final TokenSupplier tokenSupplier;
private final Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
private final Consumer<IInstanceConfig> configUpdater;
private final int broadcastPort;
// mutated by starting/stopping a node
private final List<I> instances;
private final Map<InetSocketAddress, I> instanceMap;
private final Versions.Version initialVersion;
// mutated by user-facing API
private final MessageFilters filters;
private final INodeProvisionStrategy.Strategy nodeProvisionStrategy;
private final IInstanceInitializer instanceInitializer;
private final int datadirCount;
private volatile Thread.UncaughtExceptionHandler previousHandler = null;
private volatile BiPredicate<Integer, Throwable> ignoreUncaughtThrowable = null;
private final List<Throwable> uncaughtExceptions = new CopyOnWriteArrayList<>();
private final ThreadGroup clusterThreadGroup = new ThreadGroup(clusterId.toString());
private final ShutdownExecutor shutdownExecutor;
private volatile IMessageSink messageSink;
/**
* Common builder, add methods that are applicable to both Cluster and Upgradable cluster here.
*/
public static abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>>
extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, B>
{
private INodeProvisionStrategy.Strategy nodeProvisionStrategy = INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
private ShutdownExecutor shutdownExecutor = DEFAULT_SHUTDOWN_EXECUTOR;
{
// Indicate that we are running in the in-jvm dtest environment
CassandraRelevantProperties.DTEST_IS_IN_JVM_DTEST.setBoolean(true);
// those properties may be set for unit-test optimizations; those should not be used when running dtests
CassandraRelevantProperties.TEST_FLUSH_LOCAL_SCHEMA_CHANGES.reset();
CassandraRelevantProperties.NON_GRACEFUL_SHUTDOWN.reset();
}
public AbstractBuilder(Factory<I, C, B> factory)
{
super(factory);
withSharedClasses(SHARED_PREDICATE);
}
public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy nodeProvisionStrategy)
{
this.nodeProvisionStrategy = nodeProvisionStrategy;
return (B) this;
}
public B withShutdownExecutor(ShutdownExecutor shutdownExecutor)
{
this.shutdownExecutor = shutdownExecutor;
return (B) this;
}
@Override
public C createWithoutStarting() throws IOException
{
// if running as vnode but test sets withoutVNodes(), then skip the test
// AbstractCluster.createInstanceConfig has similar logic, but handles the cases where the test
// attempts to control tokens via config
// when token supplier is defined, use getTokenCount() to see if vnodes is supported or not
if (isVnode())
{
Assume.assumeTrue("vnode is not supported", isVNodeAllowed());
// if token count > 1 and isVnode, then good
Assume.assumeTrue("no-vnode is requested but not supported", getTokenCount() > 1);
}
else
{
Assume.assumeTrue("single-token is not supported", isSingleTokenAllowed());
// if token count == 1 and isVnode == false, then goodAbstractClusterTest
Assume.assumeTrue("vnode is requested but not supported", getTokenCount() == 1);
}
return super.createWithoutStarting();
}
private boolean isVnode()
{
TokenSupplier ts = getTokenSupplier();
return ts == null
? getTokenCount() > 1 // token supplier wasn't defined yet, so rely on getTokenCount()
: ts.tokens(1).size() > 1; // token supplier is defined... check the first instance to see what tokens are used
}
}
protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
{
private final IInstanceConfig config;
private volatile IInvokableInstance delegate;
private volatile Versions.Version version;
@GuardedBy("this")
private volatile boolean isShutdown = true;
@GuardedBy("this")
private InetSocketAddress broadcastAddress;
private int generation = -1;
protected IInvokableInstance delegate()
{
if (delegate == null)
throw new IllegalStateException("Can't use shutdown instances, delegate is null");
return delegate;
}
protected IInvokableInstance delegateForStartup()
{
if (delegate == null)
delegate = newInstance();
return delegate;
}
public Wrapper(Versions.Version version, IInstanceConfig config)
{
this.config = config;
this.version = version;
// we ensure there is always a non-null delegate, so that the executor may be used while the node is offline
this.delegate = newInstance();
this.broadcastAddress = config.broadcastAddress();
}
private IInvokableInstance newInstance()
{
++generation;
IClassTransformer transformer = classTransformer == null ? null : classTransformer.initialise();
ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader, sharedClassPredicate, transformer);
ThreadGroup threadGroup = new ThreadGroup(clusterThreadGroup, "node" + config.num() + (generation > 1 ? "_" + generation : ""));
if (instanceInitializer != null)
instanceInitializer.initialise(classLoader, threadGroup, config.num(), generation);
IInvokableInstance instance;
try
{
instance = Instance.transferAdhocPropagate((SerializableQuadFunction<IInstanceConfig, ClassLoader, FileSystem, ShutdownExecutor, Instance>)Instance::new, classLoader)
.apply(config.forVersion(version.version), classLoader, root.getFileSystem(), shutdownExecutor);
}
catch (InvocationTargetException e)
{
try
{
instance = Instance.transferAdhocPropagate((SerializableTriFunction<IInstanceConfig, ClassLoader, FileSystem, Instance>)Instance::new, classLoader)
.apply(config.forVersion(version.version), classLoader, root.getFileSystem());
}
catch (InvocationTargetException e2)
{
instance = Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
.apply(config.forVersion(version.version), classLoader);
}
catch (IllegalAccessException e2)
{
throw new RuntimeException(e);
}
}
catch (IllegalAccessException e)
{
throw new RuntimeException(e);
}
if (instanceInitializer != null)
instanceInitializer.beforeStartup(instance);
return instance;
}
public Executor executorFor(int verb)
{
if (isShutdown)
throw new IllegalStateException();
// this method must be lock-free to avoid Simulator deadlock
return delegate().executorFor(verb);
}
public IInstanceConfig config()
{
return config;
}
public boolean isShutdown()
{
IInvokableInstance delegate = this.delegate;
// if the instance shuts down on its own, detect that
return isShutdown || (delegate != null && delegate.isShutdown());
}
private boolean isRunning()
{
return !isShutdown();
}
@Override
public boolean isValid()
{
return delegate != null;
}
@Override
public synchronized void startup()
{
startup(AbstractCluster.this);
postStartup();
}
public synchronized void startup(ICluster cluster)
{
if (cluster != AbstractCluster.this)
throw new IllegalArgumentException("Only the owning cluster can be used for startup");
if (isRunning())
throw new IllegalStateException("Can not start a instance that is already running");
isShutdown = false;
// if the delegate isn't running, remove so it can be recreated
if (delegate != null && delegate.isShutdown())
delegate = null;
if (!broadcastAddress.equals(config.broadcastAddress()))
{
// previous address != desired address, so cleanup
InetSocketAddress previous = broadcastAddress;
InetSocketAddress newAddress = config.broadcastAddress();
instanceMap.put(newAddress, (I) this); // if the broadcast address changes, update
instanceMap.remove(previous);
broadcastAddress = newAddress;
// remove delegate to make sure static state is reset
delegate = null;
}
try
{
delegateForStartup().startup(cluster);
}
catch (Throwable t)
{
if (config.get(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN) == null)
{
// its possible that the failure happens after listening and threads are started up
// but without knowing the start up phase it isn't safe to call shutdown, so assume
// that a failed to start instance was shutdown (which would be true if each instance
// was its own JVM).
isShutdown = true;
}
else
{
// user was explict about the desired behavior, respect it
// the most common reason to set this is to set 'false', this will leave the
// instance marked as running, which will have .close shut it down.
isShutdown = (boolean) config.get(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN);
}
throw t;
}
// This duplicates work done in Instance startup, but keeping as other Instance implementations
// do not, so to permit older releases to be tested, repeat the setup
updateMessagingVersions();
if (instanceInitializer != null)
instanceInitializer.afterStartup(this);
}
@Override
public synchronized Future<Void> shutdown()
{
return shutdown(true);
}
@Override
public synchronized Future<Void> shutdown(boolean graceful)
{
if (isShutdown())
throw new IllegalStateException("Instance is not running, so can not be shutdown");
isShutdown = true;
Future<Void> future = delegate.shutdown(graceful);
delegate = null;
return future;
}
public int liveMemberCount()
{
if (isRunning() && delegate != null)
return delegate().liveMemberCount();
throw new IllegalStateException("Cannot get live member count on shutdown instance: " + config.num());
}
public Metrics metrics()
{
if (isShutdown)
throw new IllegalStateException();
return delegate.metrics();
}
public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
{
return delegate().nodetoolResult(withNotifications, commandAndArgs);
}
public long killAttempts()
{
IInvokableInstance local = delegate;
// if shutdown cleared the delegate, then no longer know how many kill attempts happened, so return -1
if (local == null)
return -1;
return local.killAttempts();
}
@Override
public void receiveMessage(IMessage message)
{
IInvokableInstance delegate = this.delegate;
if (isRunning() && delegate != null) // since we sync directly on the other node, we drop messages immediately if we are shutdown
delegate.receiveMessage(message);
}
@Override
public void receiveMessageWithInvokingThread(IMessage message)
{
IInvokableInstance delegate = this.delegate;
if (isRunning() && delegate != null) // since we sync directly on the other node, we drop messages immediately if we are shutdown
delegate.receiveMessageWithInvokingThread(message);
}
@Override
public boolean getLogsEnabled()
{
return delegate().getLogsEnabled();
}
@Override
public LogAction logs()
{
return delegate().logs();
}
@Override
public synchronized void setVersion(Versions.Version version)
{
if (isRunning())
throw new IllegalStateException("Must be shutdown before version can be modified");
// re-initialise
this.version = version;
if (delegate != null)
{
// we can have a non-null delegate even thought we are shutdown, if delegate() has been invoked since shutdown.
delegate.shutdown();
delegate = null;
}
}
@Override
public void uncaughtException(Thread thread, Throwable throwable)
{
IInvokableInstance delegate = this.delegate;
if (delegate != null)
delegate.uncaughtException(thread, throwable);
else
logger.error("uncaught exception in thread {}", thread, throwable);
}
@Override
public String toString()
{
IInvokableInstance delegate = this.delegate;
return delegate == null ? "node" + config.num() : delegate.toString();
}
}
protected AbstractCluster(AbstractBuilder<I, ? extends ICluster<I>, ?> builder)
{
this.root = builder.getRootPath();
this.sharedClassLoader = builder.getSharedClassLoader();
this.sharedClassPredicate = builder.getSharedClasses();
this.classTransformer = builder.getClassTransformer();
this.subnet = builder.getSubnet();
this.tokenSupplier = builder.getTokenSupplier();
this.nodeIdTopology = builder.getNodeIdTopology();
this.configUpdater = builder.getConfigUpdater();
this.broadcastPort = builder.getBroadcastPort();
this.nodeProvisionStrategy = builder.nodeProvisionStrategy;
this.shutdownExecutor = builder.shutdownExecutor;
this.instances = new ArrayList<>();
this.instanceMap = new ConcurrentHashMap<>();
this.initialVersion = builder.getVersion();
this.filters = new MessageFilters();
this.instanceInitializer = builder.getInstanceInitializer2();
this.datadirCount = builder.getDatadirCount();
for (int i = 0; i < builder.getNodeCount(); ++i)
{
int nodeNum = i + 1;
InstanceConfig config = createInstanceConfig(nodeNum);
I instance = newInstanceWrapperInternal(initialVersion, config);
instances.add(instance);
// we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
I prev = instanceMap.put(instance.config().broadcastAddress(), instance);
if (null != prev)
throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddress() + " vs " + prev.broadcastAddress());
}
}
public InstanceConfig newInstanceConfig()
{
return createInstanceConfig(size() + 1);
}
@VisibleForTesting
InstanceConfig createInstanceConfig(int nodeNum)
{
INodeProvisionStrategy provisionStrategy = nodeProvisionStrategy.create(subnet);
Collection<String> tokens = tokenSupplier.tokens(nodeNum);
NetworkTopology topology = buildNetworkTopology(provisionStrategy, nodeIdTopology);
InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, tokens, datadirCount);
config.set(Constants.KEY_DTEST_API_CLUSTER_ID, clusterId.toString());
// if a test sets num_tokens directly, then respect it and only run if vnode or no-vnode is defined
int defaultTokenCount = config.getInt("num_tokens");
assert tokens.size() == defaultTokenCount : String.format("num_tokens=%d but tokens are %s; size does not match", defaultTokenCount, tokens);
String defaultTokens = config.getString("initial_token");
if (configUpdater != null)
{
configUpdater.accept(config);
int testTokenCount = config.getInt("num_tokens");
if (defaultTokenCount != testTokenCount)
{
if (testTokenCount == 1)
{
// test is no-vnode, but running with vnode, so skip
Assume.assumeTrue("vnode is not supported", false);
}
else
{
Assume.assumeTrue("no-vnode is requested but not supported", defaultTokenCount > 1);
// if the test controls initial_token or GOSSIP is enabled, then the test is safe to run
if (defaultTokens.equals(config.getString("initial_token")))
{
// test didn't define initial_token
Assume.assumeTrue("vnode is enabled and num_tokens is defined in test without GOSSIP or setting initial_token", config.has(Feature.GOSSIP));
config.remove("initial_token");
}
else
{
// test defined initial_token; trust it
}
}
}
}
return config;
}
public static NetworkTopology buildNetworkTopology(INodeProvisionStrategy provisionStrategy,
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
{
NetworkTopology topology = NetworkTopology.build("", 0, Collections.emptyMap());
IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
InetSocketAddress addressAndPort = addressAndPort(provisionStrategy.ipAddress(nodeId), provisionStrategy.storagePort(nodeId));
NetworkTopology.DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
topology.put(addressAndPort, dcAndRack);
});
return topology;
}
protected abstract I newInstanceWrapper(Versions.Version version, IInstanceConfig config);
protected I newInstanceWrapperInternal(Versions.Version version, IInstanceConfig config)
{
config.validate();
return newInstanceWrapper(version, config);
}
public I bootstrap(IInstanceConfig config)
{
I instance = newInstanceWrapperInternal(initialVersion, config);
instances.add(instance);
I prev = instanceMap.put(config.broadcastAddress(), instance);
if (null != prev)
{
throw new IllegalStateException(String.format("This cluster already contains a node (%d) with with same address and port: %s",
config.num(),
instance));
}
return instance;
}
/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
public ICoordinator coordinator(int node)
{
return instances.get(node - 1).coordinator();
}
public Stream<ICoordinator> coordinators()
{
return stream().map(IInstance::coordinator);
}
public List<I> get(int... nodes)
{
if (nodes == null || nodes.length == 0)
throw new IllegalArgumentException("No nodes provided");
List<I> list = new ArrayList<>(nodes.length);
for (int i : nodes)
list.add(get(i));
return list;
}
/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
public I get(int node)
{
return instances.get(node - 1);
}
public I get(InetSocketAddress addr)
{
return instanceMap.get(addr);
}
public I getFirstRunningInstance()
{
return stream().filter(i -> !i.isShutdown()).findFirst().orElseThrow(
() -> new IllegalStateException("All instances are shutdown"));
}
public int size()
{
return instances.size();
}
public Stream<I> stream()
{
return instances.stream();
}
public Stream<I> stream(String dcName)
{
return instances.stream().filter(i -> i.config().localDatacenter().equals(dcName));
}
public Stream<I> stream(String dcName, String rackName)
{
return instances.stream().filter(i -> i.config().localDatacenter().equals(dcName) &&
i.config().localRack().equals(rackName));
}
public void run(Consumer<? super I> action, Predicate<I> filter)
{
run(Collections.singletonList(action), filter);
}
public void run(Collection<Consumer<? super I>> actions, Predicate<I> filter)
{
stream().forEach(instance -> {
for (Consumer<? super I> action : actions)
{
if (filter.test(instance))
action.accept(instance);
}
});
}
public void run(Consumer<? super I> action, int instanceId, int... moreInstanceIds)
{
run(Collections.singletonList(action), instanceId, moreInstanceIds);
}
public void run(List<Consumer<? super I>> actions, int instanceId, int... moreInstanceIds)
{
int[] instanceIds = new int[moreInstanceIds.length + 1];
instanceIds[0] = instanceId;
System.arraycopy(moreInstanceIds, 0, instanceIds, 1, moreInstanceIds.length);
for (int idx : instanceIds)
{
for (Consumer<? super I> action : actions)
action.accept(this.get(idx));
}
}
public void forEach(IIsolatedExecutor.SerializableRunnable runnable)
{
forEach(i -> i.sync(runnable));
}
public void forEach(Consumer<? super I> consumer)
{
forEach(instances, consumer);
}
public void forEach(List<I> instancesForOp, Consumer<? super I> consumer)
{
instancesForOp.forEach(consumer);
}
public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
{
parallelForEach(instances, consumer, timeout, unit);
}
public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
{
FBUtilities.waitOnFutures(instances.stream()
.map(i -> i.async(consumer).apply(i))
.collect(Collectors.toList()),
timeout, unit);
}
public IMessageFilters filters()
{
return filters;
}
public synchronized void setMessageSink(IMessageSink sink)
{
if (messageSink != null && sink != null)
throw new IllegalStateException();
this.messageSink = sink;
}
public void deliverMessage(InetSocketAddress to, IMessage message)
{
IMessageSink sink = messageSink;
if (sink == null) get(to).receiveMessage(message);
else sink.accept(to, message);
}
public IMessageFilters.Builder verbs(Verb... verbs)
{
int[] ids = new int[verbs.length];
for (int i = 0; i < verbs.length; ++i)
ids[i] = verbs[i].id;
return filters.verbs(ids);
}
public void disableAutoCompaction(String keyspace)
{
forEach(() -> {
for (ColumnFamilyStore cs : Keyspace.open(keyspace).getColumnFamilyStores())
cs.disableAutoCompaction();
});
}
public void schemaChange(String query)
{
schemaChange(query, false);
}
/**
* Change the schema of the cluster, tolerating stopped nodes. N.B. the schema
* will not automatically be updated when stopped nodes are restarted, individual tests need to
* re-synchronize somehow (by gossip or some other mechanism).
* @param query Schema altering statement
*/
public void schemaChangeIgnoringStoppedInstances(String query)
{
schemaChange(query, true);
}
private void schemaChange(String query, boolean ignoreStoppedInstances)
{
I instance = ignoreStoppedInstances ? getFirstRunningInstance() : get(1);
schemaChange(query, ignoreStoppedInstances, instance);
}
public void schemaChange(String query, boolean ignoreStoppedInstances, I instance)
{
instance.sync(() -> {
try (SchemaChangeMonitor monitor = new SchemaChangeMonitor())
{
if (ignoreStoppedInstances)
monitor.ignoreStoppedInstances();
monitor.startPolling();
// execute the schema change
instance.coordinator().execute(query, ConsistencyLevel.ALL);
monitor.waitForCompletion();
}
}).run();
}
public void schemaChange(String statement, int instance)
{
get(instance).schemaChangeInternal(statement);
}
private void updateMessagingVersions()
{
for (IInstance reportTo : instances)
{
if (reportTo.isShutdown())
continue;
for (IInstance reportFrom : instances)
{
if (reportFrom == reportTo || reportFrom.isShutdown())
continue;
int minVersion = Math.min(reportFrom.getMessagingVersion(), reportTo.getMessagingVersion());
reportTo.setMessagingVersion(reportFrom.broadcastAddress(), minVersion);
}
}
}
public abstract class ChangeMonitor implements AutoCloseable
{
final List<IListen.Cancel> cleanup;
final Condition completed;
private final long timeOut;
private final TimeUnit timeoutUnit;
protected Predicate<IInstance> instanceFilter;
volatile boolean initialized;
public ChangeMonitor(long timeOut, TimeUnit timeoutUnit)
{
this.timeOut = timeOut;
this.timeoutUnit = timeoutUnit;
this.instanceFilter = i -> true;
this.cleanup = new ArrayList<>(instances.size());
this.completed = newOneTimeCondition();
}
public void ignoreStoppedInstances()
{
instanceFilter = instanceFilter.and(i -> !i.isShutdown());
}
protected void signal()
{
if (initialized && !completed.isSignalled() && isCompleted())
completed.signalAll();
}
@Override
public void close()
{
for (IListen.Cancel cancel : cleanup)
cancel.cancel();
}
public void waitForCompletion()
{
initialized = true;
signal();
try
{
if (!completed.await(timeOut, timeoutUnit))
throw new IllegalStateException(getMonitorTimeoutMessage());
}
catch (InterruptedException e)
{
throw new IllegalStateException("Caught exception while waiting for completion", e);
}
}
protected void startPolling()
{
instances.stream().filter(instanceFilter).forEach(instance -> cleanup.add(startPolling(instance)));
}
protected abstract IListen.Cancel startPolling(IInstance instance);
protected abstract boolean isCompleted();
protected abstract String getMonitorTimeoutMessage();
}
/**
* Will wait for a schema change AND agreement that occurs after it is created
* (and precedes the invocation to waitForAgreement)
* <p>
* Works by simply checking if all UUIDs agree after any schema version change event,
* so long as the waitForAgreement method has been entered (indicating the change has
* taken place on the coordinator)
* <p>
* This could perhaps be made a little more robust, but this should more than suffice.
*/
public class SchemaChangeMonitor extends ChangeMonitor
{
public SchemaChangeMonitor()
{
super(70, TimeUnit.SECONDS);
}
protected IListen.Cancel startPolling(IInstance instance)
{
return instance.listen().schema(this::signal);
}
protected boolean isCompleted()
{
return 1 == instances.stream().filter(instanceFilter).map(IInstance::schemaVersion).distinct().count();
}
protected String getMonitorTimeoutMessage()
{
return String.format("Schema agreement not reached. Schema versions of the instances: %s",
instances.stream().map(IInstance::schemaVersion).collect(Collectors.toList()));
}
}
public class AllMembersAliveMonitor extends ChangeMonitor
{
public AllMembersAliveMonitor()
{
super(60, TimeUnit.SECONDS);
}
protected IListen.Cancel startPolling(IInstance instance)
{
return instance.listen().liveMembers(this::signal);
}
protected boolean isCompleted()
{
return instances.stream().allMatch(i -> !i.config().has(Feature.GOSSIP) || i.liveMemberCount() == instances.size());
}
protected String getMonitorTimeoutMessage()
{
return "Live member count did not converge across all instances";
}
}
public void startup()
{
previousHandler = Thread.getDefaultUncaughtExceptionHandler();
Thread.setDefaultUncaughtExceptionHandler(this::uncaughtExceptions);
try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
{
monitor.startPolling();
// Start any instances with auto_bootstrap enabled first, and in series to avoid issues
// with multiple nodes bootstrapping with consistent range movement enabled,
// and then start any instances with it disabled in parallel.
List<I> startSequentially = new ArrayList<>();
List<I> startParallel = new ArrayList<>();
for (int i = 0; i < instances.size(); i++)
{
I instance = instances.get(i);
if (i == 0 || (boolean) instance.config().get("auto_bootstrap"))
startSequentially.add(instance);
else
startParallel.add(instance);
}
forEach(startSequentially, i -> {
i.startup(this);
i.postStartup();
});
parallelForEach(startParallel, i -> {
i.startup(this);
i.postStartup();
}, 0, null);
monitor.waitForCompletion();
}
}
private void uncaughtExceptions(Thread thread, Throwable error)
{
if (!(thread.getContextClassLoader() instanceof InstanceClassLoader))
{
Thread.UncaughtExceptionHandler handler = previousHandler;
if (null != handler)
handler.uncaughtException(thread, error);
return;
}
InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader();
get(cl.getInstanceId()).uncaughtException(thread, error);
BiPredicate<Integer, Throwable> ignore = ignoreUncaughtThrowable;
I instance = get(cl.getInstanceId());
if ((ignore == null || !ignore.test(cl.getInstanceId(), error)) && instance != null && !instance.isShutdown())
uncaughtExceptions.add(error);
}
@Override
public void setUncaughtExceptionsFilter(BiPredicate<Integer, Throwable> ignoreUncaughtThrowable)
{
this.ignoreUncaughtThrowable = ignoreUncaughtThrowable;
}
@Override
public void close()
{
logger.info("Closing cluster {}", this.clusterId);
FBUtilities.closeQuietly(instanceInitializer);
FBUtilities.waitOnFutures(instances.stream()
.filter(i -> !i.isShutdown())
.map(IInstance::shutdown)
.collect(Collectors.toList()),
1L, TimeUnit.MINUTES);
instances.clear();
instanceMap.clear();
PathUtils.setDeletionListener(ignore -> {});
// Make sure to only delete directory when threads are stopped
if (Files.exists(root))
PathUtils.deleteRecursive(root);
Thread.setDefaultUncaughtExceptionHandler(previousHandler);
previousHandler = null;
checkAndResetUncaughtExceptions();
//checkForThreadLeaks();
//withThreadLeakCheck(futures);
}
@Override
public void checkAndResetUncaughtExceptions()
{
List<Throwable> drain = new ArrayList<>(uncaughtExceptions.size());
uncaughtExceptions.removeIf(e -> {
drain.add(e);
return true;
});
if (!drain.isEmpty())
throw new ShutdownException(drain);
}
private void checkForThreadLeaks()
{
//This is an alternate version of the thread leak check that just checks to see if any threads are still alive
// with the context classloader.
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
threadSet.stream().filter(t->t.getContextClassLoader() instanceof InstanceClassLoader).forEach(t->{
t.setContextClassLoader(null);
throw new RuntimeException("Unterminated thread detected " + t.getName() + " in group " + t.getThreadGroup().getName());
});
}
// We do not want this check to run every time until we fix problems with tread stops
private void withThreadLeakCheck(List<Future<?>> futures)
{
FBUtilities.waitOnFutures(futures);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
threadSet = Sets.difference(threadSet, Collections.singletonMap(Thread.currentThread(), null).keySet());
if (!threadSet.isEmpty())
{
for (Thread thread : threadSet)
{
System.out.println(thread);
System.out.println(Arrays.toString(thread.getStackTrace()));
}
throw new RuntimeException(String.format("Not all threads have shut down. %d threads are still running: %s", threadSet.size(), threadSet));
}
}
public List<Token> tokens()
{
return stream()
.flatMap(i ->
{
try
{
IPartitioner partitioner = ((IPartitioner)Class.forName(i.config().getString("partitioner")).newInstance());
return Stream.of(i.config().getString("initial_token").split(",")).map(partitioner.getTokenFactory()::fromString);
}
catch (Throwable t)
{
throw new RuntimeException(t);
}
})
.collect(Collectors.toList());
}
private static Set<String> findClassesMarkedForSharedClassLoader(Class<?>[] share, Shared.Scope ... scopes)
{
return findClassesMarkedForSharedClassLoader(share, ImmutableSet.copyOf(scopes)::contains);
}
private static Set<String> findClassesMarkedForSharedClassLoader(Class<?>[] share, Predicate<Shared.Scope> scopes)
{
Set<Class<?>> classes = findClassesMarkedWith(Shared.class, a -> of(a.scope()).anyMatch(scopes));
Collections.addAll(classes, share);
assertTransitiveClosure(classes);
return toNames(classes);
}
private static Set<String> findClassesMarkedForInstanceClassLoader(Class<?>[] isolate)
{
Set<Class<?>> classes = findClassesMarkedWith(Isolated.class, ignore -> true);
Collections.addAll(classes, isolate);
return toNames(classes);
}
public static Predicate<String> getSharedClassPredicate(Shared.Scope ... scopes)
{
return getSharedClassPredicate(new Class[0], new Class[0], scopes);
}
public static Predicate<String> getSharedClassPredicate(Class<?>[] isolate, Class<?>[] share, Shared.Scope ... scopes)
{
Set<String> shared = findClassesMarkedForSharedClassLoader(share, scopes);
Set<String> isolated = findClassesMarkedForInstanceClassLoader(isolate);
return s -> {
if (isolated.contains(s))
return false;
return shared.contains(s) ||
InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
s.startsWith("org.jboss.byteman");
};
}
private static <A extends Annotation> Set<Class<?>> findClassesMarkedWith(Class<A> annotation, Predicate<A> testAnnotation)
{
Reflections reflections = new Reflections(ConfigurationBuilder.build("org.apache.cassandra").setExpandSuperTypes(false));
return Utils.INSTANCE.forNames(reflections.get(Scanners.TypesAnnotated.get(annotation.getName())),
reflections.getConfiguration().getClassLoaders())
.stream()
.filter(testAnnotation(annotation, testAnnotation))
.flatMap(expander())
.collect(Collectors.toSet());
}
private static Set<String> toNames(Set<Class<?>> classes)
{
return classes.stream().map(Class::getName).collect(Collectors.toSet());
}
private static <A extends Annotation> Predicate<Class<?>> testAnnotation(Class<A> annotation, Predicate<A> test)
{
return clazz -> {
A[] annotations = clazz.getDeclaredAnnotationsByType(annotation);
for (A a : annotations)
{
if (!test.test(a))
return false;
}
return true;
};
}
private static void assertTransitiveClosure(Set<Class<?>> classes)
{
Set<Class<?>> tested = new HashSet<>();
for (Class<?> clazz : classes)
{
forEach(test -> {
if (!classes.contains(test))
throw new AssertionError(clazz.getName() + " is shared, but its dependency " + test + " is not");
}, new SharedParams(ALL, ALL, NONE), clazz, tested);
}
}
private static class SharedParams
{
final Recursive ancestors, members, inner;
private SharedParams(Recursive ancestors, Recursive members, Recursive inner)
{
this.ancestors = ancestors;
this.members = members;
this.inner = inner;
}
private SharedParams(Shared shared)
{
this.ancestors = shared.ancestors();
this.members = shared.members();
this.inner = shared.inner();
}
}
private static void forEach(Consumer<Class<?>> forEach, SharedParams shared, Class<?> cur, Set<Class<?>> done)
{
if (null == (cur = consider(cur, done)))
return;
forEach.accept(cur);
switch (shared.ancestors)
{
case ALL:
forEach(forEach, shared, cur.getSuperclass(), done);
case INTERFACES:
for (Class<?> i : cur.getInterfaces())
forEach(forEach, shared, i, done);
}
if (shared.members != NONE)
{
for (Field field : cur.getDeclaredFields())
{
if ((field.getModifiers() & Modifier.PRIVATE) == 0)
forEachMatch(shared.members, forEach, shared, field.getType(), done);
}
for (Method method : cur.getDeclaredMethods())
{
if ((method.getModifiers() & Modifier.PRIVATE) == 0)
{
forEachMatch(shared.members, forEach, shared, method.getReturnType(), done);
forEachMatch(shared.members, forEach, shared, method.getParameterTypes(), done);
}
}
}
if (shared.inner != NONE)
forEachMatch(shared.inner, forEach, shared, cur.getDeclaredClasses(), done);
}
private static void forEachMatch(Recursive ifMatches, Consumer<Class<?>> forEach, SharedParams shared, Class<?>[] classes, Set<Class<?>> done)
{
for (Class<?> cur : classes)
forEachMatch(ifMatches, forEach, shared, cur, done);
}
private static void forEachMatch(Recursive ifMatches, Consumer<Class<?>> forEach, SharedParams shared, Class<?> cur, Set<Class<?>> done)
{
if (ifMatches == ALL || isInterface(cur))
forEach(forEach, shared, cur, done);
}
private static boolean isInterface(Class<?> test)
{
return test.isInterface() || test.isEnum() || Throwable.class.isAssignableFrom(test);
}
private static Function<Class<?>, Stream<Class<?>>> expander()
{
Set<Class<?>> done = new HashSet<>();
return clazz -> expand(clazz, done);
}
private static Stream<Class<?>> expand(Class<?> clazz, Set<Class<?>> done)
{
Optional<Shared> maybeShared = of(clazz.getDeclaredAnnotationsByType(Shared.class)).findFirst();
if (!maybeShared.isPresent())
return Stream.of(clazz);
Shared shared = maybeShared.get();
if (shared.inner() == NONE && shared.members() == NONE && shared.ancestors() == NONE)
return Stream.of(clazz);
Set<Class<?>> closure = new HashSet<>();
forEach(closure::add, new SharedParams(shared), clazz, done);
return closure.stream();
}
private static Class<?> consider(Class<?> consider, Set<Class<?>> considered)
{
if (consider == null) return null;
while (consider.isArray()) // TODO (future): this is inadequate handling of array types (fine for now)
consider = consider.getComponentType();
if (consider.isPrimitive()) return null;
if (consider.getPackage() != null && consider.getPackage().getName().startsWith("java.")) return null;
if (!considered.add(consider)) return null;
if (InstanceClassLoader.getDefaultLoadSharedFilter().test(consider.getName())) return null;
return consider;
}
// 3.0 and earlier clusters must have unique InetAddressAndPort for each InetAddress
public static <I extends IInstance> Map<InetSocketAddress, I> getUniqueAddressLookup(ICluster<I> cluster)
{
return getUniqueAddressLookup(cluster, Function.identity());
}
public static <I extends IInstance, V> Map<InetSocketAddress, V> getUniqueAddressLookup(ICluster<I> cluster, Function<I, V> function)
{
Map<InetSocketAddress, V> lookup = new HashMap<>();
cluster.stream().forEach(instance -> {
InetSocketAddress address = instance.broadcastAddress();
if (!address.equals(instance.config().broadcastAddress()))
throw new IllegalStateException("addressAndPort mismatch: " + address + " vs " + instance.config().broadcastAddress());
V prev = lookup.put(address, function.apply(instance));
if (null != prev)
throw new IllegalStateException("This version of Cassandra does not support multiple nodes with the same InetAddress: " + address + " vs " + prev);
});
return lookup;
}
// after upgrading a static function became an interface method, so need this class to mimic old behavior
private enum Utils implements NameHelper
{
INSTANCE;
}
}