blob: 3a70c9033b33b5067c690a36b26592d31e23a23d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.model
import org.apache.druid.metadata.DynamicConfigProvider
import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
import scala.collection.mutable
class S3DeepStorageConfig extends DeepStorageConfig(DruidConfigurationKeys.s3DeepStorageTypeKey) {
private val optionsMap: mutable.Map[String, String] = mutable.Map[String, String](
DruidConfigurationKeys.deepStorageTypeKey -> deepStorageType
)
def bucket(bucket: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.bucketKey), bucket)
}
def baseKey(baseKey: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3BaseKeyKey), baseKey)
}
def maxListingLength(maxListingLength: Int): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.bucketKey), maxListingLength)
}
def disableAcl(disableAcl: Boolean): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3DisableAclKey), disableAcl)
}
def useS3aSchema(useS3aSchema: Boolean): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3UseS3ASchemaKey), useS3aSchema)
}
/**
* Extracts the S3 access key and secret key from KEYSPROVIDER using ACCESSKEYCONFKEY and SECRETKEYCONFKEY to extract
* the access key and secret key, respectively.
*
* @param keysProvider The DynamicConfigProvider providing the S3 access and secret keys.
* @param accessKeyConfKey The key in the config provided by KEYSPROVIDER whose value is the access key to use.
* @param secretKeyConfKey The key in the config provided by KEYSPROVIDER whose value is the secret key to use.
*/
def keys(
keysProvider: DynamicConfigProvider[String],
accessKeyConfKey: String,
secretKeyConfKey: String
): S3DeepStorageConfig = {
val config = keysProvider.getConfig
val accessKey = config.getOrDefault(accessKeyConfKey, "")
val secretKey = config.getOrDefault(secretKeyConfKey, "")
addToOptions(prefix(DruidConfigurationKeys.s3AccessKeyKey), accessKey)
addToOptions(prefix(DruidConfigurationKeys.s3AccessKeyKey), secretKey)
}
def accessKey(accessKey: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3AccessKeyKey), accessKey)
}
def secretKey(secretKey: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3AccessKeyKey), secretKey)
}
def fileSessionCredentials(fileSessionCredentials: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3FileSessionCredentialsKey), fileSessionCredentials)
}
def proxyHost(host: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyHostKey), host)
}
def proxyPort(port: Int): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyPortKey), port.toString)
}
def proxyUsername(username: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyUsernameKey), username)
}
/**
* Extracts the proxy password to use in communicating with S3 from PASSWORDPROVIDER using CONFKEY.
*
* @param passwordProvider The DynamicConfigProvider providing the proxy password.
* @param confKey The key in the config provided by PASSWORDPROVIDER whose value is the proxy password to use.
*/
def proxyPassword(passwordProvider: DynamicConfigProvider[String], confKey: String): S3DeepStorageConfig = {
val config = passwordProvider.getConfig
addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyPasswordKey),
config.getOrDefault(confKey, ""))
}
def proxyPassword(password: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyPasswordKey), password)
}
def endpointUrl(endpointUrl: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3EndpointPrefix, DruidConfigurationKeys.s3EndpointUrlKey), endpointUrl)
}
def endpointSigningRegion(endpointSigningRegion: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3EndpointPrefix, DruidConfigurationKeys.s3EndpointSigningRegionKey),
endpointSigningRegion
)
}
def protocol(protocol: String): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.protocolKey), protocol)
}
def disableChunkedEnconding(disableChunkedEncoding: Boolean): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3DisableChunkedEncodingKey), disableChunkedEncoding)
}
def enablePathStyleAcess(enablePathStyleAccess: Boolean): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3EnablePathStyleAccessKey), enablePathStyleAccess)
}
def forceGlobalBucketAccessEnabled(forceGlobalBucketAccessEnabled: Boolean): S3DeepStorageConfig = {
addToOptions(prefix(DruidConfigurationKeys.s3ForceGlobalBucketAccessEnabledKey), forceGlobalBucketAccessEnabled)
}
def sseType(sseType: String): S3DeepStorageConfig = {
addToOptions(prefix(
DruidConfigurationKeys.s3ServerSideEncryptionPrefix, DruidConfigurationKeys.s3ServerSideEncryptionTypeKey),
sseType
)
}
def sseKmsKeyId(sseKmsKeyId: String): S3DeepStorageConfig = {
addToOptions(prefix(
DruidConfigurationKeys.s3ServerSideEncryptionPrefix, DruidConfigurationKeys.s3ServerSideEncryptionKmsKeyIdKey),
sseKmsKeyId
)
}
def sseCustomKey(sseCustomKey: String): S3DeepStorageConfig = {
addToOptions(prefix(
DruidConfigurationKeys.s3ServerSideEncryptionPrefix, DruidConfigurationKeys.s3ServerSideEncryptionCustomKeyKey),
sseCustomKey
)
}
override def toOptions: Map[String, String] = optionsMap.toMap
private def addToOptions(key: String, value: Any): S3DeepStorageConfig = {
optionsMap.put(key, value.toString)
this
}
private def prefix(keys: String*): String = {
Configuration.toKey(DruidConfigurationKeys.s3DeepStorageTypeKey +: keys:_*)
}
}