blob: 98437f74102a9c50bdf5b78b163125dcb996de5b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/exchange/exchange_writer.h"
#include <glog/logging.h>
#include <algorithm>
#include <cstdint>
#include <vector>
#include "common/logging.h"
#include "common/status.h"
#include "core/assert_cast.h"
#include "core/block/block.h"
#include "exec/operator/exchange_sink_operator.h"
#include "exec/sink/tablet_sink_hash_partitioner.h"
namespace doris {
#include "common/compile_check_begin.h"
ExchangeWriterBase::ExchangeWriterBase(ExchangeSinkLocalState& local_state)
: _local_state(local_state), _partitioner(local_state.partitioner()) {}
template <typename ChannelPtrType>
Status ExchangeWriterBase::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) const {
channel->set_receiver_eof(st);
// Channel will not send RPC to the downstream when eof, so close channel by OK status.
return channel->close(state);
}
// NOLINTBEGIN(readability-function-cognitive-complexity)
Status ExchangeWriterBase::_add_rows_impl(RuntimeState* state,
std::vector<std::shared_ptr<Channel>>& channels,
size_t channel_count, Block* block, bool eos) {
Status status = Status::OK();
uint32_t offset = 0;
for (size_t i = 0; i < channel_count; ++i) {
uint32_t size = _channel_rows_histogram[i];
if (!channels[i]->is_receiver_eof() && size > 0) {
VLOG_DEBUG << fmt::format("partition {} of {}, block:\n{}, start: {}, size: {}", i,
channel_count, block->dump_data(), offset, size);
status = channels[i]->add_rows(block, _origin_row_idx.data(), offset, size, false);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
}
offset += size;
}
if (eos) {
for (int i = 0; i < channel_count; ++i) {
if (!channels[i]->is_receiver_eof()) {
VLOG_DEBUG << fmt::format("EOS partition {} of {}, block:\n{}", i, channel_count,
block->dump_data());
status = channels[i]->add_rows(block, _origin_row_idx.data(), 0, 0, true);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
}
}
}
return Status::OK();
}
// NOLINTEND(readability-function-cognitive-complexity)
Status ExchangeOlapWriter::write(RuntimeState* state, Block* block, bool eos) {
Block prior_block;
auto* tablet_partitioner = assert_cast<TabletSinkHashPartitioner*>(_partitioner);
RETURN_IF_ERROR(tablet_partitioner->try_cut_in_line(prior_block));
if (!prior_block.empty()) {
// prior_block (batching rows) cuts in line, deal it first.
RETURN_IF_ERROR(_write_impl(state, &prior_block));
tablet_partitioner->finish_cut_in_line();
}
RETURN_IF_ERROR(_write_impl(state, block));
// all data wrote. consider batched rows before eos.
if (eos) {
// get all batched rows
tablet_partitioner->mark_last_block();
Block final_batching_block;
RETURN_IF_ERROR(tablet_partitioner->try_cut_in_line(final_batching_block));
if (!final_batching_block.empty()) {
RETURN_IF_ERROR(_write_impl(state, &final_batching_block, true));
} else {
// No batched rows, send empty block with eos signal.
Block empty_block = block->clone_empty();
RETURN_IF_ERROR(_write_impl(state, &empty_block, true));
}
}
return Status::OK();
}
Status ExchangeOlapWriter::_write_impl(RuntimeState* state, Block* block, bool eos) {
auto rows = block->rows();
auto* tablet_partitioner = assert_cast<TabletSinkHashPartitioner*>(_partitioner);
{
SCOPED_TIMER(_local_state.split_block_hash_compute_timer());
RETURN_IF_ERROR(tablet_partitioner->do_partitioning(state, block));
}
{
SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer());
const auto& channel_ids = tablet_partitioner->get_channel_ids();
const auto invalid_val = tablet_partitioner->invalid_sentinel();
DCHECK_EQ(channel_ids.size(), rows);
// decrease not sinked rows this time
COUNTER_UPDATE(_local_state.rows_input_counter(),
-1LL * std::ranges::count(channel_ids, invalid_val));
RETURN_IF_ERROR(_channel_add_rows(state, _local_state.channels,
_local_state.channels.size(), channel_ids, rows, block,
eos, invalid_val));
}
return Status::OK();
}
Status ExchangeTrivialWriter::write(RuntimeState* state, Block* block, bool eos) {
auto rows = block->rows();
{
SCOPED_TIMER(_local_state.split_block_hash_compute_timer());
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block));
}
{
SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer());
const auto& channel_ids = _partitioner->get_channel_ids();
RETURN_IF_ERROR(_channel_add_rows(state, _local_state.channels,
_local_state.channels.size(), channel_ids, rows, block,
eos));
}
return Status::OK();
}
Status ExchangeOlapWriter::_channel_add_rows(RuntimeState* state,
std::vector<std::shared_ptr<Channel>>& channels,
size_t channel_count,
const std::vector<HashValType>& channel_ids,
size_t rows, Block* block, bool eos,
HashValType invalid_val) {
size_t effective_rows = 0;
effective_rows =
std::ranges::count_if(channel_ids, [=](int64_t cid) { return cid != invalid_val; });
// row index will skip all skipped rows.
_origin_row_idx.resize(effective_rows);
_channel_rows_histogram.assign(channel_count, 0U);
_channel_pos_offsets.resize(channel_count);
for (size_t i = 0; i < rows; ++i) {
if (channel_ids[i] == invalid_val) {
continue;
}
auto cid = channel_ids[i];
_channel_rows_histogram[cid]++;
}
_channel_pos_offsets[0] = 0;
for (size_t i = 1; i < channel_count; ++i) {
_channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + _channel_rows_histogram[i - 1];
}
for (uint32_t i = 0; i < rows; ++i) {
if (channel_ids[i] == invalid_val) {
continue;
}
auto cid = channel_ids[i];
auto pos = _channel_pos_offsets[cid]++;
_origin_row_idx[pos] = i;
}
return _add_rows_impl(state, channels, channel_count, block, eos);
}
Status ExchangeTrivialWriter::_channel_add_rows(RuntimeState* state,
std::vector<std::shared_ptr<Channel>>& channels,
size_t channel_count,
const std::vector<HashValType>& channel_ids,
size_t rows, Block* block, bool eos) {
_origin_row_idx.resize(rows);
_channel_rows_histogram.assign(channel_count, 0U);
_channel_pos_offsets.resize(channel_count);
for (size_t i = 0; i < rows; ++i) {
_channel_rows_histogram[channel_ids[i]]++;
}
_channel_pos_offsets[0] = 0;
for (size_t i = 1; i < channel_count; ++i) {
_channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + _channel_rows_histogram[i - 1];
}
for (uint32_t i = 0; i < rows; i++) {
auto cid = channel_ids[i];
auto pos = _channel_pos_offsets[cid]++;
_origin_row_idx[pos] = i;
}
return _add_rows_impl(state, channels, channel_count, block, eos);
}
} // namespace doris