blob: 83dbfbc9f4e0415e1c4e9242bb6743c0b65c7356 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.impl;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Constants;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.api.LogAction;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
import org.apache.cassandra.distributed.shared.MessageFilters;
import org.apache.cassandra.distributed.shared.Metrics;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.shared.Shared;
import org.apache.cassandra.distributed.shared.ShutdownException;
import org.apache.cassandra.distributed.shared.Versions;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.reflections.Reflections;
/**
* AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}.
*
* All instances created under the same cluster will have a shared ClassLoader that'll preload
* common classes required for configuration and communication (byte buffers, primitives, config
* objects etc). Shared classes are listed in {@link InstanceClassLoader}.
*
* Each instance has its own class loader that will load logging, yaml libraries and all non-shared
* Cassandra package classes. The rule of thumb is that we'd like to have all Cassandra-specific things
* (unless explitily shared through the common classloader) on a per-classloader basis in order to
* allow creating more than one instance of DatabaseDescriptor and other Cassandra singletones.
*
* All actions (reading, writing, schema changes, etc) are executed by serializing lambda/runnables,
* transferring them to instance-specific classloaders, deserializing and running them there. Most of
* the things can be simply captured in closure or passed through `apply` method of the wrapped serializable
* function/callable. You can use {@link Instance#{applies|runs|consumes}OnInstance} for executing
* code on specific instance.
*
* Each instance has its own logger. Each instance log line will contain INSTANCE{instance_id}.
*
* As of today, messaging is faked by hooking into MessagingService, so we're not using usual Cassandra
* handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
* coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
*/
public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
{
public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());
// WARNING: we have this logger not (necessarily) for logging, but
// to ensure we have instantiated the main classloader's LoggerFactory (and any LogbackStatusListener)
// before we instantiate any for a new instance
private static final Logger logger = LoggerFactory.getLogger(AbstractCluster.class);
private static final AtomicInteger GENERATION = new AtomicInteger();
// include byteman so tests can use
private static final Set<String> SHARED_CLASSES = findClassesMarkedForSharedClassLoader();
private static final Predicate<String> SHARED_PREDICATE = s ->
SHARED_CLASSES.contains(s) ||
InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
s.startsWith("org.jboss.byteman");
private final UUID clusterId = UUID.randomUUID();
private final File root;
private final ClassLoader sharedClassLoader;
private final int subnet;
private final TokenSupplier tokenSupplier;
private final Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
private final Consumer<IInstanceConfig> configUpdater;
private final int broadcastPort;
// mutated by starting/stopping a node
private final List<I> instances;
private final Map<InetSocketAddress, I> instanceMap;
private final Versions.Version initialVersion;
// mutated by user-facing API
private final MessageFilters filters;
private final BiConsumer<ClassLoader, Integer> instanceInitializer;
private final int datadirCount;
private volatile BiPredicate<Integer, Throwable> ignoreUncaughtThrowable = null;
private final List<Throwable> uncaughtExceptions = new CopyOnWriteArrayList<>();
private volatile Thread.UncaughtExceptionHandler previousHandler = null;
protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
{
private final int generation;
private final IInstanceConfig config;
private volatile IInvokableInstance delegate;
private volatile Versions.Version version;
private volatile boolean isShutdown = true;
protected IInvokableInstance delegate()
{
if (delegate == null)
throw new IllegalStateException("Can't use shut down instances, delegate is null");
return delegate;
}
protected IInvokableInstance delegateForStartup()
{
if (delegate == null)
delegate = newInstance(generation);
return delegate;
}
public Wrapper(int generation, Versions.Version version, IInstanceConfig config)
{
this.generation = generation;
this.config = config;
this.version = version;
// we ensure there is always a non-null delegate, so that the executor may be used while the node is offline
this.delegate = newInstance(generation);
}
private IInvokableInstance newInstance(int generation)
{
ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader, SHARED_PREDICATE);
if (instanceInitializer != null)
instanceInitializer.accept(classLoader, config.num());
return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader)
.apply(config, classLoader);
}
public IInstanceConfig config()
{
return config;
}
public boolean isShutdown()
{
return isShutdown;
}
@Override
public synchronized void startup()
{
startup(AbstractCluster.this);
}
@Override
public synchronized void startup(ICluster cluster)
{
if (!isShutdown)
throw new IllegalStateException();
delegateForStartup().startup(cluster);
isShutdown = false;
updateMessagingVersions();
}
@Override
public synchronized Future<Void> shutdown()
{
return shutdown(true);
}
@Override
public synchronized Future<Void> shutdown(boolean graceful)
{
if (isShutdown)
throw new IllegalStateException();
isShutdown = true;
Future<Void> future = delegate.shutdown(graceful);
delegate = null;
return future;
}
public int liveMemberCount()
{
if (!isShutdown && delegate != null)
return delegate().liveMemberCount();
throw new IllegalStateException("Cannot get live member count on shutdown instance");
}
public Metrics metrics()
{
if (isShutdown)
throw new IllegalStateException();
return delegate.metrics();
}
public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
{
return delegate().nodetoolResult(withNotifications, commandAndArgs);
}
public long killAttempts()
{
IInvokableInstance local = delegate;
// if shutdown cleared the delegate, then no longer know how many kill attempts happened, so return -1
if (local == null)
return -1;
return local.killAttempts();
}
@Override
public void receiveMessage(IMessage message)
{
IInvokableInstance delegate = this.delegate;
if (!isShutdown && delegate != null) // since we sync directly on the other node, we drop messages immediately if we are shutdown
delegate.receiveMessage(message);
}
@Override
public boolean getLogsEnabled()
{
return delegate().getLogsEnabled();
}
@Override
public LogAction logs()
{
return delegate().logs();
}
@Override
public synchronized void setVersion(Versions.Version version)
{
if (!isShutdown)
throw new IllegalStateException("Must be shutdown before version can be modified");
// re-initialise
this.version = version;
if (delegate != null)
{
// we can have a non-null delegate even thought we are shutdown, if delegate() has been invoked since shutdown.
delegate.shutdown();
delegate = null;
}
}
public void uncaughtException(Thread thread, Throwable throwable)
{
IInvokableInstance delegate = this.delegate;
if (delegate != null)
delegate.uncaughtException(thread, throwable);
else
logger.error("uncaught exception in thread {}", thread, throwable);
}
}
protected AbstractCluster(AbstractBuilder<I, ? extends ICluster<I>, ?> builder)
{
this.root = builder.getRoot();
this.sharedClassLoader = builder.getSharedClassLoader();
this.subnet = builder.getSubnet();
this.tokenSupplier = builder.getTokenSupplier();
this.nodeIdTopology = builder.getNodeIdTopology();
this.configUpdater = builder.getConfigUpdater();
this.broadcastPort = builder.getBroadcastPort();
this.instances = new ArrayList<>();
this.instanceMap = new HashMap<>();
this.initialVersion = builder.getVersion();
this.filters = new MessageFilters();
this.instanceInitializer = builder.getInstanceInitializer();
this.datadirCount = builder.getDatadirCount();
int generation = GENERATION.incrementAndGet();
for (int i = 0; i < builder.getNodeCount(); ++i)
{
int nodeNum = i + 1;
InstanceConfig config = createInstanceConfig(nodeNum);
I instance = newInstanceWrapperInternal(generation, initialVersion, config);
instances.add(instance);
// we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
I prev = instanceMap.put(instance.config().broadcastAddress(), instance);
if (null != prev)
throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddress() + " vs " + prev.broadcastAddress());
}
}
public InstanceConfig newInstanceConfig()
{
return createInstanceConfig(size() + 1);
}
private InstanceConfig createInstanceConfig(int nodeNum)
{
String ipPrefix = "127.0." + subnet + ".";
String seedIp = ipPrefix + "1";
String ipAddress = ipPrefix + nodeNum;
long token = tokenSupplier.token(nodeNum);
NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology);
InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp, datadirCount);
config.set(Constants.KEY_DTEST_API_CLUSTER_ID, clusterId.toString());
if (configUpdater != null)
configUpdater.accept(config);
return config;
}
protected abstract I newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config);
protected I newInstanceWrapperInternal(int generation, Versions.Version version, IInstanceConfig config)
{
config.validate();
return newInstanceWrapper(generation, version, config);
}
public I bootstrap(IInstanceConfig config)
{
if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK))
throw new IllegalStateException("New nodes can only be bootstrapped when gossip and networking is enabled.");
I instance = newInstanceWrapperInternal(0, initialVersion, config);
instances.add(instance);
I prev = instanceMap.put(config.broadcastAddress(), instance);
if (null != prev)
{
throw new IllegalStateException(String.format("This cluster already contains a node (%d) with with same address and port: %s",
config.num(),
instance));
}
return instance;
}
/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
public ICoordinator coordinator(int node)
{
return instances.get(node - 1).coordinator();
}
/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
public I get(int node)
{
return instances.get(node - 1);
}
public I get(InetSocketAddress addr)
{
return instanceMap.get(addr);
}
public I getFirstRunningInstance()
{
return stream().filter(i -> !i.isShutdown()).findFirst().orElseThrow(
() -> new IllegalStateException("All instances are shutdown"));
}
public int size()
{
return instances.size();
}
public Stream<I> stream()
{
return instances.stream();
}
public Stream<I> stream(String dcName)
{
return instances.stream().filter(i -> i.config().localDatacenter().equals(dcName));
}
public Stream<I> stream(String dcName, String rackName)
{
return instances.stream().filter(i -> i.config().localDatacenter().equals(dcName) &&
i.config().localRack().equals(rackName));
}
public void forEach(IIsolatedExecutor.SerializableRunnable runnable)
{
forEach(i -> i.sync(runnable));
}
public void forEach(Consumer<? super I> consumer)
{
forEach(instances, consumer);
}
public void forEach(List<I> instancesForOp, Consumer<? super I> consumer)
{
instancesForOp.forEach(consumer);
}
public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
{
parallelForEach(instances, consumer, timeout, unit);
}
public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
{
FBUtilities.waitOnFutures(instances.stream()
.map(i -> i.async(consumer).apply(i))
.collect(Collectors.toList()),
timeout, unit);
}
public IMessageFilters filters()
{
return filters;
}
public IMessageFilters.Builder verbs(MessagingService.Verb... verbs)
{
int[] ids = new int[verbs.length];
for (int i = 0; i < verbs.length; ++i)
ids[i] = verbs[i].ordinal();
return filters.verbs(ids);
}
public void disableAutoCompaction(String keyspace)
{
forEach(() -> {
for (ColumnFamilyStore cs : Keyspace.open(keyspace).getColumnFamilyStores())
cs.disableAutoCompaction();
});
}
public void schemaChange(String query)
{
schemaChange(query, false);
}
/**
* Change the schema of the cluster, tolerating stopped nodes. N.B. the schema
* will not automatically be updated when stopped nodes are restarted, individual tests need to
* re-synchronize somehow (by gossip or some other mechanism).
* @param query Schema altering statement
*/
public void schemaChangeIgnoringStoppedInstances(String query)
{
schemaChange(query, true);
}
private void schemaChange(String query, boolean ignoreStoppedInstances)
{
I instance = ignoreStoppedInstances ? getFirstRunningInstance() : get(1);
schemaChange(query, ignoreStoppedInstances, instance);
}
public void schemaChange(String query, boolean ignoreStoppedInstances, I instance)
{
instance.sync(() -> {
try (SchemaChangeMonitor monitor = new SchemaChangeMonitor())
{
if (ignoreStoppedInstances)
monitor.ignoreStoppedInstances();
monitor.startPolling();
// execute the schema change
instance.coordinator().execute(query, ConsistencyLevel.ALL);
monitor.waitForCompletion();
}
}).run();
}
public void schemaChange(String statement, int instance)
{
get(instance).schemaChangeInternal(statement);
}
private void updateMessagingVersions()
{
for (IInstance reportTo : instances)
{
if (reportTo.isShutdown())
continue;
for (IInstance reportFrom : instances)
{
if (reportFrom == reportTo || reportFrom.isShutdown())
continue;
int minVersion = Math.min(reportFrom.getMessagingVersion(), reportTo.getMessagingVersion());
reportTo.setMessagingVersion(reportFrom.broadcastAddress(), minVersion);
}
}
}
public abstract class ChangeMonitor implements AutoCloseable
{
final List<IListen.Cancel> cleanup;
final SimpleCondition completed;
private final long timeOut;
private final TimeUnit timeoutUnit;
protected Predicate<IInstance> instanceFilter;
volatile boolean initialized;
public ChangeMonitor(long timeOut, TimeUnit timeoutUnit)
{
this.timeOut = timeOut;
this.timeoutUnit = timeoutUnit;
this.instanceFilter = i -> true;
this.cleanup = new ArrayList<>(instances.size());
this.completed = new SimpleCondition();
}
public void ignoreStoppedInstances()
{
instanceFilter = instanceFilter.and(i -> !i.isShutdown());
}
protected void signal()
{
if (initialized && !completed.isSignaled() && isCompleted())
completed.signalAll();
}
@Override
public void close()
{
for (IListen.Cancel cancel : cleanup)
cancel.cancel();
}
public void waitForCompletion()
{
initialized = true;
signal();
try
{
if (!completed.await(timeOut, timeoutUnit))
throw new IllegalStateException(getMonitorTimeoutMessage());
}
catch (InterruptedException e)
{
throw new IllegalStateException("Caught exception while waiting for completion", e);
}
}
protected void startPolling()
{
instances.stream().filter(instanceFilter).forEach(instance -> cleanup.add(startPolling(instance)));
}
protected abstract IListen.Cancel startPolling(IInstance instance);
protected abstract boolean isCompleted();
protected abstract String getMonitorTimeoutMessage();
}
/**
* Will wait for a schema change AND agreement that occurs after it is created
* (and precedes the invocation to waitForAgreement)
* <p>
* Works by simply checking if all UUIDs agree after any schema version change event,
* so long as the waitForAgreement method has been entered (indicating the change has
* taken place on the coordinator)
* <p>
* This could perhaps be made a little more robust, but this should more than suffice.
*/
public class SchemaChangeMonitor extends ChangeMonitor
{
public SchemaChangeMonitor()
{
super(70, TimeUnit.SECONDS);
}
protected IListen.Cancel startPolling(IInstance instance)
{
return instance.listen().schema(this::signal);
}
protected boolean isCompleted()
{
return 1 == instances.stream().filter(instanceFilter).map(IInstance::schemaVersion).distinct().count();
}
protected String getMonitorTimeoutMessage()
{
return String.format("Schema agreement not reached. Schema versions of the instances: %s",
instances.stream().map(IInstance::schemaVersion).collect(Collectors.toList()));
}
}
public class AllMembersAliveMonitor extends ChangeMonitor
{
public AllMembersAliveMonitor()
{
super(60, TimeUnit.SECONDS);
}
protected IListen.Cancel startPolling(IInstance instance)
{
return instance.listen().liveMembers(this::signal);
}
protected boolean isCompleted()
{
return instances.stream().allMatch(i -> !i.config().has(Feature.GOSSIP) || i.liveMemberCount() == instances.size());
}
protected String getMonitorTimeoutMessage()
{
return "Live member count did not converge across all instances";
}
}
public void startup()
{
previousHandler = Thread.getDefaultUncaughtExceptionHandler();
Thread.setDefaultUncaughtExceptionHandler(this::uncaughtExceptions);
try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
{
monitor.startPolling();
// Start any instances with auto_bootstrap enabled first, and in series to avoid issues
// with multiple nodes bootstrapping with consistent range movement enabled,
// and then start any instances with it disabled in parallel.
List<I> startSequentially = new ArrayList<>();
List<I> startParallel = new ArrayList<>();
for (int i = 0; i < instances.size(); i++)
{
I instance = instances.get(i);
if (i == 0 || (boolean) instance.config().get("auto_bootstrap"))
startSequentially.add(instance);
else
startParallel.add(instance);
}
forEach(startSequentially, I::startup);
parallelForEach(startParallel, I::startup, 0, null);
monitor.waitForCompletion();
}
}
private void uncaughtExceptions(Thread thread, Throwable error)
{
if (!(thread.getContextClassLoader() instanceof InstanceClassLoader))
{
Thread.UncaughtExceptionHandler handler = previousHandler;
if (null != handler)
handler.uncaughtException(thread, error);
return;
}
InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader();
get(cl.getInstanceId()).uncaughtException(thread, error);
BiPredicate<Integer, Throwable> ignore = ignoreUncaughtThrowable;
I instance = get(cl.getInstanceId());
if ((ignore == null || !ignore.test(cl.getInstanceId(), error)) && instance != null && !instance.isShutdown())
uncaughtExceptions.add(error);
}
@Override
public void setUncaughtExceptionsFilter(BiPredicate<Integer, Throwable> ignoreUncaughtThrowable)
{
this.ignoreUncaughtThrowable = ignoreUncaughtThrowable;
}
@Override
public void close()
{
FBUtilities.waitOnFutures(instances.stream()
.filter(i -> !i.isShutdown())
.map(IInstance::shutdown)
.collect(Collectors.toList()),
1L, TimeUnit.MINUTES);
instances.clear();
instanceMap.clear();
// Make sure to only delete directory when threads are stopped
if (root.exists())
FileUtils.deleteRecursive(root);
Thread.setDefaultUncaughtExceptionHandler(previousHandler);
previousHandler = null;
checkAndResetUncaughtExceptions();
//withThreadLeakCheck(futures);
}
@Override
public void checkAndResetUncaughtExceptions()
{
List<Throwable> drain = new ArrayList<>(uncaughtExceptions.size());
uncaughtExceptions.removeIf(e -> {
drain.add(e);
return true;
});
if (!drain.isEmpty())
throw new ShutdownException(drain);
}
// We do not want this check to run every time until we fix problems with tread stops
private void withThreadLeakCheck(List<Future<?>> futures)
{
FBUtilities.waitOnFutures(futures);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
threadSet = Sets.difference(threadSet, Collections.singletonMap(Thread.currentThread(), null).keySet());
if (!threadSet.isEmpty())
{
for (Thread thread : threadSet)
{
System.out.println(thread);
System.out.println(Arrays.toString(thread.getStackTrace()));
}
throw new RuntimeException(String.format("Not all threads have shut down. %d threads are still running: %s", threadSet.size(), threadSet));
}
}
public List<Token> tokens()
{
return stream()
.map(i ->
{
try
{
IPartitioner partitioner = ((IPartitioner)Class.forName(i.config().getString("partitioner")).newInstance());
return partitioner.getTokenFactory().fromString(i.config().getString("initial_token"));
}
catch (Throwable t)
{
throw new RuntimeException(t);
}
})
.collect(Collectors.toList());
}
private static Set<String> findClassesMarkedForSharedClassLoader()
{
return new Reflections("org.apache.cassandra").getTypesAnnotatedWith(Shared.class).stream()
.map(Class::getName)
.collect(Collectors.toSet());
}
}