blob: 51b67b2d60c788fa606020e2fb3797ed37293c47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.server;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SSLOptions;
import io.vertx.core.net.TrafficShapingOptions;
import io.vertx.ext.web.Router;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.utils.Preconditions;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.config.SslConfiguration;
import org.apache.cassandra.sidecar.metrics.ServerMetrics;
import org.apache.cassandra.sidecar.tasks.HealthCheckPeriodicTask;
import org.apache.cassandra.sidecar.tasks.KeyStoreCheckPeriodicTask;
import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor;
import org.apache.cassandra.sidecar.utils.SslUtils;
import org.jetbrains.annotations.VisibleForTesting;
import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY;
import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
/**
* The Sidecar {@link Server} class that manages the start and stop lifecycle of the service
*/
@Singleton
public class Server
{
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
protected final Vertx vertx;
protected final ExecutorPools executorPools;
protected final SidecarConfiguration sidecarConfiguration;
protected final InstancesConfig instancesConfig;
protected final Router router;
protected final PeriodicTaskExecutor periodicTaskExecutor;
protected final HttpServerOptionsProvider optionsProvider;
protected final ServerMetrics serverMetrics;
protected final List<ServerVerticle> deployedServerVerticles = new CopyOnWriteArrayList<>();
// Keeps track of all the Cassandra instance identifiers where CQL is ready
private final Set<Integer> cqlReadyInstanceIds = Collections.synchronizedSet(new HashSet<>());
@Inject
public Server(Vertx vertx,
SidecarConfiguration sidecarConfiguration,
Router router,
InstancesConfig instancesConfig,
ExecutorPools executorPools,
PeriodicTaskExecutor periodicTaskExecutor,
HttpServerOptionsProvider optionsProvider,
ServerMetrics serverMetrics)
{
this.vertx = vertx;
this.executorPools = executorPools;
this.sidecarConfiguration = sidecarConfiguration;
this.instancesConfig = instancesConfig;
this.router = router;
this.periodicTaskExecutor = periodicTaskExecutor;
this.optionsProvider = optionsProvider;
this.serverMetrics = serverMetrics;
}
/**
* Deploys the {@link ServerVerticle verticles} to {@link Vertx}.
*
* @return a future completed with the result
*/
public Future<String> start()
{
banner(System.out);
validate();
LOGGER.info("Starting Cassandra Sidecar");
int serverVerticleCount = sidecarConfiguration.serviceConfiguration().serverVerticleInstances();
Preconditions.checkArgument(serverVerticleCount > 0,
"Server verticle count can not be less than 1");
LOGGER.debug("Deploying {} verticles to vertx", serverVerticleCount);
DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(serverVerticleCount);
HttpServerOptions options = optionsProvider.apply(sidecarConfiguration);
return vertx.deployVerticle(() -> {
ServerVerticle serverVerticle = new ServerVerticle(sidecarConfiguration, router, options);
deployedServerVerticles.add(serverVerticle);
return serverVerticle;
}, deploymentOptions)
.compose(this::scheduleInternalPeriodicTasks)
.compose(this::notifyServerStart);
}
/**
* Undeploy a verticle deployment, stopping all the {@link ServerVerticle verticles}.
*
* @param deploymentId the deployment ID
* @return a future completed with the result
*/
public Future<Void> stop(String deploymentId)
{
LOGGER.info("Stopping Cassandra Sidecar");
deployedServerVerticles.clear();
Objects.requireNonNull(deploymentId, "deploymentId must not be null");
return notifyServerStopping(deploymentId)
.compose(v -> vertx.undeploy(deploymentId))
.onSuccess(v -> LOGGER.info("Successfully stopped Cassandra Sidecar"));
}
/**
* Stops the {@link Vertx} instance and release any resources held by it.
*
* <p>The instance cannot be used after it has been closed.
*
* @return a future completed with the result
*/
public Future<Void> close()
{
LOGGER.info("Stopping Cassandra Sidecar");
deployedServerVerticles.clear();
List<Future<Void>> closingFutures = new ArrayList<>();
closingFutures.add(notifyServerStopping(null));
Promise<Void> periodicTaskExecutorPromise = Promise.promise();
periodicTaskExecutor.close(periodicTaskExecutorPromise);
closingFutures.add(periodicTaskExecutorPromise.future());
instancesConfig.instances()
.forEach(instance ->
closingFutures.add(executorPools.internal()
.executeBlocking(promise -> {
instance.delegate().close();
promise.complete(null);
})));
return Future.all(closingFutures)
.compose(v1 -> executorPools.close())
.onComplete(v -> vertx.close())
.onSuccess(f -> LOGGER.info("Successfully stopped Cassandra Sidecar"));
}
/**
* Updates the SSL Options for all servers in all the deployed verticle instances with the {@code timestamp}
* of the updated file
*
* @param timestamp the timestamp of the updated file
* @return a future to indicate the update was successfully completed
*/
public Future<CompositeFuture> updateSSLOptions(long timestamp)
{
SSLOptions options = new SSLOptions();
// Sets the updated SSL options
optionsProvider.configureSSLOptions(options, sidecarConfiguration.sslConfiguration(), timestamp);
// Updates the SSL options of all the deployed verticles
List<Future<CompositeFuture>> updateFutures =
deployedServerVerticles.stream()
.map(serverVerticle -> serverVerticle.updateSSLOptions(options))
.collect(Collectors.toList());
return Future.all(updateFutures);
}
/**
* Updates the traffic shaping options for all servers in all the deployed verticle instances
*
* @param options update traffic shaping options
*/
public void updateTrafficShapingOptions(TrafficShapingOptions options)
{
// Updates the traffic shaping options of all the deployed verticles
deployedServerVerticles.forEach(serverVerticle -> serverVerticle.updateTrafficShapingOptions(options));
}
/**
* Expose the port of the first deployed verticle for testing purposes
*
* @return the port where the first verticle is deployed, or -1 if the server has not been deployed
*/
@VisibleForTesting
public int actualPort()
{
if (!deployedServerVerticles.isEmpty())
return deployedServerVerticles.get(0).actualPort();
return -1;
}
protected Future<String> notifyServerStart(String deploymentId)
{
LOGGER.info("Successfully started Cassandra Sidecar");
vertx.eventBus().publish(SidecarServerEvents.ON_SERVER_START.address(), deploymentId);
return Future.succeededFuture(deploymentId);
}
protected Future<Void> notifyServerStopping(String deploymentId)
{
vertx.eventBus().publish(SidecarServerEvents.ON_SERVER_STOP.address(), deploymentId);
return Future.succeededFuture();
}
protected void banner(PrintStream out)
{
out.println(" _____ _ _____ _ _ \n" +
"/ __ \\ | | / ___(_) | | \n" +
"| / \\/ __ _ ___ ___ __ _ _ __ __| |_ __ __ _ \\ `--. _ __| | ___ ___ __ _ _ __ \n" +
"| | / _` / __/ __|/ _` | '_ \\ / _` | '__/ _` | `--. \\ |/ _` |/ _ \\/ __/ _` | '__|\n" +
"| \\__/\\ (_| \\__ \\__ \\ (_| | | | | (_| | | | (_| | /\\__/ / | (_| | __/ (_| (_| | | \n" +
" \\____/\\__,_|___/___/\\__,_|_| |_|\\__,_|_| \\__,_| \\____/|_|\\__,_|\\___|\\___\\__,_|_|\n" +
" \n" +
" ");
}
protected void validate()
{
SslConfiguration ssl = sidecarConfiguration.sslConfiguration();
if (ssl == null || !ssl.enabled())
{
return;
}
try
{
if (!ssl.isKeystoreConfigured())
throw new IllegalArgumentException("keyStorePath and keyStorePassword must be set if ssl enabled");
SslUtils.validateSslOpts(ssl.keystore());
if (ssl.isTrustStoreConfigured())
SslUtils.validateSslOpts(ssl.truststore());
}
catch (Exception e)
{
throw new RuntimeException("Invalid keystore parameters for SSL", e);
}
}
/**
* Schedules internal {@link org.apache.cassandra.sidecar.tasks.PeriodicTask}s.
*
* @param deploymentId the deployment ID
* @return a succeeded future with the deployment ID of the server
*/
protected Future<String> scheduleInternalPeriodicTasks(String deploymentId)
{
periodicTaskExecutor.schedule(new HealthCheckPeriodicTask(vertx,
sidecarConfiguration,
instancesConfig,
executorPools,
serverMetrics));
maybeScheduleKeyStoreCheckPeriodicTask();
MessageConsumer<JsonObject> cqlReadyConsumer = vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address());
cqlReadyConsumer.handler(message -> onCqlReady(cqlReadyConsumer, message));
return Future.succeededFuture(deploymentId);
}
/**
* When the SSL configuration is provided and enabled it schedules a periodic task to check for changes
* in the keystore file.
*/
protected void maybeScheduleKeyStoreCheckPeriodicTask()
{
SslConfiguration ssl = sidecarConfiguration.sslConfiguration();
if (ssl == null
|| !ssl.enabled()
|| !ssl.keystore().isConfigured()
|| !ssl.keystore().reloadStore())
{
return;
}
// the checks for the keystore changes are initialized here because we need a reference to the
// server to be able to update the SSL options
periodicTaskExecutor.schedule(new KeyStoreCheckPeriodicTask(vertx, this, ssl));
}
/**
* Handles CQL ready events. When all the expected CQL connections are ready, notifies to the
* {@link SidecarServerEvents#ON_ALL_CASSANDRA_CQL_READY} address.
*
* @param cqlReadyConsumer the consumer
* @param message the received message
*/
protected void onCqlReady(MessageConsumer<JsonObject> cqlReadyConsumer, Message<JsonObject> message)
{
cqlReadyInstanceIds.add(message.body().getInteger("cassandraInstanceId"));
boolean isCqlReadyOnAllInstances = instancesConfig.instances().stream()
.map(InstanceMetadata::id)
.allMatch(cqlReadyInstanceIds::contains);
if (isCqlReadyOnAllInstances)
{
cqlReadyConsumer.unregister(); // stop listening to CQL ready events
notifyAllCassandraCqlAreReady();
LOGGER.info("CQL is ready for all Cassandra instances. {}", cqlReadyInstanceIds);
}
}
/**
* Constructs the notification message containing all the Cassandra instance IDs and publishes the message
* notifying consumers that all the CQL connections are available.
*/
protected void notifyAllCassandraCqlAreReady()
{
JsonArray cassandraInstanceIds = new JsonArray();
cqlReadyInstanceIds.forEach(cassandraInstanceIds::add);
JsonObject allReadyMessage = new JsonObject()
.put("cassandraInstanceIds", cassandraInstanceIds);
vertx.eventBus().publish(ON_ALL_CASSANDRA_CQL_READY.address(), allReadyMessage);
}
}