blob: 38e853a76758bae73d392b23d5c5183ced5cccfc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/fs/log_block_manager.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <errno.h>
#include <map>
#include <memory>
#include <mutex>
#include <numeric>
#include <ostream>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include "kudu/fs/block_manager_metrics.h"
#include "kudu/fs/block_manager_util.h"
#include "kudu/fs/data_dirs.h"
#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_report.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/strip.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/alignment.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/file_cache.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/malloc.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/test_util_prod.h"
#include "kudu/util/trace.h"
DECLARE_bool(block_manager_lock_dirs);
DECLARE_bool(enable_data_block_fsync);
DECLARE_string(block_manager_preflush_control);
// TODO(unknown): How should this be configured? Should provide some guidance.
DEFINE_uint64(log_container_max_size, 10LU * 1024 * 1024 * 1024,
"Maximum size (soft) of a log container");
TAG_FLAG(log_container_max_size, advanced);
DEFINE_int64(log_container_max_blocks, -1,
"Maximum number of blocks (soft) of a log container. Use 0 for "
"no limit. Use -1 for no limit except in the case of a kernel "
"bug with hole punching on ext4 (see KUDU-1508 for details).");
TAG_FLAG(log_container_max_blocks, advanced);
DEFINE_uint64(log_container_preallocate_bytes, 32LU * 1024 * 1024,
"Number of bytes to preallocate in a log container when "
"creating new blocks. Set to 0 to disable preallocation");
TAG_FLAG(log_container_preallocate_bytes, advanced);
DEFINE_double(log_container_excess_space_before_cleanup_fraction, 0.10,
"Additional fraction of a log container's calculated size that "
"must be consumed on disk before the container is considered to "
"be inconsistent and subject to excess space cleanup at block "
"manager startup.");
TAG_FLAG(log_container_excess_space_before_cleanup_fraction, advanced);
DEFINE_double(log_container_live_metadata_before_compact_ratio, 0.50,
"Desired ratio of live block metadata in log containers. If a "
"container's live to total block ratio dips below this value, "
"the container's metadata file will be compacted at startup.");
TAG_FLAG(log_container_live_metadata_before_compact_ratio, experimental);
DEFINE_bool(log_block_manager_test_hole_punching, true,
"Ensure hole punching is supported by the underlying filesystem");
TAG_FLAG(log_block_manager_test_hole_punching, advanced);
TAG_FLAG(log_block_manager_test_hole_punching, unsafe);
METRIC_DEFINE_gauge_uint64(server, log_block_manager_bytes_under_management,
"Bytes Under Management",
kudu::MetricUnit::kBytes,
"Number of bytes of data blocks currently under management");
METRIC_DEFINE_gauge_uint64(server, log_block_manager_blocks_under_management,
"Blocks Under Management",
kudu::MetricUnit::kBlocks,
"Number of data blocks currently under management");
METRIC_DEFINE_gauge_uint64(server, log_block_manager_containers,
"Number of Block Containers",
kudu::MetricUnit::kLogBlockContainers,
"Number of log block containers");
METRIC_DEFINE_gauge_uint64(server, log_block_manager_full_containers,
"Number of Full Block Containers",
kudu::MetricUnit::kLogBlockContainers,
"Number of full log block containers");
namespace kudu {
namespace fs {
using internal::LogBlock;
using internal::LogBlockContainer;
using internal::LogWritableBlock;
using pb_util::ReadablePBContainerFile;
using pb_util::WritablePBContainerFile;
using std::map;
using std::set;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::unordered_set;
using std::vector;
using strings::Substitute;
namespace internal {
////////////////////////////////////////////////////////////
// LogBlockManagerMetrics
////////////////////////////////////////////////////////////
// Metrics container associated with the log block manager.
//
// Includes implementation-agnostic metrics as well as some that are
// specific to the log block manager.
struct LogBlockManagerMetrics {
explicit LogBlockManagerMetrics(const scoped_refptr<MetricEntity>& metric_entity);
// Implementation-agnostic metrics.
BlockManagerMetrics generic_metrics;
scoped_refptr<AtomicGauge<uint64_t>> bytes_under_management;
scoped_refptr<AtomicGauge<uint64_t>> blocks_under_management;
scoped_refptr<AtomicGauge<uint64_t>> containers;
scoped_refptr<AtomicGauge<uint64_t>> full_containers;
};
#define GINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity, 0))
LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>& metric_entity)
: generic_metrics(metric_entity),
GINIT(bytes_under_management),
GINIT(blocks_under_management),
GINIT(containers),
GINIT(full_containers) {
}
#undef GINIT
////////////////////////////////////////////////////////////
// LogBlock (declaration)
////////////////////////////////////////////////////////////
// The persistent metadata that describes a logical block.
//
// A block grows a LogBlock when its data has been synchronized with
// the disk. That's when it's fully immutable (i.e. none of its metadata
// can change), and when it becomes readable and persistent.
//
// LogBlocks are reference counted to simplify support for deletion with
// outstanding readers. All refcount increments are performed with the
// block manager lock held, as are deletion-based decrements. However,
// no lock is held when ~LogReadableBlock decrements the refcount, thus it
// must be made thread safe (by extending RefCountedThreadSafe instead of
// the simpler RefCounted).
class LogBlock : public RefCountedThreadSafe<LogBlock> {
public:
LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
int64_t length);
~LogBlock();
const BlockId& block_id() const { return block_id_; }
LogBlockContainer* container() const { return container_; }
int64_t offset() const { return offset_; }
int64_t length() const { return length_; }
// Returns a block's length aligned to the nearest filesystem block size.
int64_t fs_aligned_length() const;
// Delete the block. Actual deletion takes place when the
// block is destructed.
void Delete();
private:
// The owning container. Must outlive the LogBlock.
LogBlockContainer* container_;
// The block identifier.
const BlockId block_id_;
// The block's offset in the container.
const int64_t offset_;
// The block's length.
const int64_t length_;
// Whether the block has been marked for deletion.
bool deleted_;
DISALLOW_COPY_AND_ASSIGN(LogBlock);
};
////////////////////////////////////////////////////////////
// LogWritableBlock (declaration)
////////////////////////////////////////////////////////////
// A log-backed block that has been opened for writing.
//
// There's no reference to a LogBlock as this block has yet to be
// persisted.
class LogWritableBlock : public WritableBlock {
public:
LogWritableBlock(LogBlockContainer* container, BlockId block_id,
int64_t block_offset);
virtual ~LogWritableBlock();
virtual Status Close() OVERRIDE;
virtual Status Abort() OVERRIDE;
virtual const BlockId& id() const OVERRIDE;
virtual BlockManager* block_manager() const OVERRIDE;
virtual Status Append(const Slice& data) OVERRIDE;
virtual Status AppendV(const vector<Slice>& data) OVERRIDE;
virtual Status Finalize() OVERRIDE;
virtual size_t BytesAppended() const OVERRIDE;
virtual State state() const OVERRIDE;
// Actually close the block, finalizing it if it has not yet been
// finalized. Also updates various metrics.
//
// This is called after synchronization of dirty data and metadata
// to disk.
void DoClose();
// Starts an asynchronous flush of dirty block data to disk.
Status FlushDataAsync();
// Write this block's metadata to disk.
//
// Does not synchronize the written data; that takes place in Close().
Status AppendMetadata();
LogBlockContainer* container() const { return container_; }
private:
// The owning container. Must outlive the block.
LogBlockContainer* container_;
// The block's identifier.
const BlockId block_id_;
// The block's offset within the container. Known from the moment the
// block is created.
const int64_t block_offset_;
// The block's length. Changes with each Append().
int64_t block_length_;
// The state of the block describing where it is in the write lifecycle,
// for example, has it been synchronized to disk?
WritableBlock::State state_;
DISALLOW_COPY_AND_ASSIGN(LogWritableBlock);
};
////////////////////////////////////////////////////////////
// LogBlockContainer
////////////////////////////////////////////////////////////
// A single block container belonging to the log-backed block manager.
//
// A container may only be used to write one WritableBlock at a given time.
// However, existing blocks may be deleted concurrently. As such, almost
// all container functions must be reentrant, even if the container itself
// is logically thread unsafe (i.e. multiple clients calling WriteData()
// concurrently will produce nonsensical container data). Thread unsafe
// functions are marked explicitly.
class LogBlockContainer {
public:
enum SyncMode {
SYNC,
NO_SYNC
};
static const char* kMagic;
// Creates a new block container in 'dir'.
static Status Create(LogBlockManager* block_manager,
DataDir* dir,
unique_ptr<LogBlockContainer>* container);
// Opens an existing block container in 'dir'.
//
// Every container is comprised of two files: "<dir>/<id>.data" and
// "<dir>/<id>.metadata". Together, 'dir' and 'id' fully describe both
// files.
//
// Returns Status::Aborted() in the case that the metadata and data files
// both appear to have no data (e.g. due to a crash just after creating
// one of them but before writing any records). This is recorded in 'report'.
static Status Open(LogBlockManager* block_manager,
DataDir* dir,
FsReport* report,
const std::string& id,
unique_ptr<LogBlockContainer>* container);
// Closes a set of blocks belonging to this container, possibly synchronizing
// the dirty data and metadata to disk.
//
// If successful, adds all blocks to the block manager's in-memory maps.
Status DoCloseBlocks(const vector<LogWritableBlock*>& blocks, SyncMode mode);
// Frees the space associated with a block at 'offset' and 'length'. This
// is a physical operation, not a logical one; a separate AppendMetadata()
// is required to record the deletion in container metadata.
//
// The on-disk effects of this call are made durable only after SyncData().
Status PunchHole(int64_t offset, int64_t length);
// Preallocate enough space to ensure that an append of 'next_append_length'
// can be satisfied by this container. The offset of the beginning of this
// block must be provided in 'block_start_offset' (since container
// bookkeeping is only updated when a block is finished).
//
// Does nothing if preallocation is disabled.
Status EnsurePreallocated(int64_t block_start_offset,
size_t next_append_length);
// See RWFile::Write()
Status WriteData(int64_t offset, const Slice& data);
// See RWFile::WriteV()
Status WriteVData(int64_t offset, const vector<Slice>& data);
// See RWFile::Read().
Status ReadData(int64_t offset, Slice* result) const;
// See RWFile::ReadV().
Status ReadVData(int64_t offset, vector<Slice>* results) const;
// Appends 'pb' to this container's metadata file.
//
// The on-disk effects of this call are made durable only after SyncMetadata().
Status AppendMetadata(const BlockRecordPB& pb);
// Asynchronously flush this container's data file from 'offset' through
// to 'length'.
//
// Does not guarantee data durability; use SyncData() for that.
Status FlushData(int64_t offset, int64_t length);
// Asynchronously flush this container's metadata file (all dirty bits).
//
// Does not guarantee metadata durability; use SyncMetadata() for that.
//
// TODO(unknown): Add support to just flush a range.
Status FlushMetadata();
// Synchronize this container's data file with the disk. On success,
// guarantees that the data is made durable.
//
// TODO(unknown): Add support to synchronize just a range.
Status SyncData();
// Synchronize this container's metadata file with the disk. On success,
// guarantees that the metadata is made durable.
//
// TODO(unknown): Add support to synchronize just a range.
Status SyncMetadata();
// Reopen the metadata file record writer. Should be called if the underlying
// file was changed.
Status ReopenMetadataWriter();
// Truncates this container's data file to 'next_block_offset_' if it is
// full. This effectively removes any preallocated but unused space.
//
// Should be called only when 'next_block_offset_' is up-to-date with
// respect to the data on disk (i.e. after the container's records have
// been loaded), otherwise data may be lost!
//
// This function is thread unsafe.
Status TruncateDataToNextBlockOffset();
// Reads the container's metadata from disk, sanity checking and processing
// records along the way.
//
// Malformed records and other container inconsistencies are written to
// 'report'. Healthy blocks are written either to 'live_blocks' or
// 'dead_blocks'. Live records are written to 'live_block_records'. The
// greatest block ID seen thus far in the container is written to 'max_block_id'.
//
// Returns an error only if there was a problem accessing the container from
// disk; such errors are fatal and effectively halt processing immediately.
Status ProcessRecords(
FsReport* report,
LogBlockManager::UntrackedBlockMap* live_blocks,
LogBlockManager::BlockRecordMap* live_block_records,
std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
uint64_t* max_block_id);
// Updates internal bookkeeping state to reflect the creation of a block.
void BlockCreated(const scoped_refptr<LogBlock>& block);
// Updates internal bookkeeping state to reflect the deletion of a block.
//
// This function is thread safe because block deletions can happen concurrently
// with creations.
//
// Note: the container is not made "unfull"; containers remain sparse until deleted.
void BlockDeleted(const scoped_refptr<LogBlock>& block);
// Finalizes a fully written block. It updates the container data file's position,
// truncates the container if full and marks the container as available.
void FinalizeBlock(int64_t block_offset, int64_t block_length);
// Run a task on this container's data directory thread pool.
//
// Normally the task is performed asynchronously. However, if submission to
// the pool fails, it runs synchronously on the current thread.
void ExecClosure(const Closure& task);
// Mark the container to be read-only.
void SetReadOnly();
// Produces a debug-friendly string representation of this container.
string ToString() const;
// Handles errors if the input status is not OK.
void HandleError(const Status& s) const;
// Simple accessors.
LogBlockManager* block_manager() const { return block_manager_; }
int64_t next_block_offset() const { return next_block_offset_.Load(); }
int64_t total_bytes() const { return total_bytes_.Load(); }
int64_t total_blocks() const { return total_blocks_.Load(); }
int64_t live_bytes() const { return live_bytes_.Load(); }
int64_t live_bytes_aligned() const { return live_bytes_aligned_.Load(); }
int64_t live_blocks() const { return live_blocks_.Load(); }
bool full() const {
return next_block_offset() >= FLAGS_log_container_max_size ||
(max_num_blocks_ && (total_blocks() >= max_num_blocks_));
}
const LogBlockManagerMetrics* metrics() const { return metrics_; }
DataDir* data_dir() const { return data_dir_; }
const PathInstanceMetadataPB* instance() const { return data_dir_->instance()->metadata(); }
bool read_only() const { return read_only_.Load(); }
private:
LogBlockContainer(LogBlockManager* block_manager, DataDir* data_dir,
unique_ptr<WritablePBContainerFile> metadata_file,
shared_ptr<RWFile> data_file);
// Processes a single block record, performing sanity checks on it and adding
// it either to 'live_blocks' or 'dead_blocks'. If the record is live, it is
// added to 'live_block_records'.
//
// Returns an error only if there was a problem accessing the container from
// disk; such errors are fatal and effectively halt processing immediately.
//
// On success, 'report' is updated with any inconsistencies found in the
// record, 'data_file_size' may be updated with the latest size of the
// container's data file, and 'max_block_id' reflects the largest block ID
// seen thus far in the container.
//
// Note: 'record' may be swapped into 'report'; do not use it after calling
// this function.
Status ProcessRecord(
BlockRecordPB* record,
FsReport* report,
LogBlockManager::UntrackedBlockMap* live_blocks,
LogBlockManager::BlockRecordMap* live_block_records,
std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
uint64_t* data_file_size,
uint64_t* max_block_id);
// Updates this container data file's position based on the offset and length
// of a block, marking this container as full if needed. Should only be called
// when a block is fully written, as it will round up the container data file's
// position.
//
// This function is thread unsafe.
void UpdateNextBlockOffset(int64_t block_offset, int64_t block_length);
// The owning block manager. Must outlive the container itself.
LogBlockManager* const block_manager_;
// The data directory where the container lives.
DataDir* data_dir_;
const boost::optional<int64_t> max_num_blocks_;
// Offset up to which we have preallocated bytes.
int64_t preallocated_offset_ = 0;
// Opened file handles to the container's files.
unique_ptr<WritablePBContainerFile> metadata_file_;
shared_ptr<RWFile> data_file_;
// The offset of the next block to be written to the container.
AtomicInt<int64_t> next_block_offset_;
// The amount of data (post block alignment) written thus far to the container.
AtomicInt<int64_t> total_bytes_;
// The number of blocks written thus far in the container.
AtomicInt<int64_t> total_blocks_;
// The amount of data present in not-yet-deleted blocks of the container.
AtomicInt<int64_t> live_bytes_;
// The amount of data (post block alignment) present in not-yet-deleted
// blocks of the container.
AtomicInt<int64_t> live_bytes_aligned_;
// The number of not-yet-deleted blocks in the container.
AtomicInt<int64_t> live_blocks_;
// The metrics. Not owned by the log container; it has the same lifespan
// as the block manager.
const LogBlockManagerMetrics* metrics_;
// If true, only read operations are allowed. Existing blocks may
// not be deleted until the next restart, and new blocks may not
// be added.
AtomicBool read_only_;
DISALLOW_COPY_AND_ASSIGN(LogBlockContainer);
};
LogBlockContainer::LogBlockContainer(
LogBlockManager* block_manager,
DataDir* data_dir,
unique_ptr<WritablePBContainerFile> metadata_file,
shared_ptr<RWFile> data_file)
: block_manager_(block_manager),
data_dir_(data_dir),
max_num_blocks_(FindOrDie(block_manager->block_limits_by_data_dir_,
data_dir)),
metadata_file_(std::move(metadata_file)),
data_file_(std::move(data_file)),
next_block_offset_(0),
total_bytes_(0),
total_blocks_(0),
live_bytes_(0),
live_bytes_aligned_(0),
live_blocks_(0),
metrics_(block_manager->metrics()),
read_only_(false) {
}
void LogBlockContainer::HandleError(const Status& s) const {
HANDLE_DISK_FAILURE(
s, block_manager()->error_manager()->RunErrorNotificationCb(data_dir_));
}
#define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
block_manager->error_manager()->RunErrorNotificationCb(dir)); \
} while (0);
Status LogBlockContainer::Create(LogBlockManager* block_manager,
DataDir* dir,
unique_ptr<LogBlockContainer>* container) {
string common_path;
string metadata_path;
string data_path;
Status metadata_status;
Status data_status;
unique_ptr<RWFile> metadata_writer;
unique_ptr<RWFile> data_file;
RWFileOptions wr_opts;
wr_opts.mode = Env::CREATE_NON_EXISTING;
// Repeat in the event of a container id collision (unlikely).
//
// When looping, we delete any created-and-orphaned files.
do {
if (metadata_writer) {
block_manager->env()->DeleteFile(metadata_path);
}
common_path = JoinPathSegments(dir->dir(),
block_manager->oid_generator()->Next());
metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
metadata_status = block_manager->env()->NewRWFile(wr_opts,
metadata_path,
&metadata_writer);
if (data_file) {
block_manager->env()->DeleteFile(data_path);
}
data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
RWFileOptions rw_opts;
rw_opts.mode = Env::CREATE_NON_EXISTING;
data_status = block_manager->env()->NewRWFile(rw_opts,
data_path,
&data_file);
} while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() ||
data_status.IsAlreadyPresent()));
if (metadata_status.ok() && data_status.ok()) {
unique_ptr<WritablePBContainerFile> metadata_file;
shared_ptr<RWFile> cached_data_file;
metadata_writer.reset();
shared_ptr<RWFile> cached_metadata_writer;
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
metadata_path, &cached_metadata_writer));
metadata_file.reset(new WritablePBContainerFile(
std::move(cached_metadata_writer)));
data_file.reset();
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
data_path, &cached_data_file));
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB()));
container->reset(new LogBlockContainer(block_manager,
dir,
std::move(metadata_file),
std::move(cached_data_file)));
VLOG(1) << "Created log block container " << (*container)->ToString();
}
// Prefer metadata status (arbitrarily).
HANDLE_DISK_FAILURE(metadata_status, block_manager->error_manager()->RunErrorNotificationCb(dir));
HANDLE_DISK_FAILURE(data_status, block_manager->error_manager()->RunErrorNotificationCb(dir));
return !metadata_status.ok() ? metadata_status : data_status;
}
Status LogBlockContainer::Open(LogBlockManager* block_manager,
DataDir* dir,
FsReport* report,
const string& id,
unique_ptr<LogBlockContainer>* container) {
Env* env = block_manager->env();
string common_path = JoinPathSegments(dir->dir(), id);
string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
string data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
// Check that both the metadata and data files exist and have valid lengths.
// This covers a commonly seen case at startup, where the previous incarnation
// of the server crashed due to "too many open files" just as it was trying
// to create a data file. This orphans an empty metadata file, which we can
// safely delete.
{
uint64_t metadata_size = 0;
uint64_t data_size = 0;
Status s = env->GetFileSize(metadata_path, &metadata_size);
if (!s.IsNotFound()) {
s = s.CloneAndPrepend("unable to determine metadata file size");
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(s);
}
s = env->GetFileSize(data_path, &data_size);
if (!s.IsNotFound()) {
s = s.CloneAndPrepend("unable to determine data file size");
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(s);
}
if (metadata_size < pb_util::kPBContainerMinimumValidLength &&
data_size == 0) {
report->incomplete_container_check->entries.emplace_back(common_path);
return Status::Aborted("orphaned empty metadata and data files $0");
}
}
// Open the existing metadata and data files for writing.
shared_ptr<RWFile> metadata_file;
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
metadata_path, &metadata_file));
unique_ptr<WritablePBContainerFile> metadata_pb_writer;
metadata_pb_writer.reset(new WritablePBContainerFile(std::move(metadata_file)));
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_pb_writer->OpenExisting());
shared_ptr<RWFile> data_file;
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
data_path, &data_file));
uint64_t data_file_size;
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size));
// Create the in-memory container and populate it.
unique_ptr<LogBlockContainer> open_container(new LogBlockContainer(block_manager,
dir,
std::move(metadata_pb_writer),
std::move(data_file)));
open_container->preallocated_offset_ = data_file_size;
VLOG(1) << "Opened log block container " << open_container->ToString();
container->swap(open_container);
return Status::OK();
}
Status LogBlockContainer::TruncateDataToNextBlockOffset() {
if (full()) {
VLOG(2) << Substitute("Truncating container $0 to offset $1",
ToString(), next_block_offset());
RETURN_NOT_OK_HANDLE_ERROR(data_file_->Truncate(next_block_offset()));
}
return Status::OK();
}
Status LogBlockContainer::ProcessRecords(
FsReport* report,
LogBlockManager::UntrackedBlockMap* live_blocks,
LogBlockManager::BlockRecordMap* live_block_records,
vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
uint64_t* max_block_id) {
string metadata_path = metadata_file_->filename();
unique_ptr<RandomAccessFile> metadata_reader;
RETURN_NOT_OK_HANDLE_ERROR(block_manager()->env()->NewRandomAccessFile(
metadata_path, &metadata_reader));
ReadablePBContainerFile pb_reader(std::move(metadata_reader));
RETURN_NOT_OK_HANDLE_ERROR(pb_reader.Open());
uint64_t data_file_size = 0;
Status read_status;
while (true) {
BlockRecordPB record;
read_status = pb_reader.ReadNextPB(&record);
if (!read_status.ok()) {
break;
}
RETURN_NOT_OK(ProcessRecord(&record, report,
live_blocks, live_block_records, dead_blocks,
&data_file_size, max_block_id));
}
// NOTE: 'read_status' will never be OK here.
if (PREDICT_TRUE(read_status.IsEndOfFile())) {
// We've reached the end of the file without any problems.
return Status::OK();
}
if (read_status.IsIncomplete()) {
// We found a partial trailing record in a version of the pb container file
// format that can reliably detect this. Consider this a failed partial
// write and truncate the metadata file to remove this partial record.
report->partial_record_check->entries.emplace_back(ToString(),
pb_reader.offset());
return Status::OK();
}
// If we've made it here, we've found (and are returning) an unrecoverable error.
// Handle any errors we can, e.g. disk failures.
HandleError(read_status);
return read_status;
}
Status LogBlockContainer::ProcessRecord(
BlockRecordPB* record,
FsReport* report,
LogBlockManager::UntrackedBlockMap* live_blocks,
LogBlockManager::BlockRecordMap* live_block_records,
vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
uint64_t* data_file_size,
uint64_t* max_block_id) {
const BlockId block_id(BlockId::FromPB(record->block_id()));
scoped_refptr<LogBlock> lb;
switch (record->op_type()) {
case CREATE:
// First verify that the record's offset/length aren't wildly incorrect.
if (PREDICT_FALSE(!record->has_offset() ||
!record->has_length() ||
record->offset() < 0 ||
record->length() < 0)) {
report->malformed_record_check->entries.emplace_back(ToString(), record);
break;
}
// Now it should be safe to access the record's offset/length.
//
// KUDU-1657: When opening a container in read-only mode which is actively
// being written to by another lbm, we must reinspect the data file's size
// frequently in order to account for the latest appends. Inspecting the
// file size is expensive, so we only do it when the metadata indicates
// that additional data has been written to the file.
if (PREDICT_FALSE(record->offset() + record->length() > *data_file_size)) {
RETURN_NOT_OK_HANDLE_ERROR(data_file_->Size(data_file_size));
}
// If the record still extends beyond the end of the file, it is malformed.
if (PREDICT_FALSE(record->offset() + record->length() > *data_file_size)) {
// TODO(adar): treat as a different kind of inconsistency?
report->malformed_record_check->entries.emplace_back(ToString(), record);
break;
}
lb = new LogBlock(this, block_id, record->offset(), record->length());
if (!InsertIfNotPresent(live_blocks, block_id, lb)) {
// We found a record whose ID matches that of an already created block.
//
// TODO(adar): treat as a different kind of inconsistency?
report->malformed_record_check->entries.emplace_back(
ToString(), record);
break;
}
VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
block_id.ToString(),
record->offset(), record->length());
// This block must be included in the container's logical size, even if
// it has since been deleted. This helps satisfy one of our invariants:
// once a container byte range has been used, it may never be reused in
// the future.
//
// If we ignored deleted blocks, we would end up reusing the space
// belonging to the last deleted block in the container.
UpdateNextBlockOffset(lb->offset(), lb->length());
BlockCreated(lb);
(*live_block_records)[block_id].Swap(record);
*max_block_id = std::max(*max_block_id, block_id.id());
break;
case DELETE:
lb = EraseKeyReturnValuePtr(live_blocks, block_id);
if (!lb) {
// We found a record for which there is no already created block.
//
// TODO(adar): treat as a different kind of inconsistency?
report->malformed_record_check->entries.emplace_back(ToString(), record);
break;
}
VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
BlockDeleted(lb);
CHECK_EQ(1, live_block_records->erase(block_id));
dead_blocks->emplace_back(std::move(lb));
break;
default:
// We found a record with an unknown type.
//
// TODO(adar): treat as a different kind of inconsistency?
report->malformed_record_check->entries.emplace_back(ToString(), record);
break;
}
return Status::OK();
}
Status LogBlockContainer::DoCloseBlocks(const vector<LogWritableBlock*>& blocks,
SyncMode mode) {
DCHECK(!read_only());
auto sync_blocks = [&]() -> Status {
if (mode == SYNC) {
VLOG(3) << "Syncing data file " << data_file_->filename();
RETURN_NOT_OK(SyncData());
}
// Append metadata only after data is synced so that there's
// no chance of metadata landing on the disk before the data.
for (auto* block : blocks) {
RETURN_NOT_OK_PREPEND(block->AppendMetadata(),
"unable to append block's metadata during close");
}
if (mode == SYNC) {
VLOG(3) << "Syncing metadata file " << metadata_file_->filename();
RETURN_NOT_OK(SyncMetadata());
}
RETURN_NOT_OK(block_manager()->SyncContainer(*this));
for (LogWritableBlock* block : blocks) {
if (blocks.size() > 1) DCHECK_EQ(block->state(), WritableBlock::State::FINALIZED);
block->DoClose();
}
return Status::OK();
};
Status s = sync_blocks();
if (!s.ok()) {
// Mark container as read-only to forbid further writes in case of
// failure. Because the on-disk state may contain partial/complete
// data/metadata at this point, it is not safe to either overwrite
// it or append to it.
SetReadOnly();
}
return s;
}
Status LogBlockContainer::PunchHole(int64_t offset, int64_t length) {
DCHECK_GE(offset, 0);
DCHECK_GE(length, 0);
// It is invalid to punch a zero-size hole.
if (length) {
// It's OK if we exceed the file's total size; the kernel will truncate
// our request.
RETURN_NOT_OK_HANDLE_ERROR(data_file_->PunchHole(offset, length));
}
return Status::OK();
}
Status LogBlockContainer::WriteData(int64_t offset, const Slice& data) {
return WriteVData(offset, { data });
}
Status LogBlockContainer::WriteVData(int64_t offset, const vector<Slice>& data) {
DCHECK(!read_only());
DCHECK_GE(offset, next_block_offset());
RETURN_NOT_OK_HANDLE_ERROR(data_file_->WriteV(offset, data));
// This append may have changed the container size if:
// 1. It was large enough that it blew out the preallocated space.
// 2. Preallocation was disabled.
size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size();
});
if (offset + data_size > preallocated_offset_) {
RETURN_NOT_OK_HANDLE_ERROR(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
}
return Status::OK();
}
Status LogBlockContainer::ReadData(int64_t offset, Slice* result) const {
DCHECK_GE(offset, 0);
RETURN_NOT_OK_HANDLE_ERROR(data_file_->Read(offset, result));
return Status::OK();
}
Status LogBlockContainer::ReadVData(int64_t offset, vector<Slice>* results) const {
DCHECK_GE(offset, 0);
RETURN_NOT_OK_HANDLE_ERROR(data_file_->ReadV(offset, results));
return Status::OK();
}
Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) {
DCHECK(!read_only());
// Note: We don't check for sufficient disk space for metadata writes in
// order to allow for block deletion on full disks.
RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(pb));
return Status::OK();
}
Status LogBlockContainer::FlushData(int64_t offset, int64_t length) {
DCHECK(!read_only());
DCHECK_GE(offset, 0);
DCHECK_GE(length, 0);
RETURN_NOT_OK_HANDLE_ERROR(data_file_->Flush(RWFile::FLUSH_ASYNC, offset, length));
return Status::OK();
}
Status LogBlockContainer::FlushMetadata() {
DCHECK(!read_only());
RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Flush());
return Status::OK();
}
Status LogBlockContainer::SyncData() {
DCHECK(!read_only());
if (FLAGS_enable_data_block_fsync) {
if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
RETURN_NOT_OK_HANDLE_ERROR(data_file_->Sync());
}
return Status::OK();
}
Status LogBlockContainer::SyncMetadata() {
DCHECK(!read_only());
if (FLAGS_enable_data_block_fsync) {
if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Sync());
}
return Status::OK();
}
Status LogBlockContainer::ReopenMetadataWriter() {
shared_ptr<RWFile> f;
RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_.OpenExistingFile(
metadata_file_->filename(), &f));
unique_ptr<WritablePBContainerFile> w;
w.reset(new WritablePBContainerFile(std::move(f)));
RETURN_NOT_OK_HANDLE_ERROR(w->OpenExisting());
RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Close());
metadata_file_.swap(w);
return Status::OK();
}
Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
size_t next_append_length) {
DCHECK(!read_only());
DCHECK_GE(block_start_offset, 0);
if (!FLAGS_log_container_preallocate_bytes) {
return Status::OK();
}
// If the last write blew out the preallocation window, or if the next write
// exceeds it, we need to preallocate another chunk.
if (block_start_offset > preallocated_offset_ ||
next_append_length > preallocated_offset_ - block_start_offset) {
int64_t off = std::max(preallocated_offset_, block_start_offset);
int64_t len = FLAGS_log_container_preallocate_bytes;
RETURN_NOT_OK_HANDLE_ERROR(data_file_->PreAllocate(off, len, RWFile::CHANGE_FILE_SIZE));
RETURN_NOT_OK_HANDLE_ERROR(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
VLOG(2) << Substitute("Preallocated $0 bytes at offset $1 in container $2",
len, off, ToString());
preallocated_offset_ = off + len;
}
return Status::OK();
}
void LogBlockContainer::FinalizeBlock(int64_t block_offset, int64_t block_length) {
// Updates this container's next block offset before marking it as available
// to ensure thread safety while updating internal bookkeeping state.
UpdateNextBlockOffset(block_offset, block_length);
// Truncate the container if it's now full; any left over preallocated space
// is no longer needed.
//
// Note that depending on when FinalizeBlock() is called, this can take place
// _after_ the container has been synced to disk. That's OK; truncation isn't
// needed for correctness, and in the event of a crash or error, it will be
// retried at startup.
WARN_NOT_OK(TruncateDataToNextBlockOffset(),
"could not truncate excess preallocated space");
if (full() && block_manager_->metrics()) {
block_manager_->metrics()->full_containers->Increment();
}
block_manager_->MakeContainerAvailable(this);
}
void LogBlockContainer::UpdateNextBlockOffset(int64_t block_offset, int64_t block_length) {
DCHECK(!read_only());
DCHECK_GE(block_offset, 0);
// The log block manager maintains block contiguity as an invariant, which
// means accounting for the new block should be as simple as adding its
// aligned length to 'next_block_offset_'. However, due to KUDU-1793, some
// containers may have developed gaps between blocks. We'll account for them
// by considering both the block's offset and its length.
//
// The number of bytes is rounded up to the nearest filesystem block so
// that each Kudu block is guaranteed to be on a filesystem block
// boundary. This guarantees that the disk space can be reclaimed when
// the block is deleted.
int64_t new_next_block_offset = KUDU_ALIGN_UP(
block_offset + block_length,
instance()->filesystem_block_size_bytes());
next_block_offset_.StoreMax(new_next_block_offset);
if (full()) {
VLOG(1) << Substitute(
"Container $0 with size $1 is now full, max size is $2",
ToString(), next_block_offset(), FLAGS_log_container_max_size);
}
}
void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
DCHECK(!read_only());
DCHECK_GE(block->offset(), 0);
total_bytes_.IncrementBy(block->fs_aligned_length());
total_blocks_.Increment();
live_bytes_.IncrementBy(block->length());
live_bytes_aligned_.IncrementBy(block->fs_aligned_length());
live_blocks_.Increment();
}
void LogBlockContainer::BlockDeleted(const scoped_refptr<LogBlock>& block) {
DCHECK(!read_only());
DCHECK_GE(block->offset(), 0);
live_bytes_.IncrementBy(-block->length());
live_bytes_aligned_.IncrementBy(-block->fs_aligned_length());
live_blocks_.IncrementBy(-1);
}
void LogBlockContainer::ExecClosure(const Closure& task) {
data_dir_->ExecClosure(task);
}
string LogBlockContainer::ToString() const {
string s;
CHECK(TryStripSuffixString(data_file_->filename(),
LogBlockManager::kContainerDataFileSuffix, &s));
return s;
}
void LogBlockContainer::SetReadOnly() {
read_only_.Store(true);
}
////////////////////////////////////////////////////////////
// LogBlock (definition)
////////////////////////////////////////////////////////////
LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
int64_t offset, int64_t length)
: container_(container),
block_id_(block_id),
offset_(offset),
length_(length),
deleted_(false) {
DCHECK_GE(offset, 0);
DCHECK_GE(length, 0);
}
static void DeleteBlockAsync(LogBlockContainer* container,
BlockId block_id,
int64_t offset,
int64_t fs_aligned_length) {
// We don't call SyncData() to synchronize the deletion because it's
// expensive, and in the worst case, we'll just leave orphaned data
// behind to be cleaned up in the next GC.
VLOG(3) << "Freeing space belonging to block " << block_id;
WARN_NOT_OK(container->PunchHole(offset, fs_aligned_length),
Substitute("Could not delete block $0", block_id.ToString()));
}
LogBlock::~LogBlock() {
if (deleted_) {
// Use the block's aligned length so that the filesystem can reclaim
// maximal disk space.
container_->ExecClosure(Bind(&DeleteBlockAsync, container_, block_id_,
offset_, fs_aligned_length()));
}
}
int64_t LogBlock::fs_aligned_length() const {
uint64_t fs_block_size = container_->instance()->filesystem_block_size_bytes();
// Nearly all blocks are placed on a filesystem block boundary, which means
// their length post-alignment is simply their length aligned up to the
// nearest fs block size.
//
// However, due to KUDU-1793, some blocks may start or end at misaligned
// offsets. We don't maintain enough state to precisely pinpoint such a
// block's (aligned) end offset in this case, so we'll just undercount it.
// This should be safe, although it may mean unreclaimed disk space (i.e.
// when fs_aligned_length() is used in hole punching).
if (PREDICT_TRUE(offset_ % fs_block_size == 0)) {
return KUDU_ALIGN_UP(length_, fs_block_size);
}
return length_;
}
void LogBlock::Delete() {
DCHECK(!deleted_);
deleted_ = true;
}
////////////////////////////////////////////////////////////
// LogWritableBlock (definition)
////////////////////////////////////////////////////////////
LogWritableBlock::LogWritableBlock(LogBlockContainer* container,
BlockId block_id, int64_t block_offset)
: container_(container),
block_id_(block_id),
block_offset_(block_offset),
block_length_(0),
state_(CLEAN) {
DCHECK_GE(block_offset, 0);
DCHECK_EQ(0, block_offset % container->instance()->filesystem_block_size_bytes());
if (container->metrics()) {
container->metrics()->generic_metrics.blocks_open_writing->Increment();
container->metrics()->generic_metrics.total_writable_blocks->Increment();
}
}
LogWritableBlock::~LogWritableBlock() {
if (state_ != CLOSED) {
WARN_NOT_OK(Abort(), Substitute("Failed to abort block $0",
id().ToString()));
}
}
Status LogWritableBlock::Close() {
return container_->DoCloseBlocks({ this }, LogBlockContainer::SyncMode::SYNC);
}
Status LogWritableBlock::Abort() {
// Only updates metrics and block state for read-only container.
if (container_->read_only()) {
if (state_ != CLOSED) {
state_ = CLOSED;
if (container_->metrics()) {
container_->metrics()->generic_metrics.blocks_open_writing->Decrement();
container_->metrics()->generic_metrics.total_bytes_written->IncrementBy(
block_length_);
}
}
return Status::Aborted("container $0 is read-only", container_->ToString());
}
// Close the block and then delete it. Theoretically, we could do nothing
// for Abort() other than updating metrics and block state. Here is the
// reasoning why it would be safe to do so. Currently, failures in block
// creation can happen at various stages:
// 1) before the block is finalized (either in CLEAN or DIRTY state). It is
// safe to do nothing, as the block is not yet finalized and next block
// will overwrite the dirty data.
// 2) after the block is finalized but before the metadata is successfully
// appended. That means the container's internal bookkeeping has been
// updated and the data could be durable. Doing nothing can result in
// data-consuming gaps in containers, but these gaps can be cleaned up
// by hole repunching at start up.
// 3) after metadata is appended. If we do nothing, this can result in
// orphaned blocks if the metadata is durable. But orphaned blocks can be
// garbage collected later.
//
// TODO(Hao): implement a fine-grained abort handling.
// However it is better to provide a fine-grained abort handling to
// avoid large chunks of data-consuming gaps and orphaned blocks. A
// possible way to do so is,
// 1) for per block abort, if the block state is FINALIZED, do hole
// punching.
// 2) for BlockTransaction abort, DoCloseBlock() should append metadata
// for deletion and coalescing hole punching for blocks in the same
// container.
RETURN_NOT_OK(container_->DoCloseBlocks({ this }, LogBlockContainer::SyncMode::NO_SYNC));
// DoCloseBlocks() has unlocked the container; it may be locked by someone else.
// But block_manager_ is immutable, so this is safe.
return container_->block_manager()->DeleteBlock(id());
}
const BlockId& LogWritableBlock::id() const {
return block_id_;
}
BlockManager* LogWritableBlock::block_manager() const {
return container_->block_manager();
}
Status LogWritableBlock::Append(const Slice& data) {
return AppendV({ data });
}
Status LogWritableBlock::AppendV(const vector<Slice>& data) {
DCHECK(state_ == CLEAN || state_ == DIRTY)
<< "Invalid state: " << state_;
// This could be true if Finalize() is successful but DoCloseBlocks() is not.
// That means the container is read-only but marked as available. Another
// block can actually try to write to it.
if (container_->read_only()) {
return Status::IllegalState(Substitute("container $0 is read-only",
container_->ToString()));
}
// Calculate the amount of data to write
size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size();
});
// The metadata change is deferred to Close(). We can't do
// it now because the block's length is still in flux.
int64_t cur_block_offset = block_offset_ + block_length_;
RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data_size));
MicrosecondsInt64 start_time = GetMonoTimeMicros();
RETURN_NOT_OK(container_->WriteVData(cur_block_offset, data));
MicrosecondsInt64 end_time = GetMonoTimeMicros();
int64_t dur = end_time - start_time;
TRACE_COUNTER_INCREMENT("lbm_write_time_us", dur);
const char* counter = BUCKETED_COUNTER_NAME("lbm_writes", dur);
TRACE_COUNTER_INCREMENT(counter, 1);
block_length_ += data_size;
state_ = DIRTY;
return Status::OK();
}
Status LogWritableBlock::FlushDataAsync() {
VLOG(3) << "Flushing block " << id();
RETURN_NOT_OK(container_->FlushData(block_offset_, block_length_));
return Status::OK();
}
Status LogWritableBlock::Finalize() {
DCHECK(state_ == CLEAN || state_ == DIRTY || state_ == FINALIZED)
<< "Invalid state: " << state_;
if (state_ == FINALIZED) {
return Status::OK();
}
auto cleanup = MakeScopedCleanup([&]() {
container_->FinalizeBlock(block_offset_, block_length_);
state_ = FINALIZED;
});
VLOG(3) << "Finalizing block " << id();
if (state_ == DIRTY &&
FLAGS_block_manager_preflush_control == "finalize") {
// We do not mark the container as read-only if FlushDataAsync() fails
// since the corresponding metadata has not yet been appended.
RETURN_NOT_OK(FlushDataAsync());
}
return Status::OK();
}
size_t LogWritableBlock::BytesAppended() const {
return block_length_;
}
WritableBlock::State LogWritableBlock::state() const {
return state_;
}
void LogWritableBlock::DoClose() {
if (state_ == CLOSED) return;
if (container_->metrics()) {
container_->metrics()->generic_metrics.blocks_open_writing->Decrement();
container_->metrics()->generic_metrics.total_bytes_written->IncrementBy(
block_length_);
}
// Finalize() was not called; this indicates we should
// finalize the block.
if (state_ == CLEAN || state_ == DIRTY) {
container_->FinalizeBlock(block_offset_, block_length_);
}
scoped_refptr<LogBlock> lb = container_->block_manager()->AddLogBlock(
container_, block_id_, block_offset_, block_length_);
CHECK(lb);
container_->BlockCreated(lb);
state_ = CLOSED;
}
Status LogWritableBlock::AppendMetadata() {
BlockRecordPB record;
id().CopyToPB(record.mutable_block_id());
record.set_op_type(CREATE);
record.set_timestamp_us(GetCurrentTimeMicros());
record.set_offset(block_offset_);
record.set_length(block_length_);
return container_->AppendMetadata(record);
}
////////////////////////////////////////////////////////////
// LogReadableBlock
////////////////////////////////////////////////////////////
// A log-backed block that has been opened for reading.
//
// Refers to a LogBlock representing the block's persisted metadata.
class LogReadableBlock : public ReadableBlock {
public:
LogReadableBlock(LogBlockContainer* container,
const scoped_refptr<LogBlock>& log_block);
virtual ~LogReadableBlock();
virtual Status Close() OVERRIDE;
virtual const BlockId& id() const OVERRIDE;
virtual Status Size(uint64_t* sz) const OVERRIDE;
virtual Status Read(uint64_t offset, Slice* result) const OVERRIDE;
virtual Status ReadV(uint64_t offset, vector<Slice>* results) const OVERRIDE;
virtual size_t memory_footprint() const OVERRIDE;
private:
// The owning container. Must outlive this block.
LogBlockContainer* container_;
// A reference to this block's metadata.
scoped_refptr<internal::LogBlock> log_block_;
// Whether or not this block has been closed. Close() is thread-safe, so
// this must be an atomic primitive.
AtomicBool closed_;
DISALLOW_COPY_AND_ASSIGN(LogReadableBlock);
};
LogReadableBlock::LogReadableBlock(LogBlockContainer* container,
const scoped_refptr<LogBlock>& log_block)
: container_(container),
log_block_(log_block),
closed_(false) {
if (container_->metrics()) {
container_->metrics()->generic_metrics.blocks_open_reading->Increment();
container_->metrics()->generic_metrics.total_readable_blocks->Increment();
}
}
LogReadableBlock::~LogReadableBlock() {
WARN_NOT_OK(Close(), Substitute("Failed to close block $0",
id().ToString()));
}
Status LogReadableBlock::Close() {
if (closed_.CompareAndSet(false, true)) {
log_block_.reset();
if (container_->metrics()) {
container_->metrics()->generic_metrics.blocks_open_reading->Decrement();
}
}
return Status::OK();
}
const BlockId& LogReadableBlock::id() const {
return log_block_->block_id();
}
Status LogReadableBlock::Size(uint64_t* sz) const {
DCHECK(!closed_.Load());
*sz = log_block_->length();
return Status::OK();
}
Status LogReadableBlock::Read(uint64_t offset, Slice* result) const {
vector<Slice> results = { *result };
return ReadV(offset, &results);
}
Status LogReadableBlock::ReadV(uint64_t offset, vector<Slice>* results) const {
DCHECK(!closed_.Load());
size_t read_length = accumulate(results->begin(), results->end(), static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size();
});
uint64_t read_offset = log_block_->offset() + offset;
if (log_block_->length() < offset + read_length) {
return Status::IOError("Out-of-bounds read",
Substitute("read of [$0-$1) in block [$2-$3)",
read_offset,
read_offset + read_length,
log_block_->offset(),
log_block_->offset() + log_block_->length()));
}
MicrosecondsInt64 start_time = GetMonoTimeMicros();
RETURN_NOT_OK(container_->ReadVData(read_offset, results));
MicrosecondsInt64 end_time = GetMonoTimeMicros();
int64_t dur = end_time - start_time;
TRACE_COUNTER_INCREMENT("lbm_read_time_us", dur);
const char* counter = BUCKETED_COUNTER_NAME("lbm_reads", dur);
TRACE_COUNTER_INCREMENT(counter, 1);
if (container_->metrics()) {
container_->metrics()->generic_metrics.total_bytes_read->IncrementBy(read_length);
}
return Status::OK();
}
size_t LogReadableBlock::memory_footprint() const {
return kudu_malloc_usable_size(this);
}
} // namespace internal
////////////////////////////////////////////////////////////
// LogBlockManager
////////////////////////////////////////////////////////////
const char* LogBlockManager::kContainerMetadataFileSuffix = ".metadata";
const char* LogBlockManager::kContainerDataFileSuffix = ".data";
// These values were arrived at via experimentation. See commit 4923a74 for
// more details.
const map<int64_t, int64_t> LogBlockManager::kPerFsBlockSizeBlockLimits({
{ 1024, 673 },
{ 2048, 1353 },
{ 4096, 2721 }});
LogBlockManager::LogBlockManager(Env* env,
DataDirManager* dd_manager,
FsErrorManager* error_manager,
const BlockManagerOptions& opts)
: mem_tracker_(MemTracker::CreateTracker(-1,
"log_block_manager",
opts.parent_mem_tracker)),
dd_manager_(DCHECK_NOTNULL(dd_manager)),
error_manager_(DCHECK_NOTNULL(error_manager)),
file_cache_("lbm", env, GetFileCacheCapacityForBlockManager(env),
opts.metric_entity),
blocks_by_block_id_(10,
BlockMap::hasher(),
BlockMap::key_equal(),
BlockAllocator(mem_tracker_)),
env_(DCHECK_NOTNULL(env)),
read_only_(opts.read_only),
buggy_el6_kernel_(IsBuggyEl6Kernel(env->GetKernelRelease())),
next_block_id_(1) {
blocks_by_block_id_.set_deleted_key(BlockId());
// HACK: when running in a test environment, we often instantiate many
// LogBlockManagers in the same process, eg corresponding to different
// tablet servers in a minicluster, or due to running many separate test
// cases of some CFile-related code. In that case, we need to make it more
// likely that the block IDs are not reused. So, instead of starting with
// block ID 1, we'll start with a random block ID. A collision is still
// possible, but exceedingly unlikely.
if (IsGTest()) {
Random r(GetRandomSeed32());
next_block_id_.Store(r.Next64());
}
if (opts.metric_entity) {
metrics_.reset(new internal::LogBlockManagerMetrics(opts.metric_entity));
}
}
LogBlockManager::~LogBlockManager() {
// Release all of the memory accounted by the blocks.
int64_t mem = 0;
for (const auto& entry : blocks_by_block_id_) {
mem += kudu_malloc_usable_size(entry.second.get());
}
mem_tracker_->Release(mem);
// A LogBlock's destructor depends on its container, so all LogBlocks must be
// destroyed before their containers.
blocks_by_block_id_.clear();
// Containers may have outstanding tasks running on data directories; wait
// for them to complete before destroying the containers.
dd_manager_->WaitOnClosures();
STLDeleteValues(&all_containers_by_name_);
}
Status LogBlockManager::Open(FsReport* report) {
RETURN_NOT_OK(file_cache_.Init());
// Establish (and log) block limits for each data directory using kernel,
// filesystem, and gflags information.
for (const auto& dd : dd_manager_->data_dirs()) {
boost::optional<int64_t> limit;
if (FLAGS_log_container_max_blocks == -1) {
// No limit, unless this is KUDU-1508.
// The log block manager requires hole punching and, of the ext
// filesystems, only ext4 supports it. Thus, if this is an ext
// filesystem, it's ext4 by definition.
if (buggy_el6_kernel_ && dd->fs_type() == DataDirFsType::EXT) {
uint64_t fs_block_size =
dd->instance()->metadata()->filesystem_block_size_bytes();
bool untested_block_size =
!ContainsKey(kPerFsBlockSizeBlockLimits, fs_block_size);
string msg = Substitute(
"Data dir $0 is on an ext4 filesystem vulnerable to KUDU-1508 "
"with $1block size $2", dd->dir(),
untested_block_size ? "untested " : "", fs_block_size);
if (untested_block_size) {
LOG(WARNING) << msg;
} else {
LOG(INFO) << msg;
}
limit = LookupBlockLimit(fs_block_size);
}
} else if (FLAGS_log_container_max_blocks > 0) {
// Use the provided limit.
limit = FLAGS_log_container_max_blocks;
}
if (limit) {
LOG(INFO) << Substitute(
"Limiting containers on data directory $0 to $1 blocks",
dd->dir(), *limit);
}
InsertOrDie(&block_limits_by_data_dir_, dd.get(), limit);
}
vector<FsReport> reports(dd_manager_->data_dirs().size());
vector<Status> statuses(dd_manager_->data_dirs().size());
int i = -1;
for (const auto& dd : dd_manager_->data_dirs()) {
i++;
uint16_t uuid_idx;
CHECK(dd_manager_->FindUuidIndexByDataDir(dd.get(), &uuid_idx));
// TODO(awong): store Statuses for each directory in the directory manager
// so we can avoid these artificial Statuses.
if (dd_manager_->IsDataDirFailed(uuid_idx)) {
statuses[i] = Status::IOError("Data directory failed", "", EIO);
continue;
}
// Open the data dir asynchronously.
dd->ExecClosure(
Bind(&LogBlockManager::OpenDataDir,
Unretained(this),
dd.get(),
&reports[i],
&statuses[i]));
}
// Wait for the opens to complete.
for (const auto& dd : dd_manager_->data_dirs()) {
dd->WaitOnClosures();
}
if (dd_manager_->GetFailedDataDirs().size() == dd_manager_->data_dirs().size()) {
return Status::IOError("All data dirs failed to open", "", EIO);
}
// Ensure that no open failed without being handled.
//
// Currently only disk failures are handled. Reports from failed disks are
// unusable.
FsReport merged_report;
for (i = 0; i < statuses.size(); i++) {
const Status& s = statuses[i];
if (PREDICT_TRUE(s.ok())) {
merged_report.MergeFrom(reports[i]);
continue;
}
if (!s.IsDiskFailure()) {
return s;
}
LOG(ERROR) << Substitute("Not using report from $0: $1",
dd_manager_->data_dirs()[i]->dir(), s.ToString());
}
// Either return or log the report.
if (report) {
*report = std::move(merged_report);
} else {
RETURN_NOT_OK(merged_report.LogAndCheckForFatalErrors());
}
return Status::OK();
}
Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
unique_ptr<WritableBlock>* block) {
CHECK(!read_only_);
// Find a free container. If one cannot be found, create a new one.
//
// TODO(unknown): should we cap the number of outstanding containers and
// force callers to block if we've reached it?
LogBlockContainer* container;
RETURN_NOT_OK(GetOrCreateContainer(opts, &container));
// Generate a free block ID.
// We have to loop here because earlier versions used non-sequential block IDs,
// and thus we may have to "skip over" some block IDs that are claimed.
BlockId new_block_id;
do {
new_block_id.SetId(next_block_id_.Increment());
} while (!TryUseBlockId(new_block_id));
block->reset(new LogWritableBlock(container,
new_block_id,
container->next_block_offset()));
VLOG(3) << "Created block " << (*block)->id() << " in container "
<< container->ToString();
return Status::OK();
}
Status LogBlockManager::OpenBlock(const BlockId& block_id,
unique_ptr<ReadableBlock>* block) {
scoped_refptr<LogBlock> lb;
{
std::lock_guard<simple_spinlock> l(lock_);
lb = FindPtrOrNull(blocks_by_block_id_, block_id);
}
if (!lb) {
return Status::NotFound("Can't find block", block_id.ToString());
}
block->reset(new internal::LogReadableBlock(lb->container(),
lb.get()));
VLOG(3) << "Opened block " << (*block)->id()
<< " from container " << lb->container()->ToString();
return Status::OK();
}
Status LogBlockManager::DeleteBlock(const BlockId& block_id) {
CHECK(!read_only_);
// Deletion is forbidden for read-only container.
scoped_refptr<LogBlock> lb;
{
std::lock_guard<simple_spinlock> l(lock_);
auto it = blocks_by_block_id_.find(block_id);
if (it == blocks_by_block_id_.end()) {
return Status::NotFound("Can't find block", block_id.ToString());
}
lb = it->second;
if (lb->container()->read_only()) {
return Status::IllegalState(Substitute("container $0 is read-only",
lb->container()->ToString()));
}
// Return early if deleting a block in a failed directory.
set<uint16_t> failed_dirs = dd_manager_->GetFailedDataDirs();
if (PREDICT_FALSE(!failed_dirs.empty())) {
uint16_t uuid_idx;
CHECK(dd_manager_->FindUuidIndexByDataDir(lb->container()->data_dir(), &uuid_idx));
if (ContainsKey(failed_dirs, uuid_idx)) {
LOG_EVERY_N(INFO, 10) << Substitute("Block $0 is in a failed directory; not deleting",
block_id.ToString());
return Status::IOError("Block is in a failed directory");
}
}
RemoveLogBlockUnlocked(it);
}
VLOG(3) << "Deleting block " << block_id;
lb->Delete();
lb->container()->BlockDeleted(lb);
// Record the on-disk deletion.
//
// TODO(unknown): what if this fails? Should we restore the in-memory block?
BlockRecordPB record;
block_id.CopyToPB(record.mutable_block_id());
record.set_op_type(DELETE);
record.set_timestamp_us(GetCurrentTimeMicros());
RETURN_NOT_OK_PREPEND(lb->container()->AppendMetadata(record),
"Unable to append deletion record to block metadata");
// We don't bother fsyncing the metadata append for deletes in order to avoid
// the disk overhead. Even if we did fsync it, we'd still need to account for
// garbage at startup time (in the event that we crashed just before the
// fsync).
//
// TODO(KUDU-829): Implement GC of orphaned blocks.
return Status::OK();
}
Status LogBlockManager::CloseBlocks(const std::vector<unique_ptr<WritableBlock>>& blocks) {
VLOG(3) << "Closing " << blocks.size() << " blocks";
unordered_map<LogBlockContainer*, vector<LogWritableBlock*>> created_block_map;
for (const auto& block : blocks) {
LogWritableBlock* lwb = down_cast<LogWritableBlock*>(block.get());
if (FLAGS_block_manager_preflush_control == "close") {
// Ask the kernel to begin writing out each block's dirty data. This is
// done up-front to give the kernel opportunities to coalesce contiguous
// dirty pages.
RETURN_NOT_OK(lwb->FlushDataAsync());
}
created_block_map[lwb->container()].emplace_back(lwb);
}
// Close all blocks and sync the blocks belonging to the same
// container together to reduce fsync() usage, waiting for them
// to become durable.
for (const auto& entry : created_block_map) {
RETURN_NOT_OK(entry.first->DoCloseBlocks(entry.second,
LogBlockContainer::SyncMode::SYNC));
}
return Status::OK();
}
Status LogBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
std::lock_guard<simple_spinlock> l(lock_);
block_ids->assign(open_block_ids_.begin(), open_block_ids_.end());
AppendKeysFromMap(blocks_by_block_id_, block_ids);
return Status::OK();
}
void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
DCHECK(lock_.is_locked());
InsertOrDie(&all_containers_by_name_, container->ToString(), container);
if (metrics()) {
metrics()->containers->Increment();
if (container->full()) {
metrics()->full_containers->Increment();
}
}
}
void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name) {
DCHECK(lock_.is_locked());
unique_ptr<LogBlockContainer> to_delete(EraseKeyReturnValuePtr(
&all_containers_by_name_, container_name));
CHECK(to_delete);
CHECK(to_delete->full())
<< Substitute("Container $0 is not full", container_name);
if (metrics()) {
metrics()->containers->Decrement();
metrics()->full_containers->Decrement();
}
}
Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
LogBlockContainer** container) {
DataDir* dir;
RETURN_NOT_OK(dd_manager_->GetNextDataDir(opts, &dir));
{
std::lock_guard<simple_spinlock> l(lock_);
auto& d = available_containers_by_data_dir_[DCHECK_NOTNULL(dir)];
if (!d.empty()) {
*container = d.front();
d.pop_front();
return Status::OK();
}
}
// All containers are in use; create a new one.
unique_ptr<LogBlockContainer> new_container;
Status s = LogBlockContainer::Create(this, dir, &new_container);
// We could create a container in a different directory, but there's
// currently no point in doing so. On disk failure, the tablet specified by
// 'opts' will be shut down, so the returned container would not be used.
HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
RETURN_NOT_OK_PREPEND(s, "Could not create new log block container at " + dir->dir());
{
std::lock_guard<simple_spinlock> l(lock_);
dirty_dirs_.insert(dir->dir());
AddNewContainerUnlocked(new_container.get());
}
*container = new_container.release();
return Status::OK();
}
void LogBlockManager::MakeContainerAvailable(LogBlockContainer* container) {
std::lock_guard<simple_spinlock> l(lock_);
MakeContainerAvailableUnlocked(container);
}
void LogBlockManager::MakeContainerAvailableUnlocked(LogBlockContainer* container) {
DCHECK(lock_.is_locked());
if (container->full() || container->read_only()) {
return;
}
available_containers_by_data_dir_[container->data_dir()].push_front(container);
}
Status LogBlockManager::SyncContainer(const LogBlockContainer& container) {
Status s;
bool to_sync = false;
{
std::lock_guard<simple_spinlock> l(lock_);
to_sync = dirty_dirs_.erase(container.data_dir()->dir());
}
if (to_sync && FLAGS_enable_data_block_fsync) {
if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
s = env_->SyncDir(container.data_dir()->dir());
// If SyncDir fails, the container directory must be restored to
// dirty_dirs_. Otherwise a future successful LogWritableBlock::Close()
// on this container won't call SyncDir again, and the container might
// be lost on crash.
//
// In the worst case (another block synced this container as we did),
// we'll sync it again needlessly.
if (!s.ok()) {
container.HandleError(s);
std::lock_guard<simple_spinlock> l(lock_);
dirty_dirs_.insert(container.data_dir()->dir());
}
}
return s;
}
bool LogBlockManager::TryUseBlockId(const BlockId& block_id) {
if (block_id.IsNull()) {
return false;
}
std::lock_guard<simple_spinlock> l(lock_);
if (ContainsKey(blocks_by_block_id_, block_id)) {
return false;
}
return InsertIfNotPresent(&open_block_ids_, block_id);
}
scoped_refptr<LogBlock> LogBlockManager::AddLogBlock(
LogBlockContainer* container,
const BlockId& block_id,
int64_t offset,
int64_t length) {
std::lock_guard<simple_spinlock> l(lock_);
scoped_refptr<LogBlock> lb(new LogBlock(container, block_id, offset, length));
mem_tracker_->Consume(kudu_malloc_usable_size(lb.get()));
if (AddLogBlockUnlocked(lb)) {
return lb;
}
return nullptr;
}
bool LogBlockManager::AddLogBlockUnlocked(const scoped_refptr<LogBlock>& lb) {
DCHECK(lock_.is_locked());
if (!InsertIfNotPresent(&blocks_by_block_id_, lb->block_id(), lb)) {
return false;
}
VLOG(2) << Substitute("Added block: offset $0, length $1",
lb->offset(), lb->length());
// There may already be an entry in open_block_ids_ (e.g. we just finished
// writing out a block).
open_block_ids_.erase(lb->block_id());
if (metrics()) {
metrics()->blocks_under_management->Increment();
metrics()->bytes_under_management->IncrementBy(lb->length());
}
return true;
}
void LogBlockManager::RemoveLogBlockUnlocked(const BlockMap::iterator& it) {
scoped_refptr<LogBlock> lb = std::move(it->second);
blocks_by_block_id_.erase(it);
VLOG(2) << Substitute("Removed block: offset $0, length $1",
lb->offset(), lb->length());
mem_tracker_->Release(kudu_malloc_usable_size(lb.get()));
if (metrics()) {
metrics()->blocks_under_management->Decrement();
metrics()->bytes_under_management->DecrementBy(lb->length());
}
}
void LogBlockManager::OpenDataDir(DataDir* dir,
FsReport* report,
Status* result_status) {
FsReport local_report;
local_report.data_dirs.push_back(dir->dir());
// We are going to perform these checks.
//
// Note: this isn't necessarily the complete set of FsReport checks; there
// may be checks that the LBM cannot perform.
local_report.full_container_space_check.emplace();
local_report.incomplete_container_check.emplace();
local_report.malformed_record_check.emplace();
local_report.misaligned_block_check.emplace();
local_report.partial_record_check.emplace();
// Keep track of deleted blocks whose space hasn't been punched; they will
// be repunched during repair.
vector<scoped_refptr<internal::LogBlock>> need_repunching;
// Keep track of containers that have nothing but dead blocks; they will be
// deleted during repair.
vector<string> dead_containers;
// Keep track of containers whose live block ratio is low; their metadata
// files will be compacted during repair.
unordered_map<string, vector<BlockRecordPB>> low_live_block_containers;
// Find all containers and open them.
unordered_set<string> containers_seen;
vector<string> children;
Status s = env_->GetChildren(dir->dir(), &children);
if (!s.ok()) {
HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
*result_status = s.CloneAndPrepend(Substitute(
"Could not list children of $0", dir->dir()));
return;
}
MonoTime last_opened_container_log_time = MonoTime::Now();
for (const string& child : children) {
string container_name;
if (!TryStripSuffixString(
child, LogBlockManager::kContainerDataFileSuffix, &container_name) &&
!TryStripSuffixString(
child, LogBlockManager::kContainerMetadataFileSuffix, &container_name)) {
continue;
}
if (!InsertIfNotPresent(&containers_seen, container_name)) {
continue;
}
unique_ptr<LogBlockContainer> container;
s = LogBlockContainer::Open(
this, dir, &local_report, container_name, &container);
if (s.IsAborted()) {
// Skip the container. Open() added a record of it to 'local_report' for us.
continue;
}
if (!s.ok()) {
*result_status = s.CloneAndPrepend(Substitute(
"Could not open container $0", container_name));
return;
}
// Process the records, building a container-local map for live blocks and
// a list of dead blocks.
//
// It's important that we don't try to add these blocks to the global map
// incrementally as we see each record, since it's possible that one container
// has a "CREATE <b>" while another has a "CREATE <b> ; DELETE <b>" pair.
// If we processed those two containers in this order, then upon processing
// the second container, we'd think there was a duplicate block. Building
// the container-local map first ensures that we discount deleted blocks
// before checking for duplicate IDs.
//
// NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse
// exceedingly unlikely. However, we might have old data which still exhibits
// the above issue.
UntrackedBlockMap live_blocks;
BlockRecordMap live_block_records;
vector<scoped_refptr<internal::LogBlock>> dead_blocks;
uint64_t max_block_id = 0;
s = container->ProcessRecords(&local_report,
&live_blocks,
&live_block_records,
&dead_blocks,
&max_block_id);
if (!s.ok()) {
*result_status = s.CloneAndPrepend(Substitute(
"Could not process records in container $0", container->ToString()));
return;
}
// With deleted blocks out of the way, check for misaligned blocks.
//
// We could also enforce that the record's offset is aligned with the
// underlying filesystem's block size, an invariant maintained by the log
// block manager. However, due to KUDU-1793, that invariant may have been
// broken, so we'll note but otherwise allow it.
for (const auto& e : live_blocks) {
if (PREDICT_FALSE(e.second->offset() %
container->instance()->filesystem_block_size_bytes() != 0)) {
local_report.misaligned_block_check->entries.emplace_back(
container->ToString(), e.first);
}
}
if (container->full()) {
// Full containers without any live blocks can be deleted outright.
//
// TODO(adar): this should be reported as an inconsistency once dead
// container deletion is also done in real time. Until then, it would be
// confusing to report it as such since it'll be a natural event at startup.
if (container->live_blocks() == 0) {
DCHECK(live_blocks.empty());
dead_containers.emplace_back(container->ToString());
} else if (static_cast<double>(container->live_blocks()) /
container->total_blocks() <= FLAGS_log_container_live_metadata_before_compact_ratio) {
// Metadata files of containers with very few live blocks will be compacted.
//
// TODO(adar): this should be reported as an inconsistency once
// container metadata compaction is also done in realtime. Until then,
// it would be confusing to report it as such since it'll be a natural
// event at startup.
vector<BlockRecordPB> records(live_block_records.size());
int i = 0;
for (auto& e : live_block_records) {
records[i].Swap(&e.second);
i++;
}
// Sort the records such that their ordering reflects the ordering in
// the pre-compacted metadata file.
//
// This is preferred to storing the records in an order-preserving
// container (such as std::map) because while records are temporarily
// retained for every container, only some containers will actually
// undergo metadata compaction.
std::sort(records.begin(), records.end(),
[](const BlockRecordPB& a, const BlockRecordPB& b) {
// Sort by timestamp.
if (a.timestamp_us() != b.timestamp_us()) {
return a.timestamp_us() < b.timestamp_us();
}
// If the timestamps match, sort by offset.
//
// If the offsets also match (i.e. both blocks are of zero length),
// it doesn't matter which of the two records comes first.
return a.offset() < b.offset();
});
low_live_block_containers[container->ToString()] = std::move(records);
}
// Having processed the block records, let's check whether any full
// containers have any extra space (left behind after a crash or from an
// older version of Kudu).
//
// Filesystems are unpredictable beasts and may misreport the amount of
// space allocated to a file in various interesting ways. Some examples:
// - XFS's speculative preallocation feature may artificially enlarge the
// container's data file without updating its file size. This makes the
// file size untrustworthy for the purposes of measuring allocated space.
// See KUDU-1856 for more details.
// - On el6.6/ext4 a container data file that consumed ~32K according to
// its extent tree was actually reported as consuming an additional fs
// block (2k) of disk space. A similar container data file (generated
// via the same workload) on Ubuntu 16.04/ext4 did not exhibit this.
// The suspicion is that older versions of ext4 include interior nodes
// of the extent tree when reporting file block usage.
//
// To deal with these issues, our extra space cleanup code (deleted block
// repunching and container truncation) is gated on an "actual disk space
// consumed" heuristic. To prevent unnecessary triggering of the
// heuristic, we allow for some slop in our size measurements. The exact
// amount of slop is configurable via
// log_container_excess_space_before_cleanup_fraction.
//
// Too little slop and we'll do unnecessary work at startup. Too much and
// more unused space may go unreclaimed.
string data_filename = StrCat(container->ToString(), kContainerDataFileSuffix);
uint64_t reported_size;
s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
if (!s.ok()) {
HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
*result_status = s.CloneAndPrepend(Substitute(
"Could not get on-disk file size of container $0", container->ToString()));
return;
}
int64_t cleanup_threshold_size = container->live_bytes_aligned() *
(1 + FLAGS_log_container_excess_space_before_cleanup_fraction);
if (reported_size > cleanup_threshold_size) {
local_report.full_container_space_check->entries.emplace_back(
container->ToString(), reported_size - container->live_bytes_aligned());
// If the container is to be deleted outright, don't bother repunching
// its blocks. The report entry remains, however, so it's clear that
// there was a space discrepancy.
if (container->live_blocks()) {
need_repunching.insert(need_repunching.end(),
dead_blocks.begin(), dead_blocks.end());
}
}
local_report.stats.lbm_full_container_count++;
}
local_report.stats.live_block_bytes += container->live_bytes();
local_report.stats.live_block_bytes_aligned += container->live_bytes_aligned();
local_report.stats.live_block_count += container->live_blocks();
local_report.stats.lbm_container_count++;
// Log number of containers opened every 10 seconds
MonoTime now = MonoTime::Now();
if ((now - last_opened_container_log_time).ToSeconds() > 10) {
LOG(INFO) << Substitute("Opened $0 log block containers in $1",
local_report.stats.lbm_container_count, dir->dir());
last_opened_container_log_time = now;
}
next_block_id_.StoreMax(max_block_id + 1);
// Under the lock, merge this map into the main block map and add
// the container.
{
std::lock_guard<simple_spinlock> l(lock_);
// To avoid cacheline contention during startup, we aggregate all of the
// memory in a local and add it to the mem-tracker in a single increment
// at the end of this loop.
int64_t mem_usage = 0;
for (const UntrackedBlockMap::value_type& e : live_blocks) {
if (!AddLogBlockUnlocked(e.second)) {
// TODO(adar): track as an inconsistency?
LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
<< " which already is alive from another container when "
<< " processing container " << container->ToString();
}
mem_usage += kudu_malloc_usable_size(e.second.get());
}
mem_tracker_->Consume(mem_usage);
AddNewContainerUnlocked(container.get());
MakeContainerAvailableUnlocked(container.release());
}
}
// Like the rest of Open(), repairs are performed per data directory to take
// advantage of parallelism.
s = Repair(dir,
&local_report,
std::move(need_repunching),
std::move(dead_containers),
std::move(low_live_block_containers));
if (!s.ok()) {
*result_status = s.CloneAndPrepend(Substitute(
"fatal error while repairing inconsistencies in data directory $0",
dir->dir()));
return;
}
*report = std::move(local_report);
*result_status = Status::OK();
}
#define RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(status_expr, msg) do { \
Status s_ = (status_expr); \
s_ = s_.CloneAndPrepend(msg); \
RETURN_NOT_OK_HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(dir)); \
} while (0);
#define WARN_NOT_OK_LBM_DISK_FAILURE(status_expr, msg) do { \
Status s_ = (status_expr); \
HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(dir)); \
WARN_NOT_OK(s_, msg); \
} while (0);
Status LogBlockManager::Repair(
DataDir* dir,
FsReport* report,
vector<scoped_refptr<internal::LogBlock>> need_repunching,
vector<string> dead_containers,
unordered_map<string, vector<BlockRecordPB>> low_live_block_containers) {
if (read_only_) {
LOG(INFO) << "Read-only block manager, skipping repair";
return Status::OK();
}
if (report->HasFatalErrors()) {
LOG(WARNING) << "Found fatal and irreparable errors, skipping repair";
return Status::OK();
}
// From here on out we're committed to repairing.
// Fetch all the containers we're going to need.
unordered_map<std::string, internal::LogBlockContainer*> containers_by_name;
{
std::lock_guard<simple_spinlock> l(lock_);
// Remove all of the dead containers from the block manager. They will be
// deleted from disk shortly thereafter, outside of the lock.
for (const auto& d : dead_containers) {
RemoveFullContainerUnlocked(d);
}
// Fetch all the containers we're going to need.
if (report->partial_record_check) {
for (const auto& pr : report->partial_record_check->entries) {
LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
pr.container);
if (c) {
containers_by_name[pr.container] = c;
}
}
}
if (report->full_container_space_check) {
for (const auto& fcp : report->full_container_space_check->entries) {
LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
fcp.container);
if (c) {
containers_by_name[fcp.container] = c;
}
}
}
for (const auto& e : low_live_block_containers) {
LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
e.first);
if (c) {
containers_by_name[e.first] = c;
}
}
}
// Delete all dead containers.
//
// After the deletions, the data directory is sync'ed to reduce the chance
// of a data file existing without its corresponding metadata file (or vice
// versa) in the event of a crash. The block manager would treat such a case
// as corruption and require manual intervention.
//
// TODO(adar) the above is not fool-proof; a crash could manifest in between
// any pair of deletions. That said, the odds of it happening are incredibly
// rare, and manual resolution isn't hard (just delete the existing file).
int64_t deleted_metadata_bytes = 0;
for (const auto& d : dead_containers) {
string data_file_name = StrCat(d, kContainerDataFileSuffix);
string metadata_file_name = StrCat(d, kContainerMetadataFileSuffix);
uint64_t metadata_size;
Status s = env_->GetFileSize(metadata_file_name, &metadata_size);
if (s.ok()) {
deleted_metadata_bytes += metadata_size;
} else {
WARN_NOT_OK_LBM_DISK_FAILURE(s,
"Could not get size of dead container metadata file " + metadata_file_name);
}
WARN_NOT_OK_LBM_DISK_FAILURE(file_cache_.DeleteFile(data_file_name),
"Could not delete dead container data file " + data_file_name);
WARN_NOT_OK_LBM_DISK_FAILURE(file_cache_.DeleteFile(metadata_file_name),
"Could not delete dead container metadata file " + metadata_file_name);
}
if (!dead_containers.empty()) {
WARN_NOT_OK_LBM_DISK_FAILURE(env_->SyncDir(dir->dir()), "Could not sync data directory");
LOG(INFO) << Substitute("Deleted $0 dead containers ($1 metadata bytes)",
dead_containers.size(), deleted_metadata_bytes);
}
// Truncate partial metadata records.
//
// This is a fatal inconsistency; if the repair fails, we cannot proceed.
if (report->partial_record_check) {
for (auto& pr : report->partial_record_check->entries) {
unique_ptr<RWFile> file;
RWFileOptions opts;
opts.mode = Env::OPEN_EXISTING;
internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
pr.container);
if (!container) {
// The container was deleted outright.
pr.repaired = true;
continue;
}
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(
env_->NewRWFile(opts,
StrCat(pr.container, kContainerMetadataFileSuffix),
&file),
"could not reopen container to truncate partial metadata record");
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(file->Truncate(pr.offset),
"could not truncate partial metadata record");
// Technically we've "repaired" the inconsistency if the truncation
// succeeded, even if the following logic fails.
pr.repaired = true;
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(file->Close(),
"could not close container after truncating partial metadata record");
// Reopen the PB writer so that it will refresh its metadata about the
// underlying file and resume appending to the new end of the file.
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(container->ReopenMetadataWriter(),
"could not reopen container metadata file");
}
}
// Delete any incomplete container files.
//
// This is a non-fatal inconsistency; we can just as easily ignore the
// leftover container files.
if (report->incomplete_container_check) {
for (auto& ic : report->incomplete_container_check->entries) {
Status s = env_->DeleteFile(
StrCat(ic.container, kContainerMetadataFileSuffix));
if (!s.ok() && !s.IsNotFound()) {
WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container metadata file");
}
s = env_->DeleteFile(StrCat(ic.container, kContainerDataFileSuffix));
if (!s.ok() && !s.IsNotFound()) {
WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container data file");
}
ic.repaired = true;
}
}
// Truncate any excess preallocated space in full containers.
//
// This is a non-fatal inconsistency; we can just as easily ignore the extra
// disk space consumption.
if (report->full_container_space_check) {
for (auto& fcp : report->full_container_space_check->entries) {
internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
fcp.container);
if (!container) {
// The container was deleted outright.
fcp.repaired = true;
continue;
}
Status s = container->TruncateDataToNextBlockOffset();
if (s.ok()) {
fcp.repaired = true;
}
WARN_NOT_OK(s, "could not truncate excess preallocated space");
}
}
// Repunch all requested holes. Any excess space reclaimed was already
// tracked by LBMFullContainerSpaceCheck.
//
// TODO(adar): can be more efficient (less fs work and more space reclamation
// in case of misaligned blocks) via hole coalescing first, but this is easy.
for (const auto& b : need_repunching) {
b->Delete();
}
// Clearing this vector drops the last references to the LogBlocks within,
// triggering the repunching operations.
need_repunching.clear();
// "Compact" metadata files with few live blocks by rewriting them with only
// the live block records.
int64_t metadata_files_compacted = 0;
int64_t metadata_bytes_delta = 0;
for (const auto& e : low_live_block_containers) {
internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
e.first);
if (!container) {
// The container was deleted outright.
continue;
}
// Rewrite this metadata file. Failures are non-fatal.
int64_t file_bytes_delta;
const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
Status s = RewriteMetadataFile(*container, e.second, &file_bytes_delta);
if (!s.ok()) {
WARN_NOT_OK(s, "could not rewrite metadata file");
continue;
}
// However, we're hosed if we can't open the new metadata file.
RETURN_NOT_OK_PREPEND(container->ReopenMetadataWriter(),
"could not reopen new metadata file");
metadata_files_compacted++;
metadata_bytes_delta += file_bytes_delta;
VLOG(1) << "Compacted metadata file " << meta_path
<< " (saved " << file_bytes_delta << " bytes)";
}
// The data directory can be synchronized once for all of the new metadata files.
//
// Non-disk failures are fatal: if a new metadata file doesn't durably exist
// in the data directory, it would be unsafe to append new block records to
// it. This is because after a crash the old metadata file may appear
// instead, and that file lacks the newly appended block records.
//
// TODO(awong): The below will only be true with persistent disk states.
// Disk failures do not suffer from this issue because, on the next startup,
// the entire directory will not be used.
if (metadata_files_compacted > 0) {
Status s = env_->SyncDir(dir->dir());
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(s, "Could not sync data directory");
LOG(INFO) << Substitute("Compacted $0 metadata files ($1 metadata bytes)",
metadata_files_compacted, metadata_bytes_delta);
}
return Status::OK();
}
Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container,
const vector<BlockRecordPB>& records,
int64_t* file_bytes_delta) {
uint64_t old_metadata_size;
const string metadata_file_name = StrCat(container.ToString(), kContainerMetadataFileSuffix);
// Get the container's data directory's UUID for error handling.
const string dir = container.data_dir()->dir();
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->GetFileSize(metadata_file_name, &old_metadata_size),
"could not get size of old metadata file");
// Create a new container metadata file with only the live block records.
//
// By using a temporary file and renaming it over the original file at the
// end, we ensure that we can recover from a failure at any point. Any
// temporary files left behind are cleaned up by the FsManager at startup.
string tmpl = metadata_file_name + kTmpInfix + ".XXXXXX";
unique_ptr<RWFile> tmp_file;
string tmp_file_name;
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->NewTempRWFile(RWFileOptions(), tmpl,
&tmp_file_name, &tmp_file),
"could not create temporary metadata file");
env_util::ScopedFileDeleter tmp_deleter(env_, tmp_file_name);
WritablePBContainerFile pb_file(std::move(tmp_file));
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(pb_file.CreateNew(BlockRecordPB()),
"could not initialize temporary metadata file");
for (const auto& r : records) {
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(pb_file.Append(r),
"could not append to temporary metadata file");
}
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(pb_file.Sync(),
"could not sync temporary metadata file");
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(pb_file.Close(),
"could not close temporary metadata file");
uint64_t new_metadata_size;
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->GetFileSize(tmp_file_name, &new_metadata_size),
"could not get file size of temporary metadata file");
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->RenameFile(tmp_file_name, metadata_file_name),
"could not rename temporary metadata file");
// Evict the old path from the file cache, so that when we re-open the new
// metadata file for write, we don't accidentally get a cache hit on the
// old file descriptor pointing to the now-deleted old version.
file_cache_.Invalidate(metadata_file_name);
tmp_deleter.Cancel();
*file_bytes_delta = (static_cast<int64_t>(old_metadata_size) - new_metadata_size);
return Status::OK();
}
std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {
return container->ToString();
}
bool LogBlockManager::IsBuggyEl6Kernel(const string& kernel_release) {
autodigit_less lt;
// Only el6 is buggy.
if (kernel_release.find("el6") == string::npos) return false;
// Kernels in the 6.8 update stream (2.6.32-642.a.b) are fixed
// for a >= 15.
//
// https://rhn.redhat.com/errata/RHSA-2017-0307.html
if (MatchPattern(kernel_release, "2.6.32-642.*.el6.*") &&
lt("2.6.32-642.15.0", kernel_release)) {
return false;
}
// If the kernel older is than 2.6.32-674 (el6.9), it's buggy.
return lt(kernel_release, "2.6.32-674");
}
int64_t LogBlockManager::LookupBlockLimit(int64_t fs_block_size) {
const int64_t* limit = FindFloorOrNull(kPerFsBlockSizeBlockLimits,
fs_block_size);
if (limit) {
return *limit;
}
// Block size must have been less than the very first key. Return the
// first recorded entry and hope for the best.
return kPerFsBlockSizeBlockLimits.begin()->second;
}
} // namespace fs
} // namespace kudu