blob: 2f825a2748ce65154ddbdb6ae26613b8cbc2d272 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/fs/file_block_manager.h"
#include <atomic>
#include <cstddef>
#include <memory>
#include <numeric>
#include <string>
#include <utility>
#include "common/logging.h"
#include "common/config.h"
#include "env/env.h"
#include "env/env_util.h"
#include "gutil/strings/substitute.h"
#include "olap/fs/block_id.h"
#include "olap/fs/block_manager_metrics.h"
#include "runtime/mem_tracker.h"
#include "util/doris_metrics.h"
#include "util/file_cache.h"
#include "util/metrics.h"
#include "util/path_util.h"
#include "util/slice.h"
using std::accumulate;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using strings::Substitute;
namespace doris {
namespace fs {
namespace internal {
////////////////////////////////////////////////////////////
// FileWritableBlock
////////////////////////////////////////////////////////////
// A file-backed block that has been opened for writing.
//
// Contains a pointer to the block manager as well as file path
// so that dirty metadata can be synced via BlockManager::SyncMetadata()
// at Close() time. Embedding a file path (and not a simpler
// BlockId) consumes more memory, but the number of outstanding
// FileWritableBlock instances is expected to be low.
class FileWritableBlock : public WritableBlock {
public:
FileWritableBlock(FileBlockManager* block_manager,
string path,
shared_ptr<WritableFile> writer);
virtual ~FileWritableBlock();
virtual Status close() override;
virtual Status abort() override;
virtual BlockManager* block_manager() const override;
virtual const BlockId& id() const override;
virtual const std::string& path() const override;
virtual Status append(const Slice& data) override;
virtual Status appendv(const Slice* data, size_t data_cnt) override;
virtual Status finalize() override;
virtual size_t bytes_appended() const override;
virtual State state() const override;
void handle_error(const Status& s) const;
// Starts an asynchronous flush of dirty block data to disk.
Status flush_data_async();
private:
DISALLOW_COPY_AND_ASSIGN(FileWritableBlock);
enum SyncMode {
SYNC,
NO_SYNC
};
// Close the block, optionally synchronizing dirty data and metadata.
Status _close(SyncMode mode);
// Back pointer to the block manager.
//
// Should remain alive for the lifetime of this block.
FileBlockManager* _block_manager;
const BlockId _block_id;
const string _path;
// The underlying opened file backing this block.
shared_ptr<WritableFile> _writer;
State _state;
// The number of bytes successfully appended to the block.
size_t _bytes_appended;
};
FileWritableBlock::FileWritableBlock(FileBlockManager* block_manager,
string path,
shared_ptr<WritableFile> writer) :
_block_manager(block_manager),
_path(std::move(path)),
_writer(std::move(writer)),
_state(CLEAN),
_bytes_appended(0) {
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_writing->increment(1);
_block_manager->_metrics->total_writable_blocks->increment(1);
}
}
FileWritableBlock::~FileWritableBlock() {
if (_state != CLOSED) {
WARN_IF_ERROR(abort(), Substitute("Failed to close block $0", _path));
}
}
Status FileWritableBlock::close() {
return _close(SYNC);
}
Status FileWritableBlock::abort() {
RETURN_IF_ERROR(_close(NO_SYNC));
return _block_manager->_delete_block(_path);
}
BlockManager* FileWritableBlock::block_manager() const {
return _block_manager;
}
const BlockId& FileWritableBlock::id() const {
CHECK(false) << "Not support Block.id(). (TODO)";
return _block_id;
}
const string& FileWritableBlock::path() const {
return _path;
}
Status FileWritableBlock::append(const Slice& data) {
return appendv(&data, 1);
}
Status FileWritableBlock::appendv(const Slice* data, size_t data_cnt) {
DCHECK(_state == CLEAN || _state == DIRTY)
<< "path=" << _path
<< " invalid state=" << _state;
RETURN_IF_ERROR(_writer->appendv(data, data_cnt));
_state = DIRTY;
// Calculate the amount of data written
size_t bytes_written = accumulate(data, data + data_cnt, static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size;
});
_bytes_appended += bytes_written;
return Status::OK();
}
Status FileWritableBlock::flush_data_async() {
VLOG(3) << "Flushing block " << _path;
RETURN_IF_ERROR(_writer->flush(WritableFile::FLUSH_ASYNC));
return Status::OK();
}
Status FileWritableBlock::finalize() {
DCHECK(_state == CLEAN || _state == DIRTY || _state == FINALIZED)
<< "path=" << _path
<< "Invalid state: " << _state;
if (_state == FINALIZED) {
return Status::OK();
}
VLOG(3) << "Finalizing block " << _path;
if (_state == DIRTY && BlockManager::block_manager_preflush_control == "finalize") {
flush_data_async();
}
_state = FINALIZED;
return Status::OK();
}
size_t FileWritableBlock::bytes_appended() const {
return _bytes_appended;
}
WritableBlock::State FileWritableBlock::state() const {
return _state;
}
Status FileWritableBlock::_close(SyncMode mode) {
if (_state == CLOSED) {
return Status::OK();
}
Status sync;
if (mode == SYNC && (_state == CLEAN || _state == DIRTY || _state == FINALIZED)) {
// Safer to synchronize data first, then metadata.
VLOG(3) << "Syncing block " << _path;
if (_block_manager->_metrics) {
_block_manager->_metrics->total_disk_sync->increment(1);
}
sync = _writer->sync();
if (sync.ok()) {
sync = _block_manager->_sync_metadata(_path);
}
WARN_IF_ERROR(sync, Substitute("Failed to sync when closing block $0", _path));
}
Status close = _writer->close();
_state = CLOSED;
_writer.reset();
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_writing->increment(-1);
_block_manager->_metrics->total_bytes_written->increment(_bytes_appended);
_block_manager->_metrics->total_blocks_created->increment(1);
}
// Either Close() or Sync() could have run into an error.
RETURN_IF_ERROR(close);
RETURN_IF_ERROR(sync);
// Prefer the result of Close() to that of Sync().
return close.ok() ? close : sync;
}
////////////////////////////////////////////////////////////
// FileReadableBlock
////////////////////////////////////////////////////////////
// A file-backed block that has been opened for reading.
//
// There may be millions of instances of FileReadableBlock outstanding, so
// great care must be taken to reduce its size. To that end, it does _not_
// embed a FileBlockLocation, using the simpler BlockId instead.
class FileReadableBlock : public ReadableBlock {
public:
FileReadableBlock(FileBlockManager* block_manager,
string path,
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle);
virtual ~FileReadableBlock();
virtual Status close() override;
virtual BlockManager* block_manager() const override;
virtual const BlockId& id() const override;
virtual const std::string& path() const override;
virtual Status size(uint64_t* sz) const override;
virtual Status read(uint64_t offset, Slice result) const override;
virtual Status readv(uint64_t offset, const Slice* results, size_t res_cnt) const override;
void handle_error(const Status& s) const;
private:
// Back pointer to the owning block manager.
FileBlockManager* _block_manager;
// The block's identifier.
const BlockId _block_id;
const string _path;
// The underlying opened file backing this block.
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> _file_handle;
// the backing file of OpenedFileHandle, not owned.
RandomAccessFile* _file;
// Whether or not this block has been closed. Close() is thread-safe, so
// this must be an atomic primitive.
std::atomic_bool _closed;
DISALLOW_COPY_AND_ASSIGN(FileReadableBlock);
};
FileReadableBlock::FileReadableBlock(FileBlockManager* block_manager,
string path,
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle) :
_block_manager(block_manager),
_path(std::move(path)),
_file_handle(file_handle),
_closed(false) {
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_reading->increment(1);
_block_manager->_metrics->total_readable_blocks->increment(1);
}
_file = _file_handle->file();
}
FileReadableBlock::~FileReadableBlock() {
WARN_IF_ERROR(close(), Substitute("Failed to close block $0", _path));
}
Status FileReadableBlock::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true)) {
_file_handle.reset();
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_reading->increment(-1);
}
}
return Status::OK();
}
BlockManager* FileReadableBlock::block_manager() const {
return _block_manager;
}
const BlockId& FileReadableBlock::id() const {
CHECK(false) << "Not support Block.id(). (TODO)";
return _block_id;
}
const string& FileReadableBlock::path() const {
return _path;
}
Status FileReadableBlock::size(uint64_t* sz) const {
DCHECK(!_closed.load());
RETURN_IF_ERROR(_file->size(sz));
return Status::OK();
}
Status FileReadableBlock::read(uint64_t offset, Slice result) const {
return readv(offset, &result, 1);
}
Status FileReadableBlock::readv(uint64_t offset, const Slice* results, size_t res_cnt) const {
DCHECK(!_closed.load());
RETURN_IF_ERROR(_file->readv_at(offset, results, res_cnt));
if (_block_manager->_metrics) {
// Calculate the read amount of data
size_t bytes_read = accumulate(results, results + res_cnt, static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size;
});
_block_manager->_metrics->total_bytes_read->increment(bytes_read);
}
return Status::OK();
}
} // namespace internal
////////////////////////////////////////////////////////////
// FileBlockManager
////////////////////////////////////////////////////////////
FileBlockManager::FileBlockManager(Env* env, BlockManagerOptions opts) :
_env(DCHECK_NOTNULL(env)),
_opts(std::move(opts)),
_mem_tracker(new MemTracker(-1, "file_block_manager", _opts.parent_mem_tracker.get())) {
if (_opts.enable_metric) {
_metrics.reset(new internal::BlockManagerMetrics());
}
_file_cache.reset(new FileCache<RandomAccessFile>("Readable file cache", config::file_descriptor_cache_capacity));
}
FileBlockManager::~FileBlockManager() {
}
Status FileBlockManager::open() {
// TODO(lingbin)
return Status::NotSupported("to be implemented. (TODO)");
}
Status FileBlockManager::create_block(const CreateBlockOptions& opts,
unique_ptr<WritableBlock>* block) {
CHECK(!_opts.read_only);
shared_ptr<WritableFile> writer;
WritableFileOptions wr_opts;
wr_opts.mode = Env::MUST_CREATE;
RETURN_IF_ERROR(env_util::open_file_for_write(wr_opts, _env, opts.path, &writer));
VLOG(1) << "Creating new block at " << opts.path;
block->reset(new internal::FileWritableBlock(this, opts.path, writer));
return Status::OK();
}
Status FileBlockManager::open_block(const std::string& path, unique_ptr<ReadableBlock>* block) {
VLOG(1) << "Opening block with path at " << path;
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle(new OpenedFileHandle<RandomAccessFile>());
bool found = _file_cache->lookup(path, file_handle.get());
if (!found) {
std::unique_ptr<RandomAccessFile> file;
RETURN_IF_ERROR(_env->new_random_access_file(path, &file));
_file_cache->insert(path, file.release(), file_handle.get());
}
block->reset(new internal::FileReadableBlock(this, path, file_handle));
return Status::OK();
}
// TODO(lingbin): We should do something to ensure that deletion can only be done
// after the last reader or writer has finished
Status FileBlockManager::_delete_block(const string& path) {
CHECK(!_opts.read_only);
RETURN_IF_ERROR(_env->delete_file(path));
// We don't bother fsyncing the parent directory as there's nothing to be
// gained by ensuring that the deletion is made durable. Even if we did
// fsync it, we'd need to account for garbage at startup time (in the
// event that we crashed just before the fsync), and with such accounting
// fsync-as-you-delete is unnecessary.
//
// The block's directory hierarchy is left behind. We could prune it if
// it's empty, but that's racy and leaving it isn't much overhead.
return Status::OK();
}
// TODO(lingbin): only one level is enough?
Status FileBlockManager::_sync_metadata(const string& path) {
string dir = path_util::dir_name(path);
if (_metrics) {
_metrics->total_disk_sync->increment(1);
}
RETURN_IF_ERROR(_env->sync_dir(dir));
return Status::OK();
}
} // namespace fs
} // namespace doris