blob: b6906dc41c26e3157f1425253cbc54e771d477a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.utils
import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import java.io.File
class DeepStorageConstructorHelpersSuite extends AnyFunSuite with Matchers {
private val sampleLocalConf: Configuration = Configuration(Map[String, String](
"deepStorageType" -> "local",
"local.storageDirectory" -> "/tmp/working/directory"
))
private val sampleHdfsConf: Configuration = Configuration(Map[String, String](
"deepStorageType" -> "hdfs",
"hdfs.storageDirectory" -> "/tmp/working/directory"
))
private val sampleS3Conf: Configuration = Configuration(Map[String, String](
"deepStorageType" -> "s3",
"s3.bucket" -> "testBucket",
"s3.baseKey" -> "prefix/to/druid",
"s3.disableAcl" -> "true",
"s3.maxListingLength" -> "999",
"s3.useS3aSchema" -> "false",
"s3.client.protocol" -> "https",
"s3.accessKey" -> "my access key",
"s3.secretKey" -> "my secret key",
"s3.proxy.host" -> "proxy.host",
"s3.proxy.port" -> "1234",
"s3.proxy.username" -> "druid",
"s3.proxy.password" -> "swordfish",
"s3.client.disableChunkedEncoding" -> "true",
"s3.endpoint.signingRegion" -> "us-west-1",
"s3.sse.type" -> "kms",
"s3.sse.kms.keyId" -> "key"
))
private val sampleGoogleConfig: Configuration = Configuration(Map[String, String](
"deepStorageType" -> "google",
"google.bucket" -> "testBucket",
"google.prefix" -> "prefix/to/druid",
"google.maxListingLength" -> "1023"
))
private val sampleAzureConfig: Configuration = Configuration(Map[String, String](
"deepStorageType" -> "azure",
"azure.account" -> "testAccount",
"azure.key" -> "12345ABCDEF",
"azure.container" -> "testContainer",
"azure.prefix" -> "prefix/to/druid",
"azure.maxListingLength" -> "1001"
))
test("createLocalDataSegmentPusherConfig should construct a LocalDataSegmentPusherConfig") {
val pusherConfig =DeepStorageConstructorHelpers.createLocalDataSegmentPusherConfig(
sampleLocalConf.dive(DruidConfigurationKeys.localDeepStorageTypeKey)
)
pusherConfig.getStorageDirectory should equal(new File("/tmp/working/directory"))
}
test("createHdfsDataSegmentPusherConfig should construct an HdfsDataSegmentPusherConfig") {
val pusherConfig = DeepStorageConstructorHelpers.createHdfsDataSegmentPusherConfig(
sampleHdfsConf.dive(DruidConfigurationKeys.hdfsDeepStorageTypeKey)
)
pusherConfig.getStorageDirectory should equal("/tmp/working/directory")
}
test("createS3DataSegmentPusherConfig should construct an S3DataSegmentPusherConfig from a Configuration") {
val pusherConfig = DeepStorageConstructorHelpers.createS3DataSegmentPusherConfig(
sampleS3Conf.dive(DruidConfigurationKeys.s3DeepStorageTypeKey)
)
pusherConfig.getBucket should equal("testBucket")
pusherConfig.getBaseKey should equal("prefix/to/druid")
pusherConfig.getDisableAcl should be(true)
pusherConfig.getMaxListingLength should equal(999)
pusherConfig.isUseS3aSchema should be(false)
val prunedMap = sampleS3Conf.dive(DruidConfigurationKeys.s3DeepStorageTypeKey).toMap - "uses3aschema" - "disableacl"
val prunedConf = DeepStorageConstructorHelpers.createS3DataSegmentPusherConfig(Configuration(prunedMap))
prunedConf.getDisableAcl should be(false)
prunedConf.isUseS3aSchema should be(true)
}
test("createS3InputDataConfig should construct an S3InputDataConfig from a Configuration") {
val inputDataConfig = DeepStorageConstructorHelpers.createS3InputDataConfig(
sampleS3Conf.dive(DruidConfigurationKeys.s3DeepStorageTypeKey)
)
inputDataConfig.getMaxListingLength should equal(999)
}
test("prepareServerSideEncryptingAmazonS3 should correctly parse a Configuration") {
val (credentialsConfig, proxyConfig, endpointConfig, clientConfig, _) =
DeepStorageConstructorHelpers.createConfigsForServerSideEncryptingAmazonS3(
sampleS3Conf.dive(DruidConfigurationKeys.s3DeepStorageTypeKey)
)
credentialsConfig.getAccessKey.getPassword should equal("my access key")
proxyConfig.getPort should equal(1234)
endpointConfig.getUrl should equal(null) // scalastyle:ignore null
clientConfig.getProtocol should equal("https")
}
test("createS3StorageConfig should create objects of the correct type") {
val noopSSEConfig = Configuration(Map[String, String]())
val s3SSEConfig = Configuration(Map[String, String]("type" -> "s3"))
val kmsSSEConfig = Configuration(Map[String, String]("type" -> "kms", "keyId" -> "key"))
val customSSEConfig = Configuration(Map[String, String](
"type" -> "custom",
"custom.base64EncodedKey" -> "0123456789abcdef"
))
val noopStorageConfig = DeepStorageConstructorHelpers.createS3StorageConfig(noopSSEConfig)
val s3StorageConfig = DeepStorageConstructorHelpers.createS3StorageConfig(s3SSEConfig)
val kmsStorageConfig = DeepStorageConstructorHelpers.createS3StorageConfig(kmsSSEConfig)
val customStorageConfig = DeepStorageConstructorHelpers.createS3StorageConfig(customSSEConfig)
// Just confirming the class names because the class is package-private and so not visible here.
noopStorageConfig.getServerSideEncryption.getClass.getName should
equal("org.apache.druid.storage.s3.NoopServerSideEncryption")
s3StorageConfig.getServerSideEncryption.getClass.getName should
equal("org.apache.druid.storage.s3.S3ServerSideEncryption")
kmsStorageConfig.getServerSideEncryption.getClass.getName should
equal("org.apache.druid.storage.s3.KmsServerSideEncryption")
customStorageConfig.getServerSideEncryption.getClass.getName should
equal("org.apache.druid.storage.s3.CustomServerSideEncryption")
}
test("createGoogleAcountConfig should correctly parse a Configuration") {
val accountConfig = DeepStorageConstructorHelpers.createGoogleAcountConfig(
sampleGoogleConfig.dive(DruidConfigurationKeys.googleDeepStorageTypeKey)
)
accountConfig.getBucket should equal("testBucket")
accountConfig.getPrefix should equal("prefix/to/druid")
}
test("createGoogleInputDataConfig should correctly parse a Configuration") {
val inputDataConfig = DeepStorageConstructorHelpers.createGoogleInputDataConfig(
sampleGoogleConfig.dive(DruidConfigurationKeys.googleDeepStorageTypeKey)
)
inputDataConfig.getMaxListingLength should equal(1023)
}
test("createAzureDataSegmentConfig should correctly parse a Configuration") {
val dataSegmentConfig = DeepStorageConstructorHelpers.createAzureDataSegmentConfig(
sampleAzureConfig.dive(DruidConfigurationKeys.azureDeepStorageTypeKey)
)
dataSegmentConfig.getContainer should equal("testContainer")
dataSegmentConfig.getPrefix should equal("prefix/to/druid")
}
test("createAzureInputDataConfig should correctly parse a Configuration") {
val inputDataConfig = DeepStorageConstructorHelpers.createAzureInputDataConfig(
sampleAzureConfig.dive(DruidConfigurationKeys.azureDeepStorageTypeKey)
)
inputDataConfig.getMaxListingLength should equal(1001)
}
test("createAzureAccountConfig should correctly parse a Configuration") {
val accountConfig = DeepStorageConstructorHelpers.createAzureAccountConfig(
sampleAzureConfig.dive(DruidConfigurationKeys.azureDeepStorageTypeKey)
)
accountConfig.getKey should equal("12345ABCDEF")
accountConfig.getAccount should equal("testAccount")
accountConfig.getProtocol should equal("https")
}
}