blob: 2a6161d465cfb7c57f4da42909f2c589ed967361 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/olap/vertical_block_reader.h"
#include <assert.h>
#include <gen_cpp/olap_file.pb.h>
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <ostream>
#include "cloud/config.h"
#include "olap/compaction.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_schema.h"
#include "util/simd/bits.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_number.h"
#include "vec/olap/vertical_merge_iterator.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
uint64_t VerticalBlockReader::nextId = 1;
VerticalBlockReader::~VerticalBlockReader() {
for (int i = 0; i < _agg_functions.size(); ++i) {
_agg_functions[i]->destroy(_agg_places[i]);
delete[] _agg_places[i];
}
}
Status VerticalBlockReader::next_block_with_aggregation(Block* block, bool* eof) {
auto res = (this->*_next_block_func)(block, eof);
if (!config::is_cloud_mode()) {
if (!res.ok()) [[unlikely]] {
static_cast<Tablet*>(_tablet.get())->report_error(res);
}
}
return res;
}
Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_params,
std::vector<RowwiseIteratorUPtr>* segment_iters,
std::vector<bool>* iterator_init_flag,
std::vector<RowsetId>* rowset_ids) {
auto res = _capture_rs_readers(read_params);
if (!res.ok()) {
LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res
<< ", tablet_id:" << read_params.tablet->tablet_id()
<< ", schema_hash:" << read_params.tablet->schema_hash()
<< ", reader_type:" << int(read_params.reader_type)
<< ", version:" << read_params.version;
return res;
}
for (const auto& rs_split : read_params.rs_splits) {
// segment iterator will be inited here
// In vertical compaction, every group will load segment so we should cache
// segment to avoid tot many s3 head request
bool use_cache = !rs_split.rs_reader->rowset()->is_local();
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_iterators(&_reader_context, segment_iters,
use_cache));
// if segments overlapping, all segment iterator should be inited in
// heap merge iterator. If segments are none overlapping, only first segment of this
// rowset will be inited and push to heap, other segment will be inited later when current
// segment reached it's end.
// Use this iterator_init_flag so we can load few segments in HeapMergeIterator to save memory
if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); ++i) {
iterator_init_flag->push_back(true);
}
} else {
for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); ++i) {
if (i == 0) {
iterator_init_flag->push_back(true);
continue;
}
iterator_init_flag->push_back(false);
}
}
for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); ++i) {
rowset_ids->push_back(rs_split.rs_reader->rowset()->rowset_id());
}
rs_split.rs_reader->reset_read_options();
}
return Status::OK();
}
Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params,
CompactionSampleInfo* sample_info) {
std::vector<bool> iterator_init_flag;
std::vector<RowsetId> rowset_ids;
std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = read_params.segment_iters_ptr;
std::vector<RowwiseIteratorUPtr> iter_ptr_vector;
if (!segment_iters_ptr) {
RETURN_IF_ERROR(_get_segment_iterators(read_params, &iter_ptr_vector, &iterator_init_flag,
&rowset_ids));
CHECK(iter_ptr_vector.size() == iterator_init_flag.size());
segment_iters_ptr = &iter_ptr_vector;
} else {
for (int i = 0; i < segment_iters_ptr->size(); ++i) {
iterator_init_flag.push_back(true);
RowsetId rowsetid;
rowset_ids.push_back(rowsetid); // TODO: _record_rowids need it
}
// TODO(zhangzhengyu): is it enough for a context?
_reader_context.reader_type = read_params.reader_type;
_reader_context.need_ordered_result = true; // TODO: should it be?
_reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
_reader_context.is_key_column_group = read_params.is_key_column_group;
_reader_context.record_rowids = read_params.record_rowids;
}
// build heap if key column iterator or build vertical merge iterator if value column
auto ori_return_col_size = _return_columns.size();
if (read_params.is_key_column_group) {
uint32_t seq_col_idx = -1;
if (read_params.tablet->tablet_schema()->has_sequence_col() &&
read_params.tablet->tablet_schema()->cluster_key_uids().empty()) {
seq_col_idx = read_params.tablet->tablet_schema()->sequence_col_idx();
}
if (read_params.tablet->tablet_schema()->num_key_columns() == 0) {
_vcollect_iter = new_vertical_fifo_merge_iterator(
std::move(*segment_iters_ptr), iterator_init_flag, rowset_ids,
ori_return_col_size, read_params.tablet->keys_type(), seq_col_idx,
_row_sources_buffer);
} else {
_vcollect_iter = new_vertical_heap_merge_iterator(
std::move(*segment_iters_ptr), iterator_init_flag, rowset_ids,
ori_return_col_size, read_params.tablet->keys_type(), seq_col_idx,
_row_sources_buffer, read_params.key_group_cluster_key_idxes);
}
} else {
_vcollect_iter = new_vertical_mask_merge_iterator(std::move(*segment_iters_ptr),
ori_return_col_size, _row_sources_buffer);
}
// init collect iterator
StorageReadOptions opts;
opts.record_rowids = read_params.record_rowids;
if (read_params.batch_size > 0) {
opts.block_row_max = cast_set<int>(read_params.batch_size);
}
RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info));
// In agg keys value columns compact, get first row for _init_agg_state
if (!read_params.is_key_column_group && read_params.tablet->keys_type() == KeysType::AGG_KEYS) {
auto st = _vcollect_iter->next_row(&_next_row);
if (!st.ok() && !st.is<END_OF_FILE>()) {
LOG(WARNING) << "failed to init first row for agg key";
return st;
}
_eof = st.is<END_OF_FILE>();
}
return Status::OK();
}
void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) {
if (_eof) {
return;
}
DCHECK(_return_columns.size() == _next_row.block->columns());
_stored_data_columns =
_next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
_stored_has_null_tag.resize(_stored_data_columns.size());
_stored_has_variable_length_tag.resize(_stored_data_columns.size());
auto& tablet_schema = *_tablet_schema;
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
AggregateFunctionPtr function =
tablet_schema.column(_return_columns.at(idx))
.get_aggregate_function(vectorized::AGG_READER_SUFFIX,
read_params.get_be_exec_version());
DCHECK(function != nullptr);
_agg_functions.push_back(function);
// create aggregate data
auto* place = new char[function->size_of_data()];
SAFE_CREATE(function->create(place), {
_agg_functions.pop_back();
delete[] place;
});
_agg_places.push_back(place);
// calculate `_has_variable_length_tag` tag. like string, array, map
_stored_has_variable_length_tag[idx] = _stored_data_columns[idx]->is_variable_length();
}
}
Status VerticalBlockReader::init(const ReaderParams& read_params) {
return init(read_params, nullptr);
}
Status VerticalBlockReader::init(const ReaderParams& read_params,
CompactionSampleInfo* sample_info) {
StorageReadOptions opts;
if (read_params.batch_size > 0) {
_reader_context.batch_size = cast_set<int>(read_params.batch_size);
} else {
_reader_context.batch_size = opts.block_row_max;
}
RETURN_IF_ERROR(TabletReader::init(read_params));
auto status = _init_collect_iter(read_params, sample_info);
if (!status.ok()) [[unlikely]] {
if (!config::is_cloud_mode()) {
static_cast<Tablet*>(_tablet.get())->report_error(status);
}
return status;
}
switch (tablet()->keys_type()) {
case KeysType::DUP_KEYS:
_next_block_func = &VerticalBlockReader::_direct_next_block;
break;
case KeysType::UNIQUE_KEYS:
if (tablet()->tablet_meta()->tablet_schema()->cluster_key_uids().empty()) {
_next_block_func = &VerticalBlockReader::_unique_key_next_block;
if (_filter_delete) {
_delete_filter_column = ColumnUInt8::create();
}
} else {
_next_block_func = &VerticalBlockReader::_direct_next_block;
}
break;
case KeysType::AGG_KEYS:
_next_block_func = &VerticalBlockReader::_agg_key_next_block;
if (!read_params.is_key_column_group) {
_init_agg_state(read_params);
}
break;
default:
DCHECK(false) << "No next row function for type:" << tablet()->keys_type();
break;
}
// Use sparse optimization flag from ReaderParams (calculated in merger.cpp based on avg_row_bytes threshold)
_enable_sparse_optimization = read_params.enable_sparse_optimization;
// Save sample_info pointer for null count tracking
_sample_info = sample_info;
return Status::OK();
}
Status VerticalBlockReader::_direct_next_block(Block* block, bool* eof) {
auto res = _vcollect_iter->next_batch(block);
if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
return res;
}
*eof = (res.is<END_OF_FILE>());
_eof = *eof;
if (_reader_context.is_key_column_group && UNLIKELY(_reader_context.record_rowids)) {
res = _vcollect_iter->current_block_row_locations(&_block_row_locations);
if (UNLIKELY(!res.ok() && res != Status::Error<END_OF_FILE>(""))) {
return res;
}
DCHECK_EQ(_block_row_locations.size(), block->rows());
}
return Status::OK();
}
void VerticalBlockReader::_append_agg_data(MutableColumns& columns) {
_stored_row_ref.push_back(_next_row);
_last_agg_data_counter++;
// execute aggregate when have `batch_size` column or some ref invalid soon
bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
if (is_last || _stored_row_ref.size() == _reader_context.batch_size) {
_update_agg_data(columns);
}
}
void VerticalBlockReader::_update_agg_data(MutableColumns& columns) {
// copy data to stored block
size_t copy_size = _copy_agg_data();
// calculate has_null_tag
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
_stored_has_null_tag[idx] = _stored_data_columns[idx]->has_null(0, copy_size);
}
// calculate aggregate and insert
int counter_sum = 0;
for (int counter : _agg_data_counters) {
_update_agg_value(columns, counter_sum, counter_sum + counter - 1);
counter_sum += counter;
}
// some key still has value at next block, so do not insert
if (_last_agg_data_counter) {
_update_agg_value(columns, counter_sum, counter_sum + _last_agg_data_counter - 1, false);
_last_agg_data_counter = 0;
}
_agg_data_counters.clear();
}
void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin, int end,
bool is_close) {
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
AggregateFunctionPtr function = _agg_functions[idx];
AggregateDataPtr place = _agg_places[idx];
auto* column_ptr = _stored_data_columns[idx].get();
if (begin <= end) {
function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr),
_arena, _stored_has_null_tag[idx]);
}
if (is_close) {
function->insert_result_into(place, *columns[idx]);
// reset aggregate data
function->reset(place);
}
}
if (is_close) {
_arena.clear();
}
}
size_t VerticalBlockReader::_copy_agg_data() {
size_t copy_size = _stored_row_ref.size();
for (size_t i = 0; i < copy_size; i++) {
auto& ref = _stored_row_ref[i];
_temp_ref_map[ref.block.get()].emplace_back(ref.row_pos, i);
}
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
auto& dst_column = _stored_data_columns[idx];
if (_stored_has_variable_length_tag[idx]) {
//variable length type should replace ordered
dst_column->clear();
for (size_t i = 0; i < copy_size; i++) {
auto& ref = _stored_row_ref[i];
dst_column->insert_from(*ref.block->get_by_position(idx).column, ref.row_pos);
}
} else {
for (auto& it : _temp_ref_map) {
if (!it.second.empty()) {
const auto& src_column = *it.first->get_by_position(idx).column;
for (auto& pos : it.second) {
dst_column->replace_column_data(src_column, pos.first, pos.second);
}
}
}
}
}
for (auto& it : _temp_ref_map) {
it.second.clear();
}
_stored_row_ref.clear();
return copy_size;
}
Status VerticalBlockReader::_agg_key_next_block(Block* block, bool* eof) {
if (_reader_context.is_key_column_group) {
// collect_iter will filter agg keys
auto res = _vcollect_iter->next_batch(block);
if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
return res;
}
*eof = (res.is<END_OF_FILE>());
_eof = *eof;
return Status::OK();
}
// handle value agg
if (UNLIKELY(_eof)) {
*eof = true;
return Status::OK();
}
int target_block_row = 0;
auto target_columns = block->mutate_columns();
// copy first row get from collect_iter in init
_append_agg_data(target_columns);
target_block_row++;
do {
Status res = _vcollect_iter->next_row(&_next_row);
if (UNLIKELY(!res.ok())) {
if (UNLIKELY(res.is<END_OF_FILE>())) {
*eof = true;
_eof = true;
break;
}
LOG(WARNING) << "next failed: " << res;
return res;
}
DCHECK(_next_row.block->columns() == block->columns());
if (!_next_row.is_same) {
if (target_block_row == _reader_context.batch_size) {
break;
}
_agg_data_counters.push_back(_last_agg_data_counter);
_last_agg_data_counter = 0;
target_block_row++;
}
_append_agg_data(target_columns);
} while (true);
_agg_data_counters.push_back(_last_agg_data_counter);
_last_agg_data_counter = 0;
_update_agg_data(target_columns);
block->set_columns(std::move(target_columns));
return Status::OK();
}
Status VerticalBlockReader::_unique_key_next_block(Block* block, bool* eof) {
if (_reader_context.is_key_column_group) {
// Record row_source_buffer current size for key column agg flag
// _vcollect_iter->next_batch(block) will fill row_source_buffer but delete sign is ignored
// we calc delete sign column if it's base compaction and update row_sourece_buffer's agg flag
// after we get current block
VLOG_NOTICE << "reader id: " << _id
<< ", buffer size: " << _row_sources_buffer->buffered_size();
uint64_t row_source_idx = _row_sources_buffer->buffered_size();
uint64_t row_buffer_size_start = row_source_idx;
uint64_t merged_rows_start = _vcollect_iter->merged_rows();
uint64_t filtered_rows_start = _stats.rows_del_filtered;
auto res = _vcollect_iter->next_batch(block);
if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
return res;
}
if (UNLIKELY(_reader_context.record_rowids)) {
auto ret = _vcollect_iter->current_block_row_locations(&_block_row_locations);
if (UNLIKELY(!ret.ok() && !ret.is<END_OF_FILE>())) {
return res;
}
DCHECK_EQ(_block_row_locations.size(), block->rows());
}
if (_row_sources_buffer->buffered_size() < row_buffer_size_start) {
row_buffer_size_start = 0;
row_source_idx = 0;
}
size_t merged_rows_in_rs_buffer = 0;
for (uint64_t i = row_buffer_size_start; i < _row_sources_buffer->buffered_size(); i++) {
if (_row_sources_buffer->get_agg_flag(i)) {
merged_rows_in_rs_buffer++;
}
}
size_t block_rows = block->rows();
if (_delete_sign_available && block_rows > 0) {
int ori_delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN);
if (ori_delete_sign_idx < 0) {
*eof = (res.is<END_OF_FILE>());
_eof = *eof;
return Status::OK();
}
// delete sign column must store in last column of the block
int delete_sign_idx = block->columns() - 1;
DCHECK(delete_sign_idx > 0);
auto target_columns = block->mutate_columns();
MutableColumnPtr delete_filter_column = (*std::move(_delete_filter_column)).mutate();
reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->resize(block_rows);
auto* __restrict filter_data =
reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->get_data().data();
auto* __restrict delete_data =
reinterpret_cast<ColumnInt8*>(target_columns[delete_sign_idx].get())
->get_data()
.data();
int cur_row = 0;
int delete_count = 0;
while (cur_row < block_rows) {
if (_row_sources_buffer->get_agg_flag(row_source_idx)) {
row_source_idx++;
continue;
}
bool sign = (delete_data[cur_row] == 0);
filter_data[cur_row] = sign;
if (UNLIKELY(!sign)) {
_row_sources_buffer->set_agg_flag(row_source_idx, true);
if (UNLIKELY(_reader_context.record_rowids)) {
_block_row_locations[cur_row].row_id = -1;
delete_count++;
}
}
cur_row++;
row_source_idx++;
}
while (row_source_idx < _row_sources_buffer->buffered_size()) {
row_source_idx++;
}
ColumnWithTypeAndName column_with_type_and_name {_delete_filter_column,
std::make_shared<DataTypeUInt8>(),
"__DORIS_COMPACTION_FILTER__"};
block->insert(column_with_type_and_name);
RETURN_IF_ERROR(
Block::filter_block(block, target_columns.size(), target_columns.size()));
_stats.rows_del_filtered += block_rows - block->rows();
if (UNLIKELY(_reader_context.record_rowids)) {
DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count);
}
}
size_t filtered_rows_in_rs_buffer = 0;
for (auto i = row_buffer_size_start; i < _row_sources_buffer->buffered_size(); i++) {
if (_row_sources_buffer->get_agg_flag(i)) {
filtered_rows_in_rs_buffer++;
}
}
filtered_rows_in_rs_buffer -= merged_rows_in_rs_buffer;
auto merged_rows_cur_batch = _vcollect_iter->merged_rows() - merged_rows_start;
auto filtered_rows_cur_batch = _stats.rows_del_filtered - filtered_rows_start;
DCHECK_EQ(merged_rows_in_rs_buffer, merged_rows_cur_batch);
DCHECK_EQ(filtered_rows_in_rs_buffer, filtered_rows_cur_batch);
*eof = (res.is<END_OF_FILE>());
_eof = *eof;
return Status::OK();
}
// Value column processing - use batch optimization
auto target_columns = block->mutate_columns();
const size_t column_count = block->columns();
// Try to use batch optimization for value column compaction
// Only use batch optimization when sparse optimization is enabled
{
auto* mask_iter = dynamic_cast<VerticalMaskMergeIterator*>(_vcollect_iter.get());
if (mask_iter != nullptr && _enable_sparse_optimization) {
// Step 1: Batch fetch row information
std::vector<RowBatch> batches;
size_t actual_rows = 0;
RETURN_IF_ERROR(mask_iter->unique_key_next_batch(&batches, _reader_context.batch_size,
&actual_rows));
if (actual_rows == 0) {
*eof = true;
_eof = true;
return Status::OK();
}
// Step 2: Prepare columns - pre-fill NULL for fixed-width, reserve for others
std::vector<ColumnNullable*> nullable_dst_cols;
std::vector<bool> supports_replace;
_prepare_sparse_columns(target_columns, actual_rows, nullable_dst_cols,
supports_replace);
// Step 3: Process each batch
size_t dst_offset =
target_columns.empty() ? 0 : target_columns[0]->size() - actual_rows;
for (const auto& batch : batches) {
Block* src_block = batch.block.get();
DCHECK(src_block != nullptr);
DCHECK(src_block->columns() == column_count);
for (size_t col_idx = 0; col_idx < column_count; ++col_idx) {
const auto& src_col = src_block->get_by_position(col_idx).column;
_process_sparse_column(nullable_dst_cols[col_idx], supports_replace[col_idx],
target_columns[col_idx], src_col, batch, dst_offset);
}
dst_offset += batch.count;
}
block->set_columns(std::move(target_columns));
return Status::OK();
}
}
// Fallback: original row-by-row processing
int target_block_row = 0;
do {
Status res = _vcollect_iter->unique_key_next_row(&_next_row);
if (UNLIKELY(!res.ok())) {
if (UNLIKELY(res.is<END_OF_FILE>())) {
*eof = true;
_eof = true;
break;
}
LOG(WARNING) << "next failed: " << res;
return res;
}
const auto& src_block = _next_row.block;
assert(src_block->columns() == column_count);
RETURN_IF_CATCH_EXCEPTION({
for (size_t i = 0; i < column_count; ++i) {
target_columns[i]->insert_from(*(src_block->get_by_position(i).column),
_next_row.row_pos);
}
});
++target_block_row;
} while (target_block_row < _reader_context.batch_size);
block->set_columns(std::move(target_columns));
return Status::OK();
}
// Prepare columns for sparse optimization: pre-fill NULL for fixed-width types, reserve for others
void VerticalBlockReader::_prepare_sparse_columns(MutableColumns& columns, size_t actual_rows,
std::vector<ColumnNullable*>& nullable_dst_cols,
std::vector<bool>& supports_replace) {
size_t column_count = columns.size();
nullable_dst_cols.resize(column_count, nullptr);
supports_replace.resize(column_count, false);
for (size_t col_idx = 0; col_idx < column_count; ++col_idx) {
auto& col = columns[col_idx];
if (col->is_nullable()) {
auto* nullable_col =
assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(col.get());
nullable_dst_cols[col_idx] = nullable_col;
supports_replace[col_idx] = nullable_col->support_replace_column_data_range();
if (supports_replace[col_idx]) {
// Fixed-width types: pre-fill with NULL for sparse optimization
size_t new_size = nullable_col->size() + actual_rows;
nullable_col->get_null_map_column().get_data().resize_fill(new_size, 1);
nullable_col->get_nested_column().resize(new_size);
} else {
// Variable-length types: just reserve space
col->reserve(col->size() + actual_rows);
}
} else {
// Non-Nullable column, reserve space
col->reserve(col->size() + actual_rows);
}
}
}
// Copy non-NULL runs from source to destination using run-length encoding
void VerticalBlockReader::_copy_non_null_runs(ColumnNullable* nullable_dst,
const ColumnNullable* nullable_src,
const uint8_t* null_map, const RowBatch& batch,
size_t dst_offset) {
size_t i = 0;
while (i < batch.count) {
// Skip NULL values (null_map == 1)
while (i < batch.count && null_map[batch.start_row + i] != 0) {
i++;
}
if (i >= batch.count) break;
// Found start of non-NULL run
size_t run_start = i;
while (i < batch.count && null_map[batch.start_row + i] == 0) {
i++;
}
size_t run_length = i - run_start;
// Batch copy this non-NULL run
nullable_dst->replace_column_data_range(*nullable_src, batch.start_row + run_start,
run_length, dst_offset + run_start);
}
}
// Process a single column for one batch in sparse optimization
void VerticalBlockReader::_process_sparse_column(ColumnNullable* nullable_dst,
bool supports_replace,
MutableColumnPtr& target_col,
const ColumnPtr& src_col, const RowBatch& batch,
size_t dst_offset) {
if (nullable_dst != nullptr && supports_replace) {
// Sparse optimization path for fixed-width types
const auto* nullable_src = static_cast<const ColumnNullable*>(src_col.get());
const auto& null_map = nullable_src->get_null_map_data();
size_t non_null_count = simd::count_zero_num(
reinterpret_cast<const int8_t*>(null_map.data() + batch.start_row),
static_cast<size_t>(batch.count));
if (_sample_info != nullptr) {
_sample_info->null_count += (batch.count - non_null_count);
}
if (non_null_count == 0) {
// All NULL, skip (already pre-filled)
} else if (non_null_count == batch.count) {
// All non-NULL, use batch copy
nullable_dst->replace_column_data_range(*nullable_src, batch.start_row, batch.count,
dst_offset);
} else {
// Mixed case: copy non-NULL runs
_copy_non_null_runs(nullable_dst, nullable_src, null_map.data(), batch, dst_offset);
}
} else if (nullable_dst != nullptr) {
// Variable-length nullable types
if (_sample_info != nullptr) {
const auto* nullable_src = static_cast<const ColumnNullable*>(src_col.get());
const auto& null_map = nullable_src->get_null_map_data();
size_t non_null_count = simd::count_zero_num(
reinterpret_cast<const int8_t*>(null_map.data() + batch.start_row),
static_cast<size_t>(batch.count));
_sample_info->null_count += (batch.count - non_null_count);
}
target_col->insert_range_from(*src_col, batch.start_row, batch.count);
} else {
// Non-Nullable column
target_col->insert_range_from(*src_col, batch.start_row, batch.count);
}
}
#include "common/compile_check_end.h"
} // namespace doris::vectorized