| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/olap/vertical_merge_iterator.h" |
| |
| #include <fcntl.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <stdlib.h> |
| |
| #include <cstddef> |
| #include <ostream> |
| |
| #include "cloud/config.h" |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "io/cache/block_file_cache_factory.h" |
| #include "olap/field.h" |
| #include "olap/iterators.h" |
| #include "olap/olap_common.h" |
| #include "vec/columns/column.h" |
| #include "vec/common/string_ref.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/core/types.h" |
| #include "vec/data_types/data_type.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| using namespace ErrorCode; |
| |
| namespace vectorized { |
| |
| // -------------- row source ---------------// |
| RowSource::RowSource(uint16_t source_num, bool agg_flag) { |
| _data = (source_num & SOURCE_FLAG) | (source_num & AGG_FLAG); |
| _data = agg_flag ? (_data | AGG_FLAG) : (_data & SOURCE_FLAG); |
| } |
| |
| uint16_t RowSource::get_source_num() const { |
| return _data & SOURCE_FLAG; |
| } |
| |
| bool RowSource::agg_flag() const { |
| return (_data & AGG_FLAG) != 0; |
| } |
| |
| void RowSource::set_agg_flag(bool agg_flag) { |
| _data = agg_flag ? (_data | AGG_FLAG) : (_data & SOURCE_FLAG); |
| } |
| |
| uint16_t RowSource::data() const { |
| return _data; |
| } |
| |
| /* -------------- row source buffer ------------- */ |
| |
| // current row_sources must save in memory so agg key can update agg flag |
| Status RowSourcesBuffer::append(const std::vector<RowSource>& row_sources) { |
| if (_buffer.allocated_bytes() + row_sources.size() * sizeof(UInt16) > |
| config::vertical_compaction_max_row_source_memory_mb * 1024 * 1024) { |
| if (_buffer.allocated_bytes() - _buffer.size() * sizeof(UInt16) < |
| row_sources.size() * sizeof(UInt16)) { |
| VLOG_NOTICE << "RowSourceBuffer is too large, serialize and reset buffer: " |
| << _buffer.allocated_bytes() << ", total size: " << _total_size; |
| // serialize current buffer |
| RETURN_IF_ERROR(_create_buffer_file()); |
| RETURN_IF_ERROR(_serialize()); |
| _reset_buffer(); |
| } |
| } |
| for (const auto& source : row_sources) { |
| _buffer.push_back(source.data()); |
| } |
| _total_size += row_sources.size(); |
| return Status::OK(); |
| } |
| |
| Status RowSourcesBuffer::seek_to_begin() { |
| _buf_idx = 0; |
| if (_fd > 0) { |
| auto offset = lseek(_fd, 0, SEEK_SET); |
| if (offset != 0) { |
| LOG(WARNING) << "failed to seek to 0"; |
| return Status::InternalError("failed to seek to 0"); |
| } |
| _reset_buffer(); |
| } |
| return Status::OK(); |
| } |
| |
| Status RowSourcesBuffer::has_remaining() { |
| if (_buf_idx < _buffer.size()) { |
| return Status::OK(); |
| } |
| DCHECK(_buf_idx == _buffer.size()); |
| if (_fd > 0) { |
| _reset_buffer(); |
| auto st = _deserialize(); |
| if (!st.ok()) { |
| return st; |
| } |
| return Status::OK(); |
| } |
| return Status::EndOfFile("end of row source buffer"); |
| } |
| |
| void RowSourcesBuffer::set_agg_flag(uint64_t index, bool agg) { |
| DCHECK(index < _buffer.size()); |
| RowSource ori(_buffer[index]); |
| ori.set_agg_flag(agg); |
| _buffer[index] = ori.data(); |
| } |
| |
| bool RowSourcesBuffer::get_agg_flag(uint64_t index) { |
| DCHECK(index < _buffer.size()); |
| RowSource ori(_buffer[index]); |
| return ori.agg_flag(); |
| } |
| |
| size_t RowSourcesBuffer::continuous_agg_count(uint64_t index) { |
| size_t result = 1; |
| int64_t start = index + 1; |
| int64_t end = _buffer.size(); |
| while (start < end) { |
| RowSource next(_buffer[start++]); |
| if (next.agg_flag()) { |
| ++result; |
| } else { |
| break; |
| } |
| } |
| return result; |
| } |
| |
| size_t RowSourcesBuffer::same_source_count(uint16_t source, size_t limit) { |
| int result = 1; |
| int64_t start = _buf_idx + 1; |
| int64_t end = _buffer.size(); |
| while (result < limit && start < end) { |
| RowSource next(_buffer[start++]); |
| if (source != next.get_source_num()) { |
| break; |
| } |
| ++result; |
| } |
| return result; |
| } |
| |
| Status RowSourcesBuffer::_create_buffer_file() { |
| if (_fd != -1) { |
| return Status::OK(); |
| } |
| std::stringstream file_path_ss; |
| if (config::is_cloud_mode()) { |
| std::vector<std::string> paths = io::FileCacheFactory::instance()->get_base_paths(); |
| if (paths.empty()) { |
| return Status::InternalError("fail to create write buffer due to missing cache path"); |
| } |
| std::size_t hash_val = std::hash<int64_t> {}(_tablet_id); |
| int64_t idx = hash_val % paths.size(); |
| file_path_ss << paths[idx] << "/compaction_row_source_" << _tablet_id; |
| } else { |
| file_path_ss << _tablet_path << "/compaction_row_source_" << _tablet_id; |
| } |
| |
| if (_reader_type == ReaderType::READER_BASE_COMPACTION) { |
| file_path_ss << "_base"; |
| } else if (_reader_type == ReaderType::READER_CUMULATIVE_COMPACTION || |
| _reader_type == ReaderType::READER_SEGMENT_COMPACTION) { |
| file_path_ss << "_cumu"; |
| } else if (_reader_type == ReaderType::READER_FULL_COMPACTION) { |
| file_path_ss << "_full"; |
| } else if (_reader_type == ReaderType::READER_COLD_DATA_COMPACTION) { |
| file_path_ss << "_cold"; |
| } else { |
| DCHECK(false); |
| return Status::InternalError("unknown reader type"); |
| } |
| file_path_ss << ".XXXXXX"; |
| std::string file_path = file_path_ss.str(); |
| LOG(INFO) << "Vertical compaction row sources buffer path: " << file_path; |
| _fd = mkstemp(file_path.data()); |
| if (_fd < 0) { |
| LOG(WARNING) << "failed to create tmp file, file_path=" << file_path |
| << ", err: " << strerror(errno); |
| return Status::InternalError("failed to create tmp file "); |
| } |
| // file will be released after fd is close |
| unlink(file_path.data()); |
| return Status::OK(); |
| } |
| |
| Status RowSourcesBuffer::flush() { |
| if (_fd > 0 && !_buffer.empty()) { |
| RETURN_IF_ERROR(_serialize()); |
| _reset_buffer(); |
| } |
| return Status::OK(); |
| } |
| |
| Status RowSourcesBuffer::_serialize() { |
| size_t rows = _buffer.size(); |
| if (rows == 0) { |
| return Status::OK(); |
| } |
| // write size |
| ssize_t bytes_written = ::write(_fd, &rows, sizeof(rows)); |
| if (bytes_written != sizeof(size_t)) { |
| LOG(WARNING) << "failed to write buffer size to file, bytes_written=" << bytes_written; |
| return Status::InternalError("fail to write buffer size to file"); |
| } |
| // write data |
| bytes_written = ::write(_fd, _buffer.data(), _buffer.size() * sizeof(UInt16)); |
| if (bytes_written != _buffer.size() * sizeof(UInt16)) { |
| LOG(WARNING) << "failed to write buffer data to file, bytes_written=" << bytes_written |
| << " buffer size=" << _buffer.size() * sizeof(UInt16); |
| return Status::InternalError("fail to write buffer size to file"); |
| } |
| return Status::OK(); |
| } |
| |
| Status RowSourcesBuffer::_deserialize() { |
| size_t rows = 0; |
| ssize_t bytes_read = ::read(_fd, &rows, sizeof(rows)); |
| if (bytes_read == 0) { |
| LOG(WARNING) << "end of row source buffer file"; |
| return Status::EndOfFile("end of row source buffer file"); |
| } else if (bytes_read != sizeof(size_t)) { |
| LOG(WARNING) << "failed to read buffer size from file, bytes_read=" << bytes_read; |
| return Status::InternalError("failed to read buffer size from file"); |
| } |
| _buffer.resize(rows); |
| auto& internal_data = _buffer; |
| bytes_read = ::read(_fd, internal_data.data(), rows * sizeof(UInt16)); |
| if (bytes_read != rows * sizeof(UInt16)) { |
| LOG(WARNING) << "failed to read buffer data from file, bytes_read=" << bytes_read |
| << ", expect bytes=" << rows * sizeof(UInt16); |
| return Status::InternalError("failed to read buffer data from file"); |
| } |
| return Status::OK(); |
| } |
| |
| // ---------- vertical merge iterator context ----------// |
| Status VerticalMergeIteratorContext::block_reset(const std::shared_ptr<Block>& block) { |
| if (!block->columns()) { |
| const Schema& schema = _iter->schema(); |
| const auto& column_ids = schema.column_ids(); |
| for (size_t i = 0; i < schema.num_column_ids(); ++i) { |
| auto column_desc = schema.column(column_ids[i]); |
| auto data_type = Schema::get_data_type_ptr(*column_desc); |
| if (data_type == nullptr) { |
| return Status::RuntimeError("invalid data type"); |
| } |
| auto column = data_type->create_column(); |
| column->reserve(_block_row_max); |
| block->insert(ColumnWithTypeAndName(std::move(column), data_type, column_desc->name())); |
| } |
| } else { |
| block->clear_column_data(); |
| } |
| return Status::OK(); |
| } |
| |
| bool VerticalMergeIteratorContext::compare(const VerticalMergeIteratorContext& rhs) const { |
| int cmp_res; |
| if (_key_group_cluster_key_idxes.empty()) { |
| cmp_res = _block->compare_at(_index_in_block, rhs._index_in_block, _num_key_columns, |
| *rhs._block, -1); |
| } else { |
| cmp_res = _block->compare_at(_index_in_block, rhs._index_in_block, |
| &_key_group_cluster_key_idxes, *rhs._block, -1); |
| } |
| if (cmp_res != 0) { |
| return cmp_res > 0; |
| } |
| auto col_cmp_res = 0; |
| if (_seq_col_idx != -1) { |
| DCHECK(_block->columns() >= _num_key_columns); |
| auto real_seq_idx = _num_key_columns; |
| col_cmp_res = _block->compare_column_at(_index_in_block, rhs._index_in_block, real_seq_idx, |
| *rhs._block, -1); |
| } |
| auto result = (col_cmp_res == 0) ? (_order < rhs.order()) : (col_cmp_res < 0); |
| result ? set_is_same(true) : rhs.set_is_same(true); |
| return result; |
| } |
| |
| Status VerticalMergeIteratorContext::copy_rows(Block* block, size_t count) { |
| Block& src = *_block; |
| Block& dst = *block; |
| DCHECK(count > 0); |
| |
| auto start = _index_in_block; |
| _index_in_block += count - 1; |
| RETURN_IF_CATCH_EXCEPTION({ |
| for (size_t i = 0; i < _ori_return_cols; ++i) { |
| auto& s_col = src.get_by_position(i); |
| auto& d_col = dst.get_by_position(i); |
| |
| ColumnPtr& s_cp = s_col.column; |
| ColumnPtr& d_cp = d_col.column; |
| |
| d_cp->assume_mutable()->insert_range_from(*s_cp, start, count); |
| } |
| }); |
| return Status::OK(); |
| } |
| // `advanced = false` when current block finished |
| Status VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) { |
| Block& src = *_block; |
| Block& dst = *block; |
| if (_cur_batch_num == 0) { |
| return Status::OK(); |
| } |
| |
| // copy a row to dst block column by column |
| size_t start = _index_in_block - _cur_batch_num + 1 - advanced; |
| RETURN_IF_CATCH_EXCEPTION({ |
| for (size_t i = 0; i < _ori_return_cols; ++i) { |
| auto& s_col = src.get_by_position(i); |
| auto& d_col = dst.get_by_position(i); |
| |
| ColumnPtr& s_cp = s_col.column; |
| ColumnPtr& d_cp = d_col.column; |
| |
| d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num); |
| } |
| }); |
| _cur_batch_num = 0; |
| return Status::OK(); |
| } |
| |
| Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts, |
| CompactionSampleInfo* sample_info) { |
| if (LIKELY(_inited)) { |
| return Status::OK(); |
| } |
| _block_row_max = opts.block_row_max; |
| _record_rowids = opts.record_rowids; |
| RETURN_IF_ERROR(_load_next_block()); |
| if (sample_info != nullptr) { |
| sample_info->bytes += bytes(); |
| sample_info->rows += rows(); |
| } |
| if (valid()) { |
| RETURN_IF_ERROR(advance()); |
| } |
| _inited = true; |
| return Status::OK(); |
| } |
| |
| Status VerticalMergeIteratorContext::advance() { |
| // NOTE: we increase _index_in_block directly to valid one check |
| _is_same = false; |
| do { |
| _index_in_block++; |
| if (LIKELY(_index_in_block < _block->rows())) { |
| return Status::OK(); |
| } |
| // current batch has no data, load next batch |
| RETURN_IF_ERROR(_load_next_block()); |
| } while (_valid); |
| return Status::OK(); |
| } |
| |
| Status VerticalMergeIteratorContext::_load_next_block() { |
| do { |
| if (_block != nullptr) { |
| _block_list.push_back(_block); |
| _block = nullptr; |
| } |
| for (auto it = _block_list.begin(); it != _block_list.end(); it++) { |
| if (it->use_count() == 1) { |
| RETURN_IF_ERROR(block_reset(*it)); |
| _block = *it; |
| _block_list.erase(it); |
| break; |
| } |
| } |
| if (_block == nullptr) { |
| _block = std::make_shared<Block>(); |
| RETURN_IF_ERROR(block_reset(_block)); |
| } |
| Status st = _iter->next_batch(_block.get()); |
| if (!st.ok()) { |
| _valid = false; |
| if (st.is<END_OF_FILE>()) { |
| // When reading to the end of the segment file, clearing the block did not release the memory. |
| // Directly releasing the block to free up memory. |
| _block.reset(); |
| // When reading through segment file for columns that are dictionary encoded, |
| // the column iterator in the segment iterator will hold the dictionary. |
| // Release the segment iterator to free up the dictionary. |
| _iter.reset(); |
| return Status::OK(); |
| } else { |
| return st; |
| } |
| } |
| // erase delete handler columns |
| if (_block->columns() > _ori_return_cols) { |
| for (auto i = _block->columns() - 1; i >= _ori_return_cols; --i) { |
| _block->erase(i); |
| } |
| } |
| if (UNLIKELY(_record_rowids)) { |
| RETURN_IF_ERROR(_iter->current_block_row_locations(&_block_row_locations)); |
| for (auto i = 0; i < _block_row_locations.size(); i++) { |
| RowLocation& row_location = _block_row_locations[i]; |
| _block_row_locations[i] = |
| RowLocation(_rowset_id, row_location.segment_id, row_location.row_id); |
| } |
| } |
| } while (_block->rows() == 0); |
| _index_in_block = -1; |
| _valid = true; |
| return Status::OK(); |
| } |
| |
| // ---------------- VerticalHeapMergeIterator ------------- // |
| Status VerticalHeapMergeIterator::next_batch(Block* block) { |
| size_t row_idx = 0; |
| VerticalMergeIteratorContext* pre_ctx = nullptr; |
| std::vector<RowSource> tmp_row_sources; |
| if (UNLIKELY(_record_rowids)) { |
| _block_row_locations.resize(_block_row_max); |
| } |
| while (_get_size(block) < _block_row_max) { |
| if (_merge_heap.empty()) { |
| VLOG_NOTICE << "_merge_heap empty"; |
| break; |
| } |
| |
| auto ctx = _merge_heap.top(); |
| _merge_heap.pop(); |
| if (ctx->is_same()) { |
| tmp_row_sources.emplace_back(ctx->order(), true); |
| } else { |
| tmp_row_sources.emplace_back(ctx->order(), false); |
| } |
| if (ctx->is_same() && |
| ((_keys_type == KeysType::UNIQUE_KEYS && _key_group_cluster_key_idxes.empty()) || |
| _keys_type == KeysType::AGG_KEYS)) { |
| // skip cur row, copy pre ctx |
| ++_merged_rows; |
| if (pre_ctx) { |
| RETURN_IF_ERROR(pre_ctx->copy_rows(block)); |
| pre_ctx = nullptr; |
| } |
| } else { |
| ctx->add_cur_batch(); |
| if (pre_ctx != ctx) { |
| if (pre_ctx) { |
| RETURN_IF_ERROR(pre_ctx->copy_rows(block)); |
| } |
| pre_ctx = ctx; |
| } |
| if (UNLIKELY(_record_rowids)) { |
| _block_row_locations[row_idx] = ctx->current_row_location(); |
| } |
| row_idx++; |
| if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) { |
| // current block finished, ctx not advance |
| // so copy start_idx = (_index_in_block - _cur_batch_num + 1) |
| RETURN_IF_ERROR(ctx->copy_rows(block, false)); |
| pre_ctx = nullptr; |
| } |
| } |
| |
| RETURN_IF_ERROR(ctx->advance()); |
| if (ctx->valid()) { |
| _merge_heap.push(ctx); |
| } else { |
| // push next iterator in same rowset into heap |
| size_t cur_order = ctx->order(); |
| for (size_t next_order = cur_order + 1; |
| next_order < _iterator_init_flags.size() && !_iterator_init_flags[next_order]; |
| ++next_order) { |
| auto& next_ctx = _ori_iter_ctx[next_order]; |
| DCHECK(next_ctx); |
| RETURN_IF_ERROR(next_ctx->init(_opts)); |
| if (next_ctx->valid()) { |
| _merge_heap.push(next_ctx.get()); |
| break; |
| } |
| // next_ctx is empty segment, move to next |
| next_ctx.reset(); |
| } |
| // Release ctx earlier to reduce resource consumed |
| _ori_iter_ctx[cur_order].reset(); |
| } |
| } |
| RETURN_IF_ERROR(_row_sources_buf->append(tmp_row_sources)); |
| if (!_merge_heap.empty()) { |
| return Status::OK(); |
| } |
| if (UNLIKELY(_record_rowids)) { |
| _block_row_locations.resize(row_idx); |
| } |
| return Status::EndOfFile("no more data in segment"); |
| } |
| |
| Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts, |
| CompactionSampleInfo* sample_info) { |
| DCHECK(_origin_iters.size() == _iterator_init_flags.size()); |
| _record_rowids = opts.record_rowids; |
| if (_origin_iters.empty()) { |
| return Status::OK(); |
| } |
| _schema = &(*_origin_iters.begin())->schema(); |
| |
| size_t num_iters = _origin_iters.size(); |
| for (size_t seg_order = 0; seg_order < num_iters; ++seg_order) { |
| auto& iter = _origin_iters[seg_order]; |
| auto ctx = std::make_unique<VerticalMergeIteratorContext>( |
| std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx, |
| _key_group_cluster_key_idxes); |
| _ori_iter_ctx.push_back(std::move(ctx)); |
| } |
| _origin_iters.clear(); |
| |
| // Init contxt depends on _iterator_init_flags |
| // for example, the vector is [1,0,0,1,1], mean that order 0,3,4 iterator needs |
| // to be inited and [0-2] is in same rowset. |
| // Notice: if iterator[0] is empty it will be invalid when init succeed, but it |
| // will not be pushed into heap, we should init next one util we find a valid iter |
| // so this rowset can work in heap |
| bool pre_iter_invalid = false; |
| for (size_t i = 0; i < num_iters; ++i) { |
| if (_iterator_init_flags[i] || pre_iter_invalid) { |
| auto& ctx = _ori_iter_ctx[i]; |
| RETURN_IF_ERROR(ctx->init(opts, sample_info)); |
| if (!ctx->valid()) { |
| pre_iter_invalid = true; |
| continue; |
| } |
| _merge_heap.push(ctx.get()); |
| pre_iter_invalid = false; |
| } |
| } |
| |
| _opts = opts; |
| _block_row_max = opts.block_row_max; |
| return Status::OK(); |
| } |
| |
| // ---------------- VerticalFifoMergeIterator ------------- // |
| Status VerticalFifoMergeIterator::next_batch(Block* block) { |
| size_t row_idx = 0; |
| std::vector<RowSource> tmp_row_sources; |
| if (UNLIKELY(_record_rowids)) { |
| _block_row_locations.resize(_block_row_max); |
| } |
| while (_get_size(block) < _block_row_max) { |
| if (_cur_iter_ctx == nullptr) { |
| VLOG_NOTICE << "_merge_list empty"; |
| break; |
| } |
| |
| tmp_row_sources.emplace_back(_cur_iter_ctx->order(), false); |
| |
| // Fifo only for duplicate no key |
| _cur_iter_ctx->add_cur_batch(); |
| if (UNLIKELY(_record_rowids)) { |
| _block_row_locations[row_idx] = _cur_iter_ctx->current_row_location(); |
| } |
| row_idx++; |
| if (_cur_iter_ctx->is_cur_block_finished() || row_idx >= _block_row_max) { |
| // current block finished, ctx not advance |
| // so copy start_idx = (_index_in_block - _cur_batch_num + 1) |
| RETURN_IF_ERROR(_cur_iter_ctx->copy_rows(block, false)); |
| } |
| |
| RETURN_IF_ERROR(_cur_iter_ctx->advance()); |
| if (!_cur_iter_ctx->valid()) { |
| // take the ownership of _cur_iter_ctx. |
| std::unique_ptr<VerticalMergeIteratorContext> ctx(_cur_iter_ctx.release()); |
| // push next iterator in same rowset into heap |
| for (auto cur_order = ctx->order() + 1; cur_order < _iterator_init_flags.size(); |
| cur_order++) { |
| auto& next_iter = _origin_iters[cur_order]; |
| std::unique_ptr<VerticalMergeIteratorContext> next_ctx( |
| new VerticalMergeIteratorContext(std::move(next_iter), |
| _rowset_ids[cur_order], _ori_return_cols, |
| cur_order, _seq_col_idx)); |
| RETURN_IF_ERROR(next_ctx->init(_opts)); |
| if (next_ctx->valid()) { |
| _cur_iter_ctx.swap(next_ctx); |
| break; |
| } |
| } |
| // ctx resource will release automated. |
| } |
| } |
| RETURN_IF_ERROR(_row_sources_buf->append(tmp_row_sources)); |
| if (_cur_iter_ctx) { |
| return Status::OK(); |
| } |
| if (UNLIKELY(_record_rowids)) { |
| _block_row_locations.resize(row_idx); |
| } |
| return Status::EndOfFile("no more data in segment"); |
| } |
| |
| Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts, |
| CompactionSampleInfo* sample_info) { |
| DCHECK(_origin_iters.size() == _iterator_init_flags.size()); |
| DCHECK(_keys_type == KeysType::DUP_KEYS); |
| _record_rowids = opts.record_rowids; |
| if (_origin_iters.empty()) { |
| return Status::OK(); |
| } |
| _schema = &(*_origin_iters.begin())->schema(); |
| |
| auto seg_order = 0; |
| // Init contxt depends on _iterator_init_flags |
| // for example, the vector is [1,0,0,1,1], mean that order 0,3,4 iterator needs |
| // to be inited and [0-2] is in same rowset. |
| // Notice: if iterator[0] is empty it will be invalid when init succeed, but it |
| // will not be pushed into heap, we should init next one util we find a valid iter |
| // so this rowset can work in heap |
| for (auto& iter : _origin_iters) { |
| std::unique_ptr<VerticalMergeIteratorContext> ctx( |
| new VerticalMergeIteratorContext(std::move(iter), _rowset_ids[seg_order], |
| _ori_return_cols, seg_order, _seq_col_idx)); |
| RETURN_IF_ERROR(ctx->init(opts, sample_info)); |
| if (!ctx->valid()) { |
| ++seg_order; |
| continue; |
| } |
| ++seg_order; |
| _cur_iter_ctx.swap(ctx); |
| break; |
| } |
| |
| _opts = opts; |
| _block_row_max = opts.block_row_max; |
| return Status::OK(); |
| } |
| |
| // ---------------- VerticalMaskMergeIterator ------------- // |
| Status VerticalMaskMergeIterator::check_all_iter_finished() { |
| for (auto& iter : _origin_iter_ctx) { |
| if (iter->inited()) { |
| if (iter->valid()) { |
| RETURN_IF_ERROR(iter->advance()); |
| } |
| DCHECK(!iter->valid()); |
| } |
| } |
| return Status::OK(); |
| } |
| Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) { |
| DCHECK(_row_sources_buf); |
| auto st = _row_sources_buf->has_remaining(); |
| if (!st.ok()) { |
| if (st.is<END_OF_FILE>()) { |
| RETURN_IF_ERROR(check_all_iter_finished()); |
| } |
| return st; |
| } |
| |
| auto row_source = _row_sources_buf->current(); |
| uint16_t order = row_source.get_source_num(); |
| auto& ctx = _origin_iter_ctx[order]; |
| // init ctx and this ctx must be valid |
| RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); |
| DCHECK(ctx->valid()); |
| |
| if (UNLIKELY(ctx->is_first_row())) { |
| // first row in block, don't call ctx->advance |
| // Except first row, we call advance first and than get cur row |
| ctx->set_cur_row_ref(ref); |
| ref->is_same = row_source.agg_flag(); |
| if (ref->is_same) { |
| _filtered_rows++; |
| } |
| |
| ctx->set_is_first_row(false); |
| _row_sources_buf->advance(); |
| return Status::OK(); |
| } |
| RETURN_IF_ERROR(ctx->advance()); |
| ctx->set_cur_row_ref(ref); |
| ref->is_same = row_source.agg_flag(); |
| if (ref->is_same) { |
| _filtered_rows++; |
| } |
| |
| _row_sources_buf->advance(); |
| return Status::OK(); |
| } |
| |
| Status VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef* ref) { |
| DCHECK(_row_sources_buf); |
| auto st = _row_sources_buf->has_remaining(); |
| while (st.ok()) { |
| auto row_source = _row_sources_buf->current(); |
| uint16_t order = row_source.get_source_num(); |
| auto& ctx = _origin_iter_ctx[order]; |
| RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); |
| DCHECK(ctx->valid()); |
| if (!ctx->valid()) { |
| LOG(INFO) << "VerticalMergeIteratorContext not valid"; |
| return Status::InternalError("VerticalMergeIteratorContext not valid"); |
| } |
| |
| if (UNLIKELY(ctx->is_first_row()) && !row_source.agg_flag()) { |
| // first row in block, don't call ctx->advance |
| // Except first row, we call advance first and than get cur row |
| ctx->set_cur_row_ref(ref); |
| ctx->set_is_first_row(false); |
| _row_sources_buf->advance(); |
| return Status::OK(); |
| } |
| RETURN_IF_ERROR(ctx->advance()); |
| _row_sources_buf->advance(); |
| if (!row_source.agg_flag()) { |
| ctx->set_cur_row_ref(ref); |
| return Status::OK(); |
| } |
| _filtered_rows++; |
| st = _row_sources_buf->has_remaining(); |
| } |
| |
| if (st.is<END_OF_FILE>()) { |
| RETURN_IF_ERROR(check_all_iter_finished()); |
| } |
| return st; |
| } |
| |
| Status VerticalMaskMergeIterator::next_batch(Block* block) { |
| DCHECK(_row_sources_buf); |
| size_t rows = 0; |
| auto st = _row_sources_buf->has_remaining(); |
| while (rows < _block_row_max && st.ok()) { |
| uint16_t order = _row_sources_buf->current().get_source_num(); |
| DCHECK(order < _origin_iter_ctx.size()); |
| auto& ctx = _origin_iter_ctx[order]; |
| RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); |
| DCHECK(ctx->valid()); |
| if (!ctx->valid()) { |
| LOG(INFO) << "VerticalMergeIteratorContext not valid"; |
| return Status::InternalError("VerticalMergeIteratorContext not valid"); |
| } |
| |
| // find max same source count in cur ctx |
| size_t limit = std::min(ctx->remain_rows(), _block_row_max - rows); |
| auto same_source_cnt = _row_sources_buf->same_source_count(order, limit); |
| _row_sources_buf->advance(same_source_cnt); |
| // copy rows to block |
| RETURN_IF_ERROR(ctx->copy_rows(block, same_source_cnt)); |
| RETURN_IF_ERROR(ctx->advance()); |
| rows += same_source_cnt; |
| st = _row_sources_buf->has_remaining(); |
| } |
| if (st.is<END_OF_FILE>()) { |
| RETURN_IF_ERROR(check_all_iter_finished()); |
| } |
| return st; |
| } |
| |
| Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts, |
| CompactionSampleInfo* sample_info) { |
| if (_origin_iters.empty()) { |
| return Status::OK(); |
| } |
| _schema = &(*_origin_iters.begin())->schema(); |
| _opts = opts; |
| |
| RowsetId rs_id; |
| for (auto& iter : _origin_iters) { |
| auto ctx = std::make_unique<VerticalMergeIteratorContext>(std::move(iter), rs_id, |
| _ori_return_cols, -1, -1); |
| _origin_iter_ctx.push_back(std::move(ctx)); |
| } |
| _origin_iters.clear(); |
| |
| _sample_info = sample_info; |
| _block_row_max = opts.block_row_max; |
| return Status::OK(); |
| } |
| |
| // interfaces to create vertical merge iterator |
| std::shared_ptr<RowwiseIterator> new_vertical_heap_merge_iterator( |
| std::vector<RowwiseIteratorUPtr>&& inputs, const std::vector<bool>& iterator_init_flag, |
| const std::vector<RowsetId>& rowset_ids, size_t ori_return_cols, KeysType keys_type, |
| uint32_t seq_col_idx, RowSourcesBuffer* row_sources, |
| std::vector<uint32_t> key_group_cluster_key_idxes) { |
| return std::make_shared<VerticalHeapMergeIterator>( |
| std::move(inputs), iterator_init_flag, rowset_ids, ori_return_cols, keys_type, |
| seq_col_idx, row_sources, key_group_cluster_key_idxes); |
| } |
| |
| std::shared_ptr<RowwiseIterator> new_vertical_fifo_merge_iterator( |
| std::vector<RowwiseIteratorUPtr>&& inputs, const std::vector<bool>& iterator_init_flag, |
| const std::vector<RowsetId>& rowset_ids, size_t ori_return_cols, KeysType keys_type, |
| uint32_t seq_col_idx, RowSourcesBuffer* row_sources) { |
| return std::make_shared<VerticalFifoMergeIterator>(std::move(inputs), iterator_init_flag, |
| rowset_ids, ori_return_cols, keys_type, |
| seq_col_idx, row_sources); |
| } |
| |
| std::shared_ptr<RowwiseIterator> new_vertical_mask_merge_iterator( |
| std::vector<RowwiseIteratorUPtr>&& inputs, size_t ori_return_cols, |
| RowSourcesBuffer* row_sources) { |
| return std::make_shared<VerticalMaskMergeIterator>(std::move(inputs), ori_return_cols, |
| row_sources); |
| } |
| |
| } // namespace vectorized |
| #include "common/compile_check_end.h" |
| } // namespace doris |