blob: a339be265d83ae2f041b7ff90de6c86a2d491bd8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.distributed.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import com.google.common.base.Preconditions;
import com.vdurmont.semver4j.Semver;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.api.IMessageSink;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.shared.Versions;
import org.apache.cassandra.testing.ClusterBuilderConfiguration;
import org.apache.cassandra.testing.IClusterExtension;
import org.apache.cassandra.testing.TestTokenSupplier;
import org.jetbrains.annotations.NotNull;
import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
/**
* Implements the {@link IClusterExtension} interface, delegating the functionality to the
* {@link AbstractCluster delegate}. This class is meant to be loaded in a different classloader.
*
* @param <I> the type of the cluster instance
*/
public class CassandraCluster<I extends IInstance> implements IClusterExtension<I>
{
private final AbstractCluster<I> delegate;
private static final Predicate<String> EXTRA = className -> {
// Those classes can be reached by Spark SizeEstimator, when it estimates the broadcast variable.
// In the test scenario containing cassandra instance shutdown, there is a chance that it pick the class
// that is loaded by the closed instance classloader, causing the following exception.
// java.lang.IllegalStateException: Can't load <CLASS>. Instance class loader is already closed.
return className.equals("org.apache.cassandra.utils.concurrent.Ref$OnLeak")
|| className.startsWith("org.apache.cassandra.metrics.RestorableMeter")
|| className.equals("org.apache.logging.slf4j.EventDataConverter")
|| (className.startsWith("org.apache.cassandra.analytics.") && className.contains("BBHelper"));
};
public CassandraCluster(String versionString, ClusterBuilderConfiguration configuration) throws IOException
{
delegate = initializeCluster(versionString, configuration);
}
@SuppressWarnings("unchecked")
public AbstractCluster<I> initializeCluster(String versionString,
ClusterBuilderConfiguration configuration) throws IOException
{
// spin up a C* cluster using the in-jvm dtest
Versions versions = Versions.find();
Versions.Version requestedVersion = versions.getLatest(new Semver(versionString, Semver.SemverType.LOOSE));
int nodesPerDc = configuration.nodesPerDc;
int dcCount = configuration.dcCount;
int newNodesPerDc = configuration.newNodesPerDc;
Preconditions.checkArgument(newNodesPerDc >= 0, "newNodesPerDc cannot be a negative number");
int originalNodeCount = nodesPerDc * dcCount;
int finalNodeCount = dcCount * (nodesPerDc + newNodesPerDc);
TokenSupplier tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(nodesPerDc, newNodesPerDc, dcCount, 1);
UpgradeableCluster.Builder clusterBuilder = UpgradeableCluster.build(originalNodeCount);
clusterBuilder.withVersion(requestedVersion)
.withDynamicPortAllocation(configuration.dynamicPortAllocation) // to allow parallel test runs
.withSharedClasses(EXTRA.or(clusterBuilder.getSharedClasses()))
.withDCs(dcCount)
.withDataDirCount(configuration.numDataDirsPerInstance)
.withConfig(config -> configuration.features.forEach(config::with))
.withTokenSupplier(tokenSupplier);
if (dcCount > 1)
{
clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
(nodeId) -> nodeId % 2 != 0 ?
dcAndRack("datacenter1", "rack1") :
dcAndRack("datacenter2", "rack2")));
}
if (configuration.instanceInitializer != null)
{
clusterBuilder.withInstanceInitializer(configuration.instanceInitializer);
}
UpgradeableCluster cluster = clusterBuilder.start();
if (cluster.size() > 1)
{
waitForHealthyRing(cluster);
fixDistributedSchemas((AbstractCluster<I>) cluster);
}
return (AbstractCluster<I>) cluster;
}
// IClusterExtension methods
@Override
public void schemaChangeIgnoringStoppedInstances(String query)
{
delegate.schemaChangeIgnoringStoppedInstances(query);
}
@Override
public I addInstance(String dc, String rack, Consumer<IInstanceConfig> fn)
{
return ClusterUtils.addInstance(delegate, dc, rack, fn);
}
@Override
public I getFirstRunningInstance()
{
return delegate.getFirstRunningInstance();
}
@Override
public IInstanceConfig newInstanceConfig()
{
return delegate.newInstanceConfig();
}
@Override
public ICluster<I> delegate()
{
return delegate;
}
@Override
public void awaitRingState(IInstance instance, IInstance expectedInRing, String state)
{
ClusterUtils.awaitRingState(instance, expectedInRing, state);
}
@Override
public void awaitRingStatus(IInstance instance, IInstance expectedInRing, String status)
{
ClusterUtils.awaitRingStatus(instance, expectedInRing, status);
}
@Override
public void awaitGossipStatus(IInstance instance, IInstance expectedInGossip, String targetStatus)
{
ClusterUtils.awaitGossipStatus(instance, expectedInGossip, targetStatus);
}
@Override
public void stopUnchecked(IInstance instance)
{
ClusterUtils.stopUnchecked(instance);
}
// ICluster methods
@Override
public void startup()
{
delegate.startup();
}
@Override
public I bootstrap(IInstanceConfig iInstanceConfig)
{
return delegate.bootstrap(iInstanceConfig);
}
@Override
public I get(int i)
{
return delegate.get(i);
}
@Override
public I get(InetSocketAddress inetSocketAddress)
{
return delegate.get(inetSocketAddress);
}
@Override
public ICoordinator coordinator(int i)
{
return delegate.coordinator(i);
}
@Override
public void schemaChange(String s)
{
delegate.schemaChange(s);
}
@Override
public void schemaChange(String s, int i)
{
delegate.schemaChange(s, i);
}
@Override
public int size()
{
return delegate.size();
}
@Override
public Stream<I> stream()
{
return delegate.stream();
}
@Override
public Stream<I> stream(String s)
{
return delegate.stream(s);
}
@Override
public Stream<I> stream(String s, String s1)
{
return delegate.stream(s, s1);
}
@NotNull
@Override
public Iterator<I> iterator()
{
return delegate.iterator();
}
@Override
public IMessageFilters filters()
{
return delegate.filters();
}
@Override
public void setMessageSink(IMessageSink messageSink)
{
delegate.setMessageSink(messageSink);
}
@Override
public void deliverMessage(InetSocketAddress to, IMessage msg)
{
delegate.deliverMessage(to, msg);
}
@Override
public void setUncaughtExceptionsFilter(BiPredicate<Integer, Throwable> ignoreThrowable)
{
delegate.setUncaughtExceptionsFilter(ignoreThrowable);
}
@Override
public void setUncaughtExceptionsFilter(Predicate<Throwable> ignoreThrowable)
{
delegate.setUncaughtExceptionsFilter(ignoreThrowable);
}
@Override
public void checkAndResetUncaughtExceptions()
{
delegate.checkAndResetUncaughtExceptions();
}
@Override
public void close() throws Exception
{
delegate.close();
}
@Override
public void forEach(Consumer<? super I> action)
{
delegate.forEach(action);
}
@Override
public Spliterator<I> spliterator()
{
return delegate.spliterator();
}
// Utility methods
void fixDistributedSchemas(AbstractCluster<I> cluster)
{
// These keyspaces are under replicated by default, so must be updated when doing a multi-node cluster;
// else bootstrap will fail with 'Unable to find sufficient sources for streaming range <range> in keyspace <name>'
for (String ks : Arrays.asList("system_auth", "system_traces"))
{
cluster.schemaChange("ALTER KEYSPACE " + ks +
" WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': "
+ Math.min(cluster.size(), 3) + "}",
true,
cluster.getFirstRunningInstance());
}
// in real live repair is needed in this case, but in the test case it doesn't matter if the tables loose
// anything, so ignoring repair to speed up the tests.
}
void waitForHealthyRing(ICluster<?> cluster)
{
for (IInstance inst : cluster)
{
ClusterUtils.awaitRingHealthy(inst);
}
}
}