blob: 7741e1078b7de34db330060b989bad107ab409c1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/delete_bitmap_calculator.h"
#include <cstdint>
#include "common/cast_set.h"
#include "common/status.h"
#include "olap/primary_key_index.h"
#include "vec/data_types/data_type_factory.hpp"
namespace doris {
#include "common/compile_check_begin.h"
Status MergeIndexDeleteBitmapCalculatorContext::get_current_key(Slice& slice) {
if (_cur_row_id >= _num_rows) {
return Status::EndOfFile("Reach the end of file");
}
if (_cur_pos >= _block_size) {
RETURN_IF_ERROR(_iter->seek_to_ordinal(_cur_row_id));
RETURN_IF_ERROR(_next_batch(_cur_row_id));
}
slice = Slice(_index_column->get_data_at(_cur_pos).data,
_index_column->get_data_at(_cur_pos).size);
return Status::OK();
}
Status MergeIndexDeleteBitmapCalculatorContext::advance() {
++_cur_pos;
++_cur_row_id;
if (_cur_row_id >= _num_rows) {
return Status::EndOfFile("Reach the end of file");
}
return Status::OK();
}
Status MergeIndexDeleteBitmapCalculatorContext::seek_at_or_after(Slice const& key) {
auto st = _iter->seek_at_or_after(&key, &_excat_match);
if (st.is<ErrorCode::ENTRY_NOT_FOUND>()) {
return Status::EndOfFile("Reach the end of file");
}
RETURN_IF_ERROR(st);
auto current_ordinal = cast_set<uint32_t>(_iter->get_current_ordinal());
DCHECK(current_ordinal > _cur_row_id)
<< fmt::format("current_ordinal: {} should be greater than _cur_row_id: {}",
current_ordinal, _cur_row_id);
// if key is still in the block read before,
// in other words, if `_cur_pos + current_ordinal - _cur_row_id < _block_size` holds
// we can seek simply by moving the pointers, aka. _cur_pos and _cur_row_id
if (_cur_pos + current_ordinal - _cur_row_id < _block_size) {
_cur_pos = _cur_pos + current_ordinal - _cur_row_id;
_cur_row_id = current_ordinal;
return Status::OK();
}
// otherwise, we have to read the data starts from `current_ordinal`
return _next_batch(current_ordinal);
}
Status MergeIndexDeleteBitmapCalculatorContext::_next_batch(uint32_t row_id) {
// _iter should be seeked before calling this function
DCHECK(row_id < _num_rows) << fmt::format("row_id: {} should be less than _num_rows: {}",
row_id, _num_rows);
_index_column = _index_type->create_column();
auto remaining = _num_rows - row_id;
size_t num_to_read = std::min(_max_batch_size, remaining);
size_t num_read = num_to_read;
RETURN_IF_ERROR(_iter->next_batch(&num_read, _index_column));
DCHECK(num_to_read == num_read) << fmt::format(
"num_to_read: {} should be equal to num_to_read: {}", num_to_read, num_read);
_block_size = num_read;
_cur_pos = 0;
_cur_row_id = row_id;
return Status::OK();
}
bool MergeIndexDeleteBitmapCalculatorContext::Comparator::operator()(
MergeIndexDeleteBitmapCalculatorContext* lhs,
MergeIndexDeleteBitmapCalculatorContext* rhs) const {
// std::proiroty_queue is a max heap, and function should return the result of `lhs < rhs`
// so if the result of the function is true, rhs will be popped before lhs
Slice key1, key2;
// MergeIndexDeleteBitmapCalculatorContext::get_current_key may return non-OK status if encounter
// memory allocation failure, we can only throw exception here to propagate error in this situation
THROW_IF_ERROR(lhs->get_current_key(key1));
THROW_IF_ERROR(rhs->get_current_key(key2));
if (_sequence_length == 0 && _rowid_length == 0) {
auto cmp_result = key1.compare(key2);
// when key1 is the same as key2,
// we want the one with greater segment id to be popped first
return cmp_result ? (cmp_result > 0) : (lhs->segment_id() < rhs->segment_id());
}
// smaller key popped first
auto key1_without_seq =
Slice(key1.get_data(), key1.get_size() - _sequence_length - _rowid_length);
auto key2_without_seq =
Slice(key2.get_data(), key2.get_size() - _sequence_length - _rowid_length);
auto cmp_result = key1_without_seq.compare(key2_without_seq);
if (cmp_result != 0) {
return cmp_result > 0;
}
if (_sequence_length > 0) {
// greater sequence value popped first
auto key1_sequence_val =
Slice(key1.get_data() + key1_without_seq.get_size() + 1, _sequence_length - 1);
auto key2_sequence_val =
Slice(key2.get_data() + key2_without_seq.get_size() + 1, _sequence_length - 1);
cmp_result = key1_sequence_val.compare(key2_sequence_val);
if (cmp_result != 0) {
return cmp_result < 0;
}
}
// greater segment id popped first
return lhs->segment_id() < rhs->segment_id();
}
bool MergeIndexDeleteBitmapCalculatorContext::Comparator::is_key_same(Slice const& lhs,
Slice const& rhs) const {
DCHECK(lhs.get_size() >= _sequence_length + _rowid_length);
DCHECK(rhs.get_size() >= _sequence_length + _rowid_length);
auto lhs_without_seq = Slice(lhs.get_data(), lhs.get_size() - _sequence_length - _rowid_length);
auto rhs_without_seq = Slice(rhs.get_data(), rhs.get_size() - _sequence_length - _rowid_length);
return lhs_without_seq.compare(rhs_without_seq) == 0;
}
Status MergeIndexDeleteBitmapCalculator::init(RowsetId rowset_id,
std::vector<SegmentSharedPtr> const& segments,
size_t seq_col_length, size_t rowdid_length,
size_t max_batch_size) {
RETURN_IF_CATCH_EXCEPTION({
_rowset_id = rowset_id;
_seq_col_length = seq_col_length;
_rowid_length = rowdid_length;
_comparator =
MergeIndexDeleteBitmapCalculatorContext::Comparator(seq_col_length, _rowid_length);
_contexts.reserve(segments.size());
_heap = std::make_unique<Heap>(_comparator);
for (auto& segment : segments) {
RETURN_IF_ERROR(segment->load_index(nullptr));
auto pk_idx = segment->get_primary_key_index();
std::unique_ptr<segment_v2::IndexedColumnIterator> index;
RETURN_IF_ERROR(pk_idx->new_iterator(&index, nullptr));
auto index_type = vectorized::DataTypeFactory::instance().create_data_type(
pk_idx->type_info()->type(), 1, 0);
_contexts.emplace_back(std::move(index), index_type, segment->id(), pk_idx->num_rows());
_heap->push(&_contexts.back());
}
if (_rowid_length > 0) {
_rowid_coder = get_key_coder(
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>()->type());
}
});
return Status::OK();
}
Status MergeIndexDeleteBitmapCalculator::calculate_one(RowLocation& loc) {
// get the location of a out-of-date row
while (!_heap->empty()) {
auto cur_ctx = _heap->top();
_heap->pop();
Slice cur_key;
RETURN_IF_ERROR(cur_ctx->get_current_key(cur_key));
if (!_last_key.empty() && _comparator.is_key_same(cur_key, _last_key)) {
loc.segment_id = cur_ctx->segment_id();
loc.row_id = cur_ctx->row_id();
if (_rowid_length > 0) {
Slice key_without_seq = Slice(cur_key.get_data(),
cur_key.get_size() - _seq_col_length - _rowid_length);
Slice rowid_slice =
Slice(cur_key.get_data() + key_without_seq.get_size() + _seq_col_length + 1,
_rowid_length - 1);
RETURN_IF_ERROR(_rowid_coder->decode_ascending(&rowid_slice, _rowid_length,
(uint8_t*)&loc.row_id));
}
auto st = cur_ctx->advance();
if (st.ok()) {
_heap->push(cur_ctx);
} else if (!st.is<ErrorCode::END_OF_FILE>()) {
return st;
}
return Status::OK();
}
if (_heap->empty()) {
break;
}
_last_key = cur_key.to_string();
auto nxt_ctx = _heap->top();
Slice nxt_key;
RETURN_IF_ERROR(nxt_ctx->get_current_key(nxt_key));
Status st = _comparator.is_key_same(cur_key, nxt_key)
? cur_ctx->advance()
: cur_ctx->seek_at_or_after(
Slice(nxt_key.get_data(),
nxt_key.get_size() - _seq_col_length - _rowid_length));
if (st.is<ErrorCode::END_OF_FILE>()) {
continue;
}
RETURN_IF_ERROR(st);
_heap->push(cur_ctx);
}
return Status::EndOfFile("Reach end of file");
}
Status MergeIndexDeleteBitmapCalculator::calculate_all(DeleteBitmapPtr delete_bitmap) {
RETURN_IF_CATCH_EXCEPTION({
RowLocation loc;
while (true) {
auto st = calculate_one(loc);
if (st.is<ErrorCode::END_OF_FILE>()) {
break;
}
RETURN_IF_ERROR(st);
delete_bitmap->add({_rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
}
});
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris