blob: d6502be578b9f8a5ae704196d2e021a516a2c8d0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/vdata_gen_scan_node.h"
#include <gen_cpp/PlanNodes_types.h>
#include <sstream>
#include "common/logging.h"
#include "common/status.h"
#include "exec/exec_node.h"
#include "exprs/runtime_filter.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
#include "vec/exprs/vexpr_context.h"
namespace doris {
class ObjectPool;
class TScanRangeParams;
} // namespace doris
namespace doris::vectorized {
VDataGenFunctionScanNode::VDataGenFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
_is_init(false),
_tuple_id(tnode.data_gen_scan_node.tuple_id),
_tuple_desc(nullptr),
_runtime_filter_descs(tnode.runtime_filters) {
// set _table_func here
switch (tnode.data_gen_scan_node.func_name) {
case TDataGenFunctionName::NUMBERS:
_table_func = std::make_shared<VNumbersTVF>(_tuple_id, _tuple_desc);
break;
default:
LOG(FATAL) << "Unsupported function type";
__builtin_unreachable();
}
}
Status VDataGenFunctionScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VDataGenFunctionScanNode::Prepare";
if (_is_init) {
return Status::OK();
}
if (nullptr == state) {
return Status::InternalError("input pointer is nullptr.");
}
RETURN_IF_ERROR(ScanNode::prepare(state));
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
_table_func->set_tuple_desc(_tuple_desc);
if (nullptr == _tuple_desc) {
return Status::InternalError("Failed to get tuple descriptor.");
}
// TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node.
for (const auto& filter_desc : _runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
RETURN_IF_ERROR(
state->register_consumer_runtime_filter(filter_desc, false, id(), &runtime_filter));
runtime_filter->init_profile(_runtime_profile.get());
}
_is_init = true;
return Status::OK();
}
Status VDataGenFunctionScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
if (nullptr == state) {
return Status::InternalError("input pointer is nullptr.");
}
if (!_is_init) {
return Status::InternalError("used before initialize.");
}
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
return Status::OK();
}
Status VDataGenFunctionScanNode::get_next(RuntimeState* state, vectorized::Block* block,
bool* eos) {
if (state == nullptr || block == nullptr || eos == nullptr) {
return Status::InternalError("input is NULL pointer");
}
RETURN_IF_CANCELLED(state);
Status res = _table_func->get_next(state, block, eos);
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, block, block->columns()));
reached_limit(block, eos);
return res;
}
Status VDataGenFunctionScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
static_cast<void>(_table_func->close(state));
SCOPED_TIMER(_runtime_profile->total_time_counter());
return ExecNode::close(state);
}
Status VDataGenFunctionScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
return _table_func->set_scan_ranges(scan_ranges);
}
} // namespace doris::vectorized