CASSANDRASC-44 Refactor health check to use vertx timer
Vertx API offers a periodic timer that integrates with it's internal thead pooling
mechanism. In this commit, we utilize vertx's periodic timer in favor of using a
`Executors.newSingleThreadScheduledExecutor()` on each delegate.
Another benefit of this approach is that if the cluster topology changes, i.e.
node replacement, cluster expansion / shrink, then the health checks will be performed
against the actual nodes in the cluster, assuming we receive an updated view of the
cluster when invoking the `Configuration#getInstancesConfig()#instances()` method.
We no longer need to worry about decommissioning the single thread executors running
on each delegate.
patch by Francisco Guerrero; reviewed by Yifan Cai, Dinesh Joshi for CASSANDRASC-44
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
index 21a9c8a..ee3b0ce 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
@@ -19,10 +19,7 @@
package org.apache.cassandra.sidecar.common;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,87 +36,67 @@
* of the underlying Cassandra adapter. If a server reboots, we can swap out the right Adapter when the driver
* reconnects.
*
- * This delegate *MUST* checkSession() before every call, because:
+ * <p>This delegate <b>MUST</b> invoke {@link #checkSession()} before every call, because:</p>
*
- * 1. The session lazily connects
- * 2. We might need to swap out the adapter if the version has changed
- *
+ * <ol>
+ * <li>The session lazily connects</li>
+ * <li>We might need to swap out the adapter if the version has changed</li>
+ * </ol>
*/
public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateListener
{
private final CQLSession cqlSession;
private final CassandraVersionProvider versionProvider;
- private Session session;
+ private volatile Session session;
private SimpleCassandraVersion currentVersion;
private ICassandraAdapter adapter;
private volatile boolean isUp = false;
- private final int refreshRate;
private static final Logger logger = LoggerFactory.getLogger(CassandraAdapterDelegate.class);
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- private ScheduledFuture<?> healthCheckRoutine;
- private boolean registered = false;
+ private final AtomicBoolean registered = new AtomicBoolean(false);
+ private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false);
public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cqlSession)
{
- this(provider, cqlSession, 5000);
- }
-
- public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cqlSession, int refreshRate)
- {
this.cqlSession = cqlSession;
this.versionProvider = provider;
- this.refreshRate = refreshRate;
}
- public synchronized void start()
+ private void maybeRegisterHostListener(@NotNull Session session)
{
- logger.info("Starting health check");
- // only schedule the health check once.
- if (healthCheckRoutine == null)
- {
- healthCheckRoutine = executor.scheduleWithFixedDelay(this::healthCheck,
- 0,
- refreshRate,
- TimeUnit.MILLISECONDS);
- }
- }
-
- private synchronized void maybeRegisterHostListener(@NotNull Session session)
- {
- if (!registered)
+ if (registered.compareAndSet(false, true))
{
session.getCluster().register(this);
- registered = true;
}
}
- private synchronized void maybeUnregisterHostListener(@NotNull Session session)
+ private void maybeUnregisterHostListener(@NotNull Session session)
{
- if (registered)
+ if (registered.compareAndSet(true, false))
{
session.getCluster().unregister(this);
- registered = false;
}
}
- public synchronized void stop()
- {
- logger.info("Stopping health check");
- executor.shutdown();
- }
-
/**
* Make an attempt to obtain the session object.
*
- * It needs to be called before routing the request to the adapter
- * We might end up swapping the adapter out because of a server upgrade
+ * <p>It needs to be called before routing the request to the adapter
+ * We might end up swapping the adapter out because of a server upgrade</p>
*/
- public synchronized void checkSession()
+ public void checkSession()
{
- if (session == null)
+ if (session != null)
{
- session = cqlSession.getLocalCql();
+ return;
+ }
+
+ synchronized (this)
+ {
+ if (session == null)
+ {
+ session = cqlSession.getLocalCql();
+ }
}
}
@@ -127,26 +104,46 @@
* Should be called on initial connect as well as when a server comes back since it might be from an upgrade
* synchronized so we don't flood the DB with version requests
*
- * If the healthcheck determines we've changed versions, it should load the proper adapter
+ * <p>If the healthcheck determines we've changed versions, it should load the proper adapter</p>
*/
- public synchronized void healthCheck()
+ public void healthCheck()
+ {
+ if (isHealthCheckActive.compareAndSet(false, true))
+ {
+ try
+ {
+ healthCheckInternal();
+ }
+ finally
+ {
+ isHealthCheckActive.set(false);
+ }
+ }
+ else
+ {
+ logger.debug("Skipping health check because there's an active check at the moment");
+ }
+ }
+
+ private void healthCheckInternal()
{
checkSession();
- if (session == null)
+ Session activeSession = session;
+ if (activeSession == null)
{
logger.info("No local CQL session is available. Cassandra is down presumably.");
isUp = false;
return;
}
- maybeRegisterHostListener(session);
+ maybeRegisterHostListener(activeSession);
try
{
- String version = session.execute("select release_version from system.local")
- .one()
- .getString("release_version");
+ String version = activeSession.execute("select release_version from system.local")
+ .one()
+ .getString("release_version");
isUp = true;
// this might swap the adapter out
SimpleCassandraVersion newVersion = SimpleCassandraVersion.create(version);
@@ -164,7 +161,7 @@
// The cassandra node is down.
// Unregister the host listener and nullify the session in order to get a new object.
isUp = false;
- maybeUnregisterHostListener(session);
+ maybeUnregisterHostListener(activeSession);
session = null;
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
index 3355d5d..25924fb 100644
--- a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
+++ b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
@@ -26,6 +26,7 @@
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import org.apache.cassandra.sidecar.utils.SslUtils;
@@ -37,12 +38,15 @@
public class CassandraSidecarDaemon
{
private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
+ private final Vertx vertx;
private final HttpServer server;
private final Configuration config;
+ private long healthCheckTimerId;
@Inject
- public CassandraSidecarDaemon(HttpServer server, Configuration config)
+ public CassandraSidecarDaemon(Vertx vertx, HttpServer server, Configuration config)
{
+ this.vertx = vertx;
this.server = server;
this.config = config;
}
@@ -53,14 +57,14 @@
validate();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
server.listen(config.getPort(), config.getHost());
- this.config.getInstancesConfig().instances().forEach(instanceMetadata -> instanceMetadata.delegate().start());
+ healthCheckTimerId = vertx.setPeriodic(config.getHealthCheckFrequencyMillis(), this::healthCheck);
}
public void stop()
{
logger.info("Stopping Cassandra Sidecar");
server.close();
- this.config.getInstancesConfig().instances().forEach(instanceMetadata -> instanceMetadata.delegate().stop());
+ vertx.cancelTimer(healthCheckTimerId);
}
private void banner(PrintStream out)
@@ -97,6 +101,20 @@
}
+ /**
+ * Checks the health of every instance configured in the {@link Configuration#getInstancesConfig()}.
+ * The health check is executed in a blocking thread to prevent the event-loop threads from blocking.
+ *
+ * @param timerId the ID of the periodic timer
+ */
+ private void healthCheck(Long timerId)
+ {
+ config.getInstancesConfig()
+ .instances()
+ .forEach(instanceMetadata ->
+ vertx.executeBlocking(promise -> instanceMetadata.delegate().healthCheck()));
+ }
+
public static void main(String[] args)
{
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index 7e7d31b..f63d613 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -45,7 +45,7 @@
this.dataDirs = dataDirs;
this.session = new CQLSession(host, port, healthCheckFrequencyMillis);
- this.delegate = new CassandraAdapterDelegate(versionProvider, session, healthCheckFrequencyMillis);
+ this.delegate = new CassandraAdapterDelegate(versionProvider, session);
}
public int id()
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 5006909..1a6393a 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -39,7 +39,6 @@
import org.apache.cassandra.sidecar.common.TestValidationConfiguration;
import org.apache.cassandra.sidecar.common.utils.ValidationConfiguration;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -107,7 +106,6 @@
CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class);
when(delegate.isUp()).thenReturn(isUp);
- doNothing().when(delegate).start();
when(instanceMeta.delegate()).thenReturn(delegate);
return instanceMeta;
}