blob: 80e36434f0880b1d9c7e7300b8ce6a958862b310 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <utility>
#include <vector>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "core/block/block.h"
#include "core/column/column.h"
#include "core/data_type/data_type.h"
#include "exprs/aggregate/aggregate_function.h"
#include "storage/iterators.h"
#include "storage/tablet/tablet.h"
#include "storage/tablet/tablet_reader.h"
#include "storage/utils.h"
#pragma once
namespace doris {
struct RowsetId;
class RowSourcesBuffer;
struct RowBatch;
class VerticalBlockReader final : public TabletReader {
public:
VerticalBlockReader(RowSourcesBuffer* row_sources_buffer)
: _row_sources_buffer(row_sources_buffer) {
_id = nextId++;
}
~VerticalBlockReader() override;
// Initialize VerticalBlockReader with tablet, data version and fetch range.
Status init(const ReaderParams& read_params) override;
Status init(const ReaderParams& read_params, CompactionSampleInfo* sample_info);
Status next_block_with_aggregation(Block* block, bool* eof) override;
uint64_t merged_rows() const override {
DCHECK(_vcollect_iter);
return _vcollect_iter->merged_rows();
}
std::vector<RowLocation> current_block_row_locations() { return _block_row_locations; }
static uint64_t nextId;
private:
// Directly read row from rowset and pass to upper caller. No need to do aggregation.
// This is usually used for DUPLICATE KEY tables
Status _direct_next_block(Block* block, bool* eof);
// For normal AGGREGATE KEY tables, read data by a merge heap.
Status _agg_key_next_block(Block* block, bool* eof);
// For UNIQUE KEY tables, read data by a merge heap.
// The difference from _agg_key_next_block is that it will read the data from high version to low version,
// to minimize the comparison time in merge heap.
Status _unique_key_next_block(Block* block, bool* eof);
Status _init_collect_iter(const ReaderParams& read_params, CompactionSampleInfo* sample_info);
Status _get_segment_iterators(const ReaderParams& read_params,
std::vector<RowwiseIteratorUPtr>* segment_iters,
std::vector<bool>* iterator_init_flag,
std::vector<RowsetId>* rowset_ids);
void _init_agg_state(const ReaderParams& read_params);
void _append_agg_data(MutableColumns& columns);
void _update_agg_data(MutableColumns& columns);
size_t _copy_agg_data();
void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true);
// Helper functions for sparse column optimization
void _prepare_sparse_columns(MutableColumns& columns, size_t actual_rows,
std::vector<ColumnNullable*>& nullable_dst_cols,
std::vector<bool>& supports_replace);
void _process_sparse_column(ColumnNullable* nullable_dst, bool supports_replace,
MutableColumnPtr& target_col, const ColumnPtr& src_col,
const RowBatch& batch, size_t dst_offset);
void _copy_non_null_runs(ColumnNullable* nullable_dst, const ColumnNullable* nullable_src,
const uint8_t* null_map, const RowBatch& batch, size_t dst_offset);
private:
size_t _id;
std::shared_ptr<RowwiseIterator> _vcollect_iter;
IteratorRowRef _next_row {{}, -1, false};
bool _eof = false;
Status (VerticalBlockReader::*_next_block_func)(Block* block, bool* eof) = nullptr;
RowSourcesBuffer* _row_sources_buffer;
ColumnPtr _delete_filter_column;
// for agg mode
std::vector<AggregateFunctionPtr> _agg_functions;
std::vector<AggregateDataPtr> _agg_places;
Arena _arena;
std::vector<int> _normal_columns_idx;
std::vector<int> _agg_columns_idx;
std::vector<int> _agg_data_counters;
int _last_agg_data_counter = 0;
MutableColumns _stored_data_columns;
std::vector<IteratorRowRef> _stored_row_ref;
std::vector<bool> _stored_has_null_tag;
std::vector<bool> _stored_has_variable_length_tag;
phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>> _temp_ref_map;
std::vector<RowLocation> _block_row_locations;
// For sparse column compaction optimization
// Set from reader_params.enable_sparse_optimization (calculated in Merger::vertical_merge_rowsets)
bool _enable_sparse_optimization = false;
// For tracking NULL cell count during compaction (used for sparse optimization threshold)
CompactionSampleInfo* _sample_info = nullptr;
};
} // namespace doris