CASSANDRASC-44 Refactor health check to use vertx timer

Vertx API offers a periodic timer that integrates with it's internal thead pooling
mechanism. In this commit, we utilize vertx's periodic timer in favor of using a
`Executors.newSingleThreadScheduledExecutor()` on each delegate.

Another benefit of this approach is that if the cluster topology changes, i.e.
node replacement, cluster expansion / shrink, then the health checks will be performed
against the actual nodes in the cluster, assuming we receive an updated view of the
cluster when invoking the `Configuration#getInstancesConfig()#instances()` method.
We no longer need to worry about decommissioning the single thread executors running
on each delegate.

patch by Francisco Guerrero; reviewed by Yifan Cai, Dinesh Joshi for CASSANDRASC-44
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
index 21a9c8a..ee3b0ce 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
@@ -19,10 +19,7 @@
 package org.apache.cassandra.sidecar.common;
 
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,87 +36,67 @@
  * of the underlying Cassandra adapter.  If a server reboots, we can swap out the right Adapter when the driver
  * reconnects.
  *
- * This delegate *MUST* checkSession() before every call, because:
+ * <p>This delegate <b>MUST</b> invoke {@link #checkSession()} before every call, because:</p>
  *
- * 1. The session lazily connects
- * 2. We might need to swap out the adapter if the version has changed
- *
+ * <ol>
+ * <li>The session lazily connects</li>
+ * <li>We might need to swap out the adapter if the version has changed</li>
+ * </ol>
  */
 public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateListener
 {
     private final CQLSession cqlSession;
     private final CassandraVersionProvider versionProvider;
-    private Session session;
+    private volatile Session session;
     private SimpleCassandraVersion currentVersion;
     private ICassandraAdapter adapter;
     private volatile boolean isUp = false;
-    private final int refreshRate;
 
     private static final Logger logger = LoggerFactory.getLogger(CassandraAdapterDelegate.class);
-    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    private ScheduledFuture<?> healthCheckRoutine;
-    private boolean registered = false;
+    private final AtomicBoolean registered = new AtomicBoolean(false);
+    private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false);
 
     public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cqlSession)
     {
-        this(provider, cqlSession, 5000);
-    }
-
-    public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cqlSession, int refreshRate)
-    {
         this.cqlSession = cqlSession;
         this.versionProvider = provider;
-        this.refreshRate = refreshRate;
     }
 
-    public synchronized void start()
+    private void maybeRegisterHostListener(@NotNull Session session)
     {
-        logger.info("Starting health check");
-        // only schedule the health check once.
-        if (healthCheckRoutine == null)
-        {
-            healthCheckRoutine = executor.scheduleWithFixedDelay(this::healthCheck,
-                                                                 0,
-                                                                 refreshRate,
-                                                                 TimeUnit.MILLISECONDS);
-        }
-    }
-
-    private synchronized void maybeRegisterHostListener(@NotNull Session session)
-    {
-        if (!registered)
+        if (registered.compareAndSet(false, true))
         {
             session.getCluster().register(this);
-            registered = true;
         }
     }
 
-    private synchronized void maybeUnregisterHostListener(@NotNull Session session)
+    private void maybeUnregisterHostListener(@NotNull Session session)
     {
-        if (registered)
+        if (registered.compareAndSet(true, false))
         {
             session.getCluster().unregister(this);
-            registered = false;
         }
     }
 
-    public synchronized void stop()
-    {
-        logger.info("Stopping health check");
-        executor.shutdown();
-    }
-
     /**
      * Make an attempt to obtain the session object.
      *
-     * It needs to be called before routing the request to the adapter
-     * We might end up swapping the adapter out because of a server upgrade
+     * <p>It needs to be called before routing the request to the adapter
+     * We might end up swapping the adapter out because of a server upgrade</p>
      */
-    public synchronized void checkSession()
+    public void checkSession()
     {
-        if (session == null)
+        if (session != null)
         {
-            session = cqlSession.getLocalCql();
+            return;
+        }
+
+        synchronized (this)
+        {
+            if (session == null)
+            {
+                session = cqlSession.getLocalCql();
+            }
         }
     }
 
@@ -127,26 +104,46 @@
      * Should be called on initial connect as well as when a server comes back since it might be from an upgrade
      * synchronized so we don't flood the DB with version requests
      *
