blob: e692ab25abcb2e127fd318f653755cc578098d0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.clients;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import o.a.c.sidecar.client.shaded.common.response.NodeSettings;
import o.a.c.sidecar.client.shaded.io.vertx.core.Vertx;
import o.a.c.sidecar.client.shaded.io.vertx.core.VertxOptions;
import org.apache.cassandra.secrets.SecretsProvider;
import org.apache.cassandra.sidecar.client.HttpClientConfig;
import org.apache.cassandra.sidecar.client.SidecarClient;
import org.apache.cassandra.sidecar.client.SidecarClientConfig;
import org.apache.cassandra.sidecar.client.SidecarClientConfigImpl;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
import org.apache.cassandra.sidecar.client.VertxHttpClient;
import org.apache.cassandra.sidecar.client.VertxRequestExecutor;
import org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy;
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
import org.apache.cassandra.spark.bulkwriter.DataTransport;
import org.apache.cassandra.spark.common.model.CassandraInstance;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.utils.BuildInfo;
import org.apache.cassandra.spark.utils.MapUtils;
import org.apache.cassandra.spark.validation.KeyStoreValidation;
import org.apache.cassandra.spark.validation.SslValidation;
import org.apache.cassandra.spark.validation.StartupValidator;
import org.apache.cassandra.spark.validation.TrustStoreValidation;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_OVERRIDE;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_SIZE;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_SIZE;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_MILLIS_TO_SLEEP;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_POOL_SIZE;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_RETRIES;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MILLIS_TO_SLEEP;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_SIDECAR_PORT;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_TIMEOUT_SECONDS;
/**
* A helper class that encapsulates configuration for the Spark Bulk Reader and Writer and helper methods to build the
* {@link SidecarClient}
*/
public final class Sidecar
{
private static final Logger LOGGER = LoggerFactory.getLogger(Sidecar.class);
private Sidecar()
{
throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated");
}
public static SidecarClient from(SidecarInstancesProvider sidecarInstancesProvider,
ClientConfig config,
SecretsProvider secretsProvider) throws IOException
{
Vertx vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true).setWorkerPoolSize(config.maxPoolSize()));
HttpClientConfig.Builder<?> builder = new HttpClientConfig.Builder<>()
.ssl(false)
.timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
.idleTimeoutMillis((int) TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
.receiveBufferSize((int) config.chunkBufferSize())
.maxChunkSize((int) config.maxBufferSize())
.userAgent(BuildInfo.READER_USER_AGENT);
if (secretsProvider != null)
{
builder = builder
.ssl(true)
.keyStoreInputStream(secretsProvider.keyStoreInputStream())
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
.keyStoreType(secretsProvider.keyStoreType())
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
.trustStoreType(secretsProvider.trustStoreType());
StartupValidator.instance().register(new KeyStoreValidation(secretsProvider));
StartupValidator.instance().register(new TrustStoreValidation(secretsProvider));
}
HttpClientConfig httpClientConfig = builder.build();
SidecarClientConfig sidecarConfig = SidecarClientConfigImpl.builder()
.maxRetries(config.maxRetries())
.retryDelayMillis(config.millisToSleep())
.maxRetryDelayMillis(config.maxMillisToSleep())
.build();
return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider);
}
static String transportModeBasedWriterUserAgent(DataTransport transport)
{
switch (transport)
{
case S3_COMPAT:
return BuildInfo.WRITER_S3_USER_AGENT;
case DIRECT:
default:
return BuildInfo.WRITER_USER_AGENT;
}
}
public static SidecarClient from(SidecarInstancesProvider sidecarInstancesProvider, BulkSparkConf conf)
{
Vertx vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true)
.setWorkerPoolSize(conf.getMaxHttpConnections()));
String userAgent = transportModeBasedWriterUserAgent(conf.getTransportInfo().getTransport());
HttpClientConfig httpClientConfig = new HttpClientConfig.Builder<>()
.timeoutMillis(conf.getHttpResponseTimeoutMs())
.idleTimeoutMillis(conf.getHttpConnectionTimeoutMs())
.userAgent(userAgent)
.keyStoreInputStream(conf.getKeyStore())
.keyStorePassword(conf.getKeyStorePassword())
.keyStoreType(conf.getKeyStoreTypeOrDefault())
.trustStoreInputStream(conf.getTrustStore())
.trustStorePassword(conf.getTrustStorePasswordOrDefault())
.trustStoreType(conf.getTrustStoreTypeOrDefault())
.ssl(conf.hasKeystoreAndKeystorePassword())
.build();
StartupValidator.instance().register(new SslValidation(conf));
StartupValidator.instance().register(new KeyStoreValidation(conf));
StartupValidator.instance().register(new TrustStoreValidation(conf));
SidecarClientConfig sidecarConfig =
SidecarClientConfigImpl.builder()
.maxRetries(conf.getSidecarRequestRetries())
.retryDelayMillis(conf.getSidecarRequestRetryDelayMillis())
.maxRetryDelayMillis(conf.getSidecarRequestMaxRetryDelayMillis())
.build();
return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider);
}
public static SidecarClient buildClient(SidecarClientConfig sidecarConfig,
Vertx vertx,
HttpClientConfig httpClientConfig,
SidecarInstancesProvider clusterConfig)
{
RetryPolicy defaultRetryPolicy = new ExponentialBackoffRetryPolicy(sidecarConfig.maxRetries(),
sidecarConfig.retryDelayMillis(),
sidecarConfig.maxRetryDelayMillis());
VertxHttpClient vertxHttpClient = new VertxHttpClient(vertx, httpClientConfig);
VertxRequestExecutor requestExecutor = new VertxRequestExecutor(vertxHttpClient);
return new SidecarClient(clusterConfig, requestExecutor, sidecarConfig, defaultRetryPolicy);
}
public static List<CompletableFuture<NodeSettings>> allNodeSettings(SidecarClient client,
Set<? extends SidecarInstance> instances)
{
return instances.stream()
.map(instance -> client
.nodeSettings(instance)
.exceptionally(throwable -> {
LOGGER.warn(String.format("Failed to execute node settings on instance=%s",
instance), throwable);
return null;
}))
.collect(Collectors.toList());
}
public static SidecarInstance toSidecarInstance(CassandraInstance instance, int sidecarPort)
{
return new SidecarInstanceImpl(instance.nodeName(), sidecarPort);
}
public static final class ClientConfig
{
public static final String SIDECAR_PORT = "sidecar_port";
public static final String MAX_RETRIES_KEY = "maxRetries";
public static final String DEFAULT_MILLIS_TO_SLEEP_KEY = "defaultMillisToSleep";
public static final String MAX_MILLIS_TO_SLEEP_KEY = "maxMillisToSleep";
public static final String MAX_BUFFER_SIZE_BYTES_KEY = "maxBufferSizeBytes";
public static final String CHUNK_BUFFER_SIZE_BYTES_KEY = "chunkBufferSizeBytes";
public static final String MAX_POOL_SIZE_KEY = "maxPoolSize";
public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds";
private final int userProvidedPort;
private final int maxRetries;
private final int maxPoolSize;
private final int timeoutSeconds;
private final long millisToSleep;
private final long maxMillisToSleep;
private final long maxBufferSize;
private final long chunkSize;
private final Map<FileType, Long> maxBufferOverride;
private final Map<FileType, Long> chunkBufferOverride;
// CHECKSTYLE IGNORE: Constructor with many parameters
private ClientConfig(int userProvidedPort,
int maxRetries,
long millisToSleep,
long maxMillisToSleep,
long maxBufferSize,
long chunkSize,
int maxPoolSize,
int timeoutSeconds,
Map<FileType, Long> maxBufferOverride,
Map<FileType, Long> chunkBufferOverride)
{
this.userProvidedPort = userProvidedPort;
this.maxRetries = maxRetries;
this.millisToSleep = millisToSleep;
this.maxMillisToSleep = maxMillisToSleep;
this.maxBufferSize = maxBufferSize;
this.chunkSize = chunkSize;
this.maxPoolSize = maxPoolSize;
this.timeoutSeconds = timeoutSeconds;
this.maxBufferOverride = maxBufferOverride;
this.chunkBufferOverride = chunkBufferOverride;
}
public int userProvidedPort()
{
return userProvidedPort;
}
public int effectivePort()
{
return userProvidedPort == -1 ? DEFAULT_SIDECAR_PORT : userProvidedPort;
}
public int maxRetries()
{
return maxRetries;
}
public long millisToSleep()
{
return millisToSleep;
}
public long maxMillisToSleep()
{
return maxMillisToSleep;
}
public long maxBufferSize()
{
return maxBufferSize(FileType.DATA);
}
public long maxBufferSize(FileType fileType)
{
return maxBufferOverride.getOrDefault(fileType, maxBufferSize);
}
public Map<FileType, Long> maxBufferOverride()
{
return maxBufferOverride;
}
public long chunkBufferSize()
{
return chunkBufferSize(FileType.DATA);
}
public long chunkBufferSize(FileType fileType)
{
return chunkBufferOverride.getOrDefault(fileType, chunkSize);
}
public Map<FileType, Long> chunkBufferOverride()
{
return chunkBufferOverride;
}
public int maxPoolSize()
{
return maxPoolSize;
}
public int timeoutSeconds()
{
return timeoutSeconds;
}
public static ClientConfig create()
{
return ClientConfig.create(-1, DEFAULT_MAX_RETRIES, DEFAULT_MILLIS_TO_SLEEP);
}
public static ClientConfig create(int userProvidedPort, int effectivePort)
{
return ClientConfig.create(userProvidedPort, DEFAULT_MAX_RETRIES, DEFAULT_MILLIS_TO_SLEEP);
}
public static ClientConfig create(int userProvidedPort, int maxRetries, long millisToSleep)
{
return ClientConfig.create(userProvidedPort,
maxRetries,
millisToSleep,
DEFAULT_MAX_MILLIS_TO_SLEEP,
DEFAULT_MAX_BUFFER_SIZE,
DEFAULT_CHUNK_BUFFER_SIZE,
DEFAULT_MAX_POOL_SIZE,
DEFAULT_TIMEOUT_SECONDS,
DEFAULT_MAX_BUFFER_OVERRIDE,
DEFAULT_CHUNK_BUFFER_OVERRIDE);
}
public static ClientConfig create(Map<String, String> options)
{
Optional<Integer> userProvidedPort = MapUtils.getOptionalInt(options, SIDECAR_PORT, SIDECAR_PORT);
return create(userProvidedPort.orElse(-1),
MapUtils.getInt(options, MAX_RETRIES_KEY, DEFAULT_MAX_RETRIES),
MapUtils.getLong(options, DEFAULT_MILLIS_TO_SLEEP_KEY, DEFAULT_MILLIS_TO_SLEEP),
MapUtils.getLong(options, MAX_MILLIS_TO_SLEEP_KEY, DEFAULT_MAX_MILLIS_TO_SLEEP),
MapUtils.getLong(options, MAX_BUFFER_SIZE_BYTES_KEY, DEFAULT_MAX_BUFFER_SIZE),
MapUtils.getLong(options, CHUNK_BUFFER_SIZE_BYTES_KEY, DEFAULT_CHUNK_BUFFER_SIZE),
MapUtils.getInt(options, MAX_POOL_SIZE_KEY, DEFAULT_MAX_POOL_SIZE),
MapUtils.getInt(options, TIMEOUT_SECONDS_KEY, DEFAULT_TIMEOUT_SECONDS),
buildMaxBufferOverride(options, DEFAULT_MAX_BUFFER_OVERRIDE),
buildChunkBufferOverride(options, DEFAULT_CHUNK_BUFFER_OVERRIDE)
);
}
public static Map<FileType, Long> buildMaxBufferOverride(Map<String, String> options,
Map<FileType, Long> defaultValue)
{
return buildOverrideMap(MAX_BUFFER_SIZE_BYTES_KEY, options, defaultValue);
}
public static Map<FileType, Long> buildChunkBufferOverride(Map<String, String> options,
Map<FileType, Long> defaultValue)
{
return buildOverrideMap(CHUNK_BUFFER_SIZE_BYTES_KEY, options, defaultValue);
}
private static Map<FileType, Long> buildOverrideMap(String keyPrefix,
Map<String, String> options,
Map<FileType, Long> defaultValue)
{
Map<FileType, Long> result = new HashMap<>(defaultValue);
for (FileType type : FileType.values())
{
// Override with DataSourceOptions if set, e.g. maxBufferSizeBytes_Index.db
String key = MapUtils.lowerCaseKey(String.format("%s_%s", keyPrefix, type.getFileSuffix()));
Optional.ofNullable(options.get(key)).map(Long::parseLong).ifPresent(s -> result.put(type, s));
}
return result;
}
// CHECKSTYLE IGNORE: Method with many parameters
public static ClientConfig create(int userProvidedPort,
int maxRetries,
long millisToSleep,
long maxMillisToSleep,
long maxBufferSizeBytes,
long chunkSizeBytes,
int maxPoolSize,
int timeoutSeconds,
Map<FileType, Long> maxBufferOverride,
Map<FileType, Long> chunkBufferOverride)
{
return new ClientConfig(userProvidedPort,
maxRetries,
millisToSleep,
maxMillisToSleep,
maxBufferSizeBytes,
chunkSizeBytes,
maxPoolSize,
timeoutSeconds,
maxBufferOverride,
chunkBufferOverride);
}
}
}