blob: ce756273d92018f3eaad44e27b922c3604294e6a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/types.pb.h>
#include <utility>
#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
#include "runtime/runtime_state.h"
#include "vec/core/column_numbers.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/simple_function_factory.h"
#include "vec/utils/util.hpp"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
// only used for dynamic topn filter
class VTopNPred : public VExpr {
ENABLE_FACTORY_CREATOR(VTopNPred);
public:
VTopNPred(const TExprNode& node, int source_node_id, VExprContextSPtr target_ctx)
: VExpr(node),
_source_node_id(source_node_id),
_expr_name(fmt::format("VTopNPred(source_node_id={})", _source_node_id)),
_target_ctx(std::move(target_ctx)) {}
static Status create_vtopn_pred(const TExpr& target_expr, int source_node_id,
vectorized::VExprSPtr& expr) {
vectorized::VExprContextSPtr target_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(target_expr, target_ctx));
TExprNode node;
node.__set_node_type(TExprNodeType::FUNCTION_CALL);
node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
node.__set_is_nullable(target_ctx->root()->is_nullable());
expr = vectorized::VTopNPred::create_shared(node, source_node_id, target_ctx);
expr->add_child(target_ctx->root());
return Status::OK();
}
Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override {
_predicate = &state->get_query_ctx()->get_runtime_predicate(_source_node_id);
RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
ColumnsWithTypeAndName argument_template;
argument_template.emplace_back(nullptr, _children[0]->data_type(),
_children[0]->expr_name());
argument_template.emplace_back(nullptr, _children[0]->data_type(), "topn value");
_function = SimpleFunctionFactory::instance().get_function(
_predicate->is_asc() ? "le" : "ge", argument_template, _data_type,
{.enable_decimal256 = state->enable_decimal256()}, state->be_exec_version());
if (!_function) {
return Status::InternalError("get function failed");
}
return Status::OK();
}
Status execute(VExprContext* context, Block* block, int* result_column_id) override {
if (!_predicate->has_value()) {
block->insert({create_always_true_column(block->rows(), _data_type->is_nullable()),
_data_type, _expr_name});
*result_column_id = block->columns() - 1;
return Status::OK();
}
Field field = _predicate->get_value();
auto column_ptr = _children[0]->data_type()->create_column_const(1, field);
size_t row_size = std::max(block->rows(), column_ptr->size());
int topn_value_id = VExpr::insert_param(
block, {column_ptr, _children[0]->data_type(), _expr_name}, row_size);
int slot_id = -1;
RETURN_IF_ERROR(_children[0]->execute(context, block, &slot_id));
// if error(slot_id == -1), will return.
ColumnNumbers arguments = {static_cast<uint32_t>(slot_id),
static_cast<uint32_t>(topn_value_id)};
uint32_t num_columns_without_result = block->columns();
block->insert({nullptr, _data_type, _expr_name});
RETURN_IF_ERROR(_function->execute(nullptr, *block, arguments, num_columns_without_result,
block->rows(), false));
*result_column_id = num_columns_without_result;
if (is_nullable() && _predicate->nulls_first()) {
// null values ​​are always not filtered
change_null_to_true(block->get_by_position(num_columns_without_result).column);
}
return Status::OK();
}
const std::string& expr_name() const override { return _expr_name; }
private:
int _source_node_id;
std::string _expr_name;
RuntimePredicate* _predicate = nullptr;
FunctionBasePtr _function;
VExprContextSPtr _target_ctx;
};
#include "common/compile_check_end.h"
} // namespace doris::vectorized