| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "paimon_jni_reader.h" |
| |
| #include <map> |
| |
| #include "core/types.h" |
| #include "format/jni/jni_data_bridge.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/runtime_state.h" |
| |
| namespace doris { |
| class RuntimeProfile; |
| class RuntimeState; |
| class Block; |
| } // namespace doris |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| |
| const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon."; |
| const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop."; |
| |
| PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, |
| RuntimeState* state, RuntimeProfile* profile, |
| const TFileRangeDesc& range, |
| const TFileScanRangeParams* range_params) |
| : JniReader( |
| file_slot_descs, state, profile, "org/apache/doris/paimon/PaimonJniScanner", |
| [&]() { |
| std::vector<std::string> column_names; |
| std::vector<std::string> column_types; |
| for (const auto& desc : file_slot_descs) { |
| column_names.emplace_back(desc->col_name()); |
| column_types.emplace_back( |
| JniDataBridge::get_jni_type_with_different_string(desc->type())); |
| } |
| const auto& paimon_params = range.table_format_params.paimon_params; |
| std::map<String, String> params; |
| params["paimon_split"] = paimon_params.paimon_split; |
| if (range_params->__isset.paimon_predicate && |
| !range_params->paimon_predicate.empty()) { |
| params["paimon_predicate"] = range_params->paimon_predicate; |
| } else if (paimon_params.__isset.paimon_predicate) { |
| params["paimon_predicate"] = paimon_params.paimon_predicate; |
| } |
| params["required_fields"] = join(column_names, ","); |
| params["columns_types"] = join(column_types, "#"); |
| params["time_zone"] = state->timezone(); |
| if (range_params->__isset.serialized_table) { |
| params["serialized_table"] = range_params->serialized_table; |
| } |
| if (range_params->__isset.paimon_options && |
| !range_params->paimon_options.empty()) { |
| for (const auto& kv : range_params->paimon_options) { |
| params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; |
| } |
| } else if (paimon_params.__isset.paimon_options) { |
| for (const auto& kv : paimon_params.paimon_options) { |
| params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; |
| } |
| } |
| if (range_params->__isset.properties && !range_params->properties.empty()) { |
| for (const auto& kv : range_params->properties) { |
| params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; |
| } |
| } else if (paimon_params.__isset.hadoop_conf) { |
| for (const auto& kv : paimon_params.hadoop_conf) { |
| params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; |
| } |
| } |
| return params; |
| }(), |
| [&]() { |
| std::vector<std::string> names; |
| for (const auto& desc : file_slot_descs) { |
| names.emplace_back(desc->col_name()); |
| } |
| return names; |
| }(), |
| range.__isset.self_split_weight ? range.self_split_weight : -1) { |
| if (range.table_format_params.__isset.table_level_row_count) { |
| _remaining_table_level_row_count = range.table_format_params.table_level_row_count; |
| } else { |
| _remaining_table_level_row_count = -1; |
| } |
| } |
| |
| Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { |
| if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) { |
| auto rows = std::min(_remaining_table_level_row_count, |
| (int64_t)_state->query_options().batch_size); |
| _remaining_table_level_row_count -= rows; |
| auto mutate_columns = block->mutate_columns(); |
| for (auto& col : mutate_columns) { |
| col->resize(rows); |
| } |
| block->set_columns(std::move(mutate_columns)); |
| *read_rows = rows; |
| if (_remaining_table_level_row_count == 0) { |
| *eof = true; |
| } |
| |
| return Status::OK(); |
| } |
| return JniReader::get_next_block(block, read_rows, eof); |
| } |
| |
| Status PaimonJniReader::init_reader() { |
| return open(_state, _profile); |
| } |
| #include "common/compile_check_end.h" |
| } // namespace doris |