blob: bb43d81e394cf155edc3c0b94ab27c37932d2178 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_v2/binary_prefix_page.h"
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <vector>
#include "common/status.h"
#include "util/coding.h"
#include "util/faststring.h"
#include "util/slice.h"
namespace doris {
namespace segment_v2 {
#include "common/compile_check_begin.h"
Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) {
DCHECK(!_finished);
if (*add_count == 0) {
return Status::OK();
}
const Slice* src = reinterpret_cast<const Slice*>(vals);
if (_count == 0) {
_first_entry.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()),
src->get_size());
}
int i = 0;
for (; i < *add_count; ++i, ++src) {
if (is_page_full()) {
break;
}
const char* entry = src->data;
size_t entry_len = src->size;
size_t old_size = _buffer.size();
size_t share_len;
if (_count % RESTART_POINT_INTERVAL == 0) {
share_len = 0;
_restart_points_offset.push_back(cast_set<uint32_t>(old_size));
} else {
size_t max_share_len = std::min(_last_entry.size(), entry_len);
share_len = max_share_len;
for (int j = 0; j < max_share_len; ++j) {
if (entry[j] != _last_entry[j]) {
share_len = j;
break;
}
}
}
size_t non_share_len = entry_len - share_len;
// This may need a large memory, should return error if could not allocated
// successfully, to avoid BE OOM.
RETURN_IF_CATCH_EXCEPTION({
put_varint32(&_buffer, cast_set<uint32_t>(share_len));
put_varint32(&_buffer, cast_set<uint32_t>(non_share_len));
_buffer.append(entry + share_len, non_share_len);
_last_entry.clear();
_last_entry.append(entry, entry_len);
});
_raw_data_size += entry_len;
++_count;
}
*add_count = i;
return Status::OK();
}
Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) {
DCHECK(!_finished);
_finished = true;
RETURN_IF_CATCH_EXCEPTION({
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, cast_set<uint32_t>(restart_point_size));
*slice = _buffer.build();
});
return Status::OK();
}
const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared,
uint32_t* non_shared) {
if ((ptr = decode_varint32_ptr(ptr, _footer_start, shared)) == nullptr) {
return nullptr;
}
if ((ptr = decode_varint32_ptr(ptr, _footer_start, non_shared)) == nullptr) {
return nullptr;
}
if (_footer_start - ptr < *non_shared) {
return nullptr;
}
return ptr;
}
Status BinaryPrefixPageDecoder::_read_next_value() {
if (_cur_pos >= _num_values) {
return Status::EndOfFile("no more value to read");
}
uint32_t shared_len;
uint32_t non_shared_len;
auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len);
if (data_ptr == nullptr) {
DCHECK(false) << "[BinaryPrefixPageDecoder::_read_next_value] corruption!";
return Status::Corruption("Failed to decode value at position {}", _cur_pos);
}
_current_value.resize(shared_len);
_current_value.append(data_ptr, non_shared_len);
_next_ptr = data_ptr + non_shared_len;
return Status::OK();
}
Status BinaryPrefixPageDecoder::_seek_to_restart_point(size_t restart_point_index) {
_cur_pos = cast_set<uint32_t>(restart_point_index * _restart_point_internal);
_next_ptr = _get_restart_point(restart_point_index);
return _read_next_value();
}
Status BinaryPrefixPageDecoder::init() {
_cur_pos = 0;
_next_ptr = reinterpret_cast<const uint8_t*>(_data.get_data());
const uint8_t* end = _next_ptr + _data.get_size();
_num_restarts = decode_fixed32_le(end - 4);
_restarts_ptr = end - (_num_restarts + 1) * 4;
_footer_start = _restarts_ptr - 4 - 1;
_num_values = decode_fixed32_le(_footer_start);
_restart_point_internal = decode_fixed8(_footer_start + 4);
_parsed = true;
return _read_next_value();
}
Status BinaryPrefixPageDecoder::seek_to_position_in_page(size_t pos) {
DCHECK(_parsed);
DCHECK_LE(pos, _num_values);
if (_num_values == 0) [[unlikely]] {
if (pos != 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"seek pos {} is larger than total elements {}", pos, _num_values);
}
}
// seek past the last value is valid
if (pos == _num_values) {
_cur_pos = cast_set<uint32_t>(_num_values);
return Status::OK();
}
size_t restart_point_index = pos / _restart_point_internal;
RETURN_IF_ERROR(_seek_to_restart_point(restart_point_index));
while (_cur_pos < pos) {
_cur_pos++;
RETURN_IF_ERROR(_read_next_value());
}
return Status::OK();
}
Status BinaryPrefixPageDecoder::seek_at_or_after_value(const void* value, bool* exact_match) {
DCHECK(_parsed);
Slice target = *reinterpret_cast<const Slice*>(value);
uint32_t left = 0;
uint32_t right = _num_restarts;
// find the first restart point >= target. after loop,
// - left == index of first restart point >= target when found
// - left == _num_restarts when not found (all restart points < target)
while (left < right) {
uint32_t mid = left + (right - left) / 2;
// read first entry at restart point `mid`
RETURN_IF_ERROR(_seek_to_restart_point(mid));
Slice mid_entry(_current_value);
if (mid_entry.compare(target) < 0) {
left = mid + 1;
} else {
right = mid;
}
}
// then linear search from the last restart pointer < target.
// when left == 0, all restart points >= target, so search from first one.
// otherwise search from the last restart point < target, which is left - 1
uint32_t search_index = left > 0 ? left - 1 : 0;
RETURN_IF_ERROR(_seek_to_restart_point(search_index));
while (true) {
int cmp = Slice(_current_value).compare(target);
if (cmp >= 0) {
*exact_match = cmp == 0;
return Status::OK();
}
_cur_pos++;
auto st = _read_next_value();
if (st.is<ErrorCode::END_OF_FILE>()) {
return Status::Error<ErrorCode::ENTRY_NOT_FOUND, false>(
"all value small than the value");
}
if (!st.ok()) {
return st;
}
}
}
Status BinaryPrefixPageDecoder::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
DCHECK(_parsed);
if (*n == 0 || _cur_pos >= _num_values) [[unlikely]] {
*n = 0;
return Status::OK();
}
size_t max_fetch = std::min(*n, static_cast<size_t>(_num_values - _cur_pos));
// read and copy values
for (size_t i = 0; i < max_fetch; ++i) {
dst->insert_data((char*)(_current_value.data()), _current_value.size());
_cur_pos++;
// reach the end of the page, should not read the next value
if (_cur_pos < _num_values) {
RETURN_IF_ERROR(_read_next_value());
}
}
*n = max_fetch;
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace segment_v2
} // namespace doris