blob: 3f716c365f662e400870f10b0fda16f9f53e508e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/union-node.h"
#include "codegen/llvm-codegen.h"
#include "exec/exec-node-util.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "gen-cpp/PlanNodes_types.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "runtime/tuple.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
using namespace impala;
Status UnionPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
DCHECK(tnode.__isset.union_node);
DCHECK_EQ(conjuncts_.size(), 0);
const TupleDescriptor* tuple_desc =
state->desc_tbl().GetTupleDescriptor(tnode.union_node.tuple_id);
DCHECK(tuple_desc != nullptr);
// Create const_exprs_lists_ from thrift exprs.
const vector<vector<TExpr>>& const_texpr_lists = tnode.union_node.const_expr_lists;
for (const vector<TExpr>& texprs : const_texpr_lists) {
vector<ScalarExpr*> const_exprs;
RETURN_IF_ERROR(ScalarExpr::Create(texprs, *row_descriptor_, state, &const_exprs));
DCHECK_EQ(const_exprs.size(), tuple_desc->slots().size());
const_exprs_lists_.push_back(const_exprs);
}
// Create child_exprs_lists_ from thrift exprs.
const vector<vector<TExpr>>& thrift_result_exprs = tnode.union_node.result_expr_lists;
for (int i = 0; i < thrift_result_exprs.size(); ++i) {
const vector<TExpr>& texprs = thrift_result_exprs[i];
vector<ScalarExpr*> child_exprs;
RETURN_IF_ERROR(
ScalarExpr::Create(texprs, *children_[i]->row_descriptor_, state, &child_exprs));
child_exprs_lists_.push_back(child_exprs);
DCHECK_EQ(child_exprs.size(), tuple_desc->slots().size());
}
return Status::OK();
}
Status UnionPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new UnionNode(pool, *this, state->desc_tbl()));
return Status::OK();
}
UnionNode::UnionNode(
ObjectPool* pool, const UnionPlanNode& pnode, const DescriptorTbl& descs)
: ExecNode(pool, pnode, descs),
tuple_id_(pnode.tnode_->union_node.tuple_id),
tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
first_materialized_child_idx_(pnode.tnode_->union_node.first_materialized_child_idx),
child_idx_(0),
child_batch_(nullptr),
child_row_idx_(0),
child_eos_(false),
const_exprs_lists_idx_(0),
to_close_child_idx_(-1) {
const_exprs_lists_ = pnode.const_exprs_lists_;
child_exprs_lists_ = pnode.child_exprs_lists_;
}
Status UnionNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Prepare(state));
DCHECK(tuple_desc_ != nullptr);
codegend_union_materialize_batch_fns_.resize(child_exprs_lists_.size());
// Prepare const expr lists.
for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) {
vector<ScalarExprEvaluator*> const_expr_evals;
RETURN_IF_ERROR(ScalarExprEvaluator::Create(const_exprs, state, pool_,
expr_perm_pool(), expr_results_pool(), &const_expr_evals));
const_expr_evals_lists_.push_back(const_expr_evals);
}
// Prepare result expr lists.
for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) {
vector<ScalarExprEvaluator*> child_expr_evals;
RETURN_IF_ERROR(ScalarExprEvaluator::Create(child_exprs, state, pool_,
expr_perm_pool(), expr_results_pool(), &child_expr_evals));
child_expr_evals_lists_.push_back(child_expr_evals);
}
return Status::OK();
}
void UnionNode::Codegen(RuntimeState* state) {
DCHECK(state->ShouldCodegen());
ExecNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
std::stringstream codegen_message;
Status codegen_status;
for (int i = 0; i < child_exprs_lists_.size(); ++i) {
if (IsChildPassthrough(i)) continue;
llvm::Function* tuple_materialize_exprs_fn;
codegen_status = Tuple::CodegenMaterializeExprs(codegen, false, *tuple_desc_,
child_exprs_lists_[i], true, &tuple_materialize_exprs_fn);
if (!codegen_status.ok()) {
// Codegen may fail in some corner cases. If this happens, abort codegen for this
// and the remaining children.
codegen_message << "Codegen failed for child: " << children_[i]->id();
break;
}
// Get a copy of the function. This function will be modified and added to the
// vector of functions.
llvm::Function* union_materialize_batch_fn =
codegen->GetFunction(IRFunction::UNION_MATERIALIZE_BATCH, true);
DCHECK(union_materialize_batch_fn != nullptr);
int replaced = codegen->ReplaceCallSites(union_materialize_batch_fn,
tuple_materialize_exprs_fn, Tuple::MATERIALIZE_EXPRS_SYMBOL);
DCHECK_REPLACE_COUNT(replaced, 1) << LlvmCodeGen::Print(union_materialize_batch_fn);
union_materialize_batch_fn = codegen->FinalizeFunction(
union_materialize_batch_fn);
DCHECK(union_materialize_batch_fn != nullptr);
// Add the function to Jit and to the vector of codegened functions.
codegen->AddFunctionToJit(union_materialize_batch_fn,
reinterpret_cast<void**>(&(codegend_union_materialize_batch_fns_.data()[i])));
}
runtime_profile()->AddCodegenMsg(
codegen_status.ok(), codegen_status, codegen_message.str());
}
Status UnionNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedOpenEventAdder ea(this);
RETURN_IF_ERROR(ExecNode::Open(state));
// Open const expr lists.
for (const vector<ScalarExprEvaluator*>& evals : const_expr_evals_lists_) {
RETURN_IF_ERROR(ScalarExprEvaluator::Open(evals, state));
}
// Open result expr lists.
for (const vector<ScalarExprEvaluator*>& evals : child_expr_evals_lists_) {
RETURN_IF_ERROR(ScalarExprEvaluator::Open(evals, state));
}
// Ensures that rows are available for clients to fetch after this Open() has
// succeeded.
if (!children_.empty()) RETURN_IF_ERROR(child(child_idx_)->Open(state));
return Status::OK();
}
Status UnionNode::GetNextPassThrough(RuntimeState* state, RowBatch* row_batch) {
DCHECK(!ReachedLimit());
DCHECK(!IsInSubplan());
DCHECK_LT(child_idx_, children_.size());
DCHECK(IsChildPassthrough(child_idx_));
DCHECK(child(child_idx_)->row_desc()->LayoutEquals(*row_batch->row_desc()));
if (child_eos_) RETURN_IF_ERROR(child(child_idx_)->Open(state));
DCHECK_EQ(row_batch->num_rows(), 0);
RETURN_IF_ERROR(child(child_idx_)->GetNext(state, row_batch, &child_eos_));
if (child_eos_) {
// Even though the child is at eos, it's not OK to Close() it here. Once we close
// the child, the row batches that it produced are invalid. Marking the batch as
// needing a deep copy let's us safely close the child in the next GetNext() call.
// TODO: Remove this as part of IMPALA-4179.
row_batch->MarkNeedsDeepCopy();
to_close_child_idx_ = child_idx_;
++child_idx_;
}
return Status::OK();
}
Status UnionNode::GetNextMaterialized(RuntimeState* state, RowBatch* row_batch) {
// Fetch from children, evaluate corresponding exprs and materialize.
DCHECK(!ReachedLimit());
DCHECK_LT(child_idx_, children_.size());
int64_t tuple_buf_size;
uint8_t* tuple_buf;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, &tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
while (HasMoreMaterialized() && !row_batch->AtCapacity()) {
// The loop runs until we are either done iterating over the children that require
// materialization, or the row batch is at capacity.
DCHECK(!IsChildPassthrough(child_idx_));
// Child row batch was either never set or we're moving on to a different child.
if (child_batch_.get() == nullptr) {
DCHECK_LT(child_idx_, children_.size());
child_batch_.reset(new RowBatch(
child(child_idx_)->row_desc(), state->batch_size(), mem_tracker()));
child_row_idx_ = 0;
// Open the current child unless it's the first child, which was already opened in
// UnionNode::Open().
if (child_eos_) RETURN_IF_ERROR(child(child_idx_)->Open(state));
// The first batch from each child is always fetched here.
RETURN_IF_ERROR(child(child_idx_)->GetNext(
state, child_batch_.get(), &child_eos_));
}
while (!row_batch->AtCapacity()) {
DCHECK(child_batch_.get() != nullptr);
DCHECK_LE(child_row_idx_, child_batch_->num_rows());
if (child_row_idx_ == child_batch_->num_rows()) {
// Move on to the next child if it is at eos.
if (child_eos_) break;
// Fetch more rows from the child.
child_batch_->Reset();
child_row_idx_ = 0;
// All batches except the first batch from each child are fetched here.
RETURN_IF_ERROR(child(child_idx_)->GetNext(
state, child_batch_.get(), &child_eos_));
// If we fetched an empty batch, go back to the beginning of this while loop, and
// try again.
if (child_batch_->num_rows() == 0) continue;
}
DCHECK_EQ(codegend_union_materialize_batch_fns_.size(), children_.size());
if (codegend_union_materialize_batch_fns_[child_idx_] == nullptr) {
MaterializeBatch(row_batch, &tuple_buf);
} else {
codegend_union_materialize_batch_fns_[child_idx_](this, row_batch, &tuple_buf);
}
}
// It shouldn't be the case that we reached the limit because we shouldn't have
// incremented 'num_rows_returned_' yet.
DCHECK(!ReachedLimit());
if (child_eos_ && child_row_idx_ == child_batch_->num_rows()) {
// Unless we are inside a subplan expecting to call Open()/GetNext() on the child
// again, the child can be closed at this point.
child_batch_.reset();
if (!IsInSubplan()) child(child_idx_)->Close(state);
++child_idx_;
} else {
// If we haven't finished consuming rows from the current child, we must have ended
// up here because the row batch is at capacity.
DCHECK(row_batch->AtCapacity());
}
}
DCHECK_LE(child_idx_, children_.size());
return Status::OK();
}
Status UnionNode::GetNextConst(RuntimeState* state, RowBatch* row_batch) {
DCHECK(state->instance_ctx().per_fragment_instance_idx == 0 || IsInSubplan());
DCHECK_LT(const_exprs_lists_idx_, const_expr_evals_lists_.size());
// Create new tuple buffer for row_batch.
int64_t tuple_buf_size;
uint8_t* tuple_buf;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, &tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
while (const_exprs_lists_idx_ < const_exprs_lists_.size() && !row_batch->AtCapacity()) {
MaterializeExprs(
const_expr_evals_lists_[const_exprs_lists_idx_], nullptr, tuple_buf, row_batch);
tuple_buf += tuple_desc_->byte_size();
++const_exprs_lists_idx_;
}
return Status::OK();
}
Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
if (to_close_child_idx_ != -1) {
// The previous child needs to be closed if passthrough was enabled for it. In the non
// passthrough case, the child was already closed in the previous call to GetNext().
DCHECK(IsChildPassthrough(to_close_child_idx_));
DCHECK(!IsInSubplan());
child(to_close_child_idx_)->Close(state);
to_close_child_idx_ = -1;
}
// Save the number of rows in case GetNext() is called with a non-empty batch, which can
// happen in a subplan.
int num_rows_before = row_batch->num_rows();
if (HasMorePassthrough()) {
RETURN_IF_ERROR(GetNextPassThrough(state, row_batch));
} else if (HasMoreMaterialized()) {
RETURN_IF_ERROR(GetNextMaterialized(state, row_batch));
} else if (HasMoreConst(state)) {
RETURN_IF_ERROR(GetNextConst(state, row_batch));
}
int num_rows_added = row_batch->num_rows() - num_rows_before;
DCHECK_GE(num_rows_added, 0);
if (limit_ != -1 && rows_returned() + num_rows_added > limit_) {
// Truncate the row batch if we went over the limit.
num_rows_added = limit_ - rows_returned();
row_batch->set_num_rows(num_rows_before + num_rows_added);
DCHECK_GE(num_rows_added, 0);
}
IncrementNumRowsReturned(num_rows_added);
*eos = ReachedLimit() ||
(!HasMorePassthrough() && !HasMoreMaterialized() && !HasMoreConst(state));
COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK();
}
Status UnionNode::Reset(RuntimeState* state, RowBatch* row_batch) {
child_idx_ = 0;
child_batch_.reset();
child_row_idx_ = 0;
child_eos_ = false;
const_exprs_lists_idx_ = 0;
// Since passthrough is disabled in subplans, verify that there is no passthrough child
// that needs to be closed.
DCHECK_EQ(to_close_child_idx_, -1);
return ExecNode::Reset(state, row_batch);
}
void UnionNode::Close(RuntimeState* state) {
if (is_closed()) return;
child_batch_.reset();
for (const vector<ScalarExprEvaluator*>& evals : const_expr_evals_lists_) {
ScalarExprEvaluator::Close(evals, state);
}
for (const vector<ScalarExprEvaluator*>& evals : child_expr_evals_lists_) {
ScalarExprEvaluator::Close(evals, state);
}
for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) {
ScalarExpr::Close(const_exprs);
}
for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) {
ScalarExpr::Close(child_exprs);
}
ExecNode::Close(state);
}