| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/iceberg-merge-node.h" |
| |
| #include <string> |
| #include <string_view> |
| #include <vector> |
| |
| #include "common/status.h" |
| #include "exec/exec-node-util.h" |
| #include "exec/exec-node.inline.h" |
| #include "exprs/scalar-expr-evaluator.h" |
| #include "exprs/scalar-expr.h" |
| #include "gen-cpp/DataSinks_types.h" |
| #include "gen-cpp/PlanNodes_types.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/fragment-state.h" |
| #include "runtime/raw-value.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/tuple-row.h" |
| #include "runtime/tuple.h" |
| #include "util/debug-util.h" |
| #include "util/runtime-profile-counters.h" |
| |
| namespace impala { |
| |
| Status IcebergMergePlanNode::Init(const TPlanNode& tnode, FragmentState* state) { |
| RETURN_IF_ERROR(PlanNode::Init(tnode, state)); |
| ObjectPool* pool = state->obj_pool(); |
| |
| for (auto& tmerge_case : tnode.merge_node.cases) { |
| auto* merge_case_plan = pool->Add(new IcebergMergeCasePlan()); |
| RETURN_IF_ERROR(merge_case_plan->Init(tmerge_case, state, row_descriptor_)); |
| merge_case_plans_.push_back(merge_case_plan); |
| } |
| RETURN_IF_ERROR(ScalarExpr::Create( |
| tnode.merge_node.row_present, *row_descriptor_, state, pool, &row_present_)); |
| |
| RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.delete_meta_exprs, |
| *row_descriptor_, state, pool, &delete_meta_exprs_)); |
| |
| RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.partition_meta_exprs, |
| *row_descriptor_, state, pool, &partition_meta_exprs_)); |
| |
| merge_action_tuple_id_ = tnode.merge_node.merge_action_tuple_id; |
| target_tuple_id_ = tnode.merge_node.target_tuple_id; |
| |
| return Status::OK(); |
| } |
| |
| Status IcebergMergeCasePlan::Init(const TIcebergMergeCase& tmerge_case, |
| FragmentState* state, const RowDescriptor* row_desc) { |
| ObjectPool* pool = state->obj_pool(); |
| RETURN_IF_ERROR(ScalarExpr::Create( |
| tmerge_case.output_expressions, *row_desc, state, pool, &output_exprs_)); |
| RETURN_IF_ERROR(ScalarExpr::Create( |
| tmerge_case.filter_conjuncts, *row_desc, state, pool, &filter_conjuncts_)); |
| case_type_ = tmerge_case.type; |
| match_type_ = tmerge_case.match_type; |
| return Status::OK(); |
| } |
| |
| void IcebergMergeCasePlan::Close() { |
| ScalarExpr::Close(output_exprs_); |
| ScalarExpr::Close(filter_conjuncts_); |
| } |
| |
| Status IcebergMergePlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const { |
| ObjectPool* pool = state->obj_pool(); |
| *node = pool->Add(new IcebergMergeNode(pool, *this, state->desc_tbl())); |
| return Status::OK(); |
| } |
| |
| void IcebergMergePlanNode::Close() { |
| for (IcebergMergeCasePlan* merge_case_plan : merge_case_plans_) { |
| merge_case_plan->Close(); |
| } |
| PlanNode::Close(); |
| } |
| |
| IcebergMergeNode::IcebergMergeNode( |
| ObjectPool* pool, const IcebergMergePlanNode& pnode, const DescriptorTbl& descs) |
| : ExecNode(pool, pnode, descs), |
| child_row_batch_(nullptr), |
| child_row_idx_(0), |
| child_eos_(false), |
| row_present_(pnode.row_present_), |
| delete_meta_exprs_(pnode.delete_meta_exprs_), |
| partition_meta_exprs_(pnode.partition_meta_exprs_) { |
| DCHECK(pnode.merge_action_tuple_id_ != -1); |
| DCHECK(pnode.target_tuple_id_ != -1); |
| |
| merge_action_tuple_idx_ = row_descriptor_.GetTupleIdx(pnode.merge_action_tuple_id_); |
| target_tuple_idx_ = row_descriptor_.GetTupleIdx(pnode.target_tuple_id_); |
| |
| for (auto* merge_case_plan : pnode.merge_case_plans_) { |
| auto merge_case = pool->Add(new IcebergMergeCase(merge_case_plan)); |
| all_cases_.push_back(merge_case); |
| switch (merge_case->match_type_) { |
| case TMergeMatchType::MATCHED: |
| matched_cases_.push_back(merge_case); |
| break; |
| case TMergeMatchType::NOT_MATCHED_BY_TARGET: |
| not_matched_by_target_cases_.push_back(merge_case); |
| break; |
| case TMergeMatchType::NOT_MATCHED_BY_SOURCE: |
| not_matched_by_source_cases_.push_back(merge_case); |
| break; |
| } |
| } |
| } |
| |
| Status IcebergMergeNode::Prepare(RuntimeState* state) { |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| RETURN_IF_ERROR(ExecNode::Prepare(state)); |
| |
| RETURN_IF_ERROR(ScalarExprEvaluator::Create(*row_present_, state, state->obj_pool(), |
| expr_perm_pool_.get(), expr_results_pool_.get(), &row_present_evaluator_)); |
| |
| RETURN_IF_ERROR( |
| ScalarExprEvaluator::Create(delete_meta_exprs_, state, state->obj_pool(), |
| expr_perm_pool_.get(), expr_results_pool_.get(), &delete_meta_evaluators_)); |
| |
| RETURN_IF_ERROR( |
| ScalarExprEvaluator::Create(partition_meta_exprs_, state, state->obj_pool(), |
| expr_perm_pool_.get(), expr_results_pool_.get(), &partition_meta_evaluators_)); |
| |
| for (auto* merge_case : all_cases_) { |
| RETURN_IF_ERROR(merge_case->Prepare(state, *this)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status IcebergMergeNode::Open(RuntimeState* state) { |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| ScopedOpenEventAdder ea(this); |
| RETURN_IF_ERROR(ExecNode::Open(state)); |
| RETURN_IF_ERROR(child(0)->Open(state)); |
| child_row_batch_.reset( |
| new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); |
| |
| RETURN_IF_ERROR(row_present_evaluator_->Open(state)); |
| RETURN_IF_ERROR(ScalarExprEvaluator::Open(delete_meta_evaluators_, state)); |
| RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_meta_evaluators_, state)); |
| |
| for (auto* merge_case : all_cases_) { |
| RETURN_IF_ERROR(merge_case->Open(state)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status IcebergMergeNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { |
| SCOPED_TIMER(runtime_profile_->total_time_counter()); |
| ScopedGetNextEventAdder ea(this, eos); |
| RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); |
| do { |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(QueryMaintenance(state)); |
| if (child_row_batch_->num_rows() == 0) { |
| RETURN_IF_ERROR(child(0)->GetNext(state, child_row_batch_.get(), &child_eos_)); |
| } |
| RETURN_IF_ERROR(EvaluateCases(row_batch)); |
| COUNTER_SET(rows_returned_counter_, rows_returned()); |
| *eos = |
| ReachedLimit() || (child_row_idx_ == child_row_batch_->num_rows() && child_eos_); |
| if (*eos || child_row_idx_ == child_row_batch_->num_rows()) { |
| child_row_idx_ = 0; |
| if (previous_row_ != nullptr) { |
| // Materialize it as a string |
| previous_row_file_path_materialized_ = std::string(previous_row_file_path_); |
| previous_row_file_path_ = std::string_view(previous_row_file_path_materialized_); |
| } |
| child_row_batch_->Reset(); |
| } |
| } while (!*eos && !row_batch->AtCapacity()); |
| return Status::OK(); |
| } |
| |
| Status IcebergMergeNode::EvaluateCases(RowBatch* output_batch) { |
| FOREACH_ROW(child_row_batch_.get(), child_row_idx_, iter) { |
| ++child_row_idx_; |
| auto row = iter.Get(); |
| |
| if (IsDuplicateTargetTuplePtr(row) && IsDuplicateTargetRowIdent(row)) { |
| return Status( |
| "Duplicate row found: one target table row matched more than one source row"); |
| } |
| SavePreviousRowPtrAndIdent(row); |
| |
| auto row_present = row_present_evaluator_->GetTinyIntVal(row).val; |
| IcebergMergeCase* selected_case = nullptr; |
| |
| |
| std::vector<IcebergMergeCase*>* cases = nullptr; |
| switch (row_present) { |
| case TIcebergMergeRowPresent::BOTH: { |
| cases = &matched_cases_; |
| break; |
| } |
| case TIcebergMergeRowPresent::SOURCE: { |
| cases = ¬_matched_by_target_cases_; |
| break; |
| } |
| case TIcebergMergeRowPresent::TARGET: { |
| cases = ¬_matched_by_source_cases_; |
| break; |
| } |
| default: |
| return Status("Invalid row presence value in MERGE statement's result set."); |
| } |
| for (auto* merge_case : *cases) { |
| if (CheckCase(merge_case, row)) { |
| selected_case = merge_case; |
| break; |
| } |
| } |
| |
| if (!selected_case) continue; |
| // Add a new row to output_batch |
| AddRow(output_batch, selected_case, row); |
| if (ReachedLimit() || output_batch->AtCapacity()) return Status::OK(); |
| } |
| return Status::OK(); |
| } |
| |
| bool IcebergMergeNode::CheckCase(const IcebergMergeCase* merge_case, TupleRow* row) { |
| return EvalConjuncts( |
| merge_case->filter_evaluators_.data(), merge_case->filter_evaluators_.size(), row); |
| } |
| |
| void IcebergMergeNode::AddRow( |
| RowBatch* output_batch, IcebergMergeCase* merge_case, TupleRow* row) { |
| TupleRow* dst_row = output_batch->GetRow(output_batch->AddRow()); |
| |
| auto* target_tuple = |
| Tuple::Create(row_descriptor_.tuple_descriptors()[target_tuple_idx_]->byte_size(), |
| output_batch->tuple_data_pool()); |
| auto* merge_action_tuple = Tuple::Create( |
| row_descriptor_.tuple_descriptors()[merge_action_tuple_idx_]->byte_size(), |
| output_batch->tuple_data_pool()); |
| |
| dst_row->SetTuple(target_tuple_idx_, target_tuple); |
| dst_row->SetTuple(merge_action_tuple_idx_, merge_action_tuple); |
| |
| for (int i = 0; i < row_descriptor_.tuple_descriptors().size(); i++) { |
| if (i != target_tuple_idx_ && i != merge_action_tuple_idx_) { |
| dst_row->SetTuple(i, nullptr); |
| } |
| } |
| |
| target_tuple->MaterializeExprs<false, false>(row, |
| *row_descriptor_.tuple_descriptors()[target_tuple_idx_], |
| merge_case->combined_evaluators_, output_batch->tuple_data_pool()); |
| |
| TIcebergMergeSinkAction::type action = merge_case->SinkAction(); |
| |
| RawValue::WriteNonNullPrimitive( |
| &action, merge_action_tuple, merge_action_tuple_type_, nullptr); |
| |
| output_batch->CommitLastRow(); |
| IncrementNumRowsReturned(1); |
| } |
| |
| bool IcebergMergeNode::IsDuplicateTargetTuplePtr(TupleRow* actual_row) { |
| if (previous_row_ == nullptr) { return false; } |
| auto previous_row_target_tuple = previous_row_->GetTuple(target_tuple_idx_); |
| if (previous_row_target_tuple == nullptr) { return false; } |
| auto actual_row_target_tuple = actual_row->GetTuple(target_tuple_idx_); |
| return previous_row_target_tuple == actual_row_target_tuple; |
| } |
| |
| bool IcebergMergeNode::IsDuplicateTargetRowIdent(TupleRow* actual_row) { |
| auto file_path_sv = delete_meta_evaluators_[0]->GetStringVal(actual_row); |
| auto file_pos = delete_meta_evaluators_[1]->GetBigIntVal(actual_row); |
| if (previous_row_file_pos_ != file_pos.val) { return false; } |
| auto file_path = |
| std::string_view(reinterpret_cast<const char*>(file_path_sv.ptr), file_path_sv.len); |
| return file_path == previous_row_file_path_; |
| } |
| |
| void IcebergMergeNode::SavePreviousRowPtrAndIdent(TupleRow* actual_row) { |
| impala_udf::StringVal file_path_sv = |
| delete_meta_evaluators_[0]->GetStringVal(actual_row); |
| auto file_pos = delete_meta_evaluators_[1]->GetBigIntVal(actual_row); |
| previous_row_file_pos_ = file_pos.val; |
| previous_row_file_path_ = |
| std::string_view(reinterpret_cast<char*>(file_path_sv.ptr), file_path_sv.len); |
| previous_row_ = actual_row; |
| } |
| |
| Status IcebergMergeNode::Reset(RuntimeState* state, RowBatch* row_batch) { |
| child_row_batch_->TransferResourceOwnership(row_batch); |
| child_row_idx_ = 0; |
| child_eos_ = false; |
| previous_row_ = nullptr; |
| previous_row_file_path_ = {}; |
| previous_row_file_path_materialized_ = {}; |
| previous_row_file_pos_ = -1; |
| return ExecNode::Reset(state, row_batch); |
| } |
| |
| void IcebergMergeNode::Close(RuntimeState* state) { |
| if (is_closed()) return; |
| child_row_batch_.reset(); |
| for (auto merge_case : all_cases_) { |
| merge_case->Close(state); |
| } |
| row_present_evaluator_->Close(state); |
| ScalarExprEvaluator::Close(delete_meta_evaluators_, state); |
| ScalarExprEvaluator::Close(partition_meta_evaluators_, state); |
| |
| row_present_->Close(); |
| ScalarExpr::Close(delete_meta_exprs_); |
| ScalarExpr::Close(partition_meta_exprs_); |
| |
| ExecNode::Close(state); |
| } |
| |
| const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::DeleteMetaEvals() { |
| return delete_meta_evaluators_; |
| } |
| |
| const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::PartitionMetaEvals() { |
| return partition_meta_evaluators_; |
| } |
| |
| IcebergMergeCase::IcebergMergeCase(const IcebergMergeCasePlan* merge_case_plan) |
| : filter_conjuncts_(merge_case_plan->filter_conjuncts_), |
| output_exprs_(merge_case_plan->output_exprs_), |
| case_type_(merge_case_plan->case_type_), |
| match_type_(merge_case_plan->match_type_) {} |
| |
| Status IcebergMergeCase::Prepare(RuntimeState* state, IcebergMergeNode& parent) { |
| RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_exprs_, state, state->obj_pool(), |
| parent.expr_perm_pool(), parent.expr_results_pool(), &output_evaluators_)); |
| RETURN_IF_ERROR(ScalarExprEvaluator::Create(filter_conjuncts_, state, state->obj_pool(), |
| parent.expr_perm_pool(), parent.expr_results_pool(), &filter_evaluators_)); |
| /// Combining output expression, Iceberg position related columns, and |
| /// Iceberg partition related columns into one vector. The order of the evaluators |
| /// matches the order of expressions in the merge query. |
| combined_evaluators_.insert( |
| combined_evaluators_.end(), output_evaluators_.begin(), output_evaluators_.end()); |
| combined_evaluators_.insert(combined_evaluators_.end(), |
| parent.DeleteMetaEvals().begin(), parent.DeleteMetaEvals().end()); |
| combined_evaluators_.insert(combined_evaluators_.end(), |
| parent.PartitionMetaEvals().begin(), parent.PartitionMetaEvals().end()); |
| return Status::OK(); |
| } |
| |
| Status IcebergMergeCase::Open(RuntimeState* state) { |
| RETURN_IF_ERROR(ScalarExprEvaluator::Open(filter_evaluators_, state)); |
| RETURN_IF_ERROR(ScalarExprEvaluator::Open(output_evaluators_, state)); |
| return Status::OK(); |
| } |
| |
| void IcebergMergeCase::Close(RuntimeState* state) { |
| ScalarExprEvaluator::Close(filter_evaluators_, state); |
| ScalarExprEvaluator::Close(output_evaluators_, state); |
| } |
| |
| } // namespace impala |