| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/fold_constant_executor.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/Exprs_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/internal_service.pb.h> |
| #include <gen_cpp/types.pb.h> |
| #include <glog/logging.h> |
| #include <stdint.h> |
| |
| #include <algorithm> |
| #include <boost/iterator/iterator_facade.hpp> |
| #include <map> |
| #include <ostream> |
| #include <utility> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/signal_handler.h" |
| #include "common/status.h" |
| #include "runtime/decimalv2_value.h" |
| #include "runtime/define_primitive_type.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/large_int_value.h" |
| #include "runtime/memory/mem_tracker.h" |
| #include "runtime/primitive_type.h" |
| #include "runtime/runtime_state.h" |
| #include "runtime/thread_context.h" |
| #include "util/binary_cast.hpp" |
| #include "util/defer_op.h" |
| #include "util/runtime_profile.h" |
| #include "util/uid_util.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_array.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/common/string_ref.h" |
| #include "vec/core/block.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/core/field.h" |
| #include "vec/core/types.h" |
| #include "vec/data_types/data_type_array.h" |
| #include "vec/data_types/data_type_number.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| #include "vec/functions/cast/cast_to_string.h" |
| #include "vec/runtime/timestamptz_value.h" |
| #include "vec/runtime/vdatetime_value.h" |
| |
| namespace doris { |
| |
| static std::unordered_set<PrimitiveType> PRIMITIVE_TYPE_SET { |
| TYPE_BOOLEAN, TYPE_TINYINT, TYPE_SMALLINT, TYPE_INT, TYPE_BIGINT, |
| TYPE_LARGEINT, TYPE_FLOAT, TYPE_DOUBLE, TYPE_TIMEV2, TYPE_CHAR, |
| TYPE_VARCHAR, TYPE_STRING, TYPE_HLL, TYPE_BITMAP, TYPE_DATE, |
| TYPE_DATETIME, TYPE_DATEV2, TYPE_DATETIMEV2, TYPE_DECIMALV2, TYPE_TIMESTAMPTZ}; |
| |
| Status FoldConstantExecutor::fold_constant_vexpr(const TFoldConstantParams& params, |
| PConstantExprResult* response) { |
| const auto& expr_map = params.expr_map; |
| auto* expr_result_map = response->mutable_expr_result_map(); |
| |
| TQueryGlobals query_globals = params.query_globals; |
| _query_id = params.query_id; |
| LOG(INFO) << "fold_query_id: " << print_id(_query_id); |
| // init |
| RETURN_IF_ERROR(_init(query_globals, params.query_options)); |
| // only after init operation, _mem_tracker is ready |
| SCOPED_ATTACH_TASK(_mem_tracker); |
| signal::SignalTaskIdKeeper keeper(_query_id); |
| |
| vectorized::DataTypeSerDe::FormatOptions format_options = |
| vectorized::DataTypeSerDe::get_default_format_options(); |
| format_options.timezone = &_runtime_state->timezone_obj(); |
| |
| for (const auto& m : expr_map) { |
| PExprResultMap pexpr_result_map; |
| for (const auto& n : m.second) { |
| vectorized::VExprContextSPtr ctx; |
| const TExpr& texpr = n.second; |
| // create expr tree from TExpr |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(texpr, ctx)); |
| // prepare and open context |
| RETURN_IF_ERROR(_prepare_and_open(ctx.get())); |
| |
| vectorized::ColumnWithTypeAndName tmp_data; |
| // calc vexpr |
| RETURN_IF_ERROR(ctx->execute_const_expr(tmp_data)); |
| // covert to thrift type |
| const auto& res_type = ctx->root()->data_type(); |
| TPrimitiveType::type t_type = doris::to_thrift(res_type->get_primitive_type()); |
| // collect result |
| PExprResult expr_result; |
| std::string result; |
| const auto& column_ptr = tmp_data.column; |
| const auto& column_type = tmp_data.type; |
| // 4 from fe: Config.be_exec_version maybe need remove after next version, now in 2.1 |
| if (_runtime_state->be_exec_version() >= 4 && params.__isset.is_nereids && |
| params.is_nereids) { |
| auto* p_type_desc = expr_result.mutable_type_desc(); |
| auto* p_values = expr_result.mutable_result_content(); |
| res_type->to_protobuf(p_type_desc); |
| auto datatype_serde = column_type->get_serde(); |
| RETURN_IF_ERROR(datatype_serde->write_column_to_pb( |
| *column_ptr->convert_to_full_column_if_const(), *p_values, 0, 1)); |
| expr_result.set_success(true); |
| // after refactor, this field is useless, but it's required |
| expr_result.set_content("ERROR"); |
| expr_result.mutable_type()->set_type(t_type); |
| pexpr_result_map.mutable_map()->insert({n.first, expr_result}); |
| } else { |
| if (column_ptr->is_null_at(0)) { |
| expr_result.set_success(false); |
| } else { |
| expr_result.set_success(true); |
| StringRef string_ref; |
| auto type = ctx->root()->data_type()->get_primitive_type(); |
| //eg: strcut, array, map VARIANT... will not impl get_data_at, so could use column->to_string() |
| if (PRIMITIVE_TYPE_SET.contains(type)) { |
| string_ref = column_ptr->get_data_at(0); |
| } |
| RETURN_IF_ERROR(_get_result((void*)string_ref.data, string_ref.size, |
| ctx->root()->data_type(), column_ptr, column_type, |
| result, format_options)); |
| } |
| expr_result.set_content(std::move(result)); |
| expr_result.mutable_type()->set_type(t_type); |
| expr_result.mutable_type()->set_scale(res_type->get_scale()); |
| expr_result.mutable_type()->set_precision(res_type->get_precision()); |
| expr_result.mutable_type()->set_len( |
| res_type->get_primitive_type() == PrimitiveType::TYPE_STRING |
| ? assert_cast<const vectorized::DataTypeString*>( |
| vectorized::remove_nullable(res_type).get()) |
| ->len() |
| : -1); |
| pexpr_result_map.mutable_map()->insert({n.first, expr_result}); |
| } |
| } |
| expr_result_map->insert({m.first, pexpr_result_map}); |
| } |
| return Status::OK(); |
| } |
| |
| Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals, |
| const TQueryOptions& query_options) { |
| // init runtime state, runtime profile |
| TPlanFragmentExecParams params; |
| params.fragment_instance_id = _query_id; |
| params.query_id = _query_id; |
| _mem_tracker = MemTrackerLimiter::create_shared( |
| MemTrackerLimiter::Type::OTHER, |
| fmt::format("FoldConstant:query_id={}", print_id(_query_id))); |
| _runtime_state = RuntimeState::create_unique(params, query_options, query_globals, |
| ExecEnv::GetInstance(), nullptr, _mem_tracker); |
| DescriptorTbl* desc_tbl = nullptr; |
| Status status = |
| DescriptorTbl::create(_runtime_state->obj_pool(), TDescriptorTable(), &desc_tbl); |
| if (UNLIKELY(!status.ok())) { |
| LOG(WARNING) << "Failed to create descriptor table, msg: " << status; |
| return status; |
| } |
| _runtime_state->set_desc_tbl(desc_tbl); |
| |
| _runtime_profile = _runtime_state->runtime_profile(); |
| _runtime_profile->set_name("FoldConstantExpr"); |
| |
| return Status::OK(); |
| } |
| |
| template <typename Context> |
| Status FoldConstantExecutor::_prepare_and_open(Context* ctx) { |
| RETURN_IF_ERROR(ctx->prepare(_runtime_state.get(), RowDescriptor())); |
| return ctx->open(_runtime_state.get()); |
| } |
| |
| Status FoldConstantExecutor::_get_result(void* src, size_t size, |
| const vectorized::DataTypePtr& type, |
| const vectorized::ColumnPtr column_ptr, |
| const vectorized::DataTypePtr column_type, |
| std::string& result, |
| const vectorized::DataTypeSerDe::FormatOptions& options) { |
| switch (type->get_primitive_type()) { |
| case TYPE_BOOLEAN: { |
| bool val = *reinterpret_cast<const bool*>(src); |
| result = val ? "true" : "false"; |
| break; |
| } |
| case TYPE_TINYINT: { |
| int8_t val = *reinterpret_cast<const int8_t*>(src); |
| result = fmt::format(FMT_COMPILE("{}"), val); |
| break; |
| } |
| case TYPE_SMALLINT: { |
| int16_t val = *reinterpret_cast<const int16_t*>(src); |
| result = fmt::format(FMT_COMPILE("{}"), val); |
| break; |
| } |
| case TYPE_INT: { |
| int32_t val = *reinterpret_cast<const int32_t*>(src); |
| result = fmt::format(FMT_COMPILE("{}"), val); |
| break; |
| } |
| case TYPE_BIGINT: { |
| int64_t val = *reinterpret_cast<const int64_t*>(src); |
| result = fmt::format(FMT_COMPILE("{}"), val); |
| break; |
| } |
| case TYPE_LARGEINT: { |
| result = LargeIntValue::to_string(*reinterpret_cast<__int128*>(src)); |
| break; |
| } |
| case TYPE_FLOAT: { |
| float val = *reinterpret_cast<const float*>(src); |
| result = fmt::format(FMT_COMPILE("{}"), val); |
| break; |
| } |
| case TYPE_DOUBLE: { |
| double val = *reinterpret_cast<double*>(src); |
| result = fmt::format(FMT_COMPILE("{}"), val); |
| break; |
| } |
| case TYPE_TIMEV2: { |
| constexpr static auto ratio_to_time = (1000 * 1000); |
| double val = *reinterpret_cast<double*>(src); |
| result = fmt::format(FMT_COMPILE("{}"), val / ratio_to_time); |
| break; |
| } |
| case TYPE_CHAR: |
| case TYPE_VARCHAR: |
| case TYPE_STRING: |
| case TYPE_HLL: |
| case TYPE_BITMAP: { |
| result = std::string((char*)src, size); |
| break; |
| } |
| case TYPE_DATE: |
| case TYPE_DATETIME: { |
| auto* date_value = reinterpret_cast<VecDateTimeValue*>(src); |
| result = vectorized::CastToString::from_date_or_datetime(*date_value); |
| break; |
| } |
| case TYPE_DATEV2: { |
| DateV2Value<DateV2ValueType> value = |
| binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(int32_t*)src); |
| result = vectorized::CastToString::from_datev2(value); |
| break; |
| } |
| case TYPE_DATETIMEV2: { |
| DateV2Value<DateTimeV2ValueType> value = |
| binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(int64_t*)src); |
| result = vectorized::CastToString::from_datetimev2(value, type->get_scale()); |
| break; |
| } |
| case TYPE_TIMESTAMPTZ: { |
| auto value = binary_cast<uint64_t, TimestampTzValue>(*(int64_t*)src); |
| result = vectorized::CastToString::from_timestamptz(value, type->get_scale(), |
| options.timezone); |
| break; |
| } |
| case TYPE_DECIMALV2: { |
| result = reinterpret_cast<DecimalV2Value*>(src)->to_string(type->get_scale()); |
| break; |
| } |
| case TYPE_DECIMAL32: |
| case TYPE_DECIMAL64: |
| case TYPE_DECIMAL128I: |
| case TYPE_DECIMAL256: |
| case TYPE_ARRAY: |
| case TYPE_JSONB: |
| case TYPE_MAP: |
| case TYPE_STRUCT: |
| case TYPE_VARIANT: |
| case TYPE_QUANTILE_STATE: |
| case TYPE_IPV4: |
| case TYPE_IPV6: { |
| result = column_type->to_string(*column_ptr, 0, options); |
| break; |
| } |
| default: |
| auto error_msg = |
| fmt::format("Type not implemented:{} need check it, and exec_query_id is: {}.", |
| type->get_name(), query_id_string()); |
| return Status::InternalError(error_msg); |
| } |
| return Status::OK(); |
| } |
| |
| } // namespace doris |