| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/cardinality-check-node.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-state.h" |
| #include "util/runtime-profile-counters.h" |
| #include "gen-cpp/PlanNodes_types.h" |
| |
| #include "common/names.h" |
| |
| namespace impala { |
| |
| CardinalityCheckNode::CardinalityCheckNode( |
| ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) |
| : ExecNode(pool, tnode, descs), |
| display_statement_(tnode.cardinality_check_node.display_statement) { |
| } |
| |
| Status CardinalityCheckNode::Prepare(RuntimeState* state) { |
| DCHECK(conjuncts_.empty()); |
| DCHECK_EQ(limit_, 1); |
| |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| RETURN_IF_ERROR(ExecNode::Prepare(state)); |
| return Status::OK(); |
| } |
| |
| Status CardinalityCheckNode::Open(RuntimeState* state) { |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| RETURN_IF_ERROR(ExecNode::Open(state)); |
| RETURN_IF_ERROR(child(0)->Open(state)); |
| row_batch_.reset( |
| new RowBatch(row_desc(), 1, mem_tracker())); |
| |
| // Read rows from the child, raise error if there are more rows than one |
| RowBatch child_batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); |
| bool child_eos = false; |
| int rows_collected = 0; |
| do { |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(QueryMaintenance(state)); |
| RETURN_IF_ERROR(child(0)->GetNext(state, &child_batch, &child_eos)); |
| |
| rows_collected += child_batch.num_rows(); |
| if (rows_collected > 1) { |
| return Status(Substitute("Subquery must not return more than one row: $0", |
| display_statement_)); |
| } |
| if (child_batch.num_rows() != 0) child_batch.DeepCopyTo(row_batch_.get()); |
| child_batch.Reset(); |
| } while (!child_eos); |
| |
| DCHECK(rows_collected == 0 || rows_collected == 1); |
| |
| // If we are inside a subplan we can expect a call to Open()/GetNext() |
| // on the child again. |
| if (!IsInSubplan()) child(0)->Close(state); |
| return Status::OK(); |
| } |
| |
| Status CardinalityCheckNode::GetNext(RuntimeState* state, RowBatch* output_row_batch, |
| bool* eos) { |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(QueryMaintenance(state)); |
| DCHECK_LE(row_batch_->num_rows(), 1); |
| |
| if (row_batch_->num_rows() == 1) { |
| TupleRow* src_row = row_batch_->GetRow(0); |
| TupleRow* dst_row = output_row_batch->GetRow(output_row_batch->AddRow()); |
| output_row_batch->CopyRow(src_row, dst_row); |
| output_row_batch->CommitLastRow(); |
| row_batch_->TransferResourceOwnership(output_row_batch); |
| num_rows_returned_ = 1; |
| COUNTER_SET(rows_returned_counter_, num_rows_returned_); |
| } |
| *eos = true; |
| row_batch_->Reset(); |
| return Status::OK(); |
| } |
| |
| Status CardinalityCheckNode::Reset(RuntimeState* state) { |
| row_batch_->Reset(); |
| return ExecNode::Reset(state); |
| } |
| |
| void CardinalityCheckNode::Close(RuntimeState* state) { |
| if (is_closed()) return; |
| // Need to call destructor to release resources before calling ExecNode::Close(). |
| row_batch_.reset(); |
| ExecNode::Close(state); |
| } |
| |
| } |