blob: 2b7fd8a9136c19a93ec9a46102f3872f184e021b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This File includes common helper functions with Arrow dependency.
#include "ConfigExtractor.h"
#include <stdexcept>
#include "config/VeloxConfig.h"
#include "utils/Exception.h"
#include "utils/Macros.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"
#include "velox/dwio/parquet/writer/Writer.h"
namespace gluten {
namespace {
void getS3HiveConfig(
std::shared_ptr<facebook::velox::config::ConfigBase> conf,
FileSystemType fsType,
std::unordered_map<std::string, std::string>& hiveConfMap) {
#ifdef ENABLE_S3
using namespace facebook::velox::filesystems;
std::string_view kSparkHadoopS3Prefix = "spark.hadoop.fs.s3a.";
std::string_view kSparkHadoopS3BucketPrefix = "spark.hadoop.fs.s3a.bucket.";
// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
// Whether to use proxy from env for s3 c++ client
const std::string kVeloxS3UseProxyFromEnv = "spark.gluten.velox.s3UseProxyFromEnv";
const std::string kVeloxS3UseProxyFromEnvDefault = "false";
// Payload signing policy
const std::string kVeloxS3PayloadSigningPolicy = "spark.gluten.velox.s3PayloadSigningPolicy";
const std::string kVeloxS3PayloadSigningPolicyDefault = "Never";
// Log location of AWS C++ SDK
const std::string kVeloxS3LogLocation = "spark.gluten.velox.s3LogLocation";
const std::unordered_map<S3Config::Keys, std::pair<std::string, std::optional<std::string>>> sparkSuffixes = {
{S3Config::Keys::kAccessKey, std::make_pair("access.key", std::nullopt)},
{S3Config::Keys::kSecretKey, std::make_pair("secret.key", std::nullopt)},
{S3Config::Keys::kEndpoint, std::make_pair("endpoint", std::nullopt)},
{S3Config::Keys::kSSLEnabled, std::make_pair("connection.ssl.enabled", "false")},
{S3Config::Keys::kPathStyleAccess, std::make_pair("path.style.access", "false")},
{S3Config::Keys::kMaxAttempts, std::make_pair("retry.limit", std::nullopt)},
{S3Config::Keys::kRetryMode, std::make_pair("retry.mode", "legacy")},
{S3Config::Keys::kMaxConnections, std::make_pair("connection.maximum", "15")},
{S3Config::Keys::kSocketTimeout, std::make_pair("connection.timeout", "200s")},
{S3Config::Keys::kConnectTimeout, std::make_pair("connection.establish.timeout", "30s")},
{S3Config::Keys::kUseInstanceCredentials, std::make_pair("instance.credentials", "false")},
{S3Config::Keys::kIamRole, std::make_pair("iam.role", std::nullopt)},
{S3Config::Keys::kIamRoleSessionName, std::make_pair("iam.role.session.name", "gluten-session")},
{S3Config::Keys::kEndpointRegion, std::make_pair("endpoint.region", std::nullopt)},
{S3Config::Keys::kIMDSEnabled, std::make_pair("aws.imds.enabled", "true")},
};
// get Velox S3 config key from Spark Suffix.
auto getVeloxKey = [&](std::string_view suffix) {
for (const auto& [key, value] : sparkSuffixes) {
if (value.first == suffix) {
return std::optional<S3Config::Keys>(key);
}
}
return std::optional<S3Config::Keys>(std::nullopt);
};
auto sparkBaseConfigValue = [&](S3Config::Keys key) {
std::stringstream ss;
auto keyValue = sparkSuffixes.find(key)->second;
ss << kSparkHadoopS3Prefix << keyValue.first;
auto sparkKey = ss.str();
if (conf->valueExists(sparkKey)) {
return static_cast<std::optional<std::string>>(conf->get<std::string>(sparkKey));
}
// Return default value.
return keyValue.second;
};
auto setConfigIfPresent = [&](S3Config::Keys key) {
auto sparkConfig = sparkBaseConfigValue(key);
if (sparkConfig.has_value()) {
hiveConfMap[S3Config::baseConfigKey(key)] = sparkConfig.value();
}
};
auto setFromEnvOrConfigIfPresent = [&](std::string_view envName, S3Config::Keys key) {
const char* envValue = std::getenv(envName.data());
if (envValue != nullptr) {
hiveConfMap[S3Config::baseConfigKey(key)] = std::string(envValue);
} else {
setConfigIfPresent(key);
}
};
setFromEnvOrConfigIfPresent("AWS_ENDPOINT", S3Config::Keys::kEndpoint);
setFromEnvOrConfigIfPresent("AWS_MAX_ATTEMPTS", S3Config::Keys::kMaxAttempts);
setFromEnvOrConfigIfPresent("AWS_RETRY_MODE", S3Config::Keys::kRetryMode);
setFromEnvOrConfigIfPresent("AWS_ACCESS_KEY_ID", S3Config::Keys::kAccessKey);
setFromEnvOrConfigIfPresent("AWS_SECRET_ACCESS_KEY", S3Config::Keys::kSecretKey);
setConfigIfPresent(S3Config::Keys::kUseInstanceCredentials);
setConfigIfPresent(S3Config::Keys::kIamRole);
setConfigIfPresent(S3Config::Keys::kIamRoleSessionName);
setConfigIfPresent(S3Config::Keys::kSSLEnabled);
setConfigIfPresent(S3Config::Keys::kPathStyleAccess);
setConfigIfPresent(S3Config::Keys::kMaxConnections);
setConfigIfPresent(S3Config::Keys::kSocketTimeout);
setConfigIfPresent(S3Config::Keys::kConnectTimeout);
setConfigIfPresent(S3Config::Keys::kEndpointRegion);
setConfigIfPresent(S3Config::Keys::kIMDSEnabled);
hiveConfMap[S3Config::kS3LogLevel] = conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);
hiveConfMap[S3Config::baseConfigKey(S3Config::Keys::kUseProxyFromEnv)] =
conf->get<std::string>(kVeloxS3UseProxyFromEnv, kVeloxS3UseProxyFromEnvDefault);
hiveConfMap[S3Config::kS3PayloadSigningPolicy] =
conf->get<std::string>(kVeloxS3PayloadSigningPolicy, kVeloxS3PayloadSigningPolicyDefault);
auto logLocation = conf->get<std::string>(kVeloxS3LogLocation);
if (logLocation.has_value()) {
hiveConfMap[S3Config::kS3LogLocation] = logLocation.value();
};
// Convert all Spark bucket configs to Velox bucket configs.
for (const auto& [key, value] : conf->rawConfigs()) {
if (key.find(kSparkHadoopS3BucketPrefix) == 0) {
std::string_view skey = key;
auto remaining = skey.substr(kSparkHadoopS3BucketPrefix.size());
int dot = remaining.find(".");
auto bucketName = remaining.substr(0, dot);
auto suffix = remaining.substr(dot + 1);
auto veloxKey = getVeloxKey(suffix);
if (veloxKey.has_value()) {
hiveConfMap[S3Config::bucketConfigKey(veloxKey.value(), bucketName)] = value;
}
}
}
#endif
}
void getGcsHiveConfig(
std::shared_ptr<facebook::velox::config::ConfigBase> conf,
FileSystemType fsType,
std::unordered_map<std::string, std::string>& hiveConfMap) {
#ifdef ENABLE_GCS
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#api-client-configuration
auto gsStorageRootUrl = conf->get<std::string>("spark.hadoop.fs.gs.storage.root.url");
if (gsStorageRootUrl.has_value()) {
std::string gcsEndpoint = gsStorageRootUrl.value();
if (!gcsEndpoint.empty()) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kGcsEndpoint] = gcsEndpoint;
}
}
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#http-transport-configuration
// https://cloud.google.com/cpp/docs/reference/storage/latest/classgoogle_1_1cloud_1_1storage_1_1LimitedErrorCountRetryPolicy
auto gsMaxRetryCount = conf->get<std::string>("spark.hadoop.fs.gs.http.max.retry");
if (gsMaxRetryCount.has_value()) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kGcsMaxRetryCount] = gsMaxRetryCount.value();
}
// https://cloud.google.com/cpp/docs/reference/storage/latest/classgoogle_1_1cloud_1_1storage_1_1LimitedTimeRetryPolicy
auto gsMaxRetryTime = conf->get<std::string>("spark.hadoop.fs.gs.http.max.retry-time");
if (gsMaxRetryTime.has_value()) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kGcsMaxRetryTime] = gsMaxRetryTime.value();
}
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication
auto gsAuthType = conf->get<std::string>("spark.hadoop.fs.gs.auth.type");
auto gsAuthServiceAccountJsonKeyfile = conf->get<std::string>("spark.hadoop.fs.gs.auth.service.account.json.keyfile");
if (gsAuthType.has_value() && gsAuthType.value() == "SERVICE_ACCOUNT_JSON_KEYFILE") {
if (gsAuthServiceAccountJsonKeyfile.has_value()) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kGcsCredentialsPath] =
gsAuthServiceAccountJsonKeyfile.value();
} else {
LOG(WARNING) << "STARTUP: conf spark.hadoop.fs.gs.auth.type is set to SERVICE_ACCOUNT_JSON_KEYFILE, "
"however conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set";
throw GlutenException("Conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set");
}
} else if (gsAuthServiceAccountJsonKeyfile.has_value()) {
LOG(WARNING) << "STARTUP: conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is set, "
"but conf spark.hadoop.fs.gs.auth.type is not SERVICE_ACCOUNT_JSON_KEYFILE";
throw GlutenException("Conf spark.hadoop.fs.gs.auth.type is missing or incorrect");
}
#endif
}
void getAbfsHiveConfig(
std::shared_ptr<facebook::velox::config::ConfigBase> conf,
FileSystemType fsType,
std::unordered_map<std::string, std::string>& hiveConfMap) {
#ifdef ENABLE_ABFS
std::string_view kSparkHadoopPrefix = "spark.hadoop.";
std::string_view kSparkHadoopAbfsPrefix = "spark.hadoop.fs.azure.";
for (const auto& [key, value] : conf->rawConfigs()) {
if (key.find(kSparkHadoopAbfsPrefix) == 0) {
// Remove the SparkHadoopPrefix
hiveConfMap[key.substr(kSparkHadoopPrefix.size())] = value;
}
}
#endif
}
} // namespace
std::shared_ptr<facebook::velox::config::ConfigBase> createHiveConnectorSessionConfig(
const std::shared_ptr<facebook::velox::config::ConfigBase>& conf) {
// The configs below are used at session level.
std::unordered_map<std::string, std::string> configs = {};
// The semantics of reading as lower case is opposite with case-sensitive.
configs[facebook::velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession] =
!conf->get<bool>(kCaseSensitive, false) ? "true" : "false";
configs[facebook::velox::connector::hive::HiveConfig::kPartitionPathAsLowerCaseSession] = "false";
configs[facebook::velox::parquet::WriterOptions::kParquetSessionWriteTimestampUnit] = std::string("6");
configs[facebook::velox::connector::hive::HiveConfig::kReadTimestampUnitSession] = std::string("6");
configs[facebook::velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] =
conf->get<std::string>(kMaxPartitions, "10000");
configs[facebook::velox::connector::hive::HiveConfig::kMaxTargetFileSize] =
conf->get<std::string>(kMaxTargetFileSize, "0B"); // 0 means no limit on target file size
configs[facebook::velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] =
conf->get<bool>(kIgnoreMissingFiles, false) ? "true" : "false";
configs[facebook::velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession] =
conf->get<bool>(kParquetUseColumnNames, true) ? "true" : "false";
configs[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] =
conf->get<bool>(kOrcUseColumnNames, true) ? "true" : "false";
overwriteVeloxConf(conf.get(), configs, kDynamicBackendConfPrefix);
return std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
}
std::string getConfigValue(
const std::unordered_map<std::string, std::string>& confMap,
const std::string& key,
const std::optional<std::string>& fallbackValue) {
auto got = confMap.find(key);
if (got == confMap.end()) {
if (fallbackValue == std::nullopt) {
throw std::runtime_error("No such config key: " + key);
}
return fallbackValue.value();
}
return got->second;
}
std::shared_ptr<facebook::velox::config::ConfigBase> createHiveConnectorConfig(
const std::shared_ptr<facebook::velox::config::ConfigBase>& conf,
FileSystemType fsType) {
std::unordered_map<std::string, std::string> hiveConfMap;
switch (fsType) {
case FileSystemType::kS3:
getS3HiveConfig(conf, fsType, hiveConfMap);
break;
case FileSystemType::kAbfs:
getAbfsHiveConfig(conf, fsType, hiveConfMap);
break;
case FileSystemType::kGcs:
getGcsHiveConfig(conf, fsType, hiveConfMap);
break;
case FileSystemType::kHdfs:
break;
case FileSystemType::kAll:
getS3HiveConfig(conf, fsType, hiveConfMap);
getAbfsHiveConfig(conf, fsType, hiveConfMap);
getGcsHiveConfig(conf, fsType, hiveConfMap);
break;
default:
GLUTEN_UNREACHABLE();
}
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kEnableFileHandleCache] =
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false";
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kMaxCoalescedBytes] =
conf->get<std::string>(kMaxCoalescedBytes, "67108864"); // 64M
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kMaxCoalescedDistance] =
conf->get<std::string>(kMaxCoalescedDistance, "512KB"); // 512KB
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kPrefetchRowGroups] =
conf->get<std::string>(kPrefetchRowGroups, "1");
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kLoadQuantum] =
conf->get<std::string>(kLoadQuantum, "268435456"); // 256M
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kFilePreloadThreshold] =
conf->get<std::string>(kFilePreloadThreshold, "1048576"); // 1M
// read as UTC
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kReadTimestampPartitionValueAsLocalTime] = "false";
overwriteVeloxConf(conf.get(), hiveConfMap, kStaticBackendConfPrefix);
return std::make_shared<facebook::velox::config::ConfigBase>(std::move(hiveConfMap));
}
void overwriteVeloxConf(
const facebook::velox::config::ConfigBase* from,
std::unordered_map<std::string, std::string>& to,
const std::string& prefix) {
for (const auto& [k, v] : from->rawConfigs()) {
if (k.starts_with(prefix)) {
to[k.substr(prefix.size())] = v;
}
}
}
} // namespace gluten