blob: d788f7bc0cea4f1437e909b9b9354f58b070a1e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar;
import java.io.PrintStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import org.apache.cassandra.sidecar.utils.SslUtils;
/**
* Main class for initiating the Cassandra sidecar
* Note: remember to start and stop all delegates of instances
*/
@Singleton
public class CassandraSidecarDaemon
{
private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
private final Vertx vertx;
private final HttpServer server;
private final Configuration config;
private long healthCheckTimerId;
@Inject
public CassandraSidecarDaemon(Vertx vertx, HttpServer server, Configuration config)
{
this.vertx = vertx;
this.server = server;
this.config = config;
}
public void start()
{
banner(System.out);
validate();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
server.listen(config.getPort(), config.getHost())
.onSuccess(p -> {
// Run a health check after start up
healthCheck();
// Configure the periodic timer to run subsequent health checks configured
// by the config.getHealthCheckFrequencyMillis() interval
updateHealthChecker(config.getHealthCheckFrequencyMillis());
});
}
public void stop()
{
logger.info("Stopping Cassandra Sidecar");
server.close();
vertx.cancelTimer(healthCheckTimerId);
config.getInstancesConfig()
.instances()
.forEach(instanceMetadata ->
vertx.executeBlocking(promise -> instanceMetadata.session().close()));
}
private void banner(PrintStream out)
{
out.println(" _____ _ _____ _ _ \n" +
"/ __ \\ | | / ___(_) | | \n" +
"| / \\/ __ _ ___ ___ __ _ _ __ __| |_ __ __ _ \\ `--. _ __| | ___ ___ __ _ _ __ \n" +
"| | / _` / __/ __|/ _` | '_ \\ / _` | '__/ _` | `--. \\ |/ _` |/ _ \\/ __/ _` | '__|\n" +
"| \\__/\\ (_| \\__ \\__ \\ (_| | | | | (_| | | | (_| | /\\__/ / | (_| | __/ (_| (_| | | \n" +
" \\____/\\__,_|___/___/\\__,_|_| |_|\\__,_|_| \\__,_| \\____/|_|\\__,_|\\___|\\___\\__,_|_|\n" +
" \n" +
" ");
}
private void validate()
{
if (config.isSslEnabled())
{
try
{
if (config.getKeyStorePath() == null || config.getKeystorePassword() == null)
throw new IllegalArgumentException("keyStorePath and keyStorePassword must be set if ssl enabled");
SslUtils.validateSslOpts(config.getKeyStorePath(), config.getKeystorePassword());
if (config.getTrustStorePath() != null && config.getTruststorePassword() != null)
SslUtils.validateSslOpts(config.getTrustStorePath(), config.getTruststorePassword());
}
catch (Exception e)
{
throw new RuntimeException("Invalid keystore parameters for SSL", e);
}
}
}
/**
* Updates the health check frequency to the provided {@code healthCheckFrequencyMillis} value
*
* @param healthCheckFrequencyMillis the new health check frequency in milliseconds
*/
public void updateHealthChecker(long healthCheckFrequencyMillis)
{
if (healthCheckTimerId > 0)
{
// Stop existing timer
vertx.cancelTimer(healthCheckTimerId);
logger.info("Stopped health check timer with timerId={}", healthCheckTimerId);
}
// TODO: when upgrading to latest vertx version, we can set an initial delay, and the periodic delay
healthCheckTimerId = vertx.setPeriodic(healthCheckFrequencyMillis, t -> healthCheck());
logger.info("Started health check with frequency={} and timerId={}",
healthCheckFrequencyMillis, healthCheckTimerId);
}
/**
* Checks the health of every instance configured in the {@link Configuration#getInstancesConfig()}.
* The health check is executed in a blocking thread to prevent the event-loop threads from blocking.
*/
private void healthCheck()
{
config.getInstancesConfig().instances()
.forEach((instanceMetadata) ->
vertx.executeBlocking((promise) -> instanceMetadata.delegate().healthCheck()));
}
public static void main(String[] args)
{
CassandraSidecarDaemon app = Guice.createInjector(new MainModule())
.getInstance(CassandraSidecarDaemon.class);
app.start();
Runtime.getRuntime().addShutdownHook(new Thread(app::stop));
}
}