CASSANDRASC-88: Allow DriverUtils to be pluggable
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-88
diff --git a/CHANGES.txt b/CHANGES.txt
index 25e76dc..f8a4f87 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Allow DriverUtils to be pluggable (CASSANDRASC-88)
* Add JMX health checks during the periodic health checks (CASSANDRASC-87)
* Sidecar should be able to load metadata even if the local instance is unavailable (CASSANDRASC-79)
* Expose additional SSL configuration options for the Sidecar Service (CASSANDRASC-82)
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index f02ea2f..5caab2e 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -24,7 +24,6 @@
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.DriverUtils;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
@@ -38,6 +37,7 @@
import org.apache.cassandra.sidecar.common.StorageOperations;
import org.apache.cassandra.sidecar.common.TableOperations;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -49,18 +49,21 @@
protected final DnsResolver dnsResolver;
protected final JmxClient jmxClient;
private final CQLSessionProvider cqlSessionProvider;
- private final String sidecarVersion;
private final InetSocketAddress localNativeTransportAddress;
+ private final DriverUtils driverUtils;
private volatile Host host;
- public CassandraAdapter(DnsResolver dnsResolver, JmxClient jmxClient, CQLSessionProvider cqlSessionProvider,
- String sidecarVersion, InetSocketAddress localNativeTransportAddress)
+ public CassandraAdapter(DnsResolver dnsResolver,
+ JmxClient jmxClient,
+ CQLSessionProvider cqlSessionProvider,
+ InetSocketAddress localNativeTransportAddress,
+ DriverUtils driverUtils)
{
this.dnsResolver = dnsResolver;
this.jmxClient = jmxClient;
this.cqlSessionProvider = cqlSessionProvider;
- this.sidecarVersion = sidecarVersion;
this.localNativeTransportAddress = localNativeTransportAddress;
+ this.driverUtils = driverUtils;
}
/**
@@ -132,7 +135,7 @@
{
if (host == null)
{
- host = DriverUtils.getHost(metadata, localNativeTransportAddress);
+ host = driverUtils.getHost(metadata, localNativeTransportAddress);
}
}
}
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java
index 5e505d7..bfb5eb0 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java
@@ -26,6 +26,7 @@
import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.MinimumVersion;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
/**
* Factory to produce the 4.0 adapter
@@ -34,12 +35,12 @@
public class CassandraFactory implements ICassandraFactory
{
protected final DnsResolver dnsResolver;
- private final String sidecarVersion;
+ protected final DriverUtils driverUtils;
- public CassandraFactory(DnsResolver dnsResolver, String sidecarVersion)
+ public CassandraFactory(DnsResolver dnsResolver, DriverUtils driverUtils)
{
this.dnsResolver = dnsResolver;
- this.sidecarVersion = sidecarVersion;
+ this.driverUtils = driverUtils;
}
/**
@@ -55,6 +56,6 @@
JmxClient jmxClient,
InetSocketAddress localNativeTransportAddress)
{
- return new CassandraAdapter(dnsResolver, jmxClient, session, sidecarVersion, localNativeTransportAddress);
+ return new CassandraAdapter(dnsResolver, jmxClient, session, localNativeTransportAddress, driverUtils);
}
}
diff --git a/common/src/main/java/com/datastax/driver/core/DriverUtils.java b/common/src/main/java/com/datastax/driver/core/DriverUtils.java
index c28da45..4f65757 100644
--- a/common/src/main/java/com/datastax/driver/core/DriverUtils.java
+++ b/common/src/main/java/com/datastax/driver/core/DriverUtils.java
@@ -28,7 +28,10 @@
public class DriverUtils
{
/**
- * Check if a host has active connections
+ * Check if a host has active connections.
+ *
+ * <p><b>Note:</b> This method should not be used directly, but should be proxied by
+ * an implementation of {@link org.apache.cassandra.sidecar.common.utils.DriverUtils}.
*
* @param host the host to check
* @return true if the host has active connections, false otherwise
@@ -43,6 +46,9 @@
* Start attempting to reconnect to the given host, as hosts with `IGNORED` distance aren't attempted
* and the SidecarLoadBalancingPolicy marks non-selected nodes as IGNORED until they need to rotate in.
*
+ * <p><b>Note:</b> This method should not be used directly, but should be proxied by
+ * an implementation of {@link org.apache.cassandra.sidecar.common.utils.DriverUtils}.
+ *
* @param cluster The cluster object
* @param host the host to which reconnect attempts will be made
*/
@@ -54,6 +60,9 @@
/**
* Gets a Host instance from metadata based on the native transport address
*
+ * <p><b>Note:</b> This method should not be used directly, but should be proxied by
+ * an implementation of {@link org.apache.cassandra.sidecar.common.utils.DriverUtils}.
+ *
* @param metadata the {@link Metadata} instance to search for the host
* @param localNativeTransportAddress the native transport ip address and port for the host to find
* @return the {@link Host} instance if found, else null
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/utils/DriverUtils.java b/common/src/main/java/org/apache/cassandra/sidecar/common/utils/DriverUtils.java
new file mode 100644
index 0000000..b070637
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/utils/DriverUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.utils;
+
+import java.net.InetSocketAddress;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+
+/**
+ * A shim layer that provides information from the Cassandra driver. Instead of accessing the
+ * {@link com.datastax.driver.core.DriverUtils} directly, this acts as a proxy that can be swapped out
+ * based on the specific Sidecar implementation. This can be useful if a different driver version is used
+ */
+public class DriverUtils
+{
+ /**
+ * Start attempting to reconnect to the given host, as hosts with `IGNORED` distance aren't attempted
+ * and the SidecarLoadBalancingPolicy marks non-selected nodes as IGNORED until they need to rotate in.
+ *
+ * @param cluster The cluster object
+ * @param host the host to which reconnect attempts will be made
+ */
+ public void startPeriodicReconnectionAttempt(Cluster cluster, Host host)
+ {
+ com.datastax.driver.core.DriverUtils.startPeriodicReconnectionAttempt(cluster, host);
+ }
+
+ /**
+ * Gets a Host instance from metadata based on the native transport address
+ *
+ * @param metadata the {@link Metadata} instance to search for the host
+ * @param localNativeTransportAddress the native transport ip address and port for the host to find
+ * @return the {@link Host} instance if found, else null
+ */
+ public Host getHost(Metadata metadata, InetSocketAddress localNativeTransportAddress)
+ {
+ return com.datastax.driver.core.DriverUtils.getHost(metadata, localNativeTransportAddress);
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
index 05a0452..f394ca2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
@@ -38,6 +38,7 @@
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
import org.apache.cassandra.sidecar.config.DriverConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.jetbrains.annotations.Nullable;
@@ -56,6 +57,7 @@
private final NettyOptions nettyOptions;
private final ReconnectionPolicy reconnectionPolicy;
private final List<InetSocketAddress> localInstances;
+ private final DriverUtils driverUtils;
@Nullable
private volatile Session session;
@@ -73,11 +75,14 @@
this.numConnections = numConnections;
this.nettyOptions = options;
this.reconnectionPolicy = new ExponentialReconnectionPolicy(500, healthCheckFrequencyMillis);
+ this.driverUtils = new DriverUtils();
}
public CQLSessionProviderImpl(SidecarConfiguration configuration,
- NettyOptions options)
+ NettyOptions options,
+ DriverUtils driverUtils)
{
+ this.driverUtils = driverUtils;
DriverConfiguration driverConfiguration = configuration.driverConfiguration();
this.contactPoints = driverConfiguration.contactPoints();
this.localInstances = configuration.cassandraInstances()
@@ -126,7 +131,8 @@
{
logger.info("Connecting to cluster using contact points {}", contactPoints);
- LoadBalancingPolicy lbp = new SidecarLoadBalancingPolicy(localInstances, localDc, numConnections);
+ LoadBalancingPolicy lbp = new SidecarLoadBalancingPolicy(localInstances, localDc, numConnections,
+ driverUtils);
// Prevent spurious reconnects of ignored down nodes on `onUp` events
QueryOptions queryOptions = new QueryOptions().setReprepareOnUp(false);
cluster = Cluster.builder()
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index 15cea06..3e904fc 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -35,7 +35,6 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.DriverUtils;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
@@ -53,6 +52,7 @@
import org.apache.cassandra.sidecar.common.NodeSettings;
import org.apache.cassandra.sidecar.common.StorageOperations;
import org.apache.cassandra.sidecar.common.TableOperations;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
import org.jetbrains.annotations.NotNull;
@@ -82,6 +82,7 @@
private final Vertx vertx;
private final int cassandraInstanceId;
+ private final DriverUtils driverUtils;
private final String sidecarVersion;
private final CassandraVersionProvider versionProvider;
private final CQLSessionProvider cqlSessionProvider;
@@ -105,6 +106,7 @@
* @param versionProvider a Cassandra version provider
* @param session the session to the Cassandra database
* @param jmxClient the JMX client used to communicate with the Cassandra instance
+ * @param driverUtils a wrapper that exposes Cassandra driver utilities
* @param sidecarVersion the version of the Sidecar from the current binary
* @param host the Cassandra instance's hostname or ip address as a string
* @param port the Cassandra instance's port number
@@ -114,12 +116,14 @@
CassandraVersionProvider versionProvider,
CQLSessionProvider session,
JmxClient jmxClient,
+ DriverUtils driverUtils,
String sidecarVersion,
String host,
int port)
{
this.vertx = Objects.requireNonNull(vertx);
this.cassandraInstanceId = cassandraInstanceId;
+ this.driverUtils = driverUtils;
this.localNativeTransportAddress = new InetSocketAddress(host, port);
this.sidecarVersion = sidecarVersion;
this.versionProvider = versionProvider;
@@ -331,7 +335,7 @@
{
if (host == null)
{
- host = DriverUtils.getHost(metadata, localNativeTransportAddress);
+ host = driverUtils.getHost(metadata, localNativeTransportAddress);
}
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
index f149f15..bd6ae95 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
@@ -34,13 +34,13 @@
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.DriverUtils;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
/**
* The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra Metadata objects associated with the
@@ -54,6 +54,7 @@
private static final Logger LOGGER = LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
private final Set<Host> selectedHosts = new HashSet<>();
private final Set<InetSocketAddress> localHostAddresses;
+ private final DriverUtils driverUtils;
private final LoadBalancingPolicy childPolicy;
private final int totalRequestedConnections;
private final Random random = new Random();
@@ -62,10 +63,12 @@
public SidecarLoadBalancingPolicy(List<InetSocketAddress> localHostAddresses,
String localDc,
- int numAdditionalConnections)
+ int numAdditionalConnections,
+ DriverUtils driverUtils)
{
this.childPolicy = createChildPolicy(localDc);
this.localHostAddresses = new HashSet<>(localHostAddresses);
+ this.driverUtils = driverUtils;
if (numAdditionalConnections < MIN_NON_LOCAL_CONNECTIONS)
{
LOGGER.warn("Additional instances requested was {}, which is less than the minimum of {}. Using {}.",
@@ -138,7 +141,7 @@
// of preventing the driver from trying to reconnect to them
// if we miss the `onUp` event, so we need to schedule reconnects
// for these hosts explicitly unless we have active connections.
- DriverUtils.startPeriodicReconnectionAttempt(cluster, host);
+ driverUtils.startPeriodicReconnectionAttempt(cluster, host);
}
recalculateSelectedHosts();
childPolicy.onDown(host);
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index 9e11cb8..7a72b22 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -53,6 +53,7 @@
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
import org.apache.cassandra.sidecar.common.utils.SidecarVersionProvider;
import org.apache.cassandra.sidecar.config.CassandraInputValidationConfiguration;
import org.apache.cassandra.sidecar.config.InstanceConfiguration;
@@ -269,22 +270,32 @@
@Provides
@Singleton
- public CQLSessionProvider cqlSessionProvider(Vertx vertx, SidecarConfiguration sidecarConfiguration)
+ public CQLSessionProvider cqlSessionProvider(Vertx vertx, SidecarConfiguration sidecarConfiguration,
+ DriverUtils driverUtils)
{
CQLSessionProviderImpl cqlSessionProvider = new CQLSessionProviderImpl(sidecarConfiguration,
- new NettyOptions());
+ new NettyOptions(),
+ driverUtils);
vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), message -> cqlSessionProvider.close());
return cqlSessionProvider;
}
@Provides
@Singleton
+ public DriverUtils driverUtils()
+ {
+ return new DriverUtils();
+ }
+
+ @Provides
+ @Singleton
public InstancesConfig instancesConfig(Vertx vertx,
SidecarConfiguration configuration,
CassandraVersionProvider cassandraVersionProvider,
SidecarVersionProvider sidecarVersionProvider,
DnsResolver dnsResolver,
- CQLSessionProvider cqlSessionProvider)
+ CQLSessionProvider cqlSessionProvider,
+ DriverUtils driverUtils)
{
List<InstanceMetadata> instanceMetadataList =
configuration.cassandraInstances()
@@ -296,7 +307,8 @@
cassandraVersionProvider,
sidecarVersionProvider.sidecarVersion(),
jmxConfiguration,
- cqlSessionProvider);
+ cqlSessionProvider,
+ driverUtils);
})
.collect(Collectors.toList());
@@ -305,11 +317,10 @@
@Provides
@Singleton
- public CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver,
- SidecarVersionProvider sidecarVersionProvider)
+ public CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver, DriverUtils driverUtils)
{
CassandraVersionProvider.Builder builder = new CassandraVersionProvider.Builder();
- builder.add(new CassandraFactory(dnsResolver, sidecarVersionProvider.sidecarVersion()));
+ builder.add(new CassandraFactory(dnsResolver, driverUtils));
return builder.build();
}
@@ -396,7 +407,8 @@
CassandraVersionProvider versionProvider,
String sidecarVersion,
JmxConfiguration jmxConfiguration,
- CQLSessionProvider session)
+ CQLSessionProvider session,
+ DriverUtils driverUtils)
{
String host = cassandraInstance.host();
int port = cassandraInstance.port();
@@ -415,6 +427,7 @@
versionProvider,
session,
jmxClient,
+ driverUtils,
sidecarVersion,
host,
port);
diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index 35a64bb..8729330 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -44,7 +44,7 @@
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
-import org.apache.cassandra.sidecar.common.utils.SidecarVersionProvider;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
import org.apache.cassandra.testing.AbstractCassandraTestContext;
@@ -57,7 +57,6 @@
*/
public class CassandraSidecarTestContext implements AutoCloseable
{
- private static final SidecarVersionProvider svp = new SidecarVersionProvider("/sidecar.version");
public final SimpleCassandraVersion version;
private final CassandraVersionProvider versionProvider;
private final DnsResolver dnsResolver;
@@ -106,7 +105,7 @@
public static CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver)
{
return new CassandraVersionProvider.Builder()
- .add(new CassandraFactory(dnsResolver, svp.sidecarVersion())).build();
+ .add(new CassandraFactory(dnsResolver, new DriverUtils())).build();
}
private static int tryGetIntConfig(IInstanceConfig config, String configName, int defaultValue)
@@ -243,6 +242,7 @@
versionProvider,
sessionProvider,
jmxClient,
+ new DriverUtils(),
"1.0-TEST",
hostName,
nativeTransportPort);
diff --git a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
index 28a44b0..994ec01 100644
--- a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
+++ b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
@@ -39,6 +39,7 @@
import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.MockCassandraFactory;
import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
import static org.assertj.core.api.Assertions.assertThat;
@@ -113,7 +114,7 @@
{
JmxClient mockJmxClient = mock(JmxClient.class);
delegate = new CassandraAdapterDelegate(vertx, 1, versionProvider, cqlSessionProvider1, mockJmxClient,
- null, "localhost1", 9042);
+ new DriverUtils(), null, "localhost1", 9042);
}
InstanceMetadataImpl localhost = InstanceMetadataImpl.builder()