blob: 5c68b444f0fea5883be39b18b07d56932db5f816 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/sink/tablet_sink_hash_partitioner.h"
#include <algorithm>
#include <memory>
#include <utility>
#include "exec/operator/operator.h"
namespace doris {
#include "common/compile_check_begin.h"
TabletSinkHashPartitioner::TabletSinkHashPartitioner(uint32_t partition_count, int64_t txn_id,
TOlapTableSchemaParam tablet_sink_schema,
TOlapTablePartitionParam tablet_sink_partition,
TOlapTableLocationParam tablet_sink_location,
const TTupleId& tablet_sink_tuple_id,
ExchangeSinkLocalState* local_state)
: PartitionerBase(partition_count),
_txn_id(txn_id),
_tablet_sink_schema(std::move(tablet_sink_schema)),
_tablet_sink_partition(std::move(tablet_sink_partition)),
_tablet_sink_location(std::move(tablet_sink_location)),
_tablet_sink_tuple_id(tablet_sink_tuple_id),
_local_state(local_state) {}
Status TabletSinkHashPartitioner::init(const std::vector<TExpr>& texprs) {
return Status::OK();
}
Status TabletSinkHashPartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) {
return Status::OK();
}
Status TabletSinkHashPartitioner::open(RuntimeState* state) {
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(_tablet_sink_schema));
_vpartition = std::make_unique<VOlapTablePartitionParam>(_schema, _tablet_sink_partition);
RETURN_IF_ERROR(_vpartition->init());
auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition.get(), find_tablet_mode);
_tablet_sink_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tablet_sink_tuple_id);
_tablet_sink_row_desc = state->obj_pool()->add(new RowDescriptor(_tablet_sink_tuple_desc));
auto& ctxs = _local_state->parent()->cast<ExchangeSinkOperatorX>().tablet_sink_expr_ctxs();
_tablet_sink_expr_ctxs.resize(ctxs.size());
for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
}
// if _part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
_block_convertor = std::make_unique<OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
_block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), state->batch_size());
_location = state->obj_pool()->add(new OlapTableLocationParam(_tablet_sink_location));
_row_distribution.init(
{.state = state,
.block_convertor = _block_convertor.get(),
.tablet_finder = _tablet_finder.get(),
.vpartition = _vpartition.get(),
.add_partition_request_timer = _local_state->add_partition_request_timer(),
.txn_id = _txn_id,
.pool = state->obj_pool(),
.location = _location,
.vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
.schema = _schema,
.caller = (void*)this,
.create_partition_callback = &TabletSinkHashPartitioner::empty_callback_function});
RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
return Status::OK();
}
Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* block) const {
_hash_vals.resize(block->rows());
if (block->empty()) {
return Status::OK();
}
// tablet_id_hash % invalid_val never get invalid_val, so we use invalid_val as sentinel value
DCHECK_EQ(invalid_sentinel(), partition_count());
const auto& invalid_val = invalid_sentinel();
std::ranges::fill(_hash_vals, invalid_val);
int64_t dummy_stats = 0; // _local_state->rows_input_counter() updated in sink and write.
std::shared_ptr<Block> convert_block = std::make_shared<Block>();
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
*block, convert_block, _row_part_tablet_ids, dummy_stats));
_skipped = _row_distribution.get_skipped();
const auto& row_ids = _row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id_hash =
HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(HashValType), 0);
_hash_vals[row] = tablet_id_hash % invalid_val;
}
// _hash_vals[i] == invalid_val => row i is skipped or filtered
#ifndef NDEBUG
for (size_t i = 0; i < _skipped.size(); ++i) {
if (_skipped[i]) {
CHECK_EQ(_hash_vals[i], invalid_val);
}
}
CHECK_LE(std::ranges::count_if(_skipped, [](bool v) { return v; }),
std::ranges::count_if(_hash_vals, [=](HashValType v) { return v == invalid_val; }));
#endif
return Status::OK();
}
Status TabletSinkHashPartitioner::try_cut_in_line(Block& prior_block) const {
// check if we need send batching block first
if (_row_distribution.need_deal_batching()) {
{
SCOPED_TIMER(_local_state->send_new_partition_timer());
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
}
prior_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
_row_distribution._batching_block.reset(); // clear. vrow_distribution will re-construct it
_row_distribution.clear_batching_stats();
VLOG_DEBUG << "sinking batched block:\n" << prior_block.dump_data();
}
return Status::OK();
}
Status TabletSinkHashPartitioner::clone(RuntimeState* state,
std::unique_ptr<PartitionerBase>& partitioner) {
partitioner = std::make_unique<TabletSinkHashPartitioner>(
_partition_count, _txn_id, _tablet_sink_schema, _tablet_sink_partition,
_tablet_sink_location, _tablet_sink_tuple_id, _local_state);
return Status::OK();
}
Status TabletSinkHashPartitioner::close(RuntimeState* state) {
if (_block_convertor != nullptr && _tablet_finder != nullptr) {
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
_tablet_finder->num_filtered_rows());
state->update_num_rows_load_unselected(
_tablet_finder->num_immutable_partition_filtered_rows());
// sink won't see those filtered rows, we should compensate here
state->set_num_rows_load_total(state->num_rows_load_filtered() +
state->num_rows_load_unselected());
}
return Status::OK();
}
} // namespace doris