blob: bb4b4c9b2a40d2f1d6b70fff20b7de74d6c45cba [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/iceberg-metadata/iceberg-metadata-scan-node.h"
#include "exec/exec-node.inline.h"
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "service/frontend.h"
#include "util/jni-util.h"
using namespace impala;
Status IcebergMetadataScanPlanNode::CreateExecNode(
RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new IcebergMetadataScanNode(pool, *this, state->desc_tbl()));
return Status::OK();
}
IcebergMetadataScanNode::IcebergMetadataScanNode(ObjectPool* pool,
const IcebergMetadataScanPlanNode& pnode, const DescriptorTbl& descs)
: ScanNode(pool, pnode, descs),
tuple_id_(pnode.tnode_->iceberg_scan_metadata_node.tuple_id),
table_name_(pnode.tnode_->iceberg_scan_metadata_node.table_name),
metadata_table_name_(pnode.tnode_->iceberg_scan_metadata_node.metadata_table_name) {}
Status IcebergMetadataScanNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::Prepare(state));
scan_prepare_timer_ = ADD_TIMER(runtime_profile(), "ScanPrepareTime");
iceberg_api_scan_timer_ = ADD_TIMER(runtime_profile(), "IcebergApiScanTime");
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
if (tuple_desc_ == nullptr) {
return Status("Failed to get tuple descriptor, tuple id: " +
std::to_string(tuple_id_));
}
return Status::OK();
}
Status IcebergMetadataScanNode::Open(RuntimeState* state) {
SCOPED_TIMER(scan_prepare_timer_);
RETURN_IF_ERROR(ScanNode::Open(state));
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == nullptr) return Status("Failed to get/create JVM");
// Get the FeTable object from the Frontend
jobject jtable;
RETURN_IF_ERROR(GetCatalogTable(&jtable));
metadata_scanner_.reset(new IcebergMetadataScanner(jtable, metadata_table_name_.c_str(),
tuple_desc_));
RETURN_IF_ERROR(metadata_scanner_->Init(env));
iceberg_row_reader_.reset(new IcebergRowReader(this, metadata_scanner_.get()));
SCOPED_TIMER(iceberg_api_scan_timer_);
RETURN_IF_ERROR(metadata_scanner_->ScanMetadataTable(env));
return Status::OK();
}
Status IcebergMetadataScanNode::GetNext(RuntimeState* state, RowBatch* row_batch,
bool* eos) {
RETURN_IF_CANCELLED(state);
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == nullptr) return Status("Failed to get/create JVM");
SCOPED_TIMER(materialize_tuple_timer());
// Allocate buffer for RowBatch and init the tuple
uint8_t* tuple_buffer;
int64_t tuple_buffer_size;
RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size,
&tuple_buffer));
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
tuple->Init(tuple_buffer_size);
while (!ReachedLimit() && !row_batch->AtCapacity()) {
int row_idx = row_batch->AddRow();
TupleRow* tuple_row = row_batch->GetRow(row_idx);
tuple_row->SetTuple(0, tuple);
// Get the next row from 'org.apache.impala.util.IcebergMetadataScanner'
jobject struct_like_row;
RETURN_IF_ERROR(metadata_scanner_->GetNext(env, &struct_like_row));
// When 'struct_like_row' is null, there are no more rows to read
if (struct_like_row == nullptr) {
*eos = true;
return Status::OK();
}
// Translate a StructLikeRow from Iceberg to Tuple
RETURN_IF_ERROR(iceberg_row_reader_->MaterializeTuple(env, struct_like_row,
tuple_desc_, tuple, row_batch->tuple_data_pool(), state));
env->DeleteLocalRef(struct_like_row);
RETURN_ERROR_IF_EXC(env);
COUNTER_ADD(rows_read_counter(), 1);
// Evaluate conjuncts on this tuple row
if (ExecNode::EvalConjuncts(conjunct_evals().data(),
conjunct_evals().size(), tuple_row)) {
row_batch->CommitLastRow();
tuple = reinterpret_cast<Tuple*>(
reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
IncrementNumRowsReturned(1);
} else {
// Reset the null bits, everyhing else will be overwritten
Tuple::ClearNullBits(tuple, tuple_desc_->null_bytes_offset(),
tuple_desc_->num_null_bytes());
}
}
if (ReachedLimit()) *eos = true;
return Status::OK();
}
void IcebergMetadataScanNode::Close(RuntimeState* state) {
if (is_closed()) return;
metadata_scanner_->Close(state);
ScanNode::Close(state);
}
Status IcebergMetadataScanNode::GetCatalogTable(jobject* jtable) {
Frontend* fe = ExecEnv::GetInstance()->frontend();
RETURN_IF_ERROR(fe->GetCatalogTable(table_name_, jtable));
return Status::OK();
}