Call the start method of CassandraAdaptorDelegate to start periodic health checl
patch by Saranya Krishnakumar; reviewed by Yifan Cai, Dinesh Joshi for CASSANDRASC-32
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSession.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSession.java
index 5ec3147..b3e82b5 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSession.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSession.java
@@ -101,11 +101,12 @@
.withNettyOptions(nettyOptions)
.build();
localSession = cluster.connect();
+ logger.info("Successfully connected to Casssandra instance!");
}
}
catch (Exception e)
{
- logger.debug("Failed to reach Cassandra", e);
+ logger.error("Failed to reach Cassandra", e);
if (cluster != null)
{
try
@@ -114,7 +115,7 @@
}
catch (Exception ex)
{
- logger.debug("Failed to close cluster in cleanup", ex);
+ logger.error("Failed to close cluster in cleanup", ex);
}
}
}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
index b00faf8..3ed9c04 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
@@ -32,6 +33,7 @@
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.sun.istack.internal.NotNull;
/**
@@ -57,6 +59,7 @@
private static final Logger logger = LoggerFactory.getLogger(CassandraAdapterDelegate.class);
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ private ScheduledFuture<?> healthCheckRoutine;
private boolean registered = false;
public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cqlSession)
@@ -74,19 +77,31 @@
public synchronized void start()
{
logger.info("Starting health check");
- executor.scheduleWithFixedDelay(this::healthCheck, 0, refreshRate, TimeUnit.MILLISECONDS);
- maybeRegisterHostListener();
+ // only schedule the health check once.
+ if (healthCheckRoutine == null)
+ {
+ healthCheckRoutine = executor.scheduleWithFixedDelay(this::healthCheck,
+ 0,
+ refreshRate,
+ TimeUnit.MILLISECONDS);
+ }
}
- private synchronized void maybeRegisterHostListener()
+ private synchronized void maybeRegisterHostListener(@NotNull Session session)
{
if (!registered)
{
- checkSession();
- if (session != null)
- {
- session.getCluster().register(this);
- }
+ session.getCluster().register(this);
+ registered = true;
+ }
+ }
+
+ private synchronized void maybeUnregisterHostListener(@NotNull Session session)
+ {
+ if (registered)
+ {
+ session.getCluster().unregister(this);
+ registered = false;
}
}
@@ -97,7 +112,9 @@
}
/**
- * Need to be called before routing the request to the adapter
+ * Make an attempt to obtain the session object.
+ *
+ * It needs to be called before routing the request to the adapter
* We might end up swapping the adapter out because of a server upgrade
*/
public synchronized void checkSession()
@@ -105,7 +122,6 @@
if (session == null)
{
session = cqlSession.getLocalCql();
- start();
}
}
@@ -117,7 +133,17 @@
*/
public synchronized void healthCheck()
{
- Preconditions.checkNotNull(session);
+ checkSession();
+
+ if (session == null)
+ {
+ logger.info("No local CQL session is available. Cassandra is down presumably.");
+ isUp = false;
+ return;
+ }
+
+ maybeRegisterHostListener(session);
+
try
{
String version = session.execute("select release_version from system.local")
@@ -128,15 +154,20 @@
SimpleCassandraVersion newVersion = SimpleCassandraVersion.create(version);
if (!newVersion.equals(currentVersion))
{
- currentVersion = SimpleCassandraVersion.create(version);
+ currentVersion = newVersion;
adapter = versionProvider.getCassandra(version).create(cqlSession);
- logger.info("Cassandra version change detected. New adapter loaded: {}", adapter);
+ logger.info("Cassandra version change detected. New adapter loaded: {}", adapter);
}
- logger.info("Cassandra version {}");
+ logger.info("Cassandra version {}", version);
}
catch (NoHostAvailableException e)
{
+ logger.error("Unexpected error connecting to Cassandra instance.", e);
+ // The cassandra node is down.
+ // Unregister the host listener and nullify the session in order to get a new object.
isUp = false;
+ maybeUnregisterHostListener(session);
+ session = null;
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
index 63e503f..5644d7b 100644
--- a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
+++ b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
@@ -27,6 +27,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.http.HttpServer;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.utils.SslUtils;
/**
@@ -36,12 +37,14 @@
public class CassandraSidecarDaemon
{
private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
+ private final CassandraAdapterDelegate delegate;
private final HttpServer server;
private final Configuration config;
@Inject
- public CassandraSidecarDaemon(HttpServer server, Configuration config)
+ public CassandraSidecarDaemon(CassandraAdapterDelegate delegate, HttpServer server, Configuration config)
{
+ this.delegate = delegate;
this.server = server;
this.config = config;
}
@@ -50,6 +53,7 @@
{
banner(System.out);
validate();
+ delegate.start();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
server.listen(config.getPort(), config.getHost());
}
@@ -57,6 +61,7 @@
public void stop()
{
logger.info("Stopping Cassandra Sidecar");
+ delegate.stop();
server.close();
}