blob: 955ee9a8d0c9c55f16506367228d2dda12274974 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_v2/page_io.h"
#include <crc32c/crc32c.h>
#include <gen_cpp/segment_v2.pb.h>
#include <stdint.h>
#include <algorithm>
#include <cstring>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include "cloud/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/cached_remote_file_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "olap/olap_common.h"
#include "olap/page_cache.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/page_handle.h"
#include "util/block_compression.h"
#include "util/coding.h"
#include "util/concurrency_stats.h"
#include "util/faststring.h"
#include "util/runtime_profile.h"
namespace doris {
namespace segment_v2 {
#include "common/compile_check_begin.h"
Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving,
const std::vector<Slice>& body, OwnedSlice* compressed_body) {
size_t uncompressed_size = Slice::compute_total_size(body);
if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) {
faststring buf;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf));
double space_saving =
1.0 - (cast_set<double>(buf.size()) / cast_set<double>(uncompressed_size));
// return compressed body only when it saves more than min_space_saving
if (space_saving > 0 && space_saving >= min_space_saving) {
// shrink the buf to fit the len size to avoid taking
// up the memory of the size MAX_COMPRESSED_SIZE
RETURN_IF_CATCH_EXCEPTION(*compressed_body = buf.build());
return Status::OK();
}
}
// otherwise, do not compress
OwnedSlice empty;
*compressed_body = std::move(empty);
return Status::OK();
}
Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body,
const PageFooterPB& footer, PagePointer* result) {
// sanity check of page footer
CHECK(footer.has_type()) << "type must be set";
CHECK(footer.has_uncompressed_size()) << "uncompressed_size must be set";
switch (footer.type()) {
case DATA_PAGE:
CHECK(footer.has_data_page_footer());
break;
case INDEX_PAGE:
CHECK(footer.has_index_page_footer());
break;
case DICTIONARY_PAGE:
CHECK(footer.has_dict_page_footer());
break;
case SHORT_KEY_PAGE:
CHECK(footer.has_short_key_page_footer());
break;
default:
CHECK(false) << "Invalid page footer type: " << footer.type();
break;
}
std::string footer_buf; // serialized footer + footer size
footer.SerializeToString(&footer_buf);
put_fixed32_le(&footer_buf, static_cast<uint32_t>(footer_buf.size()));
std::vector<Slice> page = body;
page.emplace_back(footer_buf);
// checksum
uint8_t checksum_buf[sizeof(uint32_t)];
uint32_t checksum = 0;
for (const auto& slice : page) {
checksum = crc32c::Extend(checksum, (const uint8_t*)slice.data, slice.size);
}
encode_fixed32_le(checksum_buf, checksum);
page.emplace_back(checksum_buf, sizeof(uint32_t));
uint64_t offset = writer->bytes_appended();
RETURN_IF_ERROR(writer->appendv(&page[0], page.size()));
result->offset = offset;
result->size = cast_set<uint32_t>(writer->bytes_appended() - offset);
return Status::OK();
}
io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0
return io::BlockFileCache::hash(base);
}
std::string file_cache_key_str(const std::string& seg_path) {
return file_cache_key_from_path(seg_path).to_string();
}
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
opts.sanity_check();
opts.stats->total_pages_num++;
auto cache = StoragePageCache::instance();
PageCacheHandle cache_handle;
StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(),
opts.file_reader->size(), opts.page_pointer.offset);
VLOG_DEBUG << fmt::format("Reading page {}:{}:{}", cache_key.fname, cache_key.fsize,
cache_key.offset);
if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) {
// we find page in cache, use it
*handle = PageHandle(std::move(cache_handle));
opts.stats->cached_pages_num++;
// parse body and footer
Slice page_slice = handle->data();
uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size);
if (!footer->ParseFromString(footer_buf)) {
return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}",
footer_size, opts.file_reader->path().native());
}
*body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
// If read from cache, then should also recorded in uncompressed bytes read counter.
opts.stats->uncompressed_bytes_read += body->size;
return Status::OK();
}
// every page contains 4 bytes footer length and 4 bytes checksum
const uint32_t page_size = opts.page_pointer.size;
if (page_size < 8) {
return Status::Corruption("Bad page: too small size ({}), file={}", page_size,
opts.file_reader->path().native());
}
// hold compressed page at first, reset to decompressed page later
std::unique_ptr<DataPage> page =
std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type);
Slice page_slice(page->data(), page_size);
{
SCOPED_RAW_TIMER(&opts.stats->io_ns);
size_t bytes_read = 0;
RETURN_IF_ERROR(opts.file_reader->read_at(opts.page_pointer.offset, page_slice, &bytes_read,
&opts.io_ctx));
DCHECK_EQ(bytes_read, page_size);
opts.stats->compressed_bytes_read += page_size;
}
if (opts.verify_checksum) {
uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
uint32_t actual = crc32c::Crc32c(page_slice.data, page_slice.size - 4);
// here const_cast is used for testing.
InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)};
(void)ctx;
TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx);
if (expect != actual) {
return Status::Corruption(
"Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect,
opts.file_reader->path().native());
}
}
// remove checksum suffix
page_slice.size -= 4;
// parse and set footer
uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) {
return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", footer_size,
opts.file_reader->path().native());
}
auto body_size = cast_set<uint32_t>(page_slice.size - 4 - footer_size);
if (body_size != footer->uncompressed_size()) { // need decompress body
if (opts.codec == nullptr) {
return Status::Corruption(
"Bad page: page is compressed but codec is NO_COMPRESSION, file={}",
opts.file_reader->path().native());
}
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress);
SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>(
footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type);
// decompress page body
Slice compressed_body(page_slice.data, body_size);
Slice decompressed_body(decompressed_page->data(), footer->uncompressed_size());
RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body));
if (decompressed_body.size != footer->uncompressed_size()) {
return Status::Corruption(
"Bad page: record uncompressed size={} vs real decompressed size={}, file={}",
footer->uncompressed_size(), decompressed_body.size,
opts.file_reader->path().native());
}
// append footer and footer size
memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size,
footer_size + 4);
// free memory of compressed page
page = std::move(decompressed_page);
page_slice = Slice(page->data(), footer->uncompressed_size() + footer_size + 4);
}
if (opts.pre_decode) {
const auto* encoding_info = opts.encoding_info;
if (opts.is_dict_page) {
// for dict page, we need to use encoding_info based on footer->dict_page_footer().encoding()
// to get its pre_decoder
RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR,
footer->dict_page_footer().encoding(), {},
&encoding_info));
}
if (encoding_info) {
auto* pre_decoder = encoding_info->get_data_page_pre_decoder();
if (pre_decoder) {
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode);
RETURN_IF_ERROR(pre_decoder->decode(
&page, &page_slice,
footer->data_page_footer().nullmap_size() + footer_size + 4,
opts.use_page_cache, opts.type, opts.file_reader->path().native()));
}
}
}
*body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
page->reset_size(page_slice.size);
// Uncompressed has 2 meanings: uncompress and decode. The buffer in pagecache maybe
// uncompressed or decoded. So that should update the uncompressed_bytes_read counter
// just before add it to pagecache, it will be consistency with reading data from page cache.
opts.stats->uncompressed_bytes_read += body->size;
if (opts.use_page_cache && cache) {
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache);
// insert this page into cache and return the cache handle
cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory);
*handle = PageHandle(std::move(cache_handle));
} else {
*handle = PageHandle(page.get());
}
page.release(); // memory now managed by handle
return Status::OK();
}
Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
// First try to read with file cache
Status st = do_read_and_decompress_page(opts, handle, body, footer);
if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) {
return st;
}
auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader);
if (cached_file_reader == nullptr) {
return st;
}
// If we get CORRUPTION error and using file cache, clear cache and retry
LOG(WARNING) << "Bad page may be read from file cache, need retry."
<< " error msg: " << st.msg()
<< " file path: " << opts.file_reader->path().native()
<< " offset: " << opts.page_pointer.offset;
// Remove cache if exists
const std::string path = opts.file_reader->path().string();
auto file_key = file_cache_key_from_path(path);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
if (file_cache) {
file_cache->remove_if_cached(file_key);
}
// Retry with file cache
st = do_read_and_decompress_page(opts, handle, body, footer);
if (!st.is<ErrorCode::CORRUPTION>()) {
return st;
}
LOG(WARNING) << "Corruption again with retry downloading cache,"
<< " error msg: " << st.msg()
<< " file path: " << opts.file_reader->path().native()
<< " offset: " << opts.page_pointer.offset;
PageReadOptions new_opts = opts;
new_opts.file_reader = cached_file_reader->get_remote_reader();
st = do_read_and_decompress_page(new_opts, handle, body, footer);
if (!st.ok()) {
LOG(WARNING) << "Corruption again with retry read directly from remote,"
<< " error msg: " << st.msg()
<< " file path: " << opts.file_reader->path().native()
<< " offset: " << opts.page_pointer.offset << " Give up.";
}
return st;
}
#include "common/compile_check_end.h"
} // namespace segment_v2
} // namespace doris