blob: f337d790af96d475e67d757ff722375dcd8ba6fa [file] [log] [blame]
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/common/sst/sst_file_reader.h"
#include "paimon/common/sst/sst_file_utils.h"
#include "paimon/common/utils/crc32c.h"
#include "paimon/common/utils/murmurhash_utils.h"
namespace paimon {
Result<std::shared_ptr<SstFileReader>> SstFileReader::Create(
const std::shared_ptr<MemoryPool>& pool, const std::shared_ptr<paimon::FileSystem>& fs,
const std::string& file_path,
std::function<int32_t(const std::shared_ptr<MemorySlice>&, const std::shared_ptr<MemorySlice>&)>
comparator) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> in, fs->Open(file_path));
PAIMON_ASSIGN_OR_RAISE(uint64_t file_len, in->Length());
auto block_cache =
std::make_shared<BlockCache>(file_path, in, pool, std::make_unique<CacheManager>());
// read footer
auto segment = block_cache->GetBlock(file_len - BlockFooter::ENCODED_LENGTH,
BlockFooter::ENCODED_LENGTH, true);
if (!segment.get()) {
return Status::Invalid("Read footer error");
}
auto slice = MemorySlice::Wrap(segment);
auto input = slice->ToInput();
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<paimon::BlockFooter> footer,
BlockFooter::ReadBlockFooter(input));
// read bloom filter directly now
auto bloom_filter_handle = footer->GetBloomFilterHandle();
std::shared_ptr<paimon::BloomFilter> bloom_filter = nullptr;
if (bloom_filter_handle->ExpectedEntries() || bloom_filter_handle->Size() ||
bloom_filter_handle->Offset()) {
bloom_filter = std::make_shared<BloomFilter>(bloom_filter_handle->ExpectedEntries(),
bloom_filter_handle->Size());
PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(block_cache->GetBlock(
bloom_filter_handle->Offset(), bloom_filter_handle->Size(), true)));
}
// create index block reader
auto index_block_handle = footer->GetIndexBlockHandle();
auto trailer_data =
block_cache->GetBlock(index_block_handle->Offset() + index_block_handle->Size(),
BlockTrailer::ENCODED_LENGTH, true);
auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput();
auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input);
auto block_data =
block_cache->GetBlock(index_block_handle->Offset(), index_block_handle->Size(), true);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<paimon::MemorySegment> uncompressed_data,
DecompressBlock(block_data, trailer, pool));
auto reader = BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator);
return std::make_shared<SstFileReader>(pool, block_cache, bloom_filter, reader, comparator);
}
SstFileReader::SstFileReader(
const std::shared_ptr<MemoryPool>& pool, const std::shared_ptr<BlockCache>& block_cache,
const std::shared_ptr<BloomFilter>& bloom_filter,
const std::shared_ptr<paimon::BlockReader>& index_block_reader,
std::function<int32_t(const std::shared_ptr<MemorySlice>&, const std::shared_ptr<MemorySlice>&)>
comparator)
: pool_(pool),
block_cache_(block_cache),
bloom_filter_(bloom_filter),
index_block_reader_(index_block_reader),
comparator_(std::move(comparator)) {}
std::unique_ptr<SstFileIterator> SstFileReader::CreateIterator() {
return std::make_unique<SstFileIterator>(this, index_block_reader_->Iterator());
}
std::shared_ptr<Bytes> SstFileReader::Lookup(std::shared_ptr<Bytes> key) {
if (bloom_filter_.get() && !bloom_filter_->TestHash(MurmurHashUtils::HashBytes(key))) {
return nullptr;
}
auto key_slice = MemorySlice::Wrap(key);
// seek the index to the block containing the key
auto index_block_iterator = index_block_reader_->Iterator();
index_block_iterator->SeekTo(key_slice);
// if indexIterator does not have a next, it means the key does not exist in this iterator
if (index_block_iterator->HasNext()) {
// seek the current iterator to the key
auto current = GetNextBlock(index_block_iterator);
if (!current.ok() || !current.value()) {
return nullptr;
}
auto& it = current.value();
if (it->SeekTo(key_slice)) {
auto ret = it->Next();
if (!ret.ok()) {
return nullptr;
}
return ret.value()->value_->CopyBytes(pool_.get());
}
}
return nullptr;
}
Result<std::unique_ptr<BlockIterator>> SstFileReader::GetNextBlock(
std::unique_ptr<BlockIterator>& index_iterator) {
PAIMON_ASSIGN_OR_RAISE(auto ret, index_iterator->Next());
auto& slice = ret->value_;
auto input = slice->ToInput();
PAIMON_ASSIGN_OR_RAISE(auto reader, ReadBlock(BlockHandle::ReadBlockHandle(input), false));
return reader->Iterator();
}
Result<std::shared_ptr<BlockReader>> SstFileReader::ReadBlock(std::shared_ptr<BlockHandle>&& handle,
bool index) {
auto block_handle = handle;
return ReadBlock(block_handle, index);
}
Result<std::shared_ptr<BlockReader>> SstFileReader::ReadBlock(
const std::shared_ptr<BlockHandle>& handle, bool index) {
auto trailer_data = block_cache_->GetBlock(handle->Offset() + handle->Size(),
BlockTrailer::ENCODED_LENGTH, true);
auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput();
auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input);
auto block_data = block_cache_->GetBlock(handle->Offset(), handle->Size(), index);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<paimon::MemorySegment> uncompressed_data,
DecompressBlock(block_data, trailer, pool_));
return BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator_);
}
Result<std::shared_ptr<paimon::MemorySegment>> SstFileReader::DecompressBlock(
const std::shared_ptr<paimon::MemorySegment>& compressed_data,
const std::unique_ptr<BlockTrailer>& trailer, const std::shared_ptr<MemoryPool>& pool) {
auto input_memory = compressed_data->GetHeapMemory();
// check crc32c
auto crc32c_code = CRC32C::calculate(input_memory->data(), input_memory->size());
auto compression_val =
static_cast<char>(static_cast<int32_t>(trailer->CompressionType()) & 0xFF);
crc32c_code = CRC32C::calculate(&compression_val, 1, crc32c_code);
if (trailer->Crc32c() != static_cast<int32_t>(crc32c_code)) {
return Status::IOError("Expected crc32c(" + SstFileUtils::ToHexString(trailer->Crc32c()) +
") but found crc32c(" + SstFileUtils::ToHexString(crc32c_code) +
")");
}
// decompress data
PAIMON_ASSIGN_OR_RAISE(auto factory, BlockCompressionFactory::Create(
SstFileUtils::From(trailer->CompressionType())));
if (!factory || factory->GetCompressionType() == BlockCompressionType::NONE) {
return compressed_data;
} else {
auto decompressor = factory->GetDecompressor();
auto input = MemorySlice::Wrap(compressed_data)->ToInput();
auto output = MemorySegment::AllocateHeapMemory(input->ReadVarLenInt(), pool.get());
auto output_memory = output.GetHeapMemory();
PAIMON_ASSIGN_OR_RAISE(
int uncompressed_length,
decompressor->Decompress(input_memory->data() + input->Position(), input->Available(),
output_memory->data(), output_memory->size()));
if (static_cast<size_t>(uncompressed_length) != output_memory->size()) {
return Status::Invalid("Invalid data");
}
return std::make_shared<MemorySegment>(output);
}
}
SstFileIterator::SstFileIterator(SstFileReader* reader,
std::unique_ptr<BlockIterator> index_iterator)
: reader_(reader), index_iterator_(std::move(index_iterator)) {}
Status SstFileIterator::SeekTo(std::shared_ptr<Bytes>& key) {
auto key_slice = MemorySlice::Wrap(key);
index_iterator_->SeekTo(key_slice);
if (index_iterator_->HasNext()) {
PAIMON_ASSIGN_OR_RAISE(data_iterator_, reader_->GetNextBlock(index_iterator_));
// The index block entry key is the last key of the corresponding data block.
// If there is some index entry key >= target key, the related data block must
// also contain some key >= target key, which means seeked_data_block.HasNext()
// must be true
data_iterator_->SeekTo(key_slice);
} else {
data_iterator_.reset();
}
return Status::OK();
}
} // namespace paimon