blob: 63cb689a4b63a66b8780dd9c0989a47d2fe368b8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/paimon/paimon-jni-scan-node.h"
#include "common/status.h"
#include "exec/exec-node-util.h"
#include "exec/exec-node.inline.h"
#include "exec/paimon/paimon-jni-row-reader.h"
#include "exec/paimon/paimon-jni-scanner.h"
#include "exec/paimon/paimon-scan-plan-node.h"
#include "rpc/thrift-util.h"
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "util/jni-util.h"
#include "util/periodic-counter-updater.h"
#include "util/runtime-profile-counters.h"
#include <jni.h>
#include <vector>
#include <arrow/c/bridge.h>
#include <glog/logging.h>
using namespace impala;
PaimonJniScanNode::PaimonJniScanNode(
ObjectPool* pool, const PaimonScanPlanNode& pnode, const DescriptorTbl& descs)
: ScanNode(pool, pnode, descs),
tuple_id_(pnode.tnode_->paimon_table_scan_node.tuple_id),
table_name_(pnode.tnode_->paimon_table_scan_node.table_name),
splits_empty_(false),
paimon_last_arrow_record_batch_consumed_bytes_(0),
arrow_record_batch_row_count_(0),
arrow_record_batch_row_index_(0) {}
Status PaimonJniScanNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::Prepare(state));
DCHECK(scan_range_params_ != NULL)
<< "Must call SetScanRanges() before calling Prepare()";
scan_open_timer_ = ADD_TIMER(runtime_profile(), "ScanOpenTime");
paimon_api_scan_timer_ = ADD_TIMER(runtime_profile(), "PaimonApiScanTime");
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
if (tuple_desc_ == nullptr) {
return Status(
"Failed to get tuple descriptor, tuple id: " + std::to_string(tuple_id_));
}
arrow_batch_mem_tracker_.reset(
new MemTracker(-1, "Arrow Batch", this->mem_tracker(), false));
/// Construct the jni scan param, the param will be used in PaimonJniScanner.
paimon_jni_scan_param_.__set_paimon_table_obj(
plan_node_.tnode_->paimon_table_scan_node.paimon_table_obj);
/// update projection id, will get the top-level field ids of each tuple.
std::vector<int32_t> field_ids;
RETURN_IF_ERROR(CollectProjectionFieldIds(tuple_desc_, field_ids));
paimon_jni_scan_param_.__set_projection(field_ids);
LOG(INFO) << table_name_ << " Contains " << field_ids.size() << " field ids."
<< std::endl;
paimon_jni_scan_param_.__set_mem_limit_bytes(
arrow_batch_mem_tracker_->GetLowestLimit(MemLimit::HARD));
paimon_jni_scan_param_.__set_batch_size(state->batch_size());
paimon_jni_scan_param_.__set_fragment_id(state->fragment_instance_id());
std::vector<std::string> scan_range_vector;
for (const ScanRangeParamsPB& params : *scan_range_params_) {
DCHECK(params.scan_range().has_file_metadata());
const std::string& split = params.scan_range().file_metadata();
scan_range_vector.push_back(split);
}
paimon_jni_scan_param_.__set_splits(scan_range_vector);
/// Check if splits is empty
splits_empty_ = scan_range_vector.empty();
impala::ThriftSerializer serializer(false);
/// serialize the jni scan param to binary.
RETURN_IF_ERROR(serializer.SerializeToString<TPaimonJniScanParam>(
&paimon_jni_scan_param_, &paimon_jni_scan_param_serialized_));
return Status::OK();
}
Status PaimonJniScanNode::Open(RuntimeState* state) {
SCOPED_TIMER(scan_open_timer_);
RETURN_IF_ERROR(ScanNode::Open(state));
/// Skip if splits is empty.
if (splits_empty_) {
return Status::OK();
}
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == nullptr) return Status("Failed to get/create JVM");
jni_scanner_.reset(
new PaimonJniScanner(paimon_jni_scan_param_serialized_, tuple_desc_, table_name_));
RETURN_IF_ERROR(jni_scanner_->Init(env));
paimon_row_reader_.reset(new PaimonJniRowReader());
SCOPED_TIMER(paimon_api_scan_timer_);
RETURN_IF_ERROR(jni_scanner_->ScanTable(env));
return Status::OK();
}
Status PaimonJniScanNode::CollectProjectionFieldIds(
const TupleDescriptor* tuple_desc_, vector<int32_t>& projection) {
for (const SlotDescriptor* slot_desc : tuple_desc_->slots()) {
int field_id = -1;
if (slot_desc->col_path().size() == 1) {
// Top level slots have ColumnDescriptors that store the field ids.
field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
} else {
// TODO: support the nested field later.
return Status("Paimon Scanner currently doesn't support nested type now");
}
DCHECK_NE(field_id, -1);
projection.push_back(field_id);
}
return Status::OK();
}
Status PaimonJniScanNode::GetNextBatchIfNeeded(bool* is_empty_batch) {
/// Check if we need to fetch next arrow record batch from jni. if yes,
/// fetch it.
if (paimon_arrow_record_batch_holder_ == nullptr
|| arrow_record_batch_row_index_ >= arrow_record_batch_row_count_) {
SCOPED_TIMER(paimon_api_scan_timer_);
JNIEnv* env = JniUtil::GetJNIEnv();
struct ArrowArray* array;
struct ArrowSchema* schema;
long row_count;
long offheap_consumed_bytes;
DCHECK(is_empty_batch != nullptr);
RETURN_IF_ERROR(jni_scanner_->GetNextBatchDirect(
env, &array, &schema, &row_count, &offheap_consumed_bytes));
*is_empty_batch = false;
if (row_count > 0) {
auto resultImportVectorSchemaRoot = arrow::ImportRecordBatch(array, schema);
/// since the result type is arrow::Status, need to check status manually
/// without using macro RETURN_IF_ERROR
if (!resultImportVectorSchemaRoot.ok()) {
return Status(resultImportVectorSchemaRoot.status().message());
}
paimon_arrow_record_batch_holder_ = resultImportVectorSchemaRoot.ValueUnsafe();
arrow_record_batch_row_count_ = row_count;
DCHECK_EQ(row_count, paimon_arrow_record_batch_holder_->num_rows());
arrow_record_batch_row_index_ = 0;
/// update allocated offheap memory reported from Jni.
/// need to do for each batch since we need to check the
/// memory usage hit the limit of mem tracker.
OffheapTrackFree();
RETURN_IF_ERROR(OffheapTrackAllocation(offheap_consumed_bytes));
} else {
/// No more batches to fetch.
paimon_arrow_record_batch_holder_ = nullptr;
arrow_record_batch_row_count_ = 0;
arrow_record_batch_row_index_ = 0;
OffheapTrackFree();
}
}
*is_empty_batch = arrow_record_batch_row_count_ > 0
&& arrow_record_batch_row_index_ < arrow_record_batch_row_count_;
return Status::OK();
}
Status PaimonJniScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
/// Return eos if empty splits or reached limit.
if (splits_empty_ || ReachedLimit()) {
*eos = true;
return Status::OK();
}
/// Set eos to false initially for the batch.
*eos = false;
/// Allocate buffer for RowBatch and init the tuple
uint8_t* tuple_buffer;
int64_t tuple_buffer_size;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, &tuple_buffer));
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
tuple->Init(tuple_buffer_size);
SCOPED_TIMER(materialize_tuple_timer());
while (!ReachedLimit() && !row_batch->AtCapacity()) {
/// Break the loop if is canceled, or maintainance is needed.
if (state->is_cancelled() || !QueryMaintenance(state).ok()) {
break;
}
int row_idx = row_batch->AddRow();
TupleRow* tuple_row = row_batch->GetRow(row_idx);
tuple_row->SetTuple(0, tuple);
/// Get the next arrow batch from 'org.apache.impala.util.paimon.PaimonJniScanner'
/// if the current arrow batch is already consumed.
bool fetched;
RETURN_IF_ERROR(GetNextBatchIfNeeded(&fetched));
/// When fetched is false, there are no more arrow batches to read
if (!fetched) {
*eos = true;
break;
}
DCHECK(paimon_arrow_record_batch_holder_ != nullptr);
COUNTER_ADD(rows_read_counter(), 1);
RETURN_IF_ERROR(paimon_row_reader_->MaterializeTuple(
*paimon_arrow_record_batch_holder_, arrow_record_batch_row_index_, tuple_desc_,
tuple, row_batch->tuple_data_pool(), state));
/// Evaluate conjuncts on this tuple row
if (ExecNode::EvalConjuncts(
conjunct_evals().data(), conjunct_evals().size(), tuple_row)) {
row_batch->CommitLastRow();
tuple = reinterpret_cast<Tuple*>(
reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
IncrementNumRowsReturned(1);
} else {
/// Reset the null bits, everyhing else will be overwritten
Tuple::ClearNullBits(
tuple, tuple_desc_->null_bytes_offset(), tuple_desc_->num_null_bytes());
}
/// will process the next row of arrow RecordBatch in next iteration.
arrow_record_batch_row_index_++;
}
if (ReachedLimit()) {
*eos = true;
}
return Status::OK();
}
void PaimonJniScanNode::Close(RuntimeState* state) {
if (is_closed()) return;
/// Eagerly release arrow batch before calling
/// frontend method close, in case of
/// mem leak. leak was observed when query was
/// canceled.
if (paimon_arrow_record_batch_holder_ != nullptr) {
paimon_arrow_record_batch_holder_.reset();
arrow_record_batch_row_index_ = 0;
arrow_record_batch_row_count_ = 0;
}
/// Close jni scanner if splits is not empty.
if (!splits_empty_) {
jni_scanner_->Close(state);
}
OffheapTrackFree();
if (arrow_batch_mem_tracker_ != nullptr) {
arrow_batch_mem_tracker_->Close();
}
ScanNode::Close(state);
}