| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "meta_scanner.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/FrontendService.h> |
| #include <gen_cpp/FrontendService_types.h> |
| #include <gen_cpp/HeartbeatService_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| |
| #include "common/cast_set.h" |
| #include "common/logging.h" |
| #include "runtime/client_cache.h" |
| #include "runtime/define_primitive_type.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/runtime_state.h" |
| #include "util/thrift_rpc_helper.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/columns/column_string.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/core/block.h" |
| #include "vec/core/types.h" |
| #include "vec/exec/format/table/iceberg_sys_table_jni_reader.h" |
| #include "vec/exec/format/table/paimon_sys_table_jni_reader.h" |
| |
| namespace doris { |
| class RuntimeProfile; |
| namespace vectorized { |
| class VExprContext; |
| } // namespace vectorized |
| } // namespace doris |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| |
| MetaScanner::MetaScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, |
| TupleId tuple_id, const TScanRangeParams& scan_range, int64_t limit, |
| RuntimeProfile* profile, TUserIdentity user_identity) |
| : Scanner(state, local_state, limit, profile), |
| _meta_eos(false), |
| _tuple_id(tuple_id), |
| _user_identity(user_identity), |
| _scan_range(scan_range.scan_range) {} |
| |
| Status MetaScanner::open(RuntimeState* state) { |
| VLOG_CRITICAL << "MetaScanner::open"; |
| RETURN_IF_ERROR(Scanner::open(state)); |
| if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) { |
| // TODO: refactor this code |
| auto reader = IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, |
| _scan_range.meta_scan_range); |
| const std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range; |
| RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range)); |
| _reader = std::move(reader); |
| } else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PAIMON) { |
| auto reader = PaimonSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, |
| _scan_range.meta_scan_range); |
| const std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range; |
| RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range)); |
| _reader = std::move(reader); |
| } else { |
| RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); |
| } |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
| VLOG_CRITICAL << "MetaScanner::init"; |
| RETURN_IF_ERROR(Scanner::init(_state, conjuncts)); |
| _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
| VLOG_CRITICAL << "MetaScanner::_get_block_impl"; |
| if (nullptr == state || nullptr == block || nullptr == eof) { |
| return Status::InternalError("input is NULL pointer"); |
| } |
| if (_reader) { |
| // TODO: This is a temporary workaround; the code is planned to be refactored later. |
| size_t read_rows = 0; |
| return _reader->get_next_block(block, &read_rows, eof); |
| } |
| |
| if (_meta_eos == true) { |
| *eof = true; |
| return Status::OK(); |
| } |
| |
| auto column_size = _tuple_desc->slots().size(); |
| std::vector<MutableColumnPtr> columns(column_size); |
| bool mem_reuse = block->mem_reuse(); |
| do { |
| RETURN_IF_CANCELLED(state); |
| |
| columns.resize(column_size); |
| for (auto i = 0; i < column_size; i++) { |
| if (mem_reuse) { |
| columns[i] = block->get_by_position(i).column->assume_mutable(); |
| } else { |
| columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); |
| } |
| } |
| // fill block |
| RETURN_IF_ERROR(_fill_block_with_remote_data(columns)); |
| if (_meta_eos == true) { |
| if (block->rows() == 0) { |
| *eof = true; |
| } |
| break; |
| } |
| // Before really use the Block, must clear other ptr of column in block |
| // So here need do std::move and clear in `columns` |
| if (!mem_reuse) { |
| int column_index = 0; |
| for (const auto slot_desc : _tuple_desc->slots()) { |
| block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), |
| slot_desc->get_data_type_ptr(), |
| slot_desc->col_name())); |
| } |
| } else { |
| columns.clear(); |
| } |
| VLOG_ROW << "VMetaScanNode output rows: " << block->rows(); |
| } while (block->rows() == 0 && !(*eof)); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_fill_block_with_remote_data(const std::vector<MutableColumnPtr>& columns) { |
| VLOG_CRITICAL << "MetaScanner::_fill_block_with_remote_data"; |
| for (int col_idx = 0; col_idx < columns.size(); col_idx++) { |
| auto slot_desc = _tuple_desc->slots()[col_idx]; |
| // because the fe planner filter the non_materialize column |
| if (!slot_desc->is_materialized()) { |
| continue; |
| } |
| |
| for (int _row_idx = 0; _row_idx < _batch_data.size(); _row_idx++) { |
| vectorized::IColumn* col_ptr = columns[col_idx].get(); |
| TCell& cell = _batch_data[_row_idx].column_value[col_idx]; |
| if (cell.__isset.isNull && cell.isNull) { |
| DCHECK(slot_desc->is_nullable()) |
| << "cell is null but column is not nullable: " << slot_desc->col_name(); |
| auto& null_col = reinterpret_cast<ColumnNullable&>(*col_ptr); |
| null_col.get_nested_column().insert_default(); |
| null_col.get_null_map_data().push_back(1); |
| } else { |
| if (slot_desc->is_nullable()) { |
| auto& null_col = reinterpret_cast<ColumnNullable&>(*col_ptr); |
| null_col.get_null_map_data().push_back(0); |
| col_ptr = null_col.get_nested_column_ptr().get(); |
| } |
| switch (slot_desc->type()->get_primitive_type()) { |
| case TYPE_BOOLEAN: { |
| bool data = cell.boolVal; |
| assert_cast<vectorized::ColumnBool*>(col_ptr)->insert_value((uint8_t)data); |
| break; |
| } |
| case TYPE_TINYINT: { |
| int8_t data = (int8_t)cell.intVal; |
| assert_cast<vectorized::ColumnInt8*>(col_ptr)->insert_value(data); |
| break; |
| } |
| case TYPE_SMALLINT: { |
| int16_t data = (int16_t)cell.intVal; |
| assert_cast<vectorized::ColumnInt16*>(col_ptr)->insert_value(data); |
| break; |
| } |
| case TYPE_INT: { |
| int32_t data = cell.intVal; |
| assert_cast<vectorized::ColumnInt32*>(col_ptr)->insert_value(data); |
| break; |
| } |
| case TYPE_BIGINT: { |
| int64_t data = cell.longVal; |
| assert_cast<vectorized::ColumnInt64*>(col_ptr)->insert_value(data); |
| break; |
| } |
| case TYPE_FLOAT: { |
| auto data = static_cast<float>(cell.doubleVal); |
| assert_cast<vectorized::ColumnFloat32*>(col_ptr)->insert_value(data); |
| break; |
| } |
| case TYPE_DOUBLE: { |
| double data = cell.doubleVal; |
| assert_cast<vectorized::ColumnFloat64*>(col_ptr)->insert_value(data); |
| break; |
| } |
| case TYPE_DATEV2: { |
| uint32_t data = (uint32_t)cell.longVal; |
| assert_cast<vectorized::ColumnDateV2*>(col_ptr)->insert_value(data); |
| break; |
| } |
| case TYPE_DATETIMEV2: { |
| uint64_t data = cell.longVal; |
| assert_cast<vectorized::ColumnDateTimeV2*>(col_ptr)->insert_value(data); |
| break; |
| } |
| case TYPE_STRING: |
| case TYPE_CHAR: |
| case TYPE_VARCHAR: { |
| std::string data = cell.stringVal; |
| assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data.c_str(), |
| data.length()); |
| break; |
| } |
| default: { |
| std::string error_msg = |
| fmt::format("Invalid column type {} on column: {}.", |
| slot_desc->type()->get_name(), slot_desc->col_name()); |
| return Status::InternalError(std::string(error_msg)); |
| } |
| } |
| } |
| } |
| } |
| _meta_eos = true; |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { |
| VLOG_CRITICAL << "MetaScanner::_fetch_metadata"; |
| TFetchSchemaTableDataRequest request; |
| switch (meta_scan_range.metadata_type) { |
| case TMetadataType::HUDI: |
| RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::BACKENDS: |
| RETURN_IF_ERROR(_build_backends_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::FRONTENDS: |
| RETURN_IF_ERROR(_build_frontends_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::FRONTENDS_DISKS: |
| RETURN_IF_ERROR(_build_frontends_disks_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::WORKLOAD_SCHED_POLICY: |
| RETURN_IF_ERROR(_build_workload_sched_policy_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::CATALOGS: |
| RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::MATERIALIZED_VIEWS: |
| RETURN_IF_ERROR(_build_materialized_views_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::PARTITIONS: |
| RETURN_IF_ERROR(_build_partitions_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::JOBS: |
| RETURN_IF_ERROR(_build_jobs_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::TASKS: |
| RETURN_IF_ERROR(_build_tasks_metadata_request(meta_scan_range, &request)); |
| break; |
| case TMetadataType::PARTITION_VALUES: |
| RETURN_IF_ERROR(_build_partition_values_metadata_request(meta_scan_range, &request)); |
| break; |
| default: |
| _meta_eos = true; |
| return Status::OK(); |
| } |
| |
| // set filter columns |
| std::vector<std::string> filter_columns; |
| for (const auto& slot : _tuple_desc->slots()) { |
| filter_columns.emplace_back(slot->col_name_lower_case()); |
| } |
| request.metada_table_params.__set_columns_name(filter_columns); |
| |
| // _state->execution_timeout() is seconds, change to milliseconds |
| int time_out = _state->execution_timeout() * 1000; |
| TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; |
| TFetchSchemaTableDataResult result; |
| RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr.hostname, master_addr.port, |
| [&request, &result](FrontendServiceConnection& client) { |
| client->fetchSchemaTableData(result, request); |
| }, |
| time_out)); |
| |
| Status status(Status::create(result.status)); |
| if (!status.ok()) { |
| LOG(WARNING) << "fetch schema table data from master failed, errmsg=" << status; |
| return status; |
| } |
| _batch_data = std::move(result.data_batch); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_hudi_metadata_request(const TMetaScanRange& meta_scan_range, |
| TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_hudi_metadata_request"; |
| if (!meta_scan_range.__isset.hudi_params) { |
| return Status::InternalError("Can not find THudiMetadataParams from meta_scan_range."); |
| } |
| |
| // create request |
| request->__set_cluster_name(""); |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::HUDI); |
| metadata_table_params.__set_hudi_metadata_params(meta_scan_range.hudi_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_backends_metadata_request(const TMetaScanRange& meta_scan_range, |
| TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_backends_metadata_request"; |
| if (!meta_scan_range.__isset.backends_params) { |
| return Status::InternalError("Can not find TBackendsMetadataParams from meta_scan_range."); |
| } |
| // create request |
| request->__set_cluster_name(""); |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::BACKENDS); |
| metadata_table_params.__set_backends_metadata_params(meta_scan_range.backends_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_frontends_metadata_request(const TMetaScanRange& meta_scan_range, |
| TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_frontends_metadata_request"; |
| if (!meta_scan_range.__isset.frontends_params) { |
| return Status::InternalError("Can not find TFrontendsMetadataParams from meta_scan_range."); |
| } |
| // create request |
| request->__set_cluster_name(""); |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::FRONTENDS); |
| metadata_table_params.__set_frontends_metadata_params(meta_scan_range.frontends_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_frontends_disks_metadata_request(const TMetaScanRange& meta_scan_range, |
| TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_frontends_metadata_request"; |
| if (!meta_scan_range.__isset.frontends_params) { |
| return Status::InternalError("Can not find TFrontendsMetadataParams from meta_scan_range."); |
| } |
| // create request |
| request->__set_cluster_name(""); |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::FRONTENDS_DISKS); |
| metadata_table_params.__set_frontends_metadata_params(meta_scan_range.frontends_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_workload_sched_policy_metadata_request( |
| const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_workload_sched_policy_metadata_request"; |
| |
| // create request |
| request->__set_cluster_name(""); |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::WORKLOAD_SCHED_POLICY); |
| metadata_table_params.__set_current_user_ident(_user_identity); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range, |
| TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_catalogs_metadata_request"; |
| |
| // create request |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::CATALOGS); |
| metadata_table_params.__set_current_user_ident(_user_identity); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_materialized_views_metadata_request( |
| const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_materialized_views_metadata_request"; |
| if (!meta_scan_range.__isset.materialized_views_params) { |
| return Status::InternalError( |
| "Can not find TMaterializedViewsMetadataParams from meta_scan_range."); |
| } |
| |
| // create request |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::MATERIALIZED_VIEWS); |
| metadata_table_params.__set_materialized_views_metadata_params( |
| meta_scan_range.materialized_views_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_partitions_metadata_request(const TMetaScanRange& meta_scan_range, |
| TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_partitions_metadata_request"; |
| if (!meta_scan_range.__isset.partitions_params) { |
| return Status::InternalError( |
| "Can not find TPartitionsMetadataParams from meta_scan_range."); |
| } |
| |
| // create request |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::PARTITIONS); |
| metadata_table_params.__set_partitions_metadata_params(meta_scan_range.partitions_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_jobs_metadata_request(const TMetaScanRange& meta_scan_range, |
| TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_jobs_metadata_request"; |
| if (!meta_scan_range.__isset.jobs_params) { |
| return Status::InternalError("Can not find TJobsMetadataParams from meta_scan_range."); |
| } |
| |
| // create request |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::JOBS); |
| metadata_table_params.__set_jobs_metadata_params(meta_scan_range.jobs_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_tasks_metadata_request(const TMetaScanRange& meta_scan_range, |
| TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_tasks_metadata_request"; |
| if (!meta_scan_range.__isset.tasks_params) { |
| return Status::InternalError("Can not find TTasksMetadataParams from meta_scan_range."); |
| } |
| |
| // create request |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::TASKS); |
| metadata_table_params.__set_tasks_metadata_params(meta_scan_range.tasks_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::_build_partition_values_metadata_request( |
| const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { |
| VLOG_CRITICAL << "MetaScanner::_build_partition_values_metadata_request"; |
| if (!meta_scan_range.__isset.partition_values_params) { |
| return Status::InternalError( |
| "Can not find TPartitionValuesMetadataParams from meta_scan_range."); |
| } |
| |
| // create request |
| request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); |
| |
| // create TMetadataTableRequestParams |
| TMetadataTableRequestParams metadata_table_params; |
| metadata_table_params.__set_metadata_type(TMetadataType::PARTITION_VALUES); |
| metadata_table_params.__set_partition_values_metadata_params( |
| meta_scan_range.partition_values_params); |
| |
| request->__set_metada_table_params(metadata_table_params); |
| return Status::OK(); |
| } |
| |
| Status MetaScanner::close(RuntimeState* state) { |
| VLOG_CRITICAL << "MetaScanner::close"; |
| if (_reader) { |
| RETURN_IF_ERROR(_reader->close()); |
| } |
| RETURN_IF_ERROR(Scanner::close(state)); |
| return Status::OK(); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris::vectorized |