blob: 5243615bcc9d8da72cb292c4f584ad93ab527351 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.testing;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.datastax.driver.core.NettyOptions;
import com.datastax.driver.core.Session;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.SimpleCassandraVersion;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.utils.SidecarVersionProvider;
import org.apache.cassandra.testing.AbstractCassandraTestContext;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Passed to integration tests.
*/
public class CassandraSidecarTestContext implements AutoCloseable
{
public final SimpleCassandraVersion version;
private final CassandraVersionProvider versionProvider;
private final DnsResolver dnsResolver;
private final AbstractCassandraTestContext abstractCassandraTestContext;
public InstancesConfig instancesConfig;
private List<CQLSessionProvider> sessionProviders;
private List<JmxClient> jmxClients;
private static final SidecarVersionProvider svp = new SidecarVersionProvider("/sidecar.version");
private final List<InstanceConfigListener> instanceConfigListeners;
private CassandraSidecarTestContext(AbstractCassandraTestContext abstractCassandraTestContext,
SimpleCassandraVersion version,
CassandraVersionProvider versionProvider,
DnsResolver dnsResolver)
{
this.instanceConfigListeners = new ArrayList<>();
this.abstractCassandraTestContext = abstractCassandraTestContext;
this.version = version;
this.versionProvider = versionProvider;
this.dnsResolver = dnsResolver;
}
public static CassandraSidecarTestContext from(AbstractCassandraTestContext cassandraTestContext,
DnsResolver dnsResolver)
{
org.apache.cassandra.testing.SimpleCassandraVersion rootVersion = cassandraTestContext.version;
SimpleCassandraVersion versionParsed = SimpleCassandraVersion.create(rootVersion.major,
rootVersion.minor,
rootVersion.patch);
CassandraVersionProvider versionProvider = cassandraVersionProvider(dnsResolver);
return new CassandraSidecarTestContext(cassandraTestContext,
versionParsed,
versionProvider,
dnsResolver);
}
public static CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver)
{
return new CassandraVersionProvider.Builder()
.add(new CassandraFactory(dnsResolver, svp.sidecarVersion())).build();
}
public void registerInstanceConfigListener(InstanceConfigListener listener)
{
this.instanceConfigListeners.add(listener);
}
public AbstractCassandraTestContext cassandraTestContext()
{
return abstractCassandraTestContext;
}
public boolean isClusterBuilt()
{
return abstractCassandraTestContext.cluster() != null;
}
public UpgradeableCluster cluster()
{
UpgradeableCluster cluster = abstractCassandraTestContext.cluster();
if (cluster == null)
{
throw new RuntimeException("The cluster must be built before it can be used");
}
return cluster;
}
public InstancesConfig instancesConfig()
{
if (instancesConfig == null
|| instancesConfig.instances().size() != cluster().size()) // rebuild instances config if cluster changed
{
// clean-up any open sessions or client resources
close();
setInstancesConfig();
}
return this.instancesConfig;
}
public Session session()
{
return session(0);
}
public Session session(int instance)
{
if (sessionProviders == null)
{
setInstancesConfig();
}
return this.sessionProviders.get(instance).localCql();
}
@Override
public String toString()
{
return "CassandraTestContext{" +
", version=" + version +
", cluster=" + abstractCassandraTestContext.cluster() +
'}';
}
@Override
public void close()
{
if (sessionProviders != null)
{
sessionProviders.forEach(CQLSessionProvider::close);
}
if (instancesConfig != null)
{
instancesConfig.instances().forEach(instance -> instance.delegate().close());
}
}
public JmxClient jmxClient()
{
return jmxClient(0);
}
private JmxClient jmxClient(int instance)
{
if (jmxClients == null)
{
setInstancesConfig();
}
return jmxClients.get(instance);
}
/**
* A listener for {@link InstancesConfig} state changes
*/
public interface InstanceConfigListener
{
void onInstancesConfigChange(InstancesConfig instancesConfig);
}
private void setInstancesConfig()
{
this.instancesConfig = buildInstancesConfig(versionProvider, dnsResolver);
for (InstanceConfigListener listener : instanceConfigListeners)
{
listener.onInstancesConfigChange(this.instancesConfig);
}
}
private InstancesConfig buildInstancesConfig(CassandraVersionProvider versionProvider,
DnsResolver dnsResolver)
{
UpgradeableCluster cluster = cluster();
List<InstanceMetadata> metadata = new ArrayList<>();
sessionProviders = new ArrayList<>();
jmxClients = new ArrayList<>();
for (int i = 0; i < cluster.size(); i++)
{
IUpgradeableInstance instance = cluster.get(i + 1); // 1-based indexing to match node names;
IInstanceConfig config = instance.config();
String hostName = JMXUtil.getJmxHost(config);
int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042);
InetSocketAddress address = InetSocketAddress.createUnresolved(hostName,
nativeTransportPort);
CQLSessionProvider sessionProvider = new CQLSessionProvider(address, new NettyOptions());
this.sessionProviders.add(sessionProvider);
// The in-jvm dtest framework sometimes returns a cluster before all the jmx infrastructure is initialized.
// In these cases, we want to wait longer than the default retry/delay settings to connect.
JmxClient jmxClient = new JmxClient(hostName, config.jmxPort(), null, null, false, 20, 1000L);
this.jmxClients.add(jmxClient);
String[] dataDirectories = (String[]) config.get("data_file_directories");
// Use the parent of the first data directory as the staging directory
Path dataDirParentPath = Paths.get(dataDirectories[0]).getParent();
// If the cluster has not started yet, the node's root directory doesn't exist yet
assertThat(dataDirParentPath).isNotNull();
Path stagingPath = dataDirParentPath.resolve("staging");
String stagingDir = stagingPath.toFile().getAbsolutePath();
CassandraAdapterDelegate delegate = new CassandraAdapterDelegate(versionProvider,
sessionProvider,
jmxClient,
"1.0-TEST");
metadata.add(InstanceMetadataImpl.builder()
.id(i + 1)
.host(config.broadcastAddress().getAddress().getHostAddress())
.port(nativeTransportPort)
.dataDirs(Arrays.asList(dataDirectories))
.stagingDir(stagingDir)
.delegate(delegate)
.build());
}
return new InstancesConfigImpl(metadata, dnsResolver);
}
private static int tryGetIntConfig(IInstanceConfig config, String configName, int defaultValue)
{
try
{
return config.getInt(configName);
}
catch (NullPointerException npe)
{
return defaultValue;
}
}
}