blob: cc3c2636c0fd777997234bee6f59a9cdd6530846 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_v2/index_file_reader.h"
#include <memory>
#include <utility>
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/tablet_schema.h"
#include "util/debug_points.h"
namespace doris::segment_v2 {
Status IndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) {
std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
if (!_inited) {
_read_buffer_size = read_buffer_size;
if (_storage_format >= InvertedIndexStorageFormatPB::V2) {
RETURN_IF_ERROR(_init_from(read_buffer_size, io_ctx));
}
_inited = true;
}
return Status::OK();
}
Status IndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) {
auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
try {
CLuceneError err;
CL_NS(store)::IndexInput* index_input = nullptr;
// 1. get file size from meta
int64_t file_size = -1;
if (_idx_file_info.has_index_size()) {
file_size = _idx_file_info.index_size();
}
file_size = file_size == 0 ? -1 : file_size;
DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", {
if (file_size == -1) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur file size = -1, file is {}", index_file_full_path);
}
})
DCHECK(_fs != nullptr) << "file system is nullptr, index_file_full_path: "
<< index_file_full_path;
// 2. open file
auto ok = DorisFSDirectory::FSIndexInput::open(
_fs, index_file_full_path.c_str(), index_input, err, read_buffer_size, file_size);
if (!ok) {
if (err.number() == CL_ERR_FileNotFound) {
return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
"inverted index file {} is not found.", index_file_full_path);
} else if (err.number() == CL_ERR_EmptyIndexSegment) {
return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>(
"inverted index file {} is empty.", index_file_full_path);
}
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path,
err.what());
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);
// 3. read file
int32_t version = _stream->readInt(); // Read version number
if (version >= InvertedIndexStorageFormatPB::V2) {
DCHECK(version == _storage_format);
int32_t numIndices = _stream->readInt(); // Read number of indices
for (int32_t i = 0; i < numIndices; ++i) {
int64_t indexId = _stream->readLong(); // Read index ID
int32_t suffix_length = _stream->readInt(); // Read suffix length
std::vector<uint8_t> suffix_data(suffix_length);
_stream->readBytes(suffix_data.data(), suffix_length);
std::string suffix_str(suffix_data.begin(), suffix_data.end());
int32_t numFiles = _stream->readInt(); // Read number of files in the index
auto fileEntries = std::make_unique<EntriesType>();
fileEntries->reserve(numFiles);
for (int32_t j = 0; j < numFiles; ++j) {
int32_t file_name_length = _stream->readInt();
std::string file_name(file_name_length, '\0');
_stream->readBytes(reinterpret_cast<uint8_t*>(file_name.data()),
file_name_length);
auto entry = std::make_unique<ReaderFileEntry>();
entry->file_name = std::move(file_name);
entry->offset = _stream->readLong();
entry->length = _stream->readLong();
fileEntries->emplace(entry->file_name, std::move(entry));
}
_indices_entries.emplace(std::make_pair(indexId, std::move(suffix_str)),
std::move(fileEntries));
}
} else {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"unknown inverted index format {}", version);
}
} catch (CLuceneError& err) {
if (_stream != nullptr) {
try {
_stream->close();
} catch (CLuceneError& err) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when close idx file {}, error msg: {}",
index_file_full_path, err.what());
}
}
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when init idx file {}, error msg: {}", index_file_full_path,
err.what());
}
return Status::OK();
}
Result<InvertedIndexDirectoryMap> IndexFileReader::get_all_directories() {
InvertedIndexDirectoryMap res;
std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
for (auto& [index, _] : _indices_entries) {
auto&& [index_id, index_suffix] = index;
LOG(INFO) << "index_id:" << index_id << " index_suffix:" << index_suffix;
auto ret = _open(index_id, index_suffix);
if (!ret.has_value()) {
return ResultError(ret.error());
}
res.emplace(std::make_pair(index_id, index_suffix), std::move(ret.value()));
}
return res;
}
Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::_open(
int64_t index_id, const std::string& index_suffix, const io::IOContext* io_ctx) const {
std::unique_ptr<DorisCompoundReader, DirectoryDeleter> compound_reader;
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
_index_path_prefix, index_id, index_suffix);
try {
CLuceneError err;
CL_NS(store)::IndexInput* index_input = nullptr;
// 1. get file size from meta
int64_t file_size = -1;
if (_idx_file_info.index_info_size() > 0) {
for (const auto& idx_info : _idx_file_info.index_info()) {
if (index_id == idx_info.index_id() &&
index_suffix == idx_info.index_suffix()) {
file_size = idx_info.index_file_size();
break;
}
}
}
file_size = file_size == 0 ? -1 : file_size;
DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", {
if (file_size == -1) {
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur file size = -1, file is {}", index_file_path));
}
})
DCHECK(_fs != nullptr)
<< "file system is nullptr, index_file_path: " << index_file_path;
// 2. open file
auto ok = DorisFSDirectory::FSIndexInput::open(
_fs, index_file_path.c_str(), index_input, err, _read_buffer_size, file_size);
if (!ok) {
// now index_input = nullptr
if (err.number() == CL_ERR_FileNotFound) {
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
"inverted index file {} is not found.", index_file_path));
}
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when open idx file {}, error msg: {}", index_file_path,
err.what()));
}
// 3. read file in DorisCompoundReader
compound_reader.reset(new DorisCompoundReader(index_input, _read_buffer_size));
} catch (CLuceneError& err) {
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when open idx file {}, error msg: {}", index_file_path,
err.what()));
}
} else {
std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
if (_stream == nullptr) {
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
"CLuceneError occur when open idx file {}, stream is nullptr",
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)));
}
// Check if the specified index exists
auto index_it = _indices_entries.find(std::make_pair(index_id, index_suffix));
if (index_it == _indices_entries.end()) {
std::ostringstream errMsg;
errMsg << "No index with id " << index_id << " found";
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
"CLuceneError occur when open idx file {}, error msg: {}",
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix),
errMsg.str()));
}
// Need to clone resource here, because index searcher cache need it.
compound_reader.reset(new DorisCompoundReader(_stream->clone(), *index_it->second,
_read_buffer_size, io_ctx));
}
return compound_reader;
}
Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::open(
const TabletIndex* index_meta, const io::IOContext* io_ctx) const {
auto index_id = index_meta->index_id();
auto index_suffix = index_meta->get_index_suffix();
return _open(index_id, index_suffix, io_ctx);
}
std::string IndexFileReader::get_index_file_cache_key(const TabletIndex* index_meta) const {
return InvertedIndexDescriptor::get_index_file_cache_key(
_index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
}
std::string IndexFileReader::get_index_file_path(const TabletIndex* index_meta) const {
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
return InvertedIndexDescriptor::get_index_file_path_v1(
_index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
}
return InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
}
Status IndexFileReader::index_file_exist(const TabletIndex* index_meta, bool* res) const {
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
_index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
return _fs->exists(index_file_path, res);
} else {
std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
if (_stream == nullptr) {
*res = false;
return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
"idx file {} is not opened",
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix));
}
// Check if the specified index exists
auto index_it = _indices_entries.find(
std::make_pair(index_meta->index_id(), index_meta->get_index_suffix()));
if (index_it == _indices_entries.end()) {
*res = false;
} else {
*res = true;
}
}
return Status::OK();
}
Status IndexFileReader::has_null(const TabletIndex* index_meta, bool* res) const {
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
*res = true;
return Status::OK();
}
std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
if (_stream == nullptr) {
return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
"idx file {} is not opened",
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix));
}
// Check if the specified index exists
auto index_it = _indices_entries.find(
std::make_pair(index_meta->index_id(), index_meta->get_index_suffix()));
if (index_it == _indices_entries.end()) {
*res = false;
} else {
const auto& entries = index_it->second;
auto entry_it =
entries->find(InvertedIndexDescriptor::get_temporary_null_bitmap_file_name());
if (entry_it == entries->end()) {
*res = false;
return Status::OK();
}
const auto& e = entry_it->second;
// roaring bitmap cookie header size is 5
if (e->length <= 5) {
*res = false;
} else {
*res = true;
}
}
return Status::OK();
}
void IndexFileReader::debug_file_entries() {
std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
for (const auto& index : _indices_entries) {
LOG(INFO) << "index_id:" << index.first.first;
const auto& index_entries = index.second;
for (const auto& entry : *index_entries) {
const auto& file_entry = entry.second;
LOG(INFO) << "file entry name:" << file_entry->file_name
<< " length:" << file_entry->length << " offset:" << file_entry->offset;
}
}
}
} // namespace doris::segment_v2