| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/sink/vrow_distribution.h" |
| |
| #include <gen_cpp/FrontendService.h> |
| #include <gen_cpp/FrontendService_types.h> |
| #include <glog/logging.h> |
| |
| #include <cstdint> |
| #include <memory> |
| #include <string> |
| |
| #include "common/cast_set.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "runtime/client_cache.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/runtime_state.h" |
| #include "service/backend_options.h" |
| #include "util/doris_metrics.h" |
| #include "util/thrift_rpc_helper.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_const.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/common/assert_cast.h" |
| #include "vec/data_types/data_type.h" |
| #include "vec/sink/writer/vtablet_writer.h" |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| |
| std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs> |
| VRowDistribution::_get_partition_function() { |
| return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; |
| } |
| |
| Status VRowDistribution::_save_missing_values( |
| std::vector<std::vector<std::string>>& col_strs, // non-const ref for move |
| int col_size, Block* block, const std::vector<int64_t>& filter, |
| const std::vector<const NullMap*>& col_null_maps) { |
| // de-duplication for new partitions but save all rows. |
| RETURN_IF_ERROR(_batching_block->add_rows(block, filter)); |
| std::vector<TNullableStringLiteral> cur_row_values; |
| for (int row = 0; row < col_strs[0].size(); ++row) { |
| cur_row_values.clear(); |
| for (int col = 0; col < col_size; ++col) { |
| TNullableStringLiteral node; |
| const auto* null_map = col_null_maps[col]; // null map for this col |
| node.__set_is_null((null_map && (*null_map)[filter[row]]) |
| ? true |
| : node.is_null); // if not, dont change(default false) |
| if (!node.is_null) { |
| node.__set_value(col_strs[col][row]); |
| } |
| cur_row_values.push_back(node); |
| } |
| if (!_deduper.contains(cur_row_values)) { |
| _deduper.insert(cur_row_values); |
| _partitions_need_create.emplace_back(cur_row_values); |
| } |
| } |
| |
| // to avoid too large mem use |
| if (_batching_block->rows() > _batch_size) { |
| _deal_batched = true; |
| } |
| |
| VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows; |
| |
| return Status::OK(); |
| } |
| |
| void VRowDistribution::clear_batching_stats() { |
| _partitions_need_create.clear(); |
| _batching_rows = 0; |
| _batching_bytes = 0; |
| } |
| |
| Status VRowDistribution::automatic_create_partition() { |
| SCOPED_TIMER(_add_partition_request_timer); |
| TCreatePartitionRequest request; |
| TCreatePartitionResult result; |
| std::string be_endpoint = BackendOptions::get_be_endpoint(); |
| request.__set_txn_id(_txn_id); |
| request.__set_db_id(_vpartition->db_id()); |
| request.__set_table_id(_vpartition->table_id()); |
| request.__set_partitionValues(_partitions_need_create); |
| request.__set_be_endpoint(be_endpoint); |
| request.__set_write_single_replica(_write_single_replica); |
| |
| VLOG_NOTICE << "automatic partition rpc begin request " << request; |
| TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; |
| int time_out = _state->execution_timeout() * 1000; |
| RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr.hostname, master_addr.port, |
| [&request, &result](FrontendServiceConnection& client) { |
| client->createPartition(result, request); |
| }, |
| time_out)); |
| |
| Status status(Status::create(result.status)); |
| VLOG_NOTICE << "automatic partition rpc end response " << result; |
| if (result.status.status_code == TStatusCode::OK) { |
| // add new created partitions |
| RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); |
| for (const auto& part : result.partitions) { |
| _new_partition_ids.insert(part.id); |
| VLOG_TRACE << "record new id: " << part.id; |
| } |
| RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); |
| } |
| |
| return status; |
| } |
| |
| // for reuse the same create callback of create-partition |
| static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { |
| TCreatePartitionResult result; |
| result.status = arg.status; |
| result.nodes = std::move(arg.nodes); |
| result.partitions = std::move(arg.partitions); |
| result.tablets = std::move(arg.tablets); |
| result.slave_tablets = std::move(arg.slave_tablets); |
| return result; |
| } |
| |
| // use _partitions and replace them |
| Status VRowDistribution::_replace_overwriting_partition() { |
| SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition |
| TReplacePartitionRequest request; |
| TReplacePartitionResult result; |
| request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id()); |
| request.__set_db_id(_vpartition->db_id()); |
| request.__set_table_id(_vpartition->table_id()); |
| request.__set_write_single_replica(_write_single_replica); |
| |
| // only request for partitions not recorded for replacement |
| std::set<int64_t> id_deduper; |
| for (const auto* part : _partitions) { |
| if (part != nullptr) { |
| if (_new_partition_ids.contains(part->id)) { |
| // this is a new partition. dont replace again. |
| VLOG_TRACE << "skip new partition: " << part->id; |
| } else { |
| // request for replacement |
| id_deduper.insert(part->id); |
| } |
| } else if (_missing_map.empty()) { |
| // no origin partition. and not allow to create. |
| return Status::InvalidArgument( |
| "Cannot found origin partitions in auto detect overwriting, stop " |
| "processing"); |
| } // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here. |
| } |
| if (id_deduper.empty()) { |
| return Status::OK(); // no need to request |
| } |
| // de-duplicate. there's no check in FE |
| std::vector<int64_t> request_part_ids(id_deduper.begin(), id_deduper.end()); |
| |
| request.__set_partition_ids(request_part_ids); |
| |
| std::string be_endpoint = BackendOptions::get_be_endpoint(); |
| request.__set_be_endpoint(be_endpoint); |
| |
| VLOG_NOTICE << "auto detect replace partition request: " << request; |
| TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; |
| int time_out = _state->execution_timeout() * 1000; |
| RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr.hostname, master_addr.port, |
| [&request, &result](FrontendServiceConnection& client) { |
| client->replacePartition(result, request); |
| }, |
| time_out)); |
| |
| Status status(Status::create(result.status)); |
| VLOG_NOTICE << "auto detect replace partition result: " << result; |
| if (result.status.status_code == TStatusCode::OK) { |
| // record new partitions |
| for (const auto& part : result.partitions) { |
| _new_partition_ids.insert(part.id); |
| VLOG_TRACE << "record new id: " << part.id; |
| } |
| // replace data in _partitions |
| RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); |
| // reuse the function as the args' structure are same. it add nodes/locations and incremental_open |
| auto result_as_create = cast_as_create_result(result); |
| RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); |
| } |
| |
| return status; |
| } |
| |
| void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx, |
| std::vector<int64_t>& tablet_ids) { |
| tablet_ids.resize(block->rows()); |
| for (int row_idx = 0; row_idx < block->rows(); row_idx++) { |
| if (_skip[row_idx]) { |
| continue; |
| } |
| auto& partition = _partitions[row_idx]; |
| auto& tablet_index = _tablet_indexes[row_idx]; |
| auto& index = partition->indexes[index_idx]; |
| |
| auto tablet_id = index.tablets[tablet_index]; |
| tablet_ids[row_idx] = tablet_id; |
| } |
| } |
| |
| void VRowDistribution::_filter_block_by_skip(vectorized::Block* block, |
| RowPartTabletIds& row_part_tablet_id) { |
| auto& row_ids = row_part_tablet_id.row_ids; |
| auto& partition_ids = row_part_tablet_id.partition_ids; |
| auto& tablet_ids = row_part_tablet_id.tablet_ids; |
| |
| auto rows = block->rows(); |
| // row count of a block should not exceed UINT32_MAX |
| auto rows_uint32 = cast_set<uint32_t>(rows); |
| for (uint32_t i = 0; i < rows_uint32; i++) { |
| if (!_skip[i]) { |
| row_ids.emplace_back(i); |
| partition_ids.emplace_back(_partitions[i]->id); |
| tablet_ids.emplace_back(_tablet_ids[i]); |
| } |
| } |
| } |
| |
| Status VRowDistribution::_filter_block_by_skip_and_where_clause( |
| vectorized::Block* block, const vectorized::VExprContextSPtr& where_clause, |
| RowPartTabletIds& row_part_tablet_id) { |
| // TODO |
| //SCOPED_RAW_TIMER(&_stat.where_clause_ns); |
| int result_index = -1; |
| size_t column_number = block->columns(); |
| RETURN_IF_ERROR(where_clause->execute(block, &result_index)); |
| |
| auto filter_column = block->get_by_position(result_index).column; |
| |
| auto& row_ids = row_part_tablet_id.row_ids; |
| auto& partition_ids = row_part_tablet_id.partition_ids; |
| auto& tablet_ids = row_part_tablet_id.tablet_ids; |
| if (const auto* nullable_column = |
| vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) { |
| auto rows = block->rows(); |
| // row count of a block should not exceed UINT32_MAX |
| auto rows_uint32 = cast_set<uint32_t>(rows); |
| for (uint32_t i = 0; i < rows_uint32; i++) { |
| if (nullable_column->get_bool_inline(i) && !_skip[i]) { |
| row_ids.emplace_back(i); |
| partition_ids.emplace_back(_partitions[i]->id); |
| tablet_ids.emplace_back(_tablet_ids[i]); |
| } |
| } |
| } else if (const auto* const_column = |
| vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) { |
| bool ret = const_column->get_bool(0); |
| if (!ret) { |
| return Status::OK(); |
| } |
| // should we optimize? |
| _filter_block_by_skip(block, row_part_tablet_id); |
| } else { |
| const auto& filter = assert_cast<const vectorized::ColumnUInt8&>(*filter_column).get_data(); |
| auto rows = block->rows(); |
| // row count of a block should not exceed UINT32_MAX |
| auto rows_uint32 = cast_set<uint32_t>(rows); |
| for (uint32_t i = 0; i < rows_uint32; i++) { |
| if (filter[i] != 0 && !_skip[i]) { |
| row_ids.emplace_back(i); |
| partition_ids.emplace_back(_partitions[i]->id); |
| tablet_ids.emplace_back(_tablet_ids[i]); |
| } |
| } |
| } |
| |
| for (size_t i = block->columns() - 1; i >= column_number; i--) { |
| block->erase(i); |
| } |
| return Status::OK(); |
| } |
| |
| Status VRowDistribution::_filter_block(vectorized::Block* block, |
| std::vector<RowPartTabletIds>& row_part_tablet_ids) { |
| for (int i = 0; i < _schema->indexes().size(); i++) { |
| _get_tablet_ids(block, i, _tablet_ids); |
| auto& where_clause = _schema->indexes()[i]->where_clause; |
| if (where_clause != nullptr) { |
| RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, |
| row_part_tablet_ids[i])); |
| } else { |
| _filter_block_by_skip(block, row_part_tablet_ids[i]); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition( |
| vectorized::Block* block, bool has_filtered_rows, |
| std::vector<RowPartTabletIds>& row_part_tablet_ids) { |
| int num_rows = cast_set<int>(block->rows()); |
| |
| RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
| _tablet_indexes, _skip)); |
| if (has_filtered_rows) { |
| for (int i = 0; i < num_rows; i++) { |
| _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
| } |
| } |
| RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
| return Status::OK(); |
| } |
| |
| Status VRowDistribution::_deal_missing_map(vectorized::Block* block, |
| const std::vector<uint16_t>& partition_cols_idx, |
| int64_t& rows_stat_val) { |
| // for missing partition keys, calc the missing partition and save in _partitions_need_create |
| auto [part_ctxs, part_exprs] = _get_partition_function(); |
| int part_col_num = cast_set<int>(part_exprs.size()); |
| // the two vectors are in column-first-order |
| std::vector<std::vector<std::string>> col_strs; |
| std::vector<const NullMap*> col_null_maps; |
| col_strs.resize(part_col_num); |
| col_null_maps.reserve(part_col_num); |
| |
| for (int i = 0; i < part_col_num; ++i) { |
| auto return_type = part_exprs[i]->data_type(); |
| // expose the data column. the return type would be nullable |
| const auto& [range_left_col, col_const] = |
| unpack_if_const(block->get_by_position(partition_cols_idx[i]).column); |
| if (range_left_col->is_nullable()) { |
| col_null_maps.push_back(&( |
| assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data())); |
| } else { |
| col_null_maps.push_back(nullptr); |
| } |
| for (auto row : _missing_map) { |
| col_strs[i].push_back( |
| return_type->to_string(*range_left_col, index_check_const(row, col_const))); |
| } |
| } |
| |
| // calc the end value and save them. in the end of sending, we will create partitions for them and deal them. |
| RETURN_IF_ERROR( |
| _save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps)); |
| |
| size_t new_bt_rows = _batching_block->rows(); |
| size_t new_bt_bytes = _batching_block->bytes(); |
| rows_stat_val -= new_bt_rows - _batching_rows; |
| _state->update_num_rows_load_total(_batching_rows - new_bt_rows); |
| _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes); |
| DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows); |
| DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes); |
| _batching_rows = new_bt_rows; |
| _batching_bytes = new_bt_bytes; |
| |
| return Status::OK(); |
| } |
| |
| Status VRowDistribution::_generate_rows_distribution_for_auto_partition( |
| vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx, |
| bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
| int64_t& rows_stat_val) { |
| int num_rows = cast_set<int>(block->rows()); |
| std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys(); |
| |
| auto& partition_col = block->get_by_position(partition_keys[0]); |
| _missing_map.clear(); |
| _missing_map.reserve(partition_col.column->size()); |
| |
| RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
| _tablet_indexes, _skip, &_missing_map)); |
| |
| // the missing vals for auto partition are also skipped. |
| if (has_filtered_rows) { |
| for (int i = 0; i < num_rows; i++) { |
| _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
| } |
| } |
| RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
| |
| if (!_missing_map.empty()) { |
| RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val)); |
| } |
| return Status::OK(); |
| } |
| |
| Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( |
| vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx, |
| bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
| int64_t& rows_stat_val) { |
| int num_rows = cast_set<int>(block->rows()); |
| |
| // for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc, |
| // and find the new partitions to use. |
| // for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto |
| // partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz |
| // we already saved missing values. |
| if (_vpartition->is_auto_partition() && |
| _state->query_options().enable_auto_create_when_overwrite) { |
| // allow auto create partition for missing rows. |
| std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys(); |
| auto partition_col = block->get_by_position(partition_keys[0]); |
| _missing_map.clear(); |
| _missing_map.reserve(partition_col.column->size()); |
| |
| RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
| _tablet_indexes, _skip, &_missing_map)); |
| |
| // allow and really need to create during auto-detect-overwriting. |
| if (!_missing_map.empty()) { |
| RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val)); |
| } |
| } else { |
| RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
| _tablet_indexes, _skip)); |
| } |
| RETURN_IF_ERROR(_replace_overwriting_partition()); |
| |
| // regenerate locations for new partitions & tablets |
| _reset_find_tablets(num_rows); |
| if (_vpartition->is_auto_partition() && |
| _state->query_options().enable_auto_create_when_overwrite) { |
| // here _missing_map is just a placeholder |
| RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
| _tablet_indexes, _skip, &_missing_map)); |
| if (VLOG_TRACE_IS_ON) { |
| std::string tmp; |
| for (auto v : _missing_map) { |
| tmp += std::to_string(v).append(", "); |
| } |
| VLOG_TRACE << "Trace missing map of " << this << ':' << tmp; |
| } |
| } else { |
| RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
| _tablet_indexes, _skip)); |
| } |
| if (has_filtered_rows) { |
| for (int i = 0; i < num_rows; i++) { |
| _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
| } |
| } |
| RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
| return Status::OK(); |
| } |
| |
| void VRowDistribution::_reset_row_part_tablet_ids( |
| std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t rows) { |
| row_part_tablet_ids.resize(_schema->indexes().size()); |
| for (auto& row_part_tablet_id : row_part_tablet_ids) { |
| auto& row_ids = row_part_tablet_id.row_ids; |
| auto& partition_ids = row_part_tablet_id.partition_ids; |
| auto& tablet_ids = row_part_tablet_id.tablet_ids; |
| |
| row_ids.clear(); |
| partition_ids.clear(); |
| tablet_ids.clear(); |
| // This is important for performance. |
| row_ids.reserve(rows); |
| partition_ids.reserve(rows); |
| tablet_ids.reserve(rows); |
| } |
| } |
| |
| Status VRowDistribution::generate_rows_distribution( |
| vectorized::Block& input_block, std::shared_ptr<vectorized::Block>& block, |
| int64_t& filtered_rows, bool& has_filtered_rows, |
| std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val) { |
| auto input_rows = input_block.rows(); |
| _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows); |
| |
| int64_t prev_filtered_rows = |
| _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); |
| RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( |
| _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); |
| |
| // batching block rows which need new partitions. deal together at finish. |
| if (!_batching_block) [[unlikely]] { |
| std::unique_ptr<Block> tmp_block = block->create_same_struct_block(0); |
| _batching_block = MutableBlock::create_unique(std::move(*tmp_block)); |
| } |
| |
| auto num_rows = block->rows(); |
| _reset_find_tablets(num_rows); |
| |
| // if there's projection of partition calc, we need to calc it first. |
| auto [part_ctxs, part_funcs] = _get_partition_function(); |
| std::vector<uint16_t> partition_cols_idx; |
| if (_vpartition->is_projection_partition()) { |
| // calc the start value of missing partition ranges. |
| auto func_size = part_funcs.size(); |
| for (int i = 0; i < func_size; ++i) { |
| int result_idx = -1; |
| // we just calc left range here. leave right to FE to avoid dup calc. |
| RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx)); |
| |
| VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(0, 1); |
| DCHECK(result_idx != -1); |
| |
| partition_cols_idx.push_back(cast_set<uint16_t>(result_idx)); |
| } |
| |
| // change the column to compare to transformed. |
| _vpartition->set_transformed_slots(partition_cols_idx); |
| } |
| |
| Status st = Status::OK(); |
| if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) { |
| // when overwrite, no auto create partition allowed. |
| st = _generate_rows_distribution_for_auto_overwrite(block.get(), partition_cols_idx, |
| has_filtered_rows, row_part_tablet_ids, |
| rows_stat_val); |
| } else if (_vpartition->is_auto_partition() && !_deal_batched) { |
| st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx, |
| has_filtered_rows, row_part_tablet_ids, |
| rows_stat_val); |
| } else { // not auto partition |
| st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows, |
| row_part_tablet_ids); |
| } |
| |
| filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - |
| prev_filtered_rows; |
| return st; |
| } |
| |
| // reuse vars for find_tablets |
| void VRowDistribution::_reset_find_tablets(int64_t rows) { |
| _tablet_finder->filter_bitmap().Reset(rows); |
| _partitions.assign(rows, nullptr); |
| _skip.assign(rows, false); |
| _tablet_indexes.assign(rows, 0); |
| } |
| |
| } // namespace doris::vectorized |