blob: d99c6f7308ea434700dcae16861453aa5e52e327 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/iceberg-delete-node.h"
#include <sstream>
#include "exec/blocking-join-node.inline.h"
#include "exec/exec-node-util.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "util/debug-util.h"
#include "util/roaring-bitmap.h"
#include "util/runtime-profile-counters.h"
#include "gen-cpp/PlanNodes_types.h"
#include "common/names.h"
namespace impala {
Status IcebergDeletePlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
DCHECK(tnode.__isset.join_node);
DCHECK(tnode.join_node.__isset.iceberg_delete_node);
// TODO: IMPALA-12265: create the config only if it is necessary
RETURN_IF_ERROR(IcebergDeleteBuilderConfig::CreateConfig(state, tnode_->node_id,
tnode_->join_node.join_op, &build_row_desc(), &id_builder_config_));
return Status::OK();
}
void IcebergDeletePlanNode::Close() {
if (id_builder_config_ != nullptr) id_builder_config_->Close();
PlanNode::Close();
}
Status IcebergDeletePlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new IcebergDeleteNode(state, *this, state->desc_tbl()));
return Status::OK();
}
IcebergDeleteNode::IcebergDeleteNode(
RuntimeState* state, const IcebergDeletePlanNode& pnode, const DescriptorTbl& descs)
: BlockingJoinNode("IcebergDeleteNode", state->obj_pool(), pnode, descs) {}
IcebergDeleteNode::~IcebergDeleteNode() {}
Status IcebergDeleteNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
runtime_state_ = state;
if (!UseSeparateBuild(state->query_options())) {
const IcebergDeleteBuilderConfig& builder_config =
*static_cast<const IcebergDeletePlanNode&>(plan_node_).id_builder_config_;
builder_ = builder_config.CreateSink(buffer_pool_client(),
resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size,
state);
RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
runtime_profile()->PrependChild(builder_->profile());
}
auto& tuple_descs = probe_row_desc().tuple_descriptors();
auto& slot_descs = tuple_descs[0]->slots();
for (auto& slot : slot_descs) {
if (slot->virtual_column_type() == TVirtualColumnType::FILE_POSITION) {
pos_offset_ = slot->tuple_offset();
}
if (slot->virtual_column_type() == TVirtualColumnType::INPUT_FILE_NAME) {
file_path_offset_ = slot->tuple_offset();
}
}
return Status::OK();
}
Status IcebergDeleteNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedOpenEventAdder ea(this);
JoinBuilder* tmp_builder = nullptr;
RETURN_IF_ERROR(BlockingJoinNode::OpenImpl(state, &tmp_builder));
if (builder_ == nullptr) {
DCHECK(UseSeparateBuild(state->query_options()));
builder_ = dynamic_cast<IcebergDeleteBuilder*>(tmp_builder);
DCHECK(builder_ != nullptr);
}
// Check for errors and free expr result allocations before opening children.
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
// The prepare functions of probe expressions may have made result allocations
// implicitly (e.g. calling UdfBuiltins::Lower()). The probe expressions' expr result
// allocations need to be cleared now as they don't get cleared again till probing.
// Other exprs' result allocations are cleared in QueryMaintenance().
RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
ResetForProbe();
probe_state_ = ProbeState::PROBING_IN_BATCH;
return Status::OK();
}
Status IcebergDeleteNode::AcquireResourcesForBuild(RuntimeState* state) {
if (!buffer_pool_client()->is_registered()) {
RETURN_IF_ERROR(ClaimBufferReservation(state));
}
return Status::OK();
}
Status IcebergDeleteNode::Reset(RuntimeState* state, RowBatch* row_batch) {
builder_->Reset(nullptr);
return BlockingJoinNode::Reset(state, row_batch);
}
void IcebergDeleteNode::Close(RuntimeState* state) {
if (is_closed()) return;
// IMPALA-9737: free batches in case attached buffers need to be freed to
// transfer reservation to 'builder_'.
if (build_batch_ != nullptr) build_batch_->Reset();
if (probe_batch_ != nullptr) probe_batch_->Reset();
if (builder_ != nullptr) {
bool separate_build = UseSeparateBuild(state->query_options());
if (!separate_build || waited_for_build_) {
builder_->CloseFromProbe(state);
waited_for_build_ = false;
}
}
BlockingJoinNode::Close(state);
}
Status IcebergDeleteNode::NextProbeRowBatch(
RuntimeState* state, RowBatch* out_batch, bool* eos) {
DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
RETURN_IF_ERROR(NextProbeRowBatchFromChild(state, out_batch, eos));
return Status::OK();
}
Status IcebergDeleteNode::NextProbeRowBatchFromChild(
RuntimeState* state, RowBatch* out_batch, bool* eos) {
DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
*eos = false;
do {
// Loop until we find a non-empty row batch.
probe_batch_->TransferResourceOwnership(out_batch);
if (out_batch->AtCapacity()) {
// This out batch is full. Need to return it before getting the next batch.
probe_batch_pos_ = -1;
return Status::OK();
}
if (probe_side_eos_) {
probe_batch_pos_ = -1;
*eos = true;
return Status::OK();
}
RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_));
COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows());
} while (probe_batch_->num_rows() == 0);
ResetForProbe();
return Status::OK();
}
Status IcebergDeleteNode::ProcessProbeBatch(RowBatch* out_batch) {
DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_IN_BATCH);
DCHECK_NE(probe_batch_pos_, -1);
int rows_added = 0;
TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
SCOPED_TIMER(probe_timer_);
rows_added = ProcessProbeBatch(prefetch_mode, out_batch);
out_batch->CommitRows(rows_added);
return Status::OK();
}
Status IcebergDeleteNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
DCHECK(!out_batch->AtCapacity());
Status status = Status::OK();
*eos = false;
// Save the number of rows in case GetNext() is called with a non-empty batch,
// which can happen in a subplan.
int num_rows_before = out_batch->num_rows();
// This loop executes the 'probe_state_' state machine until either a full batch is
// produced, resources are attached to 'out_batch' that require flushing, or eos
// is reached (i.e. all rows are returned). The next call into GetNext() will resume
// execution of the state machine where the current call into GetNext() left off.
// See the definition of ProbeState for description of the state machine and states.
do {
DCHECK(status.ok());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
switch (probe_state_) {
case ProbeState::PROBING_IN_BATCH: {
// Finish processing rows in the current probe batch.
RETURN_IF_ERROR(ProcessProbeBatch(out_batch));
if (probe_batch_pos_ == probe_batch_->num_rows()) {
probe_state_ = ProbeState::PROBING_END_BATCH;
}
break;
}
case ProbeState::PROBING_END_BATCH: {
// Try to get the next row batch from the current probe input.
bool probe_eos;
RETURN_IF_ERROR(NextProbeRowBatch(state, out_batch, &probe_eos));
if (probe_batch_pos_ == 0) {
// Got a batch, need to process it.
probe_state_ = ProbeState::PROBING_IN_BATCH;
} else if (probe_eos) {
DCHECK_EQ(probe_batch_pos_, -1);
// Finished processing all the probe rows
RETURN_IF_ERROR(DoneProbing(state, out_batch));
probe_state_ = ProbeState::EOS;
} else {
// Got an empty batch with resources that we need to flush before getting the
// next batch.
DCHECK_EQ(probe_batch_pos_, -1);
}
break;
}
case ProbeState::EOS: {
// Ensure that all potential sources of output rows are exhausted.
DCHECK(probe_side_eos_);
*eos = true;
break;
}
default:
DCHECK(false) << "invalid probe_state_" << static_cast<int>(probe_state_);
break;
}
} while (!out_batch->AtCapacity() && !*eos);
int num_rows_added = out_batch->num_rows() - num_rows_before;
DCHECK_GE(num_rows_added, 0);
if (limit_ != -1 && rows_returned() + num_rows_added > limit_) {
// Truncate the row batch if we went over the limit.
num_rows_added = limit_ - rows_returned();
DCHECK_GE(num_rows_added, 0);
out_batch->set_num_rows(num_rows_before + num_rows_added);
probe_batch_->TransferResourceOwnership(out_batch);
*eos = true;
}
IncrementNumRowsReturned(num_rows_added);
COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK();
}
Status IcebergDeleteNode::DoneProbing(RuntimeState* state, RowBatch* batch) {
DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
DCHECK_EQ(probe_batch_pos_, -1);
VLOG(2) << "Probe Side Consumed\n" << NodeDebugString();
return Status::OK();
}
string IcebergDeleteNode::NodeDebugString() const {
stringstream ss;
ss << "IcebergDeleteNode (id=" << id() << " op=" << join_op_ << ")" << endl;
if (builder_ != nullptr) {
ss << "IcebergDeleteBuilder: " << builder_->DebugString();
}
return ss.str();
}
uint64_t IR_ALWAYS_INLINE IcebergDeleteNode::ProbeFilePosition(
const RowBatch::Iterator& probe_it) const {
DCHECK(!probe_it.AtEnd());
const TupleRow* current_probe_row = probe_it.Get();
return *current_probe_row->GetTuple(0)->GetBigIntSlot(pos_offset_);
}
int IR_ALWAYS_INLINE IcebergDeleteNode::CountRowsToCopy(const RoaringBitmap64* deletes,
int remaining_capacity, RoaringBitmap64::Iterator* deletes_it,
RowBatch::Iterator* probe_it) {
DCHECK(!probe_it->AtEnd());
int rows_to_copy = 0;
int rows_to_copy_max = std::min(probe_it->RemainingRows(), remaining_capacity);
DCHECK_GT(rows_to_copy_max, 0);
if (deletes == nullptr) {
probe_it->Advance(rows_to_copy_max);
return rows_to_copy_max;
}
uint64_t current_probe_pos = ProbeFilePosition(*probe_it);
probe_it->Next();
uint64_t next_deleted_pos = deletes_it->GetEqualOrLarger(current_probe_pos);
while (current_probe_pos < next_deleted_pos) {
++rows_to_copy;
if (rows_to_copy == rows_to_copy_max) break;
current_probe_pos = ProbeFilePosition(*probe_it);
probe_it->Next();
// If the rows are filtered in the SCAN then the position values may increase
// by more than one, so let's adjust 'next_deleted_pos'.
if (current_probe_pos > next_deleted_pos) {
next_deleted_pos = deletes_it->GetEqualOrLarger(current_probe_pos);
}
}
return rows_to_copy;
}
int IcebergDeleteNode::ProcessProbeBatch(
TPrefetchMode::type prefetch_mode, RowBatch* out_batch) {
DCHECK(!out_batch->AtCapacity());
DCHECK_GE(probe_batch_pos_, 0);
const int max_rows = out_batch->capacity() - out_batch->num_rows();
RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
RowBatch::Iterator out_batch_iterator(out_batch, out_batch->AddRow());
if (probe_batch_iterator.AtEnd()) return 0;
int remaining_capacity = max_rows;
StringValue* file_path =
probe_batch_iterator.Get()->GetTuple(0)->GetStringSlot(file_path_offset_);
const IcebergDeleteBuilder::DeleteRowHashTable& delete_rows_table =
builder_->deleted_rows();
const RoaringBitmap64* deletes = nullptr;
RoaringBitmap64::Iterator deletes_it;
auto it = delete_rows_table.find(*file_path);
if (it != delete_rows_table.end() && NeedToCheckBatch(it->second)) {
deletes = &it->second;
deletes_it.Init(*deletes);
}
while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0) {
DCHECK_EQ(*file_path,
*probe_batch_iterator.Get()->GetTuple(0)->GetStringSlot(file_path_offset_));
int probe_offset = probe_batch_iterator.RowNum();
int rows_to_copy = CountRowsToCopy(
deletes, remaining_capacity, &deletes_it,
&probe_batch_iterator);
if (rows_to_copy == 0) continue;
int dst_offset = out_batch_iterator.RowNum();
out_batch->CopyRows(
probe_batch_.get(), rows_to_copy, probe_offset, dst_offset);
out_batch_iterator.Advance(rows_to_copy);
remaining_capacity -= rows_to_copy;
}
// Update where we are in the probe batch.
probe_batch_pos_ = probe_batch_iterator.RowNum();
int num_rows_added = max_rows - remaining_capacity;
DCHECK_GE(probe_batch_pos_, 0);
DCHECK_LE(probe_batch_pos_, probe_batch_->capacity());
DCHECK_LE(num_rows_added, max_rows);
return num_rows_added;
}
bool IcebergDeleteNode::NeedToCheckBatch(const RoaringBitmap64& deletes) {
DCHECK_GE(probe_batch_pos_, 0);
if (deletes.IsEmpty()) return false;
TupleRow* first_row = probe_batch_->GetRow(probe_batch_pos_);
int64_t* first_row_pos = first_row->GetTuple(0)->GetBigIntSlot(pos_offset_);
if (*first_row_pos > deletes.Max()) return false;
TupleRow* last_row = probe_batch_->GetRow(probe_batch_->num_rows() - 1);
int64_t* last_row_pos = last_row->GetTuple(0)->GetBigIntSlot(pos_offset_);
if (*last_row_pos < deletes.Min()) return false;
return true;
}
} // namespace impala