-     * If the healthcheck determines we've changed versions, it should load the proper adapter
+     * <p>If the healthcheck determines we've changed versions, it should load the proper adapter</p>
      */
-    public synchronized void healthCheck()
+    public void healthCheck()
+    {
+        if (isHealthCheckActive.compareAndSet(false, true))
+        {
+            try
+            {
+                healthCheckInternal();
+            }
+            finally
+            {
+                isHealthCheckActive.set(false);
+            }
+        }
+        else
+        {
+            logger.debug("Skipping health check because there's an active check at the moment");
+        }
+    }
+
+    private void healthCheckInternal()
     {
         checkSession();
 
-        if (session == null)
+        Session activeSession = session;
+        if (activeSession == null)
         {
             logger.info("No local CQL session is available. Cassandra is down presumably.");
             isUp = false;
             return;
         }
 
-        maybeRegisterHostListener(session);
+        maybeRegisterHostListener(activeSession);
 
         try
         {
-            String version = session.execute("select release_version from system.local")
-                    .one()
-                    .getString("release_version");
+            String version = activeSession.execute("select release_version from system.local")
+                                          .one()
+                                          .getString("release_version");
             isUp = true;
             // this might swap the adapter out
             SimpleCassandraVersion newVersion = SimpleCassandraVersion.create(version);
@@ -164,7 +161,7 @@
             // The cassandra node is down.
             // Unregister the host listener and nullify the session in order to get a new object.
             isUp = false;
-            maybeUnregisterHostListener(session);
+            maybeUnregisterHostListener(activeSession);
             session = null;
         }
     }
diff --git a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
index 3355d5d..25924fb 100644
--- a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
+++ b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
@@ -26,6 +26,7 @@
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpServer;
 import org.apache.cassandra.sidecar.utils.SslUtils;
 
@@ -37,12 +38,15 @@
 public class CassandraSidecarDaemon
 {
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
+    private final Vertx vertx;
     private final HttpServer server;
     private final Configuration config;
+    private long healthCheckTimerId;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(Vertx vertx, HttpServer server, Configuration config)
     {
+        this.vertx = vertx;
         this.server = server;
         this.config = config;
     }
@@ -53,14 +57,14 @@
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
         server.listen(config.getPort(), config.getHost());
-        this.config.getInstancesConfig().instances().forEach(instanceMetadata -> instanceMetadata.delegate().start());
+        healthCheckTimerId = vertx.setPeriodic(config.getHealthCheckFrequencyMillis(), this::healthCheck);
     }
 
     public void stop()
     {
         logger.info("Stopping Cassandra Sidecar");
         server.close();
-        this.config.getInstancesConfig().instances().forEach(instanceMetadata -> instanceMetadata.delegate().stop());
+        vertx.cancelTimer(healthCheckTimerId);
     }
 
     private void banner(PrintStream out)
@@ -97,6 +101,20 @@
 
     }
 
+    /**
+     * Checks the health of every instance configured in the {@link Configuration#getInstancesConfig()}.
+     * The health check is executed in a blocking thread to prevent the event-loop threads from blocking.
+     *
+     * @param timerId the ID of the periodic timer
+     */
+    private void healthCheck(Long timerId)
+    {
+        config.getInstancesConfig()
+              .instances()
+              .forEach(instanceMetadata ->
+                       vertx.executeBlocking(promise -> instanceMetadata.delegate().healthCheck()));
+    }
+
 
     public static void main(String[] args)
     {
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index 7e7d31b..f63d613 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -45,7 +45,7 @@
         this.dataDirs = dataDirs;
 
         this.session = new CQLSession(host, port, healthCheckFrequencyMillis);
-        this.delegate = new CassandraAdapterDelegate(versionProvider, session, healthCheckFrequencyMillis);
+        this.delegate = new CassandraAdapterDelegate(versionProvider, session);
     }
 
     public int id()
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 5006909..1a6393a 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -39,7 +39,6 @@
 import org.apache.cassandra.sidecar.common.TestValidationConfiguration;
 import org.apache.cassandra.sidecar.common.utils.ValidationConfiguration;
 
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -107,7 +106,6 @@
 
         CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class);
         when(delegate.isUp()).thenReturn(isUp);
-        doNothing().when(delegate).start();
         when(instanceMeta.delegate()).thenReturn(delegate);
         return instanceMeta;
     }