blob: bfe9a69ac9f5e7ca6d8feb1fba6874a35702e13b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <fmt/format.h>
#include <stddef.h>
#include <atomic>
#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include "common/status.h"
#include "io/cache/file_cache_common.h"
#include "util/slice.h"
namespace doris {
namespace io {
struct FileBlocksHolder;
class BlockFileCache;
struct FileBlockCell;
class FileBlock {
friend struct FileBlocksHolder;
friend class BlockFileCache;
friend class CachedRemoteFileReader;
friend struct FileBlockCell;
public:
enum class State {
DOWNLOADED,
/**
* When file block is first created and returned to user, it has state EMPTY.
* EMPTY state can become DOWNLOADING when getOrSetDownaloder is called successfully
* by any owner of EMPTY state file block.
*/
EMPTY,
/**
* A newly created file block never has DOWNLOADING state until call to getOrSetDownloader
* because each cache user might acquire multiple file blocks and reads them one by one,
* so only user which actually needs to read this block earlier than others - becomes a downloader.
*/
DOWNLOADING,
SKIP_CACHE,
};
FileBlock(const FileCacheKey& key, size_t size, BlockFileCache* mgr, State download_state);
~FileBlock() = default;
State state() const;
State state_unsafe() const;
static std::string state_to_string(FileBlock::State state);
/// Represents an interval [left, right] including both boundaries.
struct Range {
size_t left;
size_t right;
Range(size_t left, size_t right) : left(left), right(right) {}
[[nodiscard]] size_t size() const { return right - left + 1; }
[[nodiscard]] std::string to_string() const {
return fmt::format("[{}, {}]", std::to_string(left), std::to_string(right));
}
};
const Range& range() const { return _block_range; }
const UInt128Wrapper& get_hash_value() const { return _key.hash; }
size_t offset() const { return range().left; }
State wait();
// append data to cache file
[[nodiscard]] Status append(Slice data);
// read data from cache file
[[nodiscard]] Status read(Slice buffer, size_t read_offset);
// finish write, release the file writer
[[nodiscard]] Status finalize();
// set downloader if state == EMPTY
uint64_t get_or_set_downloader();
uint64_t get_downloader() const;
void reset_downloader(std::lock_guard<std::mutex>& block_lock);
bool is_downloader() const;
FileCacheType cache_type() const { return _key.meta.type; }
int64_t tablet_id() const { return _key.meta.tablet_id; }
void set_tablet_id(int64_t id) { _key.meta.tablet_id = id; }
static uint64_t get_caller_id();
std::string get_info_for_log() const;
[[nodiscard]] Status change_cache_type(FileCacheType new_type);
[[nodiscard]] Status change_cache_type_lock(FileCacheType new_type,
std::lock_guard<std::mutex>&);
uint64_t expiration_time() const { return _key.meta.expiration_time; }
std::string get_cache_file() const;
State state_unlock(std::lock_guard<std::mutex>&) const;
FileBlock& operator=(const FileBlock&) = delete;
FileBlock(const FileBlock&) = delete;
// block is being using by other thread when deleting, so tag it is_deleting and delete later on¬
void set_deleting() { _is_deleting = true; }
bool is_deleting() const { return _is_deleting; };
public:
std::atomic<bool> _owned_by_cached_reader {
false}; // pocessed by CachedRemoteFileReader::_cache_file_readers
private:
std::string get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) const;
[[nodiscard]] Status set_downloaded(std::lock_guard<std::mutex>& block_lock);
bool is_downloader_impl(std::lock_guard<std::mutex>& block_lock) const;
void complete_unlocked(std::lock_guard<std::mutex>& block_lock);
void reset_downloader_impl(std::lock_guard<std::mutex>& block_lock);
Range _block_range;
State _download_state;
uint64_t _downloader_id {0};
BlockFileCache* _mgr;
/// global locking order rule:
/// 1. cache lock
/// 2. block lock
mutable std::mutex _mutex;
std::condition_variable _cv;
FileCacheKey _key;
size_t _downloaded_size {0};
bool _is_deleting {false};
FileBlockCell* cell;
};
extern std::ostream& operator<<(std::ostream& os, const FileBlock::State& value);
using FileBlockSPtr = std::shared_ptr<FileBlock>;
using FileBlocks = std::list<FileBlockSPtr>;
struct FileBlocksHolder {
explicit FileBlocksHolder(FileBlocks file_blocks) : file_blocks(std::move(file_blocks)) {}
FileBlocksHolder(FileBlocksHolder&& other) noexcept
: file_blocks(std::move(other.file_blocks)) {}
FileBlocksHolder& operator=(const FileBlocksHolder&) = delete;
FileBlocksHolder(const FileBlocksHolder&) = delete;
~FileBlocksHolder();
FileBlocks file_blocks;
};
using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
} // namespace io
} // namespace doris