blob: c4d8379039ee71a656c92f51183ebce3d660574c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/nested-loop-join-node.h"
#include <sstream>
#include <gutil/strings/substitute.h>
#include "common/names.h"
#include "exec/exec-node-util.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "gen-cpp/PlanNodes_types.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "util/bitmap.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
using namespace impala;
using namespace strings;
Status NestedLoopJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
DCHECK(tnode.__isset.nested_loop_join_node);
// join_conjunct_evals_ are evaluated in the context of rows assembled from
// all inner and outer tuples.
RowDescriptor full_row_desc(
*children_[0]->row_descriptor_, *children_[1]->row_descriptor_);
RETURN_IF_ERROR(ScalarExpr::Create(tnode.nested_loop_join_node.join_conjuncts,
full_row_desc, state, &join_conjuncts_));
DCHECK(tnode.nested_loop_join_node.join_op != TJoinOp::CROSS_JOIN
|| join_conjuncts_.size() == 0)
<< "Join conjuncts in a cross join";
return Status::OK();
}
Status NestedLoopJoinPlanNode::CreateExecNode(
RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new NestedLoopJoinNode(pool, *this, state->desc_tbl()));
return Status::OK();
}
NestedLoopJoinNode::NestedLoopJoinNode(
ObjectPool* pool, const NestedLoopJoinPlanNode& pnode, const DescriptorTbl& descs)
: BlockingJoinNode("NestedLoopJoinNode", pnode.tnode_->nested_loop_join_node.join_op,
pool, pnode, descs),
build_batches_(NULL),
current_build_row_idx_(0),
process_unmatched_build_rows_(false) {
join_conjuncts_ = pnode.join_conjuncts_;
}
NestedLoopJoinNode::~NestedLoopJoinNode() {
DCHECK(is_closed());
}
Status NestedLoopJoinNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedOpenEventAdder ea(this);
RETURN_IF_ERROR(BlockingJoinNode::Open(state));
RETURN_IF_ERROR(ScalarExprEvaluator::Open(join_conjunct_evals_, state));
// Check for errors and free expr result allocations before opening children.
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
if (child(1)->type() == TPlanNodeType::type::SINGULAR_ROW_SRC_NODE) {
DCHECK(IsInSubplan());
// When inside a subplan, open the first child before doing the build such that
// UnnestNodes on the probe side are opened and project their unnested collection
// slots. Otherwise, the build might unnecessarily deep-copy those collection slots,
// and this node would return them in GetNext().
// TODO: Remove this special-case behavior for subplans once we have proper
// projection. See UnnestNode for details on the current projection implementation.
RETURN_IF_ERROR(child(0)->Open(state));
RETURN_IF_ERROR(ConstructSingularBuildSide(state));
DCHECK_EQ(builder_->copied_build_batches()->total_num_rows(), 0);
build_batches_ = builder_->input_build_batches();
} else {
RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
build_batches_ = builder_->GetFinalBuildBatches();
if (matching_build_rows_ != NULL) {
RETURN_IF_ERROR(ResetMatchingBuildRows(state, build_batches_->total_num_rows()));
}
}
RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
ResetForProbe();
return Status::OK();
}
Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state,
pool_, expr_perm_pool(), expr_results_pool(), &join_conjunct_evals_));
builder_ = NljBuilder::CreateSink(child(1)->row_desc(), state);
RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
runtime_profile()->PrependChild(builder_->profile());
// For some join modes we need to record the build rows with matches in a bitmap.
if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::RIGHT_SEMI_JOIN ||
join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN) {
if (child(1)->type() == TPlanNodeType::type::SINGULAR_ROW_SRC_NODE) {
// Allocate a fixed-size bitmap with a single element if we have a singular
// row source node as our build child.
int64_t bitmap_mem_usage = Bitmap::MemUsage(1);
if (!mem_tracker()->TryConsume(bitmap_mem_usage)) {
return mem_tracker()->MemLimitExceeded(state,
"Could not allocate bitmap in nested loop join", bitmap_mem_usage);
}
matching_build_rows_.reset(new Bitmap(1));
} else {
// Allocate empty bitmap to be expanded later.
matching_build_rows_.reset(new Bitmap(0));
}
}
return Status::OK();
}
Status NestedLoopJoinNode::Reset(RuntimeState* state, RowBatch* row_batch) {
builder_->Reset();
build_batches_ = NULL;
matched_probe_ = false;
current_probe_row_ = NULL;
probe_batch_pos_ = 0;
process_unmatched_build_rows_ = false;
return BlockingJoinNode::Reset(state, row_batch);
}
void NestedLoopJoinNode::Close(RuntimeState* state) {
if (is_closed()) return;
ScalarExprEvaluator::Close(join_conjunct_evals_, state);
ScalarExpr::Close(join_conjuncts_);
if (builder_ != NULL) {
// IMPALA-6595: builder must be closed before child.
DCHECK(builder_->is_closed() || !children_[1]->is_closed());
builder_->Close(state);
}
build_batches_ = NULL;
if (matching_build_rows_ != NULL) {
mem_tracker()->Release(matching_build_rows_->MemUsage());
matching_build_rows_.reset();
}
BlockingJoinNode::Close(state);
}
Status NestedLoopJoinNode::ConstructSingularBuildSide(RuntimeState* state) {
// Optimized path for a common subplan shape with a singular row src node on the build
// side that avoids expensive timers, virtual function calls, and other overhead.
DCHECK_EQ(child(1)->type(), TPlanNodeType::type::SINGULAR_ROW_SRC_NODE);
DCHECK(IsInSubplan());
RowBatch* batch = builder_->GetNextEmptyBatch();
bool eos;
RETURN_IF_ERROR(child(1)->GetNext(state, batch, &eos));
DCHECK_EQ(batch->num_rows(), 1);
DCHECK(eos);
DCHECK(!batch->needs_deep_copy());
builder_->AddBuildBatch(batch);
if (matching_build_rows_ != NULL) {
DCHECK_EQ(matching_build_rows_->num_bits(), 1);
matching_build_rows_->SetAllBits(false);
}
return Status::OK();
}
Status NestedLoopJoinNode::ResetMatchingBuildRows(RuntimeState* state, int64_t num_bits) {
// Reuse existing bitmap, expanding it if needed.
if (matching_build_rows_->num_bits() >= num_bits) {
matching_build_rows_->SetAllBits(false);
} else {
// Account for the additional memory used by the bitmap.
int64_t bitmap_size_increase =
Bitmap::MemUsage(num_bits) - matching_build_rows_->MemUsage();
if (!mem_tracker()->TryConsume(bitmap_size_increase)) {
return mem_tracker()->MemLimitExceeded(state,
"Could not expand bitmap in nested loop join", bitmap_size_increase);
}
matching_build_rows_->Reset(num_bits);
}
return Status::OK();
}
void NestedLoopJoinNode::ResetForProbe() {
DCHECK(build_batches_ != NULL);
build_row_iterator_ = build_batches_->Iterator();
current_build_row_idx_ = 0;
matched_probe_ = false;
}
Status NestedLoopJoinNode::GetNext(
RuntimeState* state, RowBatch* output_batch, bool* eos) {
DCHECK(!output_batch->AtCapacity());
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
*eos = false;
if (!HasValidProbeRow()) {
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) goto end;
}
switch (join_op_) {
case TJoinOp::INNER_JOIN:
case TJoinOp::CROSS_JOIN:
RETURN_IF_ERROR(GetNextInnerJoin(state, output_batch));
break;
case TJoinOp::LEFT_OUTER_JOIN:
RETURN_IF_ERROR(GetNextLeftOuterJoin(state, output_batch));
break;
case TJoinOp::LEFT_SEMI_JOIN:
RETURN_IF_ERROR(GetNextLeftSemiJoin(state, output_batch));
break;
case TJoinOp::LEFT_ANTI_JOIN:
RETURN_IF_ERROR(GetNextLeftAntiJoin(state, output_batch));
break;
case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
RETURN_IF_ERROR(GetNextNullAwareLeftAntiJoin(state, output_batch));
break;
case TJoinOp::RIGHT_OUTER_JOIN:
RETURN_IF_ERROR(GetNextRightOuterJoin(state, output_batch));
break;
case TJoinOp::RIGHT_SEMI_JOIN:
RETURN_IF_ERROR(GetNextRightSemiJoin(state, output_batch));
break;
case TJoinOp::RIGHT_ANTI_JOIN:
RETURN_IF_ERROR(GetNextRightAntiJoin(state, output_batch));
break;
case TJoinOp::FULL_OUTER_JOIN:
RETURN_IF_ERROR(GetNextFullOuterJoin(state, output_batch));
break;
default:
DCHECK(false) << "Unknown join type: " << join_op_;
}
end:
if (ReachedLimit()) {
int64_t extra_rows = rows_returned() - limit_;
DCHECK_GE(extra_rows, 0);
DCHECK_LE(extra_rows, output_batch->num_rows());
output_batch->set_num_rows(output_batch->num_rows() - extra_rows);
SetNumRowsReturned(limit_);
eos_ = true;
}
if (eos_) {
*eos = true;
probe_batch_->TransferResourceOwnership(output_batch);
build_batches_->TransferResourceOwnership(output_batch);
}
COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK();
}
Status NestedLoopJoinNode::GetNextInnerJoin(RuntimeState* state,
RowBatch* output_batch) {
while (!eos_) {
DCHECK(HasValidProbeRow());
bool return_output_batch;
RETURN_IF_ERROR(
FindBuildMatches(state, output_batch, &return_output_batch));
if (return_output_batch) return Status::OK();
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) break;
}
return Status::OK();
}
Status NestedLoopJoinNode::GetNextLeftOuterJoin(RuntimeState* state,
RowBatch* output_batch) {
while (!eos_) {
DCHECK(HasValidProbeRow());
bool return_output_batch;
RETURN_IF_ERROR(
FindBuildMatches(state, output_batch, &return_output_batch));
if (return_output_batch) return Status::OK();
if (!matched_probe_) RETURN_IF_ERROR(ProcessUnmatchedProbeRow(state, output_batch));
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) break;
}
return Status::OK();
}
Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
RowBatch* output_batch) {
ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data();
size_t num_join_conjuncts = join_conjuncts_.size();
DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!eos_) {
DCHECK(HasValidProbeRow());
while (!build_row_iterator_.AtEnd()) {
DCHECK(HasValidProbeRow());
CreateOutputRow(semi_join_staging_row_, current_probe_row_,
build_row_iterator_.GetRow());
build_row_iterator_.Next();
++current_build_row_idx_;
// This loop can go on for a long time if the conjuncts are very selective. Do
// expensive query maintenance after every N iterations.
if ((current_build_row_idx_ & (N - 1)) == 0) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
}
if (!EvalConjuncts(
join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) {
continue;
}
// A match is found. Create the output row from the probe row.
TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());
output_batch->CopyRow(current_probe_row_, output_row);
VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
output_batch->CommitLastRow();
IncrementNumRowsReturned(1);
if (ReachedLimit()) {
eos_ = true;
return Status::OK();
}
// Stop scanning the build rows for the current probe row. If we reach
// this point, we already have a match for this probe row.
break;
}
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) break;
}
return Status::OK();
}
Status NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state,
RowBatch* output_batch) {
ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data();
size_t num_join_conjuncts = join_conjuncts_.size();
DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!eos_) {
DCHECK(HasValidProbeRow());
while (!build_row_iterator_.AtEnd()) {
DCHECK(current_probe_row_ != NULL);
CreateOutputRow(semi_join_staging_row_, current_probe_row_,
build_row_iterator_.GetRow());
build_row_iterator_.Next();
++current_build_row_idx_;
// This loop can go on for a long time if the conjuncts are very selective. Do
// expensive query maintenance after every N iterations.
if ((current_build_row_idx_ & (N - 1)) == 0) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
}
if (EvalConjuncts(
join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) {
// Found a match for the probe row. This row will not be in the result.
matched_probe_ = true;
break;
}
}
if (!matched_probe_) RETURN_IF_ERROR(ProcessUnmatchedProbeRow(state, output_batch));
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) break;
}
return Status::OK();
}
Status NestedLoopJoinNode::GetNextNullAwareLeftAntiJoin(RuntimeState* state,
RowBatch* output_batch) {
return Status("Nested-loop join with null-aware anti join mode is not implemented");
}
Status NestedLoopJoinNode::GetNextRightOuterJoin(RuntimeState* state,
RowBatch* output_batch) {
while (!eos_ && HasMoreProbeRows()) {
DCHECK(HasValidProbeRow());
bool return_output_batch = false;
RETURN_IF_ERROR(
FindBuildMatches(state, output_batch, &return_output_batch));
if (return_output_batch) return Status::OK();
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) return Status::OK();
}
return ProcessUnmatchedBuildRows(state, output_batch);
}
Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
RowBatch* output_batch) {
ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data();
size_t num_join_conjuncts = join_conjuncts_.size();
DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
DCHECK(matching_build_rows_ != NULL);
const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!eos_) {
DCHECK(HasValidProbeRow());
while (!build_row_iterator_.AtEnd()) {
DCHECK(HasValidProbeRow());
// This loop can go on for a long time if the conjuncts are very selective.
// Do query maintenance every N iterations.
if ((current_build_row_idx_ & (N - 1)) == 0) {
if (ReachedLimit()) {
eos_ = true;
return Status::OK();
}
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
}
// Check if we already have a match for the build row.
if (matching_build_rows_->Get(current_build_row_idx_)) {
build_row_iterator_.Next();
++current_build_row_idx_;
continue;
}
CreateOutputRow(semi_join_staging_row_, current_probe_row_,
build_row_iterator_.GetRow());
// Evaluate the join conjuncts on the semi-join staging row.
if (!EvalConjuncts(
join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) {
build_row_iterator_.Next();
++current_build_row_idx_;
continue;
}
TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());
matching_build_rows_->Set(current_build_row_idx_, true);
output_batch->CopyRow(build_row_iterator_.GetRow(), output_row);
build_row_iterator_.Next();
++current_build_row_idx_;
VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
output_batch->CommitLastRow();
IncrementNumRowsReturned(1);
if (output_batch->AtCapacity()) return Status::OK();
}
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) break;
}
return Status::OK();
}
Status NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state,
RowBatch* output_batch) {
ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data();
size_t num_join_conjuncts = join_conjuncts_.size();
DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
DCHECK(matching_build_rows_ != NULL);
const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!eos_ && HasMoreProbeRows()) {
DCHECK(HasValidProbeRow());
while (!build_row_iterator_.AtEnd()) {
DCHECK(current_probe_row_ != NULL);
// This loop can go on for a long time if the conjuncts are very selective.
// Do query maintenance every N iterations.
if ((current_build_row_idx_ & (N - 1)) == 0) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
}
if (matching_build_rows_->Get(current_build_row_idx_)) {
build_row_iterator_.Next();
++current_build_row_idx_;
continue;
}
CreateOutputRow(semi_join_staging_row_, current_probe_row_,
build_row_iterator_.GetRow());
if (EvalConjuncts(
join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) {
matching_build_rows_->Set(current_build_row_idx_, true);
}
build_row_iterator_.Next();
++current_build_row_idx_;
}
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) return Status::OK();
}
return ProcessUnmatchedBuildRows(state, output_batch);
}
Status NestedLoopJoinNode::GetNextFullOuterJoin(RuntimeState* state,
RowBatch* output_batch) {
while (!eos_ && HasMoreProbeRows()) {
DCHECK(HasValidProbeRow());
bool return_output_batch;
RETURN_IF_ERROR(
FindBuildMatches(state, output_batch, &return_output_batch));
if (return_output_batch) return Status::OK();
if (!matched_probe_) {
RETURN_IF_ERROR(ProcessUnmatchedProbeRow(state, output_batch));
}
RETURN_IF_ERROR(NextProbeRow(state, output_batch));
if (output_batch->AtCapacity()) return Status::OK();
}
return ProcessUnmatchedBuildRows(state, output_batch);
}
Status NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state,
RowBatch* output_batch) {
DCHECK(!matched_probe_);
DCHECK(current_probe_row_ != NULL);
ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
size_t num_conjuncts = conjuncts_.size();
DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());
if (join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN) {
CreateOutputRow(output_row, current_probe_row_, NULL);
} else {
DCHECK(join_op_ == TJoinOp::LEFT_ANTI_JOIN) << "Unsupported join operator: " <<
join_op_;
output_batch->CopyRow(current_probe_row_, output_row);
}
// Evaluate all the other (non-join) conjuncts.
if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) {
VLOG_ROW << "match row:" << PrintRow(output_row, *row_desc());
output_batch->CommitLastRow();
IncrementNumRowsReturned(1);
if (ReachedLimit()) eos_ = true;
}
return Status::OK();
}
Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
RuntimeState* state, RowBatch* output_batch) {
if (!process_unmatched_build_rows_) {
// Reset the build row iterator to start processing the unmatched build rows.
build_row_iterator_ = build_batches_->Iterator();
current_build_row_idx_ = 0;
process_unmatched_build_rows_ = true;
}
ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
size_t num_conjuncts = conjuncts_.size();
DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
DCHECK(matching_build_rows_ != NULL);
const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!build_row_iterator_.AtEnd()) {
// This loop can go on for a long time if the conjuncts are very selective. Do query
// maintenance every N iterations.
if ((current_build_row_idx_ & (N - 1)) == 0) {
if (ReachedLimit()) {
eos_ = true;
return Status::OK();
}
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
}
if (matching_build_rows_->Get(current_build_row_idx_)) {
build_row_iterator_.Next();
++current_build_row_idx_;
continue;
}
// Current build row is unmatched.
TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());
if (join_op_ == TJoinOp::FULL_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_OUTER_JOIN) {
CreateOutputRow(output_row, NULL, build_row_iterator_.GetRow());
} else {
DCHECK(join_op_ == TJoinOp::RIGHT_ANTI_JOIN) << "Unsupported join operator: " <<
join_op_;
output_batch->CopyRow(build_row_iterator_.GetRow(), output_row);
}
build_row_iterator_.Next();
++current_build_row_idx_;
// Evaluate conjuncts that don't affect the matching rows of the join on the
// result row.
if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) {
VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
output_batch->CommitLastRow();
IncrementNumRowsReturned(1);
if (output_batch->AtCapacity()) return Status::OK();
}
}
eos_ = true;
return Status::OK();
}
Status NestedLoopJoinNode::FindBuildMatches(
RuntimeState* state, RowBatch* output_batch, bool* return_output_batch) {
*return_output_batch = false;
ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data();
size_t num_join_conjuncts = join_conjuncts_.size();
DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
size_t num_conjuncts = conjuncts_.size();
DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!build_row_iterator_.AtEnd()) {
DCHECK(current_probe_row_ != NULL);
TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());
CreateOutputRow(output_row, current_probe_row_, build_row_iterator_.GetRow());
build_row_iterator_.Next();
++current_build_row_idx_;
// This loop can go on for a long time if the conjuncts are very selective. Do
// expensive query maintenance after every N iterations.
if ((current_build_row_idx_ & (N - 1)) == 0) {
if (ReachedLimit()) {
eos_ = true;
*return_output_batch = true;
return Status::OK();
}
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
}
if (!EvalConjuncts(join_conjunct_evals, num_join_conjuncts, output_row)) {
continue;
}
matched_probe_ = true;
if (matching_build_rows_ != NULL) {
matching_build_rows_->Set(current_build_row_idx_ - 1, true);
}
if (!EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) continue;
VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
output_batch->CommitLastRow();
IncrementNumRowsReturned(1);
if (output_batch->AtCapacity()) {
*return_output_batch = true;
return Status::OK();
}
}
return Status::OK();
}
Status NestedLoopJoinNode::NextProbeRow(RuntimeState* state, RowBatch* output_batch) {
current_probe_row_ = NULL;
matched_probe_ = false;
while (probe_batch_pos_ == probe_batch_->num_rows()) {
probe_batch_->TransferResourceOwnership(output_batch);
probe_batch_pos_ = 0;
// If output_batch_ is at capacity after acquiring probe_batch_'s resources, we
// need to pass it up through the execution before getting a new probe batch.
// Otherwise, subsequent GetNext() calls to the probe input may cause the
// memory referenced by this batch to be deleted (see IMPALA-2191).
if (output_batch->AtCapacity()) return Status::OK();
if (probe_side_eos_) {
eos_ = !NeedToProcessUnmatchedBuildRows(join_op_);
return Status::OK();
} else {
RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_));
}
}
current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
// We have a valid probe row; reset the build row iterator.
build_row_iterator_ = build_batches_->Iterator();
current_build_row_idx_ = 0;
VLOG_ROW << "left row: " << GetLeftChildRowString(current_probe_row_);
return Status::OK();
}