blob: c0aecb804fcfe0ebecfeddf7ed6d22bdc1863471 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;
import org.apache.commons.configuration2.HierarchicalConfiguration;
import org.apache.commons.configuration2.YAMLConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.configuration2.tree.ImmutableNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.vertx.core.VertxOptions;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.utils.ValidationConfiguration;
import org.apache.cassandra.sidecar.common.utils.YAMLValidationConfiguration;
import org.apache.cassandra.sidecar.config.CacheConfiguration;
import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration;
import org.jetbrains.annotations.Nullable;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.ALLOWABLE_SKEW_IN_MINUTES;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CACHE_EXPIRE_AFTER_ACCESS_MILLIS;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CACHE_MAXIMUM_SIZE;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_ALLOWED_CHARS_FOR_COMPONENT_NAME;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_ALLOWED_CHARS_FOR_DIRECTORY;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_ALLOWED_CHARS_FOR_RESTRICTED_COMPONENT_NAME;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_FORBIDDEN_KEYSPACES;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_INPUT_VALIDATION;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_INSTANCE;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_INSTANCES;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_INSTANCE_DATA_DIRS;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_INSTANCE_HOST;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_INSTANCE_ID;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_INSTANCE_PORT;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_INSTANCE_STAGING_DIR;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_JMX_HOST;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_JMX_PORT;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_JMX_ROLE;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_JMX_ROLE_PASSWORD;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CASSANDRA_JMX_SSL_ENABLED;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.CONCURRENT_UPLOAD_LIMIT;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.HEALTH_CHECK_INTERVAL;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.HOST;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.KEYSTORE_PASSWORD;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.KEYSTORE_PATH;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.MIN_FREE_SPACE_PERCENT_FOR_UPLOAD;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.PORT;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.REQUEST_IDLE_TIMEOUT_MILLIS;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.REQUEST_TIMEOUT_MILLIS;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.SSL_ENABLED;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.SSTABLE_IMPORT_CACHE_CONFIGURATION;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.SSTABLE_IMPORT_POLL_INTERVAL_MILLIS;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.STREAM_REQUESTS_PER_SEC;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.THROTTLE_DELAY_SEC;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.THROTTLE_TIMEOUT_SEC;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.TRUSTSTORE_PASSWORD;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.TRUSTSTORE_PATH;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.WORKER_POOL_FOR_INTERNAL;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.WORKER_POOL_FOR_SERVICE;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.WORKER_POOL_MAX_EXECUTION_TIME_MILLIS;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.WORKER_POOL_NAME;
import static org.apache.cassandra.sidecar.utils.SidecarYaml.WORKER_POOL_SIZE;
/**
* A {@link Configuration} that is built from a YAML configuration file for Sidecar
*/
public class YAMLSidecarConfiguration extends Configuration
{
private static final Logger logger = LoggerFactory.getLogger(YAMLSidecarConfiguration.class);
private YAMLSidecarConfiguration(InstancesConfig instancesConfig,
String host,
Integer port,
int healthCheckFrequencyMillis,
boolean isSslEnabled,
@Nullable String keyStorePath,
@Nullable String keyStorePassword,
@Nullable String trustStorePath,
@Nullable String trustStorePassword,
long rateLimitStreamRequestsPerSecond,
long throttleTimeoutInSeconds,
long throttleDelayInSeconds,
int allowableSkewInMinutes,
int requestIdleTimeoutMillis,
long requestTimeoutMillis,
float minFreeSpacePercentRequiredForUpload,
int concurrentUploadsLimit,
int ssTableImportPollIntervalMillis,
ValidationConfiguration validationConfiguration,
CacheConfiguration ssTableImportCacheConfiguration,
WorkerPoolConfiguration serverWorkerPoolConfiguration,
WorkerPoolConfiguration serverInternalWorkerPoolConfiguration)
{
super(instancesConfig,
host,
port,
healthCheckFrequencyMillis,
isSslEnabled,
keyStorePath,
keyStorePassword,
trustStorePath,
trustStorePassword,
rateLimitStreamRequestsPerSecond,
throttleTimeoutInSeconds,
throttleDelayInSeconds,
allowableSkewInMinutes,
requestIdleTimeoutMillis,
requestTimeoutMillis,
minFreeSpacePercentRequiredForUpload,
concurrentUploadsLimit,
ssTableImportPollIntervalMillis,
validationConfiguration,
ssTableImportCacheConfiguration,
serverWorkerPoolConfiguration,
serverInternalWorkerPoolConfiguration);
}
/**
* Returns a new {@link Configuration} built from the provided {@code confPath} YAML file and a
* {@code versionProvider}
*
* @param confPath the path to the Sidecar YAML configuration file
* @param versionProvider a Cassandra version provider
* @param sidecarVersion the version of the Sidecar from the current binary
* @return the {@link YAMLConfiguration} parsed from the YAML file
* @throws IOException when reading the configuration from file fails
*/
public static Configuration of(String confPath,
CassandraVersionProvider versionProvider,
String sidecarVersion) throws IOException
{
YAMLConfiguration yamlConf = yamlConfiguration(confPath);
int healthCheckFrequencyMillis = yamlConf.getInt(HEALTH_CHECK_INTERVAL, 1000);
ValidationConfiguration validationConfiguration = validationConfiguration(yamlConf);
InstancesConfig instancesConfig = instancesConfig(yamlConf,
versionProvider,
healthCheckFrequencyMillis,
sidecarVersion);
CacheConfiguration ssTableImportCacheConfiguration = cacheConfig(yamlConf,
SSTABLE_IMPORT_CACHE_CONFIGURATION,
TimeUnit.HOURS.toMillis(2),
10_000);
WorkerPoolConfiguration serverWorkerPoolConf = workerPoolConfiguration(yamlConf,
WORKER_POOL_FOR_SERVICE,
"sidecar-worker-pool",
VertxOptions.DEFAULT_WORKER_POOL_SIZE,
TimeUnit.SECONDS.toMillis(60));
WorkerPoolConfiguration internalWorkerPoolConf = workerPoolConfiguration(yamlConf,
WORKER_POOL_FOR_INTERNAL,
"sidecar-internal-worker-pool",
VertxOptions.DEFAULT_WORKER_POOL_SIZE,
TimeUnit.SECONDS.toMillis(60));
return new YAMLSidecarConfiguration(instancesConfig,
yamlConf.get(String.class, HOST),
yamlConf.get(Integer.class, PORT),
healthCheckFrequencyMillis,
yamlConf.get(Boolean.class, SSL_ENABLED, false),
yamlConf.get(String.class, KEYSTORE_PATH, null),
yamlConf.get(String.class, KEYSTORE_PASSWORD, null),
yamlConf.get(String.class, TRUSTSTORE_PATH, null),
yamlConf.get(String.class, TRUSTSTORE_PASSWORD, null),
yamlConf.getLong(STREAM_REQUESTS_PER_SEC, 5000L),
yamlConf.getLong(THROTTLE_TIMEOUT_SEC, 10),
yamlConf.getLong(THROTTLE_DELAY_SEC, 5),
yamlConf.getInt(ALLOWABLE_SKEW_IN_MINUTES, 60),
yamlConf.getInt(REQUEST_IDLE_TIMEOUT_MILLIS, 300_000),
yamlConf.getLong(REQUEST_TIMEOUT_MILLIS, 300_000L),
yamlConf.getFloat(MIN_FREE_SPACE_PERCENT_FOR_UPLOAD, 10),
yamlConf.getInt(CONCURRENT_UPLOAD_LIMIT, 80),
yamlConf.getInt(SSTABLE_IMPORT_POLL_INTERVAL_MILLIS, 100),
validationConfiguration,
ssTableImportCacheConfiguration,
serverWorkerPoolConf,
internalWorkerPoolConf);
}
/**
* Returns an object to read the YAML file from {@code confPath}.
*
* @param confPath the YAML file that provides the Sidecar {@link Configuration}
* @return an object to read the YAML file from {@code confPath}
* @throws IOException when reading the configuration from file fails
*/
private static YAMLConfiguration yamlConfiguration(String confPath) throws IOException
{
logger.info("Reading configuration from {}", confPath);
try
{
URL url = new URL(confPath);
YAMLConfiguration yamlConf = new YAMLConfiguration();
InputStream stream = url.openStream();
yamlConf.read(stream);
return yamlConf;
}
catch (ConfigurationException | IOException e)
{
throw new IOException(String.format("Unable to parse cluster information from file='%s'", confPath), e);
}
}
/**
* Parses the {@link InstancesConfig} from the {@link YAMLConfiguration yamlConf}, the {@code versionProvider}, and
* the {@code healthCheckFrequencyMillis}.
*
* @param yamlConf the object used to parse the YAML file
* @param versionProvider a Cassandra version provider
* @param healthCheckFrequencyMillis the health check frequency configuration in milliseconds
* @param sidecarVersion the version of the Sidecar from the current binary
* @return the parsed {@link InstancesConfig} from the {@code yamlConf} object
*/
private static InstancesConfig instancesConfig(YAMLConfiguration yamlConf,
CassandraVersionProvider versionProvider,
int healthCheckFrequencyMillis,
String sidecarVersion)
{
/* Since we are supporting handling multiple instances in Sidecar optionally, we prefer reading single instance
* data over reading multiple instances section
*/
org.apache.commons.configuration2.Configuration singleInstanceConf = yamlConf.subset(CASSANDRA_INSTANCE);
if (singleInstanceConf != null && !singleInstanceConf.isEmpty())
{
InstanceMetadata instanceMetadata = buildInstanceMetadata(singleInstanceConf,
versionProvider,
healthCheckFrequencyMillis,
sidecarVersion);
return new InstancesConfigImpl(instanceMetadata);
}
List<HierarchicalConfiguration<ImmutableNode>> instances = yamlConf.configurationsAt(CASSANDRA_INSTANCES);
final List<InstanceMetadata> instanceMetas = new ArrayList<>();
for (HierarchicalConfiguration<ImmutableNode> instance : instances)
{
InstanceMetadata instanceMetadata = buildInstanceMetadata(instance,
versionProvider,
healthCheckFrequencyMillis,
sidecarVersion);
instanceMetas.add(instanceMetadata);
}
return new InstancesConfigImpl(instanceMetas);
}
private static CacheConfiguration cacheConfig(YAMLConfiguration yamlConf, String prefix,
long defaultExpireAfterAccessMillis, int defaultMaximumSize)
{
org.apache.commons.configuration2.Configuration cacheConf = yamlConf.subset(prefix);
return new CacheConfiguration(
cacheConf.getLong(CACHE_EXPIRE_AFTER_ACCESS_MILLIS, defaultExpireAfterAccessMillis),
cacheConf.getInt(CACHE_MAXIMUM_SIZE, defaultMaximumSize)
);
}
private static WorkerPoolConfiguration workerPoolConfiguration(YAMLConfiguration yamlConf, String prefix,
String defaultName, int defaultSize,
long defaultMaxExecutionTimeMillis)
{
org.apache.commons.configuration2.Configuration conf = yamlConf.subset(prefix);
return new WorkerPoolConfiguration(
conf.getString(WORKER_POOL_NAME, defaultName),
conf.getInt(WORKER_POOL_SIZE, defaultSize),
conf.getLong(WORKER_POOL_MAX_EXECUTION_TIME_MILLIS, defaultMaxExecutionTimeMillis)
);
}
/**
* Parses the {@link ValidationConfiguration} from the {@link YAMLConfiguration yamlConf}.
*
* @param yamlConf the object used to parse the YAML file
* @return the parsed {@link ValidationConfiguration} from the {@code yamlConf} object
*/
private static ValidationConfiguration validationConfiguration(YAMLConfiguration yamlConf)
{
org.apache.commons.configuration2.Configuration validation = yamlConf.subset(CASSANDRA_INPUT_VALIDATION);
Set<String> forbiddenKeyspaces = new HashSet<>(validation.getList(String.class,
CASSANDRA_FORBIDDEN_KEYSPACES,
Collections.emptyList()));
UnaryOperator<String> readString = key -> validation.get(String.class, key);
String allowedPatternForDirectory = readString.apply(CASSANDRA_ALLOWED_CHARS_FOR_DIRECTORY);
String allowedPatternForComponentName = readString.apply(CASSANDRA_ALLOWED_CHARS_FOR_COMPONENT_NAME);
String allowedPatternForRestrictedComponentName = readString
.apply(CASSANDRA_ALLOWED_CHARS_FOR_RESTRICTED_COMPONENT_NAME);
return new YAMLValidationConfiguration(forbiddenKeyspaces,
allowedPatternForDirectory,
allowedPatternForComponentName,
allowedPatternForRestrictedComponentName);
}
/**
* Builds the {@link InstanceMetadata} from the {@link org.apache.commons.configuration2.Configuration},
* a provided {@code versionProvider} and {@code healthCheckFrequencyMillis}.
*
* @param instance the object that allows reading from the YAML file
* @param versionProvider a Cassandra version provider
* @param healthCheckFrequencyMillis the health check frequency configuration in milliseconds
* @param sidecarVersion the version of the Sidecar from the current binary
* @return the parsed {@link InstanceMetadata} from YAML
*/
private static InstanceMetadata buildInstanceMetadata(org.apache.commons.configuration2.Configuration instance,
CassandraVersionProvider versionProvider,
int healthCheckFrequencyMillis,
String sidecarVersion)
{
int id = instance.get(Integer.class, CASSANDRA_INSTANCE_ID, 1);
String host = instance.get(String.class, CASSANDRA_INSTANCE_HOST);
int port = instance.get(Integer.class, CASSANDRA_INSTANCE_PORT);
String dataDirs = instance.get(String.class, CASSANDRA_INSTANCE_DATA_DIRS);
String stagingDir = instance.get(String.class, CASSANDRA_INSTANCE_STAGING_DIR);
String jmxHost = instance.get(String.class, CASSANDRA_JMX_HOST, "127.0.0.1");
int jmxPort = instance.get(Integer.class, CASSANDRA_JMX_PORT, 7199);
String jmxRole = instance.get(String.class, CASSANDRA_JMX_ROLE, null);
String jmxRolePassword = instance.get(String.class, CASSANDRA_JMX_ROLE_PASSWORD, null);
boolean jmxSslEnabled = instance.get(Boolean.class, CASSANDRA_JMX_SSL_ENABLED, false);
CQLSessionProvider session = new CQLSessionProvider(host, port, healthCheckFrequencyMillis);
JmxClient jmxClient = new JmxClient(jmxHost, jmxPort, jmxRole, jmxRolePassword, jmxSslEnabled);
return new InstanceMetadataImpl(id,
host,
port,
Collections.unmodifiableList(Arrays.asList(dataDirs.split(","))),
stagingDir,
session,
jmxClient,
versionProvider,
sidecarVersion);
}
}