| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "service/query-options.h" |
| |
| #include "runtime/runtime-filter.h" |
| #include "util/debug-util.h" |
| #include "util/mem-info.h" |
| #include "util/parse-util.h" |
| #include "util/string-parser.h" |
| #include "exprs/timezone_db.h" |
| #include "gen-cpp/ImpalaInternalService_types.h" |
| |
| #include <zstd.h> |
| #include <sstream> |
| #include <boost/algorithm/string.hpp> |
| #include <gutil/strings/strip.h> |
| #include <gutil/strings/substitute.h> |
| |
| #include "common/names.h" |
| |
| DECLARE_int64(min_buffer_size); |
| |
| using boost::algorithm::iequals; |
| using boost::algorithm::is_any_of; |
| using boost::algorithm::token_compress_on; |
| using boost::algorithm::split; |
| using boost::algorithm::trim; |
| using std::to_string; |
| using beeswax::TQueryOptionLevel; |
| using namespace impala; |
| using namespace strings; |
| |
| DECLARE_int32(idle_session_timeout); |
| |
| // Utility method to wrap ParseUtil::ParseMemSpec() by returning a Status instead of an |
| // int. |
| static Status ParseMemValue(const string& value, const string& key, int64_t* result) { |
| bool is_percent; |
| *result = ParseUtil::ParseMemSpec(value, &is_percent, MemInfo::physical_mem()); |
| if (*result < 0) { |
| return Status("Failed to parse " + key + " from '" + value + "'."); |
| } |
| if (is_percent) { |
| return Status("Invalid " + key + " with percent '" + value + "'."); |
| } |
| return Status::OK(); |
| } |
| |
| void impala::OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMask& mask, |
| TQueryOptions* dst) { |
| DCHECK_GT(mask.size(), _TImpalaQueryOptions_VALUES_TO_NAMES.size()) << |
| "Size of QueryOptionsMask must be increased."; |
| #define QUERY_OPT_FN(NAME, ENUM, LEVEL)\ |
| if (src.__isset.NAME && mask[TImpalaQueryOptions::ENUM]) dst->__set_##NAME(src.NAME); |
| #define REMOVED_QUERY_OPT_FN(NAME, ENUM) |
| QUERY_OPTS_TABLE |
| #undef QUERY_OPT_FN |
| #undef REMOVED_QUERY_OPT_FN |
| } |
| |
| // Choose different print function based on the type. |
| // TODO: In thrift 0.11.0 operator << is implemented for enums and this indirection can be |
| // removed. |
| template <typename T, typename std::enable_if_t<std::is_enum<T>::value>* = nullptr> |
| string PrintQueryOptionValue(const T& option) { |
| return PrintThriftEnum(option); |
| } |
| |
| template <typename T, typename std::enable_if_t<std::is_arithmetic<T>::value>* = nullptr> |
| string PrintQueryOptionValue(const T& option) { |
| return std::to_string(option); |
| } |
| |
| const string& PrintQueryOptionValue(const std::string& option) { |
| return option; |
| } |
| |
| const string PrintQueryOptionValue(const impala::TCompressionCodec& compression_codec) { |
| if (compression_codec.codec != THdfsCompression::ZSTD) { |
| return Substitute("$0", PrintThriftEnum(compression_codec.codec)); |
| } else { |
| return Substitute("$0:$1", PrintThriftEnum(compression_codec.codec), |
| compression_codec.compression_level); |
| } |
| } |
| |
| void impala::TQueryOptionsToMap(const TQueryOptions& query_options, |
| map<string, string>* configuration) { |
| #define QUERY_OPT_FN(NAME, ENUM, LEVEL)\ |
| {\ |
| if (query_options.__isset.NAME) { \ |
| (*configuration)[#ENUM] = PrintQueryOptionValue(query_options.NAME); \ |
| } else { \ |
| (*configuration)[#ENUM] = ""; \ |
| }\ |
| } |
| #define REMOVED_QUERY_OPT_FN(NAME, ENUM) (*configuration)[#ENUM] = ""; |
| QUERY_OPTS_TABLE |
| #undef QUERY_OPT_FN |
| #undef REMOVED_QUERY_OPT_FN |
| } |
| |
| // Resets query_options->option to its default value. |
| static void ResetQueryOption(const int option, TQueryOptions* query_options) { |
| const static TQueryOptions defaults; |
| switch (option) { |
| #define QUERY_OPT_FN(NAME, ENUM, LEVEL)\ |
| case TImpalaQueryOptions::ENUM:\ |
| query_options->__isset.NAME = defaults.__isset.NAME;\ |
| query_options->NAME = defaults.NAME;\ |
| break; |
| #define REMOVED_QUERY_OPT_FN(NAME, ENUM) |
| QUERY_OPTS_TABLE |
| #undef QUERY_OPT_FN |
| #undef REMOVED_QUERY_OPT_FN |
| } |
| } |
| |
| static TQueryOptions DefaultQueryOptions() { |
| TQueryOptions defaults; |
| // default value of idle_session_timeout is set by a command line flag. |
| defaults.__set_idle_session_timeout(FLAGS_idle_session_timeout); |
| return defaults; |
| } |
| |
| inline bool operator!=(const TCompressionCodec& a, |
| const TCompressionCodec& b) { |
| return (a.codec != b.codec || a.compression_level != b.compression_level); |
| } |
| |
| string impala::DebugQueryOptions(const TQueryOptions& query_options) { |
| const static TQueryOptions defaults = DefaultQueryOptions(); |
| int i = 0; |
| stringstream ss; |
| #define QUERY_OPT_FN(NAME, ENUM, LEVEL)\ |
| if (query_options.__isset.NAME &&\ |
| (!defaults.__isset.NAME || query_options.NAME != defaults.NAME)) {\ |
| if (i++ > 0) ss << ",";\ |
| ss << #ENUM << "=" << query_options.NAME;\ |
| } |
| #define REMOVED_QUERY_OPT_FN(NAME, ENUM) |
| QUERY_OPTS_TABLE |
| #undef QUERY_OPT_FN |
| #undef REMOVED_QUERY_OPT_FN |
| return ss.str(); |
| } |
| |
| // Returns the TImpalaQueryOptions enum for the given "key". Input is case insensitive. |
| // Return -1 if the input is an invalid option. |
| static int GetQueryOptionForKey(const string& key) { |
| map<int, const char*>::const_iterator itr = |
| _TImpalaQueryOptions_VALUES_TO_NAMES.begin(); |
| for (; itr != _TImpalaQueryOptions_VALUES_TO_NAMES.end(); ++itr) { |
| if (iequals(key, (*itr).second)) { |
| return itr->first; |
| } |
| } |
| return -1; |
| } |
| |
| // Return true if we can ignore a reference to this removed query option. |
| static bool IsRemovedQueryOption(const string& key) { |
| #define QUERY_OPT_FN(NAME, ENUM, LEVEL) |
| #define REMOVED_QUERY_OPT_FN(NAME, ENUM) \ |
| if (iequals(key, #NAME)) { \ |
| return true; \ |
| } |
| QUERY_OPTS_TABLE |
| #undef QUERY_OPT_FN |
| #undef REMOVED_QUERY_OPT_FN |
| return false; |
| } |
| |
| // Return all enum values in a string format, e.g. FOO(1), BAR(2), BAZ(3). |
| static string GetThriftEnumValues(const map<int, const char*>& enum_values_to_names) { |
| bool first = true; |
| stringstream ss; |
| for (const auto& e : enum_values_to_names) { |
| if (!first) { |
| ss << ", "; |
| } else { |
| first = false; |
| } |
| ss << e.second << "(" << e.first << ")"; |
| } |
| return ss.str(); |
| } |
| |
| // Return false for an invalid Thrift enum value. |
| template<typename ENUM_TYPE> |
| static Status GetThriftEnum(const string& value, const string& key, |
| const map<int, const char*>& enum_values_to_names, ENUM_TYPE* enum_value) { |
| for (const auto& e : enum_values_to_names) { |
| if (iequals(value, to_string(e.first)) || iequals(value, e.second)) { |
| *enum_value = static_cast<ENUM_TYPE>(e.first); |
| return Status::OK(); |
| } |
| } |
| return Status(Substitute("Invalid $0: '$1'. Valid values are $2.", key, value, |
| GetThriftEnumValues(enum_values_to_names))); |
| } |
| |
| // Return true if the given value is true (case-insensitive) or 1. |
| static bool IsTrue(const string& value) { |
| return iequals(value, "true") || iequals(value, "1"); |
| } |
| |
| // Note that we allow numerical values for boolean and enum options. This is because |
| // TQueryOptionsToMap() will output the numerical values, and we need to parse its output |
| // configuration. |
| Status impala::SetQueryOption(const string& key, const string& value, |
| TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask) { |
| int option = GetQueryOptionForKey(key); |
| if (option < 0) { |
| return Status(Substitute("Invalid query option: $0", key)); |
| } else if (value == "") { |
| ResetQueryOption(option, query_options); |
| if (set_query_options_mask != nullptr) { |
| DCHECK_LT(option, set_query_options_mask->size()); |
| set_query_options_mask->reset(option); |
| } |
| } else { |
| switch (option) { |
| case TImpalaQueryOptions::ABORT_ON_ERROR: |
| query_options->__set_abort_on_error(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::MAX_ERRORS: |
| query_options->__set_max_errors(atoi(value.c_str())); |
| break; |
| case TImpalaQueryOptions::DISABLE_CODEGEN: |
| query_options->__set_disable_codegen(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::BATCH_SIZE: { |
| StringParser::ParseResult status; |
| int val = StringParser::StringToInt<int>(value.c_str(), |
| static_cast<int>(value.size()), &status); |
| if (status != StringParser::PARSE_SUCCESS || val < 0 || val > 65536) { |
| return Status(Substitute("Invalid batch size '$0'. Valid sizes are in" |
| "[0, 65536]", value)); |
| } |
| query_options->__set_batch_size(val); |
| break; |
| } |
| case TImpalaQueryOptions::MEM_LIMIT: { |
| // Parse the mem limit spec and validate it. |
| int64_t bytes_limit; |
| RETURN_IF_ERROR(ParseMemValue(value, "query memory limit", &bytes_limit)); |
| query_options->__set_mem_limit(bytes_limit); |
| break; |
| } |
| case TImpalaQueryOptions::NUM_NODES: |
| query_options->__set_num_nodes(atoi(value.c_str())); |
| break; |
| case TImpalaQueryOptions::MAX_SCAN_RANGE_LENGTH: { |
| int64_t scan_length = 0; |
| RETURN_IF_ERROR(ParseMemValue(value, "scan range length", &scan_length)); |
| query_options->__set_max_scan_range_length(scan_length); |
| break; |
| } |
| case TImpalaQueryOptions::NUM_SCANNER_THREADS: |
| query_options->__set_num_scanner_threads(atoi(value.c_str())); |
| break; |
| case TImpalaQueryOptions::DEBUG_ACTION: |
| query_options->__set_debug_action(value.c_str()); |
| break; |
| case TImpalaQueryOptions::COMPRESSION_CODEC: { |
| // Acceptable values are: |
| // - zstd:compression_level |
| // - codec |
| vector<string> tokens; |
| split(tokens, value, is_any_of(":"), token_compress_on); |
| if (tokens.size() > 2) return Status("Invalid compression codec value"); |
| |
| string& codec_name = tokens[0]; |
| trim(codec_name); |
| int compression_level = ZSTD_CLEVEL_DEFAULT; |
| THdfsCompression::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(codec_name, "compression codec", |
| _THdfsCompression_VALUES_TO_NAMES, &enum_type)); |
| |
| if (tokens.size() == 2) { |
| if (enum_type != THdfsCompression::ZSTD) { |
| return Status("Compression level only supported for ZSTD"); |
| } |
| StringParser::ParseResult status; |
| string& clevel = tokens[1]; |
| trim(clevel); |
| compression_level = StringParser::StringToInt<int>( |
| clevel.c_str(), static_cast<int>(clevel.size()), &status); |
| if (status != StringParser::PARSE_SUCCESS || compression_level < 1 |
| || compression_level > ZSTD_maxCLevel()) { |
| return Status(Substitute("Invalid ZSTD compression level '$0'." |
| " Valid values are in [1,$1]", clevel, ZSTD_maxCLevel())); |
| } |
| } |
| |
| TCompressionCodec compression_codec; |
| compression_codec.__set_codec(enum_type); |
| if (enum_type == THdfsCompression::ZSTD) { |
| compression_codec.__set_compression_level(compression_level); |
| } |
| query_options->__set_compression_codec(compression_codec); |
| break; |
| } |
| case TImpalaQueryOptions::HBASE_CACHING: |
| query_options->__set_hbase_caching(atoi(value.c_str())); |
| break; |
| case TImpalaQueryOptions::HBASE_CACHE_BLOCKS: |
| query_options->__set_hbase_cache_blocks(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::PARQUET_FILE_SIZE: { |
| int64_t file_size; |
| RETURN_IF_ERROR(ParseMemValue(value, "parquet file size", &file_size)); |
| if (file_size > numeric_limits<int32_t>::max()) { |
| // Do not allow values greater than or equal to 2GB since hdfsOpenFile() from |
| // the HDFS API gets an int32 blocksize parameter (see HDFS-8949). |
| stringstream ss; |
| ss << "The PARQUET_FILE_SIZE query option must be less than 2GB."; |
| return Status(ss.str()); |
| } else { |
| query_options->__set_parquet_file_size(file_size); |
| } |
| break; |
| } |
| case TImpalaQueryOptions::EXPLAIN_LEVEL: { |
| TExplainLevel::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "explain level", |
| _TExplainLevel_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_explain_level(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::SYNC_DDL: |
| query_options->__set_sync_ddl(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::REQUEST_POOL: |
| query_options->__set_request_pool(value); |
| break; |
| case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN: |
| query_options->__set_disable_outermost_topn(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::QUERY_TIMEOUT_S: { |
| StringParser::ParseResult result; |
| const int32_t timeout_s = |
| StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || timeout_s < 0) { |
| return Status( |
| Substitute("Invalid query timeout: '$0'. " |
| "Only non-negative numbers are allowed.", value)); |
| } |
| query_options->__set_query_timeout_s(timeout_s); |
| break; |
| } |
| case TImpalaQueryOptions::BUFFER_POOL_LIMIT: { |
| int64_t mem; |
| RETURN_IF_ERROR(ParseMemValue(value, "buffer pool limit", &mem)); |
| query_options->__set_buffer_pool_limit(mem); |
| break; |
| } |
| case TImpalaQueryOptions::APPX_COUNT_DISTINCT: { |
| query_options->__set_appx_count_distinct(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::DISABLE_UNSAFE_SPILLS: { |
| query_options->__set_disable_unsafe_spills(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD: |
| query_options->__set_exec_single_node_rows_threshold(atoi(value.c_str())); |
| break; |
| case TImpalaQueryOptions::OPTIMIZE_PARTITION_KEY_SCANS: |
| query_options->__set_optimize_partition_key_scans(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::REPLICA_PREFERENCE: { |
| map<int, const char *> valid_enums_values = { |
| {0, "CACHE_LOCAL"}, |
| {2, "DISK_LOCAL"}, |
| {4, "REMOTE"} |
| }; |
| TReplicaPreference::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "replica memory distance preference", |
| valid_enums_values, &enum_type)); |
| query_options->__set_replica_preference(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::SCHEDULE_RANDOM_REPLICA: |
| query_options->__set_schedule_random_replica(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::DISABLE_STREAMING_PREAGGREGATIONS: |
| query_options->__set_disable_streaming_preaggregations(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::RUNTIME_FILTER_MODE: { |
| TRuntimeFilterMode::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "runtime filter mode", |
| _TRuntimeFilterMode_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_runtime_filter_mode(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE: |
| case TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE: |
| case TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE: { |
| int64_t size; |
| RETURN_IF_ERROR(ParseMemValue(value, "Bloom filter size", &size)); |
| if (size < RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE || |
| size > RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) { |
| return Status(Substitute("$0 is not a valid Bloom filter size for $1. " |
| "Valid sizes are in [$2, $3].", value, PrintThriftEnum( |
| static_cast<TImpalaQueryOptions::type>(option)), |
| RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, |
| RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE)); |
| } |
| if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE |
| && size < FLAGS_min_buffer_size |
| // last condition is to unblock the highly improbable case where the |
| // min_buffer_size is greater than RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE. |
| && FLAGS_min_buffer_size <= RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) { |
| return Status(Substitute("$0 should not be less than $1 which is the minimum " |
| "buffer size that can be allocated by the buffer pool", |
| PrintThriftEnum(static_cast<TImpalaQueryOptions::type>(option)), |
| FLAGS_min_buffer_size)); |
| } |
| if (option == TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE) { |
| query_options->__set_runtime_bloom_filter_size(size); |
| } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE) { |
| query_options->__set_runtime_filter_min_size(size); |
| } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE) { |
| query_options->__set_runtime_filter_max_size(size); |
| } |
| break; |
| } |
| case TImpalaQueryOptions::RUNTIME_FILTER_WAIT_TIME_MS: { |
| StringParser::ParseResult result; |
| const int32_t time_ms = |
| StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || time_ms < 0) { |
| return Status( |
| Substitute("$0 is not a valid wait time. Valid sizes are in [0, $1].", |
| value, numeric_limits<int32_t>::max())); |
| } |
| query_options->__set_runtime_filter_wait_time_ms(time_ms); |
| break; |
| } |
| case TImpalaQueryOptions::DISABLE_ROW_RUNTIME_FILTERING: |
| query_options->__set_disable_row_runtime_filtering(IsTrue(value)); |
| break; |
| case TImpalaQueryOptions::MAX_NUM_RUNTIME_FILTERS: { |
| StringParser::ParseResult status; |
| int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status); |
| if (status != StringParser::PARSE_SUCCESS) { |
| return Status(Substitute("Invalid number of runtime filters: '$0'.", value)); |
| } |
| if (val < 0) { |
| return Status(Substitute("Invalid number of runtime filters: '$0'. " |
| "Only positive values are allowed.", val)); |
| } |
| query_options->__set_max_num_runtime_filters(val); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_ANNOTATE_STRINGS_UTF8: { |
| query_options->__set_parquet_annotate_strings_utf8(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_FALLBACK_SCHEMA_RESOLUTION: { |
| TParquetFallbackSchemaResolution::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "parquet fallback schema resolution", |
| _TParquetFallbackSchemaResolution_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_parquet_fallback_schema_resolution(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_ARRAY_RESOLUTION: { |
| TParquetArrayResolution::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "parquet array resolution", |
| _TParquetArrayResolution_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_parquet_array_resolution(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::MT_DOP: { |
| StringParser::ParseResult result; |
| const int32_t dop = |
| StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 64) { |
| return Status( |
| Substitute("$0 is not valid for mt_dop. Valid values are in " |
| "[0, 64].", value)); |
| } |
| query_options->__set_mt_dop(dop); |
| break; |
| } |
| case TImpalaQueryOptions::S3_SKIP_INSERT_STAGING: { |
| query_options->__set_s3_skip_insert_staging(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::PREFETCH_MODE: { |
| TPrefetchMode::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "prefetch mode", |
| _TPrefetchMode_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_prefetch_mode(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::STRICT_MODE: { |
| query_options->__set_strict_mode(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::SCRATCH_LIMIT: { |
| // Parse the scratch limit spec and validate it. |
| if (iequals(value, "-1")) { |
| query_options->__set_scratch_limit(-1); |
| } else { |
| int64_t bytes_limit; |
| RETURN_IF_ERROR(ParseMemValue(value, "Scratch space memory limit", |
| &bytes_limit)); |
| query_options->__set_scratch_limit(bytes_limit); |
| } |
| break; |
| } |
| case TImpalaQueryOptions::ENABLE_EXPR_REWRITES: { |
| query_options->__set_enable_expr_rewrites(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::DECIMAL_V2: { |
| query_options->__set_decimal_v2(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_DICTIONARY_FILTERING: { |
| query_options->__set_parquet_dictionary_filtering(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_READ_STATISTICS: { |
| query_options->__set_parquet_read_statistics(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE: { |
| TJoinDistributionMode::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "default join distribution mode", |
| _TJoinDistributionMode_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_default_join_distribution_mode(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::DISABLE_CODEGEN_ROWS_THRESHOLD: { |
| StringParser::ParseResult status; |
| int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status); |
| if (status != StringParser::PARSE_SUCCESS) { |
| return Status(Substitute("Invalid threshold: '$0'.", value)); |
| } |
| if (val < 0) { |
| return Status(Substitute( |
| "Invalid threshold: '$0'. Only positive values are allowed.", val)); |
| } |
| query_options->__set_disable_codegen_rows_threshold(val); |
| break; |
| } |
| case TImpalaQueryOptions::DEFAULT_SPILLABLE_BUFFER_SIZE: { |
| int64_t buffer_size_bytes; |
| RETURN_IF_ERROR( |
| ParseMemValue(value, "Default spillable buffer size", &buffer_size_bytes)); |
| if (!BitUtil::IsPowerOf2(buffer_size_bytes)) { |
| return Status( |
| Substitute("Default spillable buffer size must be a power of two: $0", |
| buffer_size_bytes)); |
| } |
| if (buffer_size_bytes > SPILLABLE_BUFFER_LIMIT) { |
| return Status(Substitute( |
| "Default spillable buffer size must be less than or equal to: $0", |
| SPILLABLE_BUFFER_LIMIT)); |
| } |
| query_options->__set_default_spillable_buffer_size(buffer_size_bytes); |
| break; |
| } |
| case TImpalaQueryOptions::MIN_SPILLABLE_BUFFER_SIZE: { |
| int64_t buffer_size_bytes; |
| RETURN_IF_ERROR( |
| ParseMemValue(value, "Minimum spillable buffer size", &buffer_size_bytes)); |
| if (!BitUtil::IsPowerOf2(buffer_size_bytes)) { |
| return Status( |
| Substitute("Minimum spillable buffer size must be a power of two: $0", |
| buffer_size_bytes)); |
| } |
| if (buffer_size_bytes > SPILLABLE_BUFFER_LIMIT) { |
| return Status(Substitute( |
| "Minimum spillable buffer size must be less than or equal to: $0", |
| SPILLABLE_BUFFER_LIMIT)); |
| } |
| query_options->__set_min_spillable_buffer_size(buffer_size_bytes); |
| break; |
| } |
| case TImpalaQueryOptions::MAX_ROW_SIZE: { |
| int64_t max_row_size_bytes; |
| RETURN_IF_ERROR(ParseMemValue(value, "Max row size", &max_row_size_bytes)); |
| if (max_row_size_bytes <= 0 || max_row_size_bytes > ROW_SIZE_LIMIT) { |
| return Status( |
| Substitute("Invalid max row size of $0. Valid sizes are in [$1, $2]", value, |
| 1, ROW_SIZE_LIMIT)); |
| } |
| query_options->__set_max_row_size(max_row_size_bytes); |
| break; |
| } |
| case TImpalaQueryOptions::IDLE_SESSION_TIMEOUT: { |
| StringParser::ParseResult result; |
| const int32_t requested_timeout = |
| StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || requested_timeout < 0) { |
| return Status( |
| Substitute("Invalid idle session timeout: '$0'. " |
| "Only non-negative numbers are allowed.", value)); |
| } |
| query_options->__set_idle_session_timeout(requested_timeout); |
| break; |
| } |
| case TImpalaQueryOptions::COMPUTE_STATS_MIN_SAMPLE_SIZE: { |
| int64_t min_sample_size; |
| RETURN_IF_ERROR(ParseMemValue(value, "Min sample size", &min_sample_size)); |
| if (min_sample_size < 0) { |
| return Status( |
| Substitute("Min sample size must be greater or equal to zero: $0", value)); |
| } |
| query_options->__set_compute_stats_min_sample_size(min_sample_size); |
| break; |
| } |
| case TImpalaQueryOptions::EXEC_TIME_LIMIT_S: { |
| StringParser::ParseResult result; |
| const int32_t time_limit = |
| StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || time_limit < 0) { |
| return Status( |
| Substitute("Invalid query time limit: '$0'. " |
| "Only non-negative numbers are allowed.", value)); |
| } |
| query_options->__set_exec_time_limit_s(time_limit); |
| break; |
| } |
| case TImpalaQueryOptions::SHUFFLE_DISTINCT_EXPRS: { |
| query_options->__set_shuffle_distinct_exprs(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::MAX_MEM_ESTIMATE_FOR_ADMISSION: { |
| int64_t bytes_limit; |
| RETURN_IF_ERROR(ParseMemValue( |
| value, "max memory estimate for admission", &bytes_limit)); |
| query_options->__set_max_mem_estimate_for_admission(bytes_limit); |
| break; |
| } |
| case TImpalaQueryOptions::THREAD_RESERVATION_LIMIT: |
| case TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT: { |
| // Parsing logic is identical for these two options. |
| StringParser::ParseResult status; |
| int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status); |
| if (status != StringParser::PARSE_SUCCESS) { |
| return Status(Substitute("Invalid thread count: '$0'.", value)); |
| } |
| if (val < -1) { |
| return Status(Substitute("Invalid thread count: '$0'. " |
| "Only -1 and non-negative values are allowed.", val)); |
| } |
| if (option == TImpalaQueryOptions::THREAD_RESERVATION_LIMIT) { |
| query_options->__set_thread_reservation_limit(val); |
| } else { |
| DCHECK_EQ(option, TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT); |
| query_options->__set_thread_reservation_aggregate_limit(val); |
| } |
| break; |
| } |
| case TImpalaQueryOptions::KUDU_READ_MODE: { |
| TKuduReadMode::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "Kudu read mode", |
| _TKuduReadMode_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_kudu_read_mode(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::ALLOW_ERASURE_CODED_FILES: { |
| query_options->__set_allow_erasure_coded_files(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::TIMEZONE: { |
| // Leading/trailing " and ' characters are stripped because the / character |
| // cannot be entered unquoted in some contexts. |
| string timezone = value; |
| TrimString(&timezone, "'\""); |
| timezone = timezone.empty() ? TimezoneDatabase::LocalZoneName() : timezone; |
| if (TimezoneDatabase::FindTimezone(timezone) == nullptr) { |
| return Status(Substitute("Invalid timezone name '$0'.", timezone)); |
| } |
| query_options->__set_timezone(timezone); |
| break; |
| } |
| case TImpalaQueryOptions::SCAN_BYTES_LIMIT: { |
| int64_t bytes_limit; |
| RETURN_IF_ERROR(ParseMemValue(value, "query scan bytes limit", &bytes_limit)); |
| query_options->__set_scan_bytes_limit(bytes_limit); |
| break; |
| } |
| case TImpalaQueryOptions::CPU_LIMIT_S: { |
| StringParser::ParseResult result; |
| const int64_t cpu_limit_s = |
| StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || cpu_limit_s < 0) { |
| return Status( |
| Substitute("Invalid CPU limit: '$0'. " |
| "Only non-negative numbers are allowed.", value)); |
| } |
| query_options->__set_cpu_limit_s(cpu_limit_s); |
| break; |
| } |
| case TImpalaQueryOptions::TOPN_BYTES_LIMIT: { |
| int64_t topn_bytes_limit; |
| RETURN_IF_ERROR(ParseMemValue(value, "topn bytes limit", &topn_bytes_limit)); |
| query_options->__set_topn_bytes_limit(topn_bytes_limit); |
| break; |
| } |
| case TImpalaQueryOptions::CLIENT_IDENTIFIER: { |
| query_options->__set_client_identifier(value); |
| break; |
| } |
| case TImpalaQueryOptions::RESOURCE_TRACE_RATIO: { |
| StringParser::ParseResult result; |
| const double val = |
| StringParser::StringToFloat<double>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || val < 0 || val > 1) { |
| return Status(Substitute("Invalid resource trace ratio: '$0'. " |
| "Only values from 0 to 1 are allowed.", |
| value)); |
| } |
| query_options->__set_resource_trace_ratio(val); |
| } |
| case TImpalaQueryOptions::PLANNER_TESTCASE_MODE: { |
| query_options->__set_planner_testcase_mode(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::NUM_REMOTE_EXECUTOR_CANDIDATES: { |
| StringParser::ParseResult result; |
| const int64_t num_remote_executor_candidates = |
| StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || |
| num_remote_executor_candidates < 0 || num_remote_executor_candidates > 16) { |
| return Status( |
| Substitute("$0 is not valid for num_remote_executor_candidates. " |
| "Valid values are in [0, 16].", value)); |
| } |
| query_options->__set_num_remote_executor_candidates( |
| num_remote_executor_candidates); |
| break; |
| } |
| case TImpalaQueryOptions::NUM_ROWS_PRODUCED_LIMIT: { |
| StringParser::ParseResult result; |
| const int64_t num_rows_produced_limit = |
| StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || num_rows_produced_limit < 0) { |
| return Status(Substitute("Invalid rows returned limit: '$0'. " |
| "Only non-negative numbers are allowed.", value)); |
| } |
| query_options->__set_num_rows_produced_limit(num_rows_produced_limit); |
| break; |
| } |
| case TImpalaQueryOptions::DEFAULT_FILE_FORMAT: { |
| THdfsFileFormat::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "default file format", |
| _THdfsFileFormat_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_default_file_format(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_TIMESTAMP_TYPE: { |
| TParquetTimestampType::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "Parquet timestamp type", |
| _TParquetTimestampType_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_parquet_timestamp_type(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_READ_PAGE_INDEX: { |
| query_options->__set_parquet_read_page_index(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_WRITE_PAGE_INDEX: { |
| query_options->__set_parquet_write_page_index(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_PAGE_ROW_COUNT_LIMIT: { |
| StringParser::ParseResult result; |
| const int32_t row_count_limit = |
| StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || row_count_limit <= 0) { |
| return Status("Parquet page row count limit must be a positive integer."); |
| } |
| query_options->__set_parquet_page_row_count_limit(row_count_limit); |
| break; |
| } |
| case TImpalaQueryOptions::DISABLE_HDFS_NUM_ROWS_ESTIMATE: { |
| query_options->__set_disable_hdfs_num_rows_estimate(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::DEFAULT_HINTS_INSERT_STATEMENT: { |
| query_options->__set_default_hints_insert_statement(value); |
| break; |
| } |
| case TImpalaQueryOptions::SPOOL_QUERY_RESULTS: { |
| query_options->__set_spool_query_results(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::MAX_RESULT_SPOOLING_MEM: { |
| int64_t max_result_spooling_mem; |
| RETURN_IF_ERROR(ParseMemValue(value, "max result spooling memory", |
| &max_result_spooling_mem)); |
| query_options->__set_max_result_spooling_mem( |
| max_result_spooling_mem); |
| break; |
| } |
| case TImpalaQueryOptions::MAX_SPILLED_RESULT_SPOOLING_MEM: { |
| int64_t max_spilled_result_spooling_mem; |
| RETURN_IF_ERROR(ParseMemValue(value, "max spilled result spooling memory", |
| &max_spilled_result_spooling_mem)); |
| query_options->__set_max_spilled_result_spooling_mem( |
| max_spilled_result_spooling_mem); |
| break; |
| } |
| case TImpalaQueryOptions::DEFAULT_TRANSACTIONAL_TYPE: { |
| TTransactionalType::type enum_type; |
| RETURN_IF_ERROR(GetThriftEnum(value, "default transactional type", |
| _TTransactionalType_VALUES_TO_NAMES, &enum_type)); |
| query_options->__set_default_transactional_type(enum_type); |
| break; |
| } |
| case TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT: { |
| StringParser::ParseResult result; |
| const int32_t statement_expression_limit = |
| StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || |
| statement_expression_limit < MIN_STATEMENT_EXPRESSION_LIMIT) { |
| return Status(Substitute("Invalid statement expression limit: $0 " |
| "Valid values are in [$1, $2]", value, MIN_STATEMENT_EXPRESSION_LIMIT, |
| std::numeric_limits<int32_t>::max())); |
| } |
| query_options->__set_statement_expression_limit(statement_expression_limit); |
| break; |
| } |
| case TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES: { |
| int64_t max_statement_length_bytes; |
| RETURN_IF_ERROR(ParseMemValue(value, "max statement length bytes", |
| &max_statement_length_bytes)); |
| if (max_statement_length_bytes < MIN_MAX_STATEMENT_LENGTH_BYTES || |
| max_statement_length_bytes > std::numeric_limits<int32_t>::max()) { |
| return Status(Substitute("Invalid maximum statement length: $0 " |
| "Valid values are in [$1, $2]", max_statement_length_bytes, |
| MIN_MAX_STATEMENT_LENGTH_BYTES, std::numeric_limits<int32_t>::max())); |
| } |
| query_options->__set_max_statement_length_bytes(max_statement_length_bytes); |
| break; |
| } |
| case TImpalaQueryOptions::DISABLE_DATA_CACHE: { |
| query_options->__set_disable_data_cache(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::DISABLE_HBASE_NUM_ROWS_ESTIMATE: { |
| query_options->__set_disable_hbase_num_rows_estimate(IsTrue(value)); |
| break; |
| } |
| case TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS: { |
| StringParser::ParseResult result; |
| const int64_t requested_timeout = |
| StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result); |
| if (result != StringParser::PARSE_SUCCESS || requested_timeout < 0) { |
| return Status( |
| Substitute("Invalid fetch rows timeout: '$0'. " |
| "Only non-negative numbers are allowed.", value)); |
| } |
| query_options->__set_fetch_rows_timeout_ms(requested_timeout); |
| break; |
| } |
| case TImpalaQueryOptions::NOW_STRING: { |
| query_options->__set_now_string(value); |
| break; |
| } |
| case TImpalaQueryOptions::PARQUET_OBJECT_STORE_SPLIT_SIZE: { |
| int64_t parquet_object_store_split_size; |
| RETURN_IF_ERROR(ParseMemValue( |
| value, "parquet object store split size", &parquet_object_store_split_size)); |
| // The MIN_SYNTHETIC_BLOCK_SIZE from HdfsPartition.java. HdfsScanNode.java forces |
| // the block size to be greater than or equal to this value, so reject any |
| // attempt to set PARQUET_OBJECT_STORE_SPLIT_SIZE to a value lower than |
| // MIN_SYNTHETIC_BLOCK_SIZE. |
| int min_synthetic_block_size = 1024 * 1024; |
| if (parquet_object_store_split_size < min_synthetic_block_size) { |
| return Status(Substitute("Invalid parquet object store split size: '$0'. Must " |
| "be greater than or equal to '$1'.", |
| value, min_synthetic_block_size)); |
| } |
| query_options->__set_parquet_object_store_split_size( |
| parquet_object_store_split_size); |
| break; |
| } |
| case TImpalaQueryOptions::MEM_LIMIT_EXECUTORS: { |
| // Parse the mem limit spec and validate it. |
| int64_t bytes_limit; |
| RETURN_IF_ERROR( |
| ParseMemValue(value, "query memory limit for executors", &bytes_limit)); |
| query_options->__set_mem_limit_executors(bytes_limit); |
| break; |
| } |
| case TImpalaQueryOptions::BROADCAST_BYTES_LIMIT: { |
| // Parse the broadcast_bytes limit and validate it |
| int64_t broadcast_bytes_limit; |
| RETURN_IF_ERROR( |
| ParseMemValue(value, "broadcast bytes limit for join operations", |
| &broadcast_bytes_limit)); |
| query_options->__set_broadcast_bytes_limit(broadcast_bytes_limit); |
| break; |
| } |
| default: |
| if (IsRemovedQueryOption(key)) { |
| LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; |
| return Status::OK(); |
| } |
| // We hit this DCHECK(false) if we forgot to add the corresponding entry here |
| // when we add a new query option. |
| LOG(ERROR) << "Missing exec option implementation: " << key; |
| DCHECK(false); |
| break; |
| } |
| if (set_query_options_mask != NULL) { |
| DCHECK_LT(option, set_query_options_mask->size()); |
| set_query_options_mask->set(option); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status impala::ParseQueryOptions(const string& options, TQueryOptions* query_options, |
| QueryOptionsMask* set_query_options_mask) { |
| if (options.length() == 0) return Status::OK(); |
| vector<string> kv_pairs; |
| split(kv_pairs, options, is_any_of(","), token_compress_on); |
| // Construct an error status which is used to aggregate errors encountered during |
| // parsing. It is only returned if the number of error details is greater than 0. |
| Status errorStatus = Status::Expected("Errors parsing query options"); |
| for (string& kv_string: kv_pairs) { |
| trim(kv_string); |
| if (kv_string.length() == 0) continue; |
| vector<string> key_value; |
| split(key_value, kv_string, is_any_of("="), token_compress_on); |
| if (key_value.size() != 2) { |
| errorStatus.MergeStatus( |
| Status(Substitute("Invalid configuration option '$0'.", kv_string))); |
| continue; |
| } |
| errorStatus.MergeStatus(SetQueryOption(key_value[0], key_value[1], query_options, |
| set_query_options_mask)); |
| } |
| if (errorStatus.msg().details().size() > 0) return errorStatus; |
| return Status::OK(); |
| } |
| |
| Status impala::ValidateQueryOptions(TQueryOptions* query_options) { |
| // Validate that max_result_spooling_mem <= |
| // max_spilled_result_spooling_mem (a value of 0 means memory is unbounded). |
| int64_t max_mem = query_options->max_result_spooling_mem; |
| int64_t max_spilled_mem = query_options->max_spilled_result_spooling_mem; |
| if (max_mem == 0 && max_spilled_mem != 0) { |
| return Status("If max_result_spooling_mem is set to 0 (unbounded) " |
| "max_spilled_result_spooling_mem must be set to 0 (unbounded) as " |
| "well."); |
| } |
| if (max_spilled_mem != 0 && max_spilled_mem < max_mem) { |
| return Status(Substitute("max_spilled_result_spooling_mem '$0' must be greater than " |
| "max_result_spooling_mem '$1'", |
| max_spilled_mem, max_mem)); |
| } |
| return Status::OK(); |
| } |
| |
| void impala::PopulateQueryOptionLevels(QueryOptionLevels* query_option_levels) |
| { |
| #define QUERY_OPT_FN(NAME, ENUM, LEVEL)\ |
| {\ |
| (*query_option_levels)[#ENUM] = LEVEL;\ |
| } |
| #define REMOVED_QUERY_OPT_FN(NAME, ENUM)\ |
| {\ |
| (*query_option_levels)[#ENUM] = TQueryOptionLevel::REMOVED;\ |
| } |
| QUERY_OPTS_TABLE |
| QUERY_OPT_FN(support_start_over, SUPPORT_START_OVER, TQueryOptionLevel::ADVANCED) |
| #undef QUERY_OPT_FN |
| #undef REMOVED_QUERY_OPT_FN |
| } |