blob: ed3125bd56099a0ebc2a8374c0d34281e3f7f7fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.utils
import com.fasterxml.jackson.databind.MapperFeature
import com.fasterxml.jackson.databind.introspect.AnnotatedClass
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.druid.common.aws.{AWSClientConfig, AWSCredentialsConfig, AWSEndpointConfig,
AWSModule, AWSProxyConfig}
import org.apache.druid.common.gcp.GcpModule
import org.apache.druid.java.util.common.StringUtils
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig
import org.apache.druid.spark.MAPPER
import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
import org.apache.druid.spark.mixins.TryWithResources
import org.apache.druid.storage.azure.{AzureAccountConfig, AzureDataSegmentConfig,
AzureInputDataConfig, AzureStorage, AzureStorageDruidModule}
import org.apache.druid.storage.google.{GoogleAccountConfig, GoogleInputDataConfig, GoogleStorage,
GoogleStorageDruidModule}
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig
import org.apache.druid.storage.s3.{NoopServerSideEncryption, S3DataSegmentPusherConfig,
S3InputDataConfig, S3SSECustomConfig, S3SSEKmsConfig, S3StorageConfig, S3StorageDruidModule,
ServerSideEncryptingAmazonS3, ServerSideEncryption}
import org.apache.hadoop.conf.{Configuration => HConf}
import java.io.{ByteArrayInputStream, DataInputStream}
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
object DeepStorageConstructorHelpers extends TryWithResources {
/*
* Spark DataSourceOption property maps are case insensitive, by which they mean they lower-case all keys. Since all
* our user-provided property keys will come to us via a DataSourceOption, we need to use a case-insensisitive jackson
* mapper to deserialize property maps into objects. We want to be case-aware in the rest of our code, so we create a
* private, case-insensitive copy of our mapper here.
*/
private val caseInsensitiveMapper = MAPPER.copy()
.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
.registerModule(DefaultScalaModule)
// Local Storage Helpers
def createLocalDataSegmentPusherConfig(conf: Configuration): LocalDataSegmentPusherConfig = {
convertConfToInstance(conf, classOf[LocalDataSegmentPusherConfig])
}
// HDFS Storage Helpers
def createHdfsDataSegmentPusherConfig(conf: Configuration): HdfsDataSegmentPusherConfig = {
convertConfToInstance(conf, classOf[HdfsDataSegmentPusherConfig])
}
def createHadoopConfiguration(conf: Configuration): HConf = {
val hadoopConf = new HConf()
val confByteStream = new ByteArrayInputStream(
StringUtils.decodeBase64String(conf.getString(DruidConfigurationKeys.hdfsHadoopConfKey))
)
tryWithResources(confByteStream, new DataInputStream(confByteStream)){
case (_, inputStream: DataInputStream) => hadoopConf.readFields(inputStream)
}
hadoopConf
}
// S3 Storage Helpers
/**
* Create an S3DataSegmentPusherConfig from the relevant properties in CONF.
*
* *** Note that we explicitly override the default for `useS3aSchema`! ***
* Almost all users will want to use s3a, not s3n, and we have no backwards-compatibility to maintain.
*
* @param conf The Configuration object specifying the S3DataSegmentPusherConfig to create.
* @return An S3DataSegmentPusherConfig derived from the properties specified in CONF.
*/
def createS3DataSegmentPusherConfig(conf: Configuration): S3DataSegmentPusherConfig = {
if (!conf.isPresent(DruidConfigurationKeys.s3UseS3ASchemaKey)) {
convertConfToInstance(conf.merge(
Configuration.fromKeyValue(DruidConfigurationKeys.s3UseS3ASchemaKey, "true")
), classOf[S3DataSegmentPusherConfig])
} else {
convertConfToInstance(conf, classOf[S3DataSegmentPusherConfig])
}
}
def createS3InputDataConfig(conf: Configuration): S3InputDataConfig = {
convertConfToInstance(conf, classOf[S3InputDataConfig])
}
def createServerSideEncryptingAmazonS3(conf: Configuration): ServerSideEncryptingAmazonS3 = {
val (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig) =
createConfigsForServerSideEncryptingAmazonS3(conf)
val awsModule = new AWSModule
val s3Module = new S3StorageDruidModule
val credentialsProvider = awsModule.getAWSCredentialsProvider(credentialsConfig)
s3Module.getAmazonS3Client(
s3Module.getServerSideEncryptingAmazonS3Builder(
credentialsProvider,
proxyConfig,
endpointConfig,
clientConfig,
s3StorageConfig
)
)
}
def createConfigsForServerSideEncryptingAmazonS3(conf: Configuration):
(AWSCredentialsConfig, AWSProxyConfig, AWSEndpointConfig, AWSClientConfig, S3StorageConfig) = {
val credentialsConfig = convertConfToInstance(conf, classOf[AWSCredentialsConfig])
val proxyConfig = convertConfToInstance(conf.dive("proxy"), classOf[AWSProxyConfig])
val endpointConfig = convertConfToInstance(conf.dive("endpoint"), classOf[AWSEndpointConfig])
val clientConfig = convertConfToInstance(conf.dive("client"), classOf[AWSClientConfig])
val s3StorageConfig = createS3StorageConfig(conf.dive(DruidConfigurationKeys.s3ServerSideEncryptionPrefix))
(credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig)
}
/**
* A helper method for creating instances of S3StorageConfigs from a Configuration. While I'm sure there's a simple
* solution I'm missing, I would have thought that something like the following would have worked:
*
* ```
* val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
* caseInsensitiveMapper.setInjectableValues(new InjectableValues.Std().addValue(classOf[S3SSEKmsConfig], kmsConfig))
* val ser = caseInsensitiveMapper.writeValueAsString(Map[String, String]("type" -> "kms"))
* caseInsensitiveMapper.readValue[ServerSideEncryption](ser, new TypeReference[ServerSideEncryption] {})
* ```
*
* However, the code above throws an com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Invalid
* definition for property `config` (of type `org.apache.druid.storage.s3.KmsServerSideEncryption`): Could not find
* creator property with name 'config' (known Creator properties: [])
*
* I _think_ that the root cause is that ServerSideEncryption is abstract, but the error message above isn't
* what I would expect. Nevertheless, the simple solution would be to serialize to a KmsServerSideEncryption
* instance and then cast to the base ServerSideEncryption to assign. Unfortunately, KmsServerSideEncryption
* is package-private, so we can't access the class here. Since we already have the config object and we
* need to muck about with field visibility, we take the shortcut and just make the constructor accessible. This
* solution generalizes to the CustomServerSideEncyption case as well.
*/
def createS3StorageConfig(conf: Configuration): S3StorageConfig = {
// There's probably a more elegant way to do this that would allow us to transparently support new sse types, but
// this will work for now.
val sseType = conf.get(DruidConfigurationKeys.s3ServerSideEncryptionTypeKey)
// Getting the list of subtypes since we'll need to use it to grab references to the package-private implementations
val config = caseInsensitiveMapper.getDeserializationConfig
val ac = AnnotatedClass.constructWithoutSuperTypes(classOf[ServerSideEncryption], config)
val subtypes = caseInsensitiveMapper.getSubtypeResolver.collectAndResolveSubtypesByClass(config, ac)
val serverSideEncryption: ServerSideEncryption = sseType match {
case Some("s3") =>
val clazz = subtypes.asScala.filter(_.getName == "s3").head.getType
val constructor = clazz.getDeclaredConstructor()
constructor.setAccessible(true)
constructor.newInstance().asInstanceOf[ServerSideEncryption]
case Some("kms") =>
val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
val clazz = subtypes.asScala.filter(_.getName == "kms").head.getType
val constructor = clazz.getDeclaredConstructor(classOf[S3SSEKmsConfig])
constructor.setAccessible(true)
constructor.newInstance(kmsConfig).asInstanceOf[ServerSideEncryption]
case Some("custom") =>
val customConfig = convertConfToInstance(conf.dive("custom"), classOf[S3SSECustomConfig])
val clazz = subtypes.asScala.filter(_.getName == "custom").head.getType
val constructor = clazz.getDeclaredConstructor(classOf[S3SSECustomConfig])
constructor.setAccessible(true)
constructor.newInstance(customConfig).asInstanceOf[ServerSideEncryption]
case _ => new NoopServerSideEncryption
}
new S3StorageConfig(serverSideEncryption)
}
// GCS Storage Helpers
def createGoogleAcountConfig(conf: Configuration): GoogleAccountConfig = {
convertConfToInstance(conf, classOf[GoogleAccountConfig])
}
def createGoogleInputDataConfig(conf: Configuration): GoogleInputDataConfig = {
convertConfToInstance(conf, classOf[GoogleInputDataConfig])
}
def createGoogleStorage(): GoogleStorage = {
val gcpModule = new GcpModule
val gcpStorageModule = new GoogleStorageDruidModule
val httpTransport = gcpModule.getHttpTransport
val jsonFactory = gcpModule.getJsonFactory
val requestInitializer = gcpModule.getHttpRequestInitializer(httpTransport, jsonFactory)
gcpStorageModule.getGoogleStorage(httpTransport, jsonFactory, requestInitializer)
}
// Azure Storage Helpers
def createAzureDataSegmentConfig(conf: Configuration): AzureDataSegmentConfig = {
convertConfToInstance(conf, classOf[AzureDataSegmentConfig])
}
def createAzureInputDataConfig(conf: Configuration): AzureInputDataConfig = {
convertConfToInstance(conf, classOf[AzureInputDataConfig])
}
def createAzureAccountConfig(conf: Configuration): AzureAccountConfig = {
convertConfToInstance(conf, classOf[AzureAccountConfig])
}
def createAzureStorage(conf: Configuration): AzureStorage = {
val accountConfig = convertConfToInstance(conf, classOf[AzureAccountConfig])
val azureModule = new AzureStorageDruidModule
val cloudBlobClient = azureModule.getCloudBlobClient(accountConfig)
azureModule.getAzureStorageContainer(cloudBlobClient)
}
private def convertConfToInstance[T](conf: Configuration, clazz: Class[T]): T = {
caseInsensitiveMapper.convertValue(conf.toMap, clazz)
}
}