blob: 3e248d0c30489fc548eff3218b980537e22d537b [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/unpivot-node.h"
#include "common/status.h"
#include "exec/exec-node.inline.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "runtime/fragment-state.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
using std::make_unique;
using std::vector;
namespace impala {
Status UnpivotPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
DCHECK(tnode.__isset.unpivot_node);
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
DCHECK(tnode.row_tuples.size() == 1);
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tnode.row_tuples[0]);
RETURN_IF_ERROR(ScalarExpr::Create(tnode.unpivot_node.source_exprs,
*(children_[0]->row_descriptor_), state, &source_exprs_));
DCHECK_EQ(tuple_desc_->slots().size(), source_exprs_.size());
num_unpivot_columns_ = tnode.unpivot_node.num_unpivot_columns;
if (tnode.unpivot_node.__isset.data_slot_id) {
data_slot_id_ = tnode.unpivot_node.data_slot_id;
DCHECK(tnode.unpivot_node.__isset.data_exprs);
RETURN_IF_ERROR(ScalarExpr::Create(tnode.unpivot_node.data_exprs,
*(children_[0]->row_descriptor_), state, &data_exprs_));
}
if (tnode.unpivot_node.__isset.header_slot_id) {
header_slot_id_ = tnode.unpivot_node.header_slot_id;
DCHECK(tnode.unpivot_node.__isset.header_exprs);
RETURN_IF_ERROR(ScalarExpr::Create(tnode.unpivot_node.header_exprs,
*(children_[0]->row_descriptor_), state, &header_exprs_));
}
return Status::OK();
}
void UnpivotPlanNode::Close() {
for (auto expr : source_exprs_) {
expr->Close();
}
for (auto expr : data_exprs_) {
expr->Close();
}
for (auto expr : header_exprs_) {
expr->Close();
}
PlanNode::Close();
}
Status UnpivotPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new UnpivotNode(pool, *this, state->desc_tbl()));
return Status::OK();
}
UnpivotNode::UnpivotNode(
ObjectPool* pool, const UnpivotPlanNode& pnode, const DescriptorTbl& descs)
: ExecNode(pool, pnode, descs) {}
Status UnpivotNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Prepare(state));
const auto& plan_node = static_cast<const UnpivotPlanNode&>(plan_node_);
auto init_evals = [&](const auto& exprs, auto& evals) -> Status {
for (const auto expr : exprs) {
ScalarExprEvaluator* eval = nullptr;
RETURN_IF_ERROR(ScalarExprEvaluator::Create(
*expr, state, pool_, expr_perm_pool(), expr_results_pool(), &eval));
evals.push_back(eval);
}
return Status::OK();
};
RETURN_IF_ERROR(init_evals(plan_node.source_exprs_, source_evals_));
RETURN_IF_ERROR(init_evals(plan_node.data_exprs_, data_evals_));
RETURN_IF_ERROR(init_evals(plan_node.header_exprs_, header_evals_));
return Status::OK();
}
Status UnpivotNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Open(state));
RETURN_IF_ERROR(child(0)->Open(state));
for (const auto eval : source_evals_) {
RETURN_IF_ERROR(eval->Open(state));
}
for (const auto eval : data_evals_) {
RETURN_IF_ERROR(eval->Open(state));
}
for (const auto eval : header_evals_) {
RETURN_IF_ERROR(eval->Open(state));
}
return Status::OK();
}
void UnpivotNode::MaterializeOutputTuple(Tuple* output_tuple, RowBatch* row_batch) {
const auto& plan_node = static_cast<const UnpivotPlanNode&>(plan_node_);
const auto num_slots = plan_node.tuple_desc_->slots().size();
auto child_row = child_row_batch_->GetRow(child_row_idx_);
for (int slot_idx = 0; slot_idx < num_slots; ++slot_idx) {
auto output_slot = plan_node.tuple_desc_->slots()[slot_idx];
void* src = nullptr;
if (output_slot->id() == plan_node.data_slot_id_) {
src = data_evals_[unpivot_slot_idx_]->GetValue(child_row);
} else if (output_slot->id() == plan_node.header_slot_id_) {
src = header_evals_[unpivot_slot_idx_]->GetValue(child_row);
} else {
src = source_evals_[slot_idx]->GetValue(child_row);
}
RawValue::Write(src, output_tuple, output_slot, row_batch->tuple_data_pool());
}
}
// Like a UnionNode, an UnpivotNode materializes tuples by evaluating expressions.
// Unlike a UnionNode, for each input tuple, an UnpivotNode might produce multiple
// output tuples.
Status UnpivotNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
const auto& plan_node = static_cast<const UnpivotPlanNode&>(plan_node_);
int64_t tuple_buf_size = -1;
uint8_t* tuple_buf = nullptr;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, &tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
auto output_tuple = tuple_buf;
DCHECK_GT(plan_node.num_unpivot_columns_, 0);
// Materializes one row in each iteration if *eos is false.
do {
if (unpivot_slot_idx_ == plan_node.num_unpivot_columns_) {
++child_row_idx_;
unpivot_slot_idx_ = 0;
}
if (child_row_batch_ == nullptr || child_row_idx_ == child_row_batch_->num_rows()) {
if (child_row_batch_ == nullptr) {
child_row_batch_ = make_unique<RowBatch>(
child(0)->row_desc(), state->batch_size(), mem_tracker());
} else {
child_row_batch_->Reset();
}
if (!child_eos_) {
RETURN_IF_ERROR(child(0)->GetNext(state, child_row_batch_.get(), &child_eos_));
}
child_row_idx_ = 0;
}
*eos = ReachedLimit()
|| (child_eos_ && (child_row_idx_ == child_row_batch_->num_rows()));
if (*eos) {
return Status::OK();
}
if (child_row_batch_->num_rows() == 0) {
continue;
}
MaterializeOutputTuple(reinterpret_cast<Tuple*>(output_tuple), row_batch);
++unpivot_slot_idx_;
auto output_row = row_batch->GetRow(row_batch->AddRow());
output_row->SetTuple(0, reinterpret_cast<Tuple*>(output_tuple));
if (EvalConjuncts(conjunct_evals_.data(), conjunct_evals_.size(), output_row)) {
row_batch->CommitLastRow();
IncrementNumRowsReturned(1);
output_tuple += plan_node.tuple_desc_->byte_size();
}
} while (!*eos && !row_batch->AtCapacity());
return Status::OK();
}
Status UnpivotNode::Reset(RuntimeState* state, RowBatch* row_batch) {
if (child_row_batch_ != nullptr) {
child_row_batch_->Reset();
}
child_row_idx_ = 0;
unpivot_slot_idx_ = 0;
child_eos_ = false;
return ExecNode::Reset(state, row_batch);
}
void UnpivotNode::Close(RuntimeState* state) {
if (is_closed()) return;
child_row_batch_.reset();
for (const auto& eval : source_evals_) {
eval->Close(state);
}
for (const auto& eval : data_evals_) {
eval->Close(state);
}
for (const auto& eval : header_evals_) {
eval->Close(state);
}
ExecNode::Close(state);
}
} // namespace impala