| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/fs/log_block_manager.h" |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cerrno> |
| #include <cstddef> |
| #include <cstdint> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <numeric> |
| #include <optional> |
| #include <ostream> |
| #include <set> |
| #include <string> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| |
| #include "kudu/fs/block_manager_metrics.h" |
| #include "kudu/fs/data_dirs.h" |
| #include "kudu/fs/dir_manager.h" |
| #include "kudu/fs/dir_util.h" |
| #include "kudu/fs/error_manager.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/fs/fs_report.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/numbers.h" |
| #include "kudu/gutil/strings/strcat.h" |
| #include "kudu/gutil/strings/strip.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/strings/util.h" |
| #include "kudu/gutil/walltime.h" |
| #include "kudu/util/alignment.h" |
| #include "kudu/util/array_view.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/fault_injection.h" |
| #include "kudu/util/file_cache.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/flag_validators.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/malloc.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/random_util.h" |
| #include "kudu/util/rw_mutex.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/sorted_disjoint_interval_list.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/test_util_prod.h" |
| #include "kudu/util/threadpool.h" |
| #include "kudu/util/trace.h" |
| |
| DECLARE_bool(enable_data_block_fsync); |
| DECLARE_string(block_manager_preflush_control); |
| |
| // TODO(unknown): How should this be configured? Should provide some guidance. |
| DEFINE_uint64(log_container_max_size, 10LU * 1024 * 1024 * 1024, |
| "Maximum size (soft) of a log container"); |
| TAG_FLAG(log_container_max_size, advanced); |
| |
| DEFINE_uint64(log_container_metadata_max_size, 0, |
| "Maximum size (soft) of a log container's metadata. Use 0 for " |
| "no limit."); |
| TAG_FLAG(log_container_metadata_max_size, advanced); |
| TAG_FLAG(log_container_metadata_max_size, experimental); |
| TAG_FLAG(log_container_metadata_max_size, runtime); |
| |
| DEFINE_bool(log_container_metadata_runtime_compact, false, |
| "Whether to enable metadata file compaction at runtime."); |
| TAG_FLAG(log_container_metadata_runtime_compact, advanced); |
| TAG_FLAG(log_container_metadata_runtime_compact, experimental); |
| TAG_FLAG(log_container_metadata_runtime_compact, runtime); |
| |
| DEFINE_int64(log_container_max_blocks, -1, |
| "Maximum number of blocks (soft) of a log container. Use 0 for " |
| "no limit. Use -1 for no limit except in the case of a kernel " |
| "bug with hole punching on ext4 (see KUDU-1508 for details)."); |
| TAG_FLAG(log_container_max_blocks, advanced); |
| |
| DEFINE_uint64(log_container_preallocate_bytes, 32LU * 1024 * 1024, |
| "Number of bytes to preallocate in a log container when " |
| "creating new blocks. Set to 0 to disable preallocation"); |
| TAG_FLAG(log_container_preallocate_bytes, advanced); |
| |
| DEFINE_double(log_container_excess_space_before_cleanup_fraction, 0.10, |
| "Additional fraction of a log container's calculated size that " |
| "must be consumed on disk before the container is considered to " |
| "be inconsistent and subject to excess space cleanup at block " |
| "manager startup."); |
| TAG_FLAG(log_container_excess_space_before_cleanup_fraction, advanced); |
| |
| DEFINE_double(log_container_live_metadata_before_compact_ratio, 0.50, |
| "Desired ratio of live block metadata in log containers. If a " |
| "container's live to total block ratio dips below this value, " |
| "the container's metadata file will be compacted at startup, " |
| "or at runtime, based on the configuration of " |
| "--log_container_metadata_size_before_compact_ratio."); |
| TAG_FLAG(log_container_live_metadata_before_compact_ratio, experimental); |
| |
| DEFINE_double(log_container_metadata_size_before_compact_ratio, 0.80, |
| "Desired portion of --log_container_metadata_max_size container metadata must " |
| "consume before metadata is considered for compaction. If a container's metadata " |
| "file exceeds --log_container_metadata_max_size * " |
| "--log_container_metadata_size_before_compact_ratio, and the number of live blocks " |
| "falls below --log_container_live_metadata_before_compact_ratio, the container's " |
| "metadata will be compacted."); |
| TAG_FLAG(log_container_metadata_size_before_compact_ratio, advanced); |
| TAG_FLAG(log_container_metadata_size_before_compact_ratio, experimental); |
| |
| DEFINE_bool(log_block_manager_test_hole_punching, true, |
| "Ensure hole punching is supported by the underlying filesystem"); |
| TAG_FLAG(log_block_manager_test_hole_punching, advanced); |
| TAG_FLAG(log_block_manager_test_hole_punching, unsafe); |
| |
| DEFINE_bool(log_block_manager_delete_dead_container, true, |
| "When enabled, full and dead log block containers will be deleted " |
| "at runtime, which can potentially help improving log block manager " |
| "startup time"); |
| TAG_FLAG(log_block_manager_delete_dead_container, advanced); |
| TAG_FLAG(log_block_manager_delete_dead_container, experimental); |
| |
| DEFINE_int32(log_container_metadata_rewrite_inject_latency_ms, 0, |
| "Amount of latency in ms to inject when rewrite metadata file. " |
| "Only for testing."); |
| TAG_FLAG(log_container_metadata_rewrite_inject_latency_ms, hidden); |
| |
| METRIC_DEFINE_gauge_uint64(server, log_block_manager_bytes_under_management, |
| "Bytes Under Management", |
| kudu::MetricUnit::kBytes, |
| "Number of bytes of data blocks currently under management", |
| kudu::MetricLevel::kInfo); |
| |
| METRIC_DEFINE_gauge_uint64(server, log_block_manager_blocks_under_management, |
| "Blocks Under Management", |
| kudu::MetricUnit::kBlocks, |
| "Number of data blocks currently under management", |
| kudu::MetricLevel::kInfo); |
| |
| METRIC_DEFINE_gauge_uint64(server, log_block_manager_containers, |
| "Number of Block Containers", |
| kudu::MetricUnit::kLogBlockContainers, |
| "Number of log block containers", |
| kudu::MetricLevel::kInfo); |
| |
| METRIC_DEFINE_gauge_uint64(server, log_block_manager_full_containers, |
| "Number of Full Block Containers", |
| kudu::MetricUnit::kLogBlockContainers, |
| "Number of full log block containers", |
| kudu::MetricLevel::kInfo); |
| |
| METRIC_DEFINE_counter(server, log_block_manager_holes_punched, |
| "Number of Holes Punched", |
| kudu::MetricUnit::kHoles, |
| "Number of holes punched since service start", |
| kudu::MetricLevel::kDebug); |
| |
| METRIC_DEFINE_counter(server, log_block_manager_dead_containers_deleted, |
| "Number of Dead Block Containers Deleted", |
| kudu::MetricUnit::kLogBlockContainers, |
| "Number of full (but dead) block containers that were deleted", |
| kudu::MetricLevel::kDebug); |
| |
| METRIC_DEFINE_gauge_uint64(server, log_block_manager_total_containers_startup, |
| "Total number of Log Block Containers during startup", |
| kudu::MetricUnit::kLogBlockContainers, |
| "Number of log block containers which were present during the server " |
| "startup", |
| kudu::MetricLevel::kInfo); |
| |
| METRIC_DEFINE_gauge_uint64(server, log_block_manager_processed_containers_startup, |
| "Number of Log Block Containers opened during startup", |
| kudu::MetricUnit::kLogBlockContainers, |
| "Number of log block containers which were opened/processed during " |
| "the server startup", |
| kudu::MetricLevel::kInfo); |
| |
| using kudu::fs::internal::LogBlock; |
| using kudu::fs::internal::LogBlockContainer; |
| using kudu::fs::internal::LogBlockDeletionTransaction; |
| using kudu::fs::internal::LogWritableBlock; |
| using kudu::pb_util::ReadablePBContainerFile; |
| using kudu::pb_util::WritablePBContainerFile; |
| using std::accumulate; |
| using std::map; |
| using std::optional; |
| using std::set; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| namespace fs { |
| |
| bool ValidateMetadataCompactFlags() { |
| if (FLAGS_log_container_metadata_runtime_compact && |
| FLAGS_log_container_metadata_size_before_compact_ratio <= 0) { |
| LOG(ERROR) << Substitute( |
| "--log_container_metadata_size_before_compact_ratio ($0) must be greater than 0 when " |
| "enabling --log_container_metadata_runtime_compact", |
| FLAGS_log_container_metadata_size_before_compact_ratio); |
| return false; |
| } |
| return true; |
| } |
| GROUP_FLAG_VALIDATOR(metadata_compact_flags, ValidateMetadataCompactFlags); |
| |
| namespace internal { |
| |
| //////////////////////////////////////////////////////////// |
| // LogBlockManagerMetrics |
| //////////////////////////////////////////////////////////// |
| |
| // Metrics container associated with the log block manager. |
| // |
| // Includes implementation-agnostic metrics as well as some that are |
| // specific to the log block manager. |
| struct LogBlockManagerMetrics { |
| explicit LogBlockManagerMetrics(const scoped_refptr<MetricEntity>& metric_entity); |
| |
| // Implementation-agnostic metrics. |
| BlockManagerMetrics generic_metrics; |
| |
| scoped_refptr<AtomicGauge<uint64_t>> bytes_under_management; |
| scoped_refptr<AtomicGauge<uint64_t>> blocks_under_management; |
| |
| scoped_refptr<AtomicGauge<uint64_t>> containers; |
| scoped_refptr<AtomicGauge<uint64_t>> full_containers; |
| |
| scoped_refptr<AtomicGauge<uint64_t>> total_containers_startup; |
| scoped_refptr<AtomicGauge<uint64_t>> processed_containers_startup; |
| |
| scoped_refptr<Counter> holes_punched; |
| scoped_refptr<Counter> dead_containers_deleted; |
| }; |
| |
| #define MINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity)) |
| #define GINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity, 0)) |
| LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>& metric_entity) |
| : generic_metrics(metric_entity), |
| GINIT(bytes_under_management), |
| GINIT(blocks_under_management), |
| GINIT(containers), |
| GINIT(full_containers), |
| GINIT(total_containers_startup), |
| GINIT(processed_containers_startup), |
| MINIT(holes_punched), |
| MINIT(dead_containers_deleted) { |
| } |
| #undef GINIT |
| |
| //////////////////////////////////////////////////////////// |
| // LogBlock (declaration) |
| //////////////////////////////////////////////////////////// |
| |
| // The persistent metadata that describes a logical block. |
| // |
| // A block grows a LogBlock when its data has been synchronized with |
| // the disk. That's when it's fully immutable (i.e. none of its metadata |
| // can change), and when it becomes readable and persistent. |
| // |
| // LogBlocks are reference counted to simplify support for deletion with |
| // outstanding readers. All refcount increments are performed with the |
| // block manager lock held, as are deletion-based decrements. However, |
| // no lock is held when ~LogReadableBlock decrements the refcount, thus it |
| // must be made thread safe (by extending RefCountedThreadSafe instead of |
| // the simpler RefCounted). |
| class LogBlock : public RefCountedThreadSafe<LogBlock> { |
| public: |
| LogBlock(LogBlockContainerRefPtr container, BlockId block_id, int64_t offset, |
| int64_t length); |
| ~LogBlock() = default; |
| |
| const BlockId& block_id() const { return block_id_; } |
| LogBlockContainer* container() const { return container_.get(); } |
| int64_t offset() const { return offset_; } |
| int64_t length() const { return length_; } |
| |
| // Returns a block's length aligned to the nearest filesystem block size. |
| int64_t fs_aligned_length() const; |
| |
| // Registers the deletion of the block with a deletion transaction. Actual |
| // deletion will take place when the deletion transaction is destructed. |
| void RegisterDeletion(const shared_ptr<LogBlockDeletionTransaction>& transaction); |
| |
| private: |
| // The owning container. |
| LogBlockContainerRefPtr container_; |
| |
| // The block identifier. |
| const BlockId block_id_; |
| |
| // The block's offset in the container. |
| const int64_t offset_; |
| |
| // The block's length. |
| const int64_t length_; |
| |
| // The block deletion transaction with which this block has been registered. |
| shared_ptr<LogBlockDeletionTransaction> transaction_; |
| |
| DISALLOW_COPY_AND_ASSIGN(LogBlock); |
| }; |
| |
| //////////////////////////////////////////////////////////// |
| // LogWritableBlock (declaration) |
| //////////////////////////////////////////////////////////// |
| |
| // A log-backed block that has been opened for writing. |
| // |
| // There's no reference to a LogBlock as this block has yet to be |
| // persisted. |
| class LogWritableBlock : public WritableBlock { |
| public: |
| LogWritableBlock(LogBlockContainerRefPtr container, BlockId block_id, |
| int64_t block_offset); |
| |
| ~LogWritableBlock(); |
| |
| Status Close() override; |
| |
| Status Abort() override; |
| |
| const BlockId& id() const override; |
| |
| BlockManager* block_manager() const override; |
| |
| Status Append(const Slice& data) override; |
| |
| Status AppendV(ArrayView<const Slice> data) override; |
| |
| Status Finalize() override; |
| |
| size_t BytesAppended() const override; |
| |
| State state() const override; |
| |
| // Actually close the block, finalizing it if it has not yet been |
| // finalized. Also updates various metrics. |
| // |
| // This is called after synchronization of dirty data and metadata |
| // to disk. |
| void DoClose(); |
| |
| // Starts an asynchronous flush of dirty block data to disk. |
| Status FlushDataAsync(); |
| |
| // Write this block's metadata to disk. |
| // |
| // Does not synchronize the written data; that takes place in Close(). |
| Status AppendMetadata(); |
| |
| LogBlockContainer* container() const { return container_.get(); } |
| |
| private: |
| // The owning container. |
| LogBlockContainerRefPtr container_; |
| |
| // The block's identifier. |
| const BlockId block_id_; |
| |
| // The block's offset within the container. Known from the moment the |
| // block is created. |
| const int64_t block_offset_; |
| |
| // The block's length. Changes with each Append(). |
| int64_t block_length_; |
| |
| // The state of the block describing where it is in the write lifecycle, |
| // for example, has it been synchronized to disk? |
| WritableBlock::State state_; |
| |
| DISALLOW_COPY_AND_ASSIGN(LogWritableBlock); |
| }; |
| |
| //////////////////////////////////////////////////////////// |
| // LogBlockContainer |
| //////////////////////////////////////////////////////////// |
| |
| // A single block container belonging to the log-backed block manager. |
| // |
| // A container may only be used to write one WritableBlock at a given time. |
| // However, existing blocks may be deleted concurrently. As such, almost |
| // all container functions must be reentrant, even if the container itself |
| // is logically thread unsafe (i.e. multiple clients calling WriteData() |
| // concurrently will produce nonsensical container data). Thread unsafe |
| // functions are marked explicitly. |
| // |
| // Block containers are reference counted so that they can be safely removed |
| // despite concurrent access. |
| class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { |
| public: |
| enum SyncMode { |
| SYNC, |
| NO_SYNC |
| }; |
| |
| // Creates a new block container in 'dir'. |
| static Status Create(LogBlockManager* block_manager, |
| Dir* dir, |
| LogBlockContainerRefPtr* container); |
| |
| // Opens an existing block container in 'dir'. |
| // |
| // Every container is comprised of two files: "<dir>/<id>.data" and |
| // "<dir>/<id>.metadata". Together, 'dir' and 'id' fully describe both |
| // files. |
| // |
| // Returns Status::Aborted() in the case that the metadata and data files |
| // both appear to have no data (e.g. due to a crash just after creating |
| // one of them but before writing any records). This is recorded in 'report'. |
| static Status Open(LogBlockManager* block_manager, |
| Dir* dir, |
| FsReport* report, |
| const string& id, |
| LogBlockContainerRefPtr* container); |
| |
| // The destructor will delete files of this container if it is dead. |
| ~LogBlockContainer(); |
| |
| // Closes a set of blocks belonging to this container, possibly synchronizing |
| // the dirty data and metadata to disk. |
| // |
| // If successful, adds all blocks to the block manager's in-memory maps. |
| Status DoCloseBlocks(const vector<LogWritableBlock*>& blocks, SyncMode mode); |
| |
| // Frees the space associated with a block or a group of blocks at 'offset' |
| // and 'length'. This is a physical operation, not a logical one; a separate |
| // AppendMetadata() is required to record the deletion in container metadata. |
| // |
| // The on-disk effects of this call are made durable only after SyncData(). |
| Status PunchHole(int64_t offset, int64_t length); |
| |
| // Executes a hole punching operation at 'offset' with the given 'length'. |
| void ContainerDeletionAsync(int64_t offset, int64_t length); |
| |
| // Preallocate enough space to ensure that an append of 'next_append_length' |
| // can be satisfied by this container. The offset of the beginning of this |
| // block must be provided in 'block_start_offset' (since container |
| // bookkeeping is only updated when a block is finished). |
| // |
| // Does nothing if preallocation is disabled. |
| Status EnsurePreallocated(int64_t block_start_offset, |
| size_t next_append_length); |
| |
| // See RWFile::Write() |
| Status WriteData(int64_t offset, const Slice& data); |
| |
| // See RWFile::WriteV() |
| Status WriteVData(int64_t offset, ArrayView<const Slice> data); |
| |
| // See RWFile::Read(). |
| Status ReadData(int64_t offset, Slice result) const; |
| |
| // See RWFile::ReadV(). |
| Status ReadVData(int64_t offset, ArrayView<Slice> results) const; |
| |
| // Appends 'pb' to this container's metadata file. |
| // |
| // The on-disk effects of this call are made durable only after SyncMetadata(). |
| Status AppendMetadata(const BlockRecordPB& pb); |
| |
| // Asynchronously flush this container's data file from 'offset' through |
| // to 'length'. |
| // |
| // Does not guarantee data durability; use SyncData() for that. |
| Status FlushData(int64_t offset, int64_t length); |
| |
| // Synchronize this container's data file with the disk. On success, |
| // guarantees that the data is made durable. |
| // |
| // TODO(unknown): Add support to synchronize just a range. |
| Status SyncData(); |
| |
| // Synchronize this container's metadata file with the disk. On success, |
| // guarantees that the metadata is made durable. |
| // |
| // TODO(unknown): Add support to synchronize just a range. |
| Status SyncMetadata(); |
| |
| // Reopen the metadata file record writer. Should be called if the underlying |
| // file was changed. |
| Status ReopenMetadataWriter(); |
| |
| // Truncates this container's data file to 'next_block_offset_' if it is |
| // full. This effectively removes any preallocated but unused space. |
| // |
| // Should be called only when 'next_block_offset_' is up-to-date with |
| // respect to the data on disk (i.e. after the container's records have |
| // been loaded), otherwise data may be lost! |
| // |
| // This function is thread unsafe. |
| Status TruncateDataToNextBlockOffset(); |
| |
| // Whether to update internal counters based on processed records. This may be |
| // useful to avoid recomputing container statistics during operations that |
| // don't change them, e.g. compacting container metadata. |
| enum class ProcessRecordType { |
| kReadOnly, // Read records only. |
| kReadAndUpdate, // Read records and update container's statistic. |
| }; |
| // Reads the container's metadata from disk, sanity checking and processing |
| // records along the way. |
| // |
| // Malformed records and other container inconsistencies are written to |
| // 'report'. Healthy blocks are written either to 'live_blocks' or |
| // 'dead_blocks'. Live records are written to 'live_block_records'. The |
| // greatest block ID seen thus far in the container is written to 'max_block_id'. |
| // |
| // Returns an error only if there was a problem accessing the container from |
| // disk; such errors are fatal and effectively halt processing immediately. |
| Status ProcessRecords( |
| FsReport* report, |
| LogBlockManager::UntrackedBlockMap* live_blocks, |
| LogBlockManager::BlockRecordMap* live_block_records, |
| vector<LogBlockRefPtr>* dead_blocks, |
| uint64_t* max_block_id, |
| ProcessRecordType type); |
| |
| // Updates internal bookkeeping state to reflect the creation of a block. |
| void BlockCreated(const LogBlockRefPtr& block); |
| |
| // Updates internal bookkeeping state to reflect the deletion of a block. |
| // |
| // This function is thread safe because block deletions can happen concurrently |
| // with creations. |
| // |
| // Note: the container is not made "unfull"; containers remain sparse until deleted. |
| void BlockDeleted(const LogBlockRefPtr& block); |
| |
| // Finalizes a fully written block. It updates the container data file's position, |
| // truncates the container if full and marks the container as available. |
| void FinalizeBlock(int64_t block_offset, int64_t block_length); |
| |
| // Runs a task on this container's data directory thread pool. |
| // |
| // Normally the task is performed asynchronously. However, if submission to |
| // the pool fails, it runs synchronously on the current thread. |
| void ExecClosure(const std::function<void()>& task); |
| |
| // Produces a debug-friendly string representation of this container. |
| string ToString() const; |
| |
| // Makes the container read-only and stores the responsible error. |
| void SetReadOnly(const Status& error); |
| |
| // Handles errors if the input status is not OK. |
| void HandleError(const Status& s) const; |
| |
| // Returns whether or not the container has been marked read-only. |
| bool read_only() const { |
| return !read_only_status().ok(); |
| } |
| |
| // Returns the error that caused the container to become read-only, or OK if |
| // the container has not been marked read-only. |
| Status read_only_status() const { |
| std::lock_guard<simple_spinlock> l(read_only_lock_); |
| return read_only_status_; |
| } |
| |
| // Simple accessors. |
| LogBlockManager* block_manager() const { return block_manager_; } |
| int64_t next_block_offset() const { return next_block_offset_.Load(); } |
| int64_t total_bytes() const { return total_bytes_.Load(); } |
| int64_t total_blocks() const { return total_blocks_.Load(); } |
| int64_t live_bytes() const { return live_bytes_.Load(); } |
| int64_t live_bytes_aligned() const { return live_bytes_aligned_.Load(); } |
| int64_t live_blocks() const { return live_blocks_.Load(); } |
| int32_t blocks_being_written() const { return blocks_being_written_.Load(); } |
| bool full() const { |
| if (next_block_offset() >= FLAGS_log_container_max_size || |
| (max_num_blocks_ && total_blocks() >= max_num_blocks_)) { |
| return true; |
| } |
| |
| if (FLAGS_log_container_metadata_max_size <= 0) { |
| return false; |
| } |
| |
| // Try lock before reading metadata offset, consider it not full if lock failed. |
| shared_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock); |
| if (!l.owns_lock()) { |
| return false; |
| } |
| return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size; |
| } |
| |
| bool dead() const { return dead_.Load(); } |
| const LogBlockManagerMetrics* metrics() const { return metrics_; } |
| Dir* data_dir() const { return data_dir_; } |
| const DirInstanceMetadataPB* instance() const { return data_dir_->instance()->metadata(); } |
| |
| // Adjusts the number of blocks being written. |
| // Positive means increase, negative means decrease. |
| int32_t blocks_being_written_incr(int32_t value) { |
| return blocks_being_written_.IncrementBy(value); |
| } |
| |
| // Check that the container meets the death condition. |
| // |
| // Although this code looks like a TOCTOU violation, it is safe because of |
| // some additional LBM invariants: |
| // 1) When a container becomes full, it stays full for the process' lifetime. |
| // 2) A full container will never accrue another live block. Meaning, losing |
| // its last live block is a terminal state for a full container. |
| // 3) The only exception to #2 is if the container currently has a finalized |
| // but not-yet-closed WritableBlock. In this case the container became full |
| // when the WritableBlock was finalized, but the live block counter only |
| // reflects the new block when it is closed. |
| bool check_death_condition() const { |
| return (full() && live_blocks() == 0 && blocks_being_written() == 0 && |
| FLAGS_log_block_manager_delete_dead_container); |
| } |
| |
| // Tries to mark the container as 'dead', which means it will be deleted |
| // when it goes out of scope. Can only be set dead once. |
| // |
| // If successful, returns true; otherwise returns false. |
| bool TrySetDead() { |
| if (dead()) return false; |
| return dead_.CompareAndSet(false, true); |
| } |
| |
| bool ShouldCompact() const { |
| shared_lock<RWMutex> l(metadata_compact_lock_); |
| return ShouldCompactUnlocked(); |
| } |
| |
| bool ShouldCompactUnlocked() const { |
| DCHECK_GT(FLAGS_log_container_metadata_max_size, 0); |
| if (live_blocks() >= |
| total_blocks() * FLAGS_log_container_live_metadata_before_compact_ratio) { |
| return false; |
| } |
| |
| return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size * |
| FLAGS_log_container_metadata_size_before_compact_ratio; |
| } |
| |
| void CompactMetadata(); |
| |
| static std::vector<BlockRecordPB> SortRecords(LogBlockManager::BlockRecordMap live_block_records); |
| |
| private: |
| LogBlockContainer(LogBlockManager* block_manager, Dir* data_dir, |
| unique_ptr<WritablePBContainerFile> metadata_file, |
| shared_ptr<RWFile> data_file); |
| |
| // Check the container whether it is fine. |
| // |
| // OK: both files of the container exist; |
| // Aborted: the container will be repaired later; |
| // NotFound: one file of the container has gone missing; |
| // *-------------*------------*----------------*-------------------------*---------------------* |
| // |DATA\METADATA| NONE EXIST | EXIST && < MIN | EXIST && NO LIVE BLOCKS | EXIST && LIVE BLOCKS| |
| // *-------------*------------*----------------*-------------------------*---------------------* |
| // | NONE EXIST | | Aborted | Aborted | NotFound | |
| // *-------------*------------*----------------*-------------------------*---------------------* |
| // | EXIST && 0 | Aborted | Aborted | OK | OK | |
| // *-------------*------------*----------------*-------------------------*---------------------* |
| // | EXIST && >0 | NotFound | OK | OK | OK | |
| // *-------------*------------*----------------*-------------------------*---------------------* |
| // |
| // Note: the status here only represents the result of check. |
| static Status CheckContainerFiles(LogBlockManager* block_manager, |
| FsReport* report, |
| const Dir* dir, |
| const string& common_path, |
| const string& data_path, |
| const string& metadata_path); |
| |
| // Processes a single block record, performing sanity checks on it and adding |
| // it either to 'live_blocks' or 'dead_blocks'. If the record is live, it is |
| // added to 'live_block_records'. |
| // |
| // Returns an error only if there was a problem accessing the container from |
| // disk; such errors are fatal and effectively halt processing immediately. |
| // |
| // On success, 'report' is updated with any inconsistencies found in the |
| // record, 'data_file_size' may be updated with the latest size of the |
| // container's data file, and 'max_block_id' reflects the largest block ID |
| // seen thus far in the container. |
| // |
| // Note: 'record' may be swapped into 'report'; do not use it after calling |
| // this function. |
| Status ProcessRecord( |
| BlockRecordPB* record, |
| FsReport* report, |
| LogBlockManager::UntrackedBlockMap* live_blocks, |
| LogBlockManager::BlockRecordMap* live_block_records, |
| vector<LogBlockRefPtr>* dead_blocks, |
| uint64_t* data_file_size, |
| uint64_t* max_block_id, |
| ProcessRecordType type); |
| |
| // Updates this container data file's position based on the offset and length |
| // of a block, marking this container as full if needed. Should only be called |
| // when a block is fully written, or after an encryption header is written, as |
| // it will round up the container data file's position. |
| // |
| // This function is thread unsafe. |
| void UpdateNextBlockOffset(int64_t block_offset, int64_t block_length); |
| |
| // The owning block manager. Must outlive the container itself. |
| LogBlockManager* const block_manager_; |
| |
| // The data directory where the container lives. |
| Dir* data_dir_; |
| |
| const optional<int64_t> max_num_blocks_; |
| |
| // Offset up to which we have preallocated bytes. |
| int64_t preallocated_offset_ = 0; |
| |
| // Protect 'metadata_file_', only rewriting should add write lock, |
| // appending and syncing only need read lock, cause there is an |
| // internal lock for these operations in WritablePBContainerFile. |
| mutable RWMutex metadata_compact_lock_; |
| // Opened file handles to the container's files. |
| unique_ptr<WritablePBContainerFile> metadata_file_; |
| shared_ptr<RWFile> data_file_; |
| |
| // The offset of the next block to be written to the container. |
| AtomicInt<int64_t> next_block_offset_; |
| |
| // The amount of data (post block alignment) written thus far to the container. |
| AtomicInt<int64_t> total_bytes_; |
| |
| // TODO(yingchun): add metadata bytes for metadata. |
| //AtomicInt<int64_t> metadata_bytes_; |
| |
| // The number of blocks written thus far in the container. |
| AtomicInt<int64_t> total_blocks_; |
| |
| // The amount of data present in not-yet-deleted blocks of the container. |
| AtomicInt<int64_t> live_bytes_; |
| |
| // The amount of data (post block alignment) present in not-yet-deleted |
| // blocks of the container. |
| AtomicInt<int64_t> live_bytes_aligned_; |
| |
| // The number of not-yet-deleted blocks in the container. |
| AtomicInt<int64_t> live_blocks_; |
| |
| // The number of LogWritableBlocks currently open for this container. |
| AtomicInt<int32_t> blocks_being_written_; |
| |
| // Whether or not this container has been marked as dead. |
| AtomicBool dead_; |
| |
| // The metrics. Not owned by the log container; it has the same lifespan |
| // as the block manager. |
| const LogBlockManagerMetrics* metrics_; |
| |
| // If true, only read operations are allowed. Existing blocks may |
| // not be deleted until the next restart, and new blocks may not |
| // be added. |
| mutable simple_spinlock read_only_lock_; |
| Status read_only_status_; |
| |
| DISALLOW_COPY_AND_ASSIGN(LogBlockContainer); |
| }; |
| |
| #define CONTAINER_DISK_FAILURE(status_expr, msg) do { \ |
| Status s_ = (status_expr); \ |
| HANDLE_DISK_FAILURE(s_, block_manager_->error_manager_->RunErrorNotificationCb( \ |
| ErrorHandlerType::DISK_ERROR, data_dir_)); \ |
| WARN_NOT_OK(s_, msg); \ |
| } while (0) |
| |
| LogBlockContainer::LogBlockContainer( |
| LogBlockManager* block_manager, |
| Dir* data_dir, |
| unique_ptr<WritablePBContainerFile> metadata_file, |
| shared_ptr<RWFile> data_file) |
| : block_manager_(block_manager), |
| data_dir_(data_dir), |
| max_num_blocks_(FindOrDie(block_manager->block_limits_by_data_dir_, |
| data_dir)), |
| metadata_compact_lock_(RWMutex::Priority::PREFER_READING), |
| metadata_file_(std::move(metadata_file)), |
| data_file_(std::move(data_file)), |
| next_block_offset_(0), |
| total_bytes_(0), |
| total_blocks_(0), |
| live_bytes_(0), |
| live_bytes_aligned_(0), |
| live_blocks_(0), |
| blocks_being_written_(0), |
| dead_(false), |
| metrics_(block_manager->metrics()) { |
| // If we have an encryption header, we need to align the next offset to the |
| // next file system block. |
| if (auto encryption_header_size = data_file_->GetEncryptionHeaderSize(); |
| encryption_header_size > 0) { |
| UpdateNextBlockOffset(0, encryption_header_size); |
| live_bytes_.Store(encryption_header_size); |
| total_bytes_.Store(next_block_offset_.Load()); |
| live_bytes_aligned_.Store(next_block_offset_.Load()); |
| } |
| } |
| |
| LogBlockContainer::~LogBlockContainer() { |
| if (dead()) { |
| CHECK(!block_manager_->opts_.read_only); |
| string data_file_name = data_file_->filename(); |
| string metadata_file_name = metadata_file_->filename(); |
| string data_failure_msg = |
| "Could not delete dead container data file " + data_file_name; |
| string metadata_failure_msg = |
| "Could not delete dead container metadata file " + metadata_file_name; |
| if (PREDICT_TRUE(block_manager_->file_cache_)) { |
| CONTAINER_DISK_FAILURE(block_manager_->file_cache_->DeleteFile(data_file_name), |
| data_failure_msg); |
| CONTAINER_DISK_FAILURE(block_manager_->file_cache_->DeleteFile(metadata_file_name), |
| metadata_failure_msg); |
| } else { |
| CONTAINER_DISK_FAILURE(block_manager_->env_->DeleteFile(data_file_name), |
| data_failure_msg); |
| CONTAINER_DISK_FAILURE(block_manager_->env_->DeleteFile(metadata_file_name), |
| metadata_failure_msg); |
| } |
| } |
| } |
| |
| void LogBlockContainer::HandleError(const Status& s) const { |
| HANDLE_DISK_FAILURE(s, |
| block_manager()->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, |
| data_dir_)); |
| } |
| |
| void LogBlockContainer::CompactMetadata() { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, 5, Substitute("CompactMetadata $0", ToString())); |
| // Skip compacting if lock failed to reduce overhead, metadata is on compacting or will be |
| // compacted next time. |
| std::unique_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock); |
| if (!l.owns_lock()) { |
| return; |
| } |
| // Check again, in case the metadata was compacted while we were waiting. |
| if (!ShouldCompactUnlocked()) { |
| return; |
| } |
| |
| FsReport report; |
| report.full_container_space_check.emplace(); |
| report.incomplete_container_check.emplace(); |
| report.malformed_record_check.emplace(); |
| report.misaligned_block_check.emplace(); |
| report.partial_record_check.emplace(); |
| |
| LogBlockManager::UntrackedBlockMap live_blocks; |
| LogBlockManager::BlockRecordMap live_block_records; |
| vector<LogBlockRefPtr> dead_blocks; |
| uint64_t max_block_id = 0; |
| |
| Status s; |
| SCOPED_CLEANUP({ |
| if (!s.ok()) { |
| // Make container read-only to forbid further writes in case of failure. |
| // Because the on-disk state may contain partial/incomplete data/metadata at |
| // this point, it is not safe to either overwrite it or append to it. |
| SetReadOnly(s); |
| } |
| }); |
| s = ProcessRecords( |
| &report, &live_blocks, &live_block_records, &dead_blocks, &max_block_id, |
| ProcessRecordType::kReadOnly); // Container statistic is updated, not need to update again. |
| if (!s.ok()) { |
| WARN_NOT_OK(s, Substitute("Could not process records in container $0", ToString())); |
| return; |
| } |
| |
| vector<BlockRecordPB> records = SortRecords(std::move(live_block_records)); |
| int64_t file_bytes_delta; |
| s = block_manager_->RewriteMetadataFile(*this, records, &file_bytes_delta); |
| if (!s.ok()) { |
| WARN_NOT_OK(s, Substitute("Could not rewrite container metadata file $0", ToString())); |
| return; |
| } |
| |
| // However, we're hosed if we can't open the new metadata file. |
| s = ReopenMetadataWriter(); |
| if (!s.ok()) { |
| WARN_NOT_OK(s, Substitute("Could not reopen new metadata file $0", ToString())); |
| return; |
| } |
| VLOG(1) << "Compacted metadata file " << ToString() |
| << " (saved " << file_bytes_delta << " bytes)"; |
| |
| total_blocks_.Store(live_blocks.size()); |
| live_blocks_.Store(live_blocks.size()); |
| } |
| |
| #define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \ |
| RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \ |
| block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); \ |
| } while (0) |
| |
| Status LogBlockContainer::Create(LogBlockManager* block_manager, |
| Dir* dir, |
| LogBlockContainerRefPtr* container) { |
| string common_path; |
| string metadata_path; |
| string data_path; |
| Status metadata_status; |
| Status data_status; |
| shared_ptr<RWFile> metadata_writer; |
| shared_ptr<RWFile> data_file; |
| |
| // Repeat in the event of a container id collision (unlikely). |
| // |
| // When looping, we delete any created-and-orphaned files. |
| do { |
| common_path = JoinPathSegments(dir->dir(), |
| block_manager->oid_generator()->Next()); |
| metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix); |
| data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix); |
| |
| if (PREDICT_TRUE(block_manager->file_cache_)) { |
| if (metadata_writer) { |
| WARN_NOT_OK(block_manager->file_cache_->DeleteFile(metadata_path), |
| "could not delete orphaned metadata file thru file cache"); |
| } |
| if (data_file) { |
| WARN_NOT_OK(block_manager->file_cache_->DeleteFile(data_path), |
| "could not delete orphaned data file thru file cache"); |
| } |
| metadata_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>( |
| metadata_path, &metadata_writer); |
| data_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>( |
| data_path, &data_file); |
| } else { |
| if (metadata_writer) { |
| WARN_NOT_OK(block_manager->env()->DeleteFile(metadata_path), |
| "could not delete orphaned metadata file"); |
| } |
| if (data_file) { |
| WARN_NOT_OK(block_manager->env()->DeleteFile(data_path), |
| "could not delete orphaned data file"); |
| } |
| unique_ptr<RWFile> rwf; |
| RWFileOptions rw_opts; |
| |
| rw_opts.mode = Env::MUST_CREATE; |
| rw_opts.is_sensitive = true; |
| metadata_status = block_manager->env()->NewRWFile( |
| rw_opts, metadata_path, &rwf); |
| metadata_writer.reset(rwf.release()); |
| data_status = block_manager->env()->NewRWFile( |
| rw_opts, data_path, &rwf); |
| data_file.reset(rwf.release()); |
| } |
| } while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() || |
| data_status.IsAlreadyPresent())); |
| if (metadata_status.ok() && data_status.ok()) { |
| unique_ptr<WritablePBContainerFile> metadata_file(new WritablePBContainerFile( |
| std::move(metadata_writer))); |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB())); |
| container->reset(new LogBlockContainer(block_manager, |
| dir, |
| std::move(metadata_file), |
| std::move(data_file))); |
| VLOG(1) << "Created log block container " << (*container)->ToString(); |
| } |
| |
| // Prefer metadata status (arbitrarily). |
| FsErrorManager* em = block_manager->error_manager(); |
| HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb( |
| ErrorHandlerType::DISK_ERROR, dir)); |
| HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); |
| return !metadata_status.ok() ? metadata_status : data_status; |
| } |
| |
| Status LogBlockContainer::Open(LogBlockManager* block_manager, |
| Dir* dir, |
| FsReport* report, |
| const string& id, |
| LogBlockContainerRefPtr* container) { |
| string common_path = JoinPathSegments(dir->dir(), id); |
| string data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix); |
| string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix); |
| RETURN_NOT_OK(CheckContainerFiles(block_manager, report, dir, |
| common_path, data_path, metadata_path)); |
| |
| // Open the existing metadata and data files for writing. |
| shared_ptr<RWFile> metadata_file; |
| shared_ptr<RWFile> data_file; |
| if (PREDICT_TRUE(block_manager->file_cache_)) { |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE( |
| block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(metadata_path, &metadata_file)); |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE( |
| block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(data_path, &data_file)); |
| } else { |
| RWFileOptions opts; |
| opts.mode = Env::MUST_EXIST; |
| opts.is_sensitive = true; |
| unique_ptr<RWFile> rwf; |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts, |
| metadata_path, &rwf)); |
| metadata_file.reset(rwf.release()); |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts, |
| data_path, &rwf)); |
| data_file.reset(rwf.release()); |
| } |
| |
| unique_ptr<WritablePBContainerFile> metadata_pb_writer; |
| metadata_pb_writer.reset(new WritablePBContainerFile(std::move(metadata_file))); |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_pb_writer->OpenExisting()); |
| |
| uint64_t data_file_size; |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size)); |
| |
| // Create the in-memory container and populate it. |
| LogBlockContainerRefPtr open_container(new LogBlockContainer(block_manager, |
| dir, |
| std::move(metadata_pb_writer), |
| std::move(data_file))); |
| open_container->preallocated_offset_ = data_file_size; |
| VLOG(1) << "Opened log block container " << open_container->ToString(); |
| container->swap(open_container); |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::CheckContainerFiles(LogBlockManager* block_manager, |
| FsReport* report, |
| const Dir* dir, |
| const string& common_path, |
| const string& data_path, |
| const string& metadata_path) { |
| Env* env = block_manager->env(); |
| uint64_t data_size = 0; |
| Status s_data = env->GetFileSize(data_path, &data_size); |
| if (!s_data.ok() && !s_data.IsNotFound()) { |
| s_data = s_data.CloneAndPrepend("unable to determine data file size"); |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(s_data); |
| } |
| uint64_t metadata_size = 0; |
| Status s_meta = env->GetFileSize(metadata_path, &metadata_size); |
| if (!s_meta.ok() && !s_meta.IsNotFound()) { |
| s_meta = s_meta.CloneAndPrepend("unable to determine metadata file size"); |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(s_meta); |
| } |
| |
| const auto kEncryptionHeaderSize = env->GetEncryptionHeaderSize(); |
| const auto kMinimumValidLength = pb_util::kPBContainerMinimumValidLength + kEncryptionHeaderSize; |
| |
| // Check that both the metadata and data files exist and have valid lengths. |
| // This covers a commonly seen case at startup, where the previous incarnation |
| // of the server crashed due to "too many open files" just as it was trying |
| // to create a file. This orphans an empty or invalid length file, which we can |
| // safely delete. And another case is that the metadata and data files exist, |
| // but the lengths are invalid. |
| if (PREDICT_FALSE(metadata_size < kMinimumValidLength && |
| data_size <= kEncryptionHeaderSize)) { |
| report->incomplete_container_check->entries.emplace_back(common_path); |
| return Status::Aborted(Substitute("orphaned empty or invalid length file $0", common_path)); |
| } |
| |
| // Handle a half-present container whose data file has gone missing and |
| // the metadata file has no live blocks. If that's true, the (orphaned) |
| // metadata file will be deleted when repairing. |
| // |
| // Open the metadata file and quickly check whether or not there is any live blocks. |
| if (PREDICT_FALSE(metadata_size >= kMinimumValidLength && |
| s_data.IsNotFound())) { |
| Status read_status; |
| BlockIdSet live_blocks; |
| unique_ptr<RandomAccessFile> reader; |
| RandomAccessFileOptions opts; |
| opts.is_sensitive = true; |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(env->NewRandomAccessFile(opts, metadata_path, &reader)); |
| ReadablePBContainerFile pb_reader(std::move(reader)); |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(pb_reader.Open()); |
| while (true) { |
| BlockRecordPB record; |
| read_status = pb_reader.ReadNextPB(&record); |
| if (!read_status.ok()) break; |
| switch (record.op_type()) { |
| case CREATE: |
| live_blocks.emplace(BlockId::FromPB(record.block_id())); |
| break; |
| case DELETE: |
| live_blocks.erase(BlockId::FromPB(record.block_id())); |
| break; |
| default: |
| LOG(WARNING) << Substitute("Found a record with unknown type $0", record.op_type()); |
| break; |
| } |
| } |
| if (read_status.IsEndOfFile() && live_blocks.empty()) { |
| report->incomplete_container_check->entries.emplace_back(common_path); |
| return Status::Aborted(Substitute("orphaned metadata file with no live blocks $0", |
| common_path)); |
| } |
| // If the read failed for some unexpected reason, propagate the error. |
| if (!read_status.IsEndOfFile() && !read_status.IsIncomplete()) { |
| RETURN_NOT_OK_CONTAINER_DISK_FAILURE(read_status); |
| } |
| } |
| |
| // Except the special cases above, returns error status if any. |
| if (s_data.IsNotFound()) RETURN_NOT_OK_CONTAINER_DISK_FAILURE(s_data); |
| if (s_meta.IsNotFound()) RETURN_NOT_OK_CONTAINER_DISK_FAILURE(s_meta); |
| |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::TruncateDataToNextBlockOffset() { |
| RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); |
| |
| if (full()) { |
| VLOG(2) << Substitute("Truncating container $0 to offset $1", |
| ToString(), next_block_offset()); |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->Truncate(next_block_offset())); |
| } |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::ProcessRecords( |
| FsReport* report, |
| LogBlockManager::UntrackedBlockMap* live_blocks, |
| LogBlockManager::BlockRecordMap* live_block_records, |
| vector<LogBlockRefPtr>* dead_blocks, |
| uint64_t* max_block_id, |
| ProcessRecordType type) { |
| string metadata_path = metadata_file_->filename(); |
| unique_ptr<RandomAccessFile> metadata_reader; |
| RandomAccessFileOptions opts; |
| opts.is_sensitive = true; |
| RETURN_NOT_OK_HANDLE_ERROR(block_manager()->env()->NewRandomAccessFile( |
| opts, metadata_path, &metadata_reader)); |
| ReadablePBContainerFile pb_reader(std::move(metadata_reader)); |
| RETURN_NOT_OK_HANDLE_ERROR(pb_reader.Open()); |
| |
| uint64_t data_file_size = 0; |
| Status read_status; |
| while (true) { |
| BlockRecordPB record; |
| read_status = pb_reader.ReadNextPB(&record); |
| if (!read_status.ok()) { |
| break; |
| } |
| RETURN_NOT_OK(ProcessRecord(&record, report, |
| live_blocks, live_block_records, dead_blocks, |
| &data_file_size, max_block_id, type)); |
| } |
| |
| // NOTE: 'read_status' will never be OK here. |
| if (PREDICT_TRUE(read_status.IsEndOfFile())) { |
| // We've reached the end of the file without any problems. |
| return Status::OK(); |
| } |
| if (read_status.IsIncomplete()) { |
| // We found a partial trailing record in a version of the pb container file |
| // format that can reliably detect this. Consider this a failed partial |
| // write and truncate the metadata file to remove this partial record. |
| report->partial_record_check->entries.emplace_back(ToString(), |
| pb_reader.offset()); |
| return Status::OK(); |
| } |
| // If we've made it here, we've found (and are returning) an unrecoverable error. |
| // Handle any errors we can, e.g. disk failures. |
| HandleError(read_status); |
| return read_status; |
| } |
| |
| Status LogBlockContainer::ProcessRecord( |
| BlockRecordPB* record, |
| FsReport* report, |
| LogBlockManager::UntrackedBlockMap* live_blocks, |
| LogBlockManager::BlockRecordMap* live_block_records, |
| vector<LogBlockRefPtr>* dead_blocks, |
| uint64_t* data_file_size, |
| uint64_t* max_block_id, |
| ProcessRecordType type) { |
| const BlockId block_id(BlockId::FromPB(record->block_id())); |
| LogBlockRefPtr lb; |
| switch (record->op_type()) { |
| case CREATE: |
| // First verify that the record's offset/length aren't wildly incorrect. |
| if (PREDICT_FALSE(!record->has_offset() || |
| !record->has_length() || |
| record->offset() < 0 || |
| record->length() < 0)) { |
| report->malformed_record_check->entries.emplace_back(ToString(), record); |
| break; |
| } |
| |
| // Now it should be safe to access the record's offset/length. |
| // |
| // KUDU-1657: When opening a container in read-only mode which is actively |
| // being written to by another lbm, we must reinspect the data file's size |
| // frequently in order to account for the latest appends. Inspecting the |
| // file size is expensive, so we only do it when the metadata indicates |
| // that additional data has been written to the file. |
| if (PREDICT_FALSE(record->offset() + record->length() > *data_file_size)) { |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->Size(data_file_size)); |
| } |
| |
| // If the record still extends beyond the end of the file, it is |
| // malformed, except if the length is 0, because in this case nothing has |
| // actually been written. |
| if (PREDICT_FALSE(record->offset() + record->length() > *data_file_size && |
| record->length() > 0)) { |
| // TODO(adar): treat as a different kind of inconsistency? |
| report->malformed_record_check->entries.emplace_back(ToString(), record); |
| break; |
| } |
| |
| lb = new LogBlock(this, block_id, record->offset(), record->length()); |
| if (!InsertIfNotPresent(live_blocks, block_id, lb)) { |
| // We found a record whose ID matches that of an already created block. |
| // |
| // TODO(adar): treat as a different kind of inconsistency? |
| report->malformed_record_check->entries.emplace_back( |
| ToString(), record); |
| break; |
| } |
| |
| VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2", |
| block_id.ToString(), |
| record->offset(), record->length()); |
| |
| if (type == ProcessRecordType::kReadAndUpdate) { |
| // This block must be included in the container's logical size, even if |
| // it has since been deleted. This helps satisfy one of our invariants: |
| // once a container byte range has been used, it may never be reused in |
| // the future. |
| // |
| // If we ignored deleted blocks, we would end up reusing the space |
| // belonging to the last deleted block in the container. |
| UpdateNextBlockOffset(lb->offset(), lb->length()); |
| BlockCreated(lb); |
| } |
| |
| (*live_block_records)[block_id].Swap(record); |
| *max_block_id = std::max(*max_block_id, block_id.id()); |
| break; |
| case DELETE: |
| lb = EraseKeyReturnValuePtr(live_blocks, block_id); |
| if (!lb) { |
| // We found a record for which there is no already created block. |
| // |
| // TODO(adar): treat as a different kind of inconsistency? |
| report->malformed_record_check->entries.emplace_back(ToString(), record); |
| break; |
| } |
| VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString()); |
| if (type == ProcessRecordType::kReadAndUpdate) { |
| BlockDeleted(lb); |
| } |
| |
| CHECK_EQ(1, live_block_records->erase(block_id)); |
| dead_blocks->emplace_back(std::move(lb)); |
| break; |
| default: |
| // We found a record with an unknown type. |
| // |
| // TODO(adar): treat as a different kind of inconsistency? |
| report->malformed_record_check->entries.emplace_back(ToString(), record); |
| break; |
| } |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::DoCloseBlocks(const vector<LogWritableBlock*>& blocks, |
| SyncMode mode) { |
| auto sync_blocks = [&]() { |
| if (mode == SYNC) { |
| VLOG(3) << "Syncing data file " << data_file_->filename(); |
| RETURN_NOT_OK(SyncData()); |
| } |
| |
| // Append metadata only after data is synced so that there's |
| // no chance of metadata landing on the disk before the data. |
| for (auto* block : blocks) { |
| RETURN_NOT_OK_PREPEND(block->AppendMetadata(), |
| "unable to append block's metadata during close"); |
| } |
| |
| if (mode == SYNC) { |
| VLOG(3) << "Syncing metadata file " << metadata_file_->filename(); |
| RETURN_NOT_OK(SyncMetadata()); |
| } |
| |
| RETURN_NOT_OK(block_manager()->SyncContainer(*this)); |
| |
| for (LogWritableBlock* block : blocks) { |
| if (blocks.size() > 1) DCHECK_EQ(block->state(), WritableBlock::State::FINALIZED); |
| block->DoClose(); |
| } |
| return Status::OK(); |
| }; |
| |
| Status s = sync_blocks(); |
| if (!s.ok()) { |
| // Make container read-only to forbid further writes in case of failure. |
| // Because the on-disk state may contain partial/incomplete data/metadata at |
| // this point, it is not safe to either overwrite it or append to it. |
| SetReadOnly(s); |
| } |
| return s; |
| } |
| |
| Status LogBlockContainer::PunchHole(int64_t offset, int64_t length) { |
| RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); |
| DCHECK_GE(offset, 0); |
| DCHECK_GE(length, 0); |
| |
| // It is invalid to punch a zero-size hole. |
| if (length) { |
| // It's OK if we exceed the file's total size; the kernel will truncate |
| // our request. |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->PunchHole(offset, length)); |
| } |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::WriteData(int64_t offset, const Slice& data) { |
| return WriteVData(offset, ArrayView<const Slice>(&data, 1)); |
| } |
| |
| Status LogBlockContainer::WriteVData(int64_t offset, ArrayView<const Slice> data) { |
| RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); |
| DCHECK_GE(offset, next_block_offset()); |
| |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->WriteV(offset, data)); |
| |
| // This append may have changed the container size if: |
| // 1. It was large enough that it blew out the preallocated space. |
| // 2. Preallocation was disabled. |
| size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0), |
| [&](int sum, const Slice& curr) { |
| return sum + curr.size(); |
| }); |
| if (offset + data_size > preallocated_offset_) { |
| RETURN_NOT_OK_HANDLE_ERROR(data_dir_->RefreshAvailableSpace(Dir::RefreshMode::ALWAYS)); |
| } |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::ReadData(int64_t offset, Slice result) const { |
| DCHECK_GE(offset, 0); |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->Read(offset, result)); |
| return Status::OK(); |
| } |
| Status LogBlockContainer::ReadVData(int64_t offset, ArrayView<Slice> results) const { |
| DCHECK_GE(offset, 0); |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->ReadV(offset, results)); |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) { |
| RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); |
| // Note: We don't check for sufficient disk space for metadata writes in |
| // order to allow for block deletion on full disks. |
| shared_lock<RWMutex> l(metadata_compact_lock_); |
| RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(pb)); |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::FlushData(int64_t offset, int64_t length) { |
| RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); |
| DCHECK_GE(offset, 0); |
| DCHECK_GE(length, 0); |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->Flush(RWFile::FLUSH_ASYNC, offset, length)); |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::SyncData() { |
| RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); |
| if (FLAGS_enable_data_block_fsync) { |
| if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment(); |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->Sync()); |
| } |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::SyncMetadata() { |
| RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); |
| if (FLAGS_enable_data_block_fsync) { |
| if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment(); |
| shared_lock<RWMutex> l(metadata_compact_lock_); |
| RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Sync()); |
| } |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::ReopenMetadataWriter() { |
| shared_ptr<RWFile> f; |
| if (PREDICT_TRUE(block_manager_->file_cache_)) { |
| RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenFile<Env::MUST_EXIST>( |
| metadata_file_->filename(), &f)); |
| } else { |
| unique_ptr<RWFile> f_uniq; |
| RWFileOptions opts; |
| opts.mode = Env::MUST_EXIST; |
| opts.is_sensitive = true; |
| RETURN_NOT_OK_HANDLE_ERROR(block_manager_->env_->NewRWFile(opts, |
| metadata_file_->filename(), &f_uniq)); |
| f.reset(f_uniq.release()); |
| } |
| unique_ptr<WritablePBContainerFile> w; |
| w.reset(new WritablePBContainerFile(std::move(f))); |
| RETURN_NOT_OK_HANDLE_ERROR(w->OpenExisting()); |
| |
| RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Close()); |
| metadata_file_.swap(w); |
| return Status::OK(); |
| } |
| |
| Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset, |
| size_t next_append_length) { |
| RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); |
| DCHECK_GE(block_start_offset, 0); |
| |
| if (!FLAGS_log_container_preallocate_bytes) { |
| return Status::OK(); |
| } |
| |
| // If the last write blew out the preallocation window, or if the next write |
| // exceeds it, we need to preallocate another chunk. |
| if (block_start_offset > preallocated_offset_ || |
| next_append_length > preallocated_offset_ - block_start_offset) { |
| int64_t off = std::max(preallocated_offset_, block_start_offset); |
| int64_t len = FLAGS_log_container_preallocate_bytes; |
| // If encryption is enabled, preallocation happens after the encryption |
| // header is written. This keeps the file size a multiple of |
| // log_container_preallocate_bytes even in this case. |
| if (block_start_offset < FLAGS_log_container_preallocate_bytes) { |
| len -= block_start_offset; |
| } |
| RETURN_NOT_OK_HANDLE_ERROR(data_file_->PreAllocate(off, len, RWFile::CHANGE_FILE_SIZE)); |
| RETURN_NOT_OK_HANDLE_ERROR(data_dir_->RefreshAvailableSpace(Dir::RefreshMode::ALWAYS)); |
| VLOG(2) << Substitute("Preallocated $0 bytes at offset $1 in container $2", |
| len, off, ToString()); |
| |
| preallocated_offset_ = off + len; |
| } |
| |
| return Status::OK(); |
| } |
| |
| void LogBlockContainer::FinalizeBlock(int64_t block_offset, int64_t block_length) { |
| // Updates this container's next block offset before marking it as available |
| // to ensure thread safety while updating internal bookkeeping state. |
| UpdateNextBlockOffset(block_offset, block_length); |
| |
| // Truncate the container if it's now full; any left over preallocated space |
| // is no longer needed. |
| // |
| // Note that depending on when FinalizeBlock() is called, this can take place |
| // _after_ the container has been synced to disk. That's OK; truncation isn't |
| // needed for correctness, and in the event of a crash or error, it will be |
| // retried at startup. |
| WARN_NOT_OK(TruncateDataToNextBlockOffset(), |
| "could not truncate excess preallocated space"); |
| if (full() && block_manager_->metrics()) { |
| block_manager_->metrics()->full_containers->Increment(); |
| } |
| block_manager_->MakeContainerAvailable(this); |
| } |
| |
| void LogBlockContainer::UpdateNextBlockOffset(int64_t block_offset, int64_t block_length) { |
| DCHECK_GE(block_offset, 0); |
| |
| // The log block manager maintains block contiguity as an invariant, which |
| // means accounting for the new block should be as simple as adding its |
| // aligned length to 'next_block_offset_'. However, due to KUDU-1793, some |
| // containers may have developed gaps between blocks. We'll account for them |
| // by considering both the block's offset and its length. |
| // |
| // The number of bytes is rounded up to the nearest filesystem block so |
| // that each Kudu block is guaranteed to be on a filesystem block |
| // boundary. This guarantees that the disk space can be reclaimed when |
| // the block is deleted. |
| int64_t new_next_block_offset = KUDU_ALIGN_UP( |
| block_offset + block_length, |
| instance()->filesystem_block_size_bytes()); |
| next_block_offset_.StoreMax(new_next_block_offset); |
| |
| if (full()) { |
| VLOG(1) << Substitute( |
| "Container $0 with size $1 is now full, max size is $2", |
| ToString(), next_block_offset(), FLAGS_log_container_max_size); |
| } |
| } |
| |
| vector<BlockRecordPB> LogBlockContainer::SortRecords( |
| LogBlockManager::BlockRecordMap live_block_records) { |
| // TODO(adar): this should be reported as an inconsistency once |
| // container metadata compaction is also done in realtime. Until then, |
| // it would be confusing to report it as such since it'll be a natural |
| // event at startup. |
| vector<BlockRecordPB> records(live_block_records.size()); |
| int i = 0; |
| for (auto& e : live_block_records) { |
| records[i].Swap(&e.second); |
| i++; |
| } |
| |
| // Sort the records such that their ordering reflects the ordering in |
| // the pre-compacted metadata file. |
| // |
| // This is preferred to storing the records in an order-preserving |
| // container (such as std::map) because while records are temporarily |
| // retained for every container, only some containers will actually |
| // undergo metadata compaction. |
| std::sort(records.begin(), records.end(), [](const BlockRecordPB& a, const BlockRecordPB& b) { |
| // Sort by timestamp. |
| if (a.timestamp_us() != b.timestamp_us()) { |
| return a.timestamp_us() < b.timestamp_us(); |
| } |
| |
| // If the timestamps match, sort by offset. |
| // |
| // If the offsets also match (i.e. both blocks are of zero length), |
| // it doesn't matter which of the two records comes first. |
| return a.offset() < b.offset(); |
| }); |
| |
| return records; |
| } |
| |
| void LogBlockContainer::BlockCreated(const LogBlockRefPtr& block) { |
| DCHECK_GE(block->offset(), 0); |
| |
| total_bytes_.IncrementBy(block->fs_aligned_length()); |
| total_blocks_.Increment(); |
| live_bytes_.IncrementBy(block->length()); |
| live_bytes_aligned_.IncrementBy(block->fs_aligned_length()); |
| live_blocks_.Increment(); |
| } |
| |
| void LogBlockContainer::BlockDeleted(const LogBlockRefPtr& block) { |
| DCHECK_GE(block->offset(), 0); |
| |
| live_bytes_.IncrementBy(-block->length()); |
| live_bytes_aligned_.IncrementBy(-block->fs_aligned_length()); |
| live_blocks_.IncrementBy(-1); |
| } |
| |
| void LogBlockContainer::ExecClosure(const std::function<void()>& task) { |
| data_dir_->ExecClosure(task); |
| } |
| |
| string LogBlockContainer::ToString() const { |
| string s; |
| CHECK(TryStripSuffixString(data_file_->filename(), |
| LogBlockManager::kContainerDataFileSuffix, &s)); |
| return s; |
| } |
| |
| void LogBlockContainer::SetReadOnly(const Status& error) { |
| DCHECK(!error.ok()); |
| LOG(WARNING) << Substitute("Container $0 being marked read-only: $1", |
| ToString(), error.ToString()); |
| std::lock_guard<simple_spinlock> l(read_only_lock_); |
| read_only_status_ = error; |
| } |
| |
| void LogBlockContainer::ContainerDeletionAsync(int64_t offset, int64_t length) { |
| if (dead()) { |
| // Don't bother punching holes; the container's destructor will delete the |
| // container's files outright. |
| return; |
| } |
| |
| VLOG(3) << "Freeing space belonging to container " << ToString(); |
| Status s = PunchHole(offset, length); |
| if (s.ok() && metrics_) metrics_->holes_punched->Increment(); |
| WARN_NOT_OK(s, Substitute("could not delete blocks in container $0", |
| data_dir()->dir())); |
| } |
| |
| /////////////////////////////////////////////////////////// |
| // LogBlockCreationTransaction |
| //////////////////////////////////////////////////////////// |
| |
| class LogBlockCreationTransaction : public BlockCreationTransaction { |
| public: |
| LogBlockCreationTransaction() = default; |
| |
| ~LogBlockCreationTransaction() = default; |
| |
| void AddCreatedBlock(unique_ptr<WritableBlock> block) override; |
| |
| Status CommitCreatedBlocks() override; |
| |
| private: |
| vector<unique_ptr<LogWritableBlock>> created_blocks_; |
| }; |
| |
| void LogBlockCreationTransaction::AddCreatedBlock( |
| unique_ptr<WritableBlock> block) { |
| LogWritableBlock* lwb = down_cast<LogWritableBlock*>(block.release()); |
| created_blocks_.emplace_back(unique_ptr<LogWritableBlock>(lwb)); |
| } |
| |
| Status LogBlockCreationTransaction::CommitCreatedBlocks() { |
| if (created_blocks_.empty()) { |
| return Status::OK(); |
| } |
| |
| VLOG(3) << "Closing " << created_blocks_.size() << " blocks"; |
| unordered_map<LogBlockContainer*, vector<LogWritableBlock*>> created_block_map; |
| for (const auto& block : created_blocks_) { |
| if (FLAGS_block_manager_preflush_control == "close") { |
| // Ask the kernel to begin writing out each block's dirty data. This is |
| // done up-front to give the kernel opportunities to coalesce contiguous |
| // dirty pages. |
| RETURN_NOT_OK(block->FlushDataAsync()); |
| } |
| created_block_map[block->container()].emplace_back(block.get()); |
| } |
| |
| // Close all blocks and sync the blocks belonging to the same |
| // container together to reduce fsync() usage, waiting for them |
| // to become durable. |
| for (const auto& entry : created_block_map) { |
| RETURN_NOT_OK(entry.first->DoCloseBlocks(entry.second, |
| LogBlockContainer::SyncMode::SYNC)); |
| } |
| created_blocks_.clear(); |
| return Status::OK(); |
| } |
| |
| /////////////////////////////////////////////////////////// |
| // LogBlockDeletionTransaction |
| //////////////////////////////////////////////////////////// |
| |
| class LogBlockDeletionTransaction : public BlockDeletionTransaction, |
| public std::enable_shared_from_this<LogBlockDeletionTransaction> { |
| public: |
| explicit LogBlockDeletionTransaction(LogBlockManager* lbm) |
| : lbm_(lbm) { |
| } |
| |
| // Due to the shared ownership of LogBlockDeletionTransaction, |
| // the destructor is responsible for destroying all registered |
| // blocks. This includes: |
| // 1. Punching holes in deleted blocks, and |
| // 2. Deleting dead containers outright. |
| ~LogBlockDeletionTransaction(); |
| |
| void AddDeletedBlock(BlockId block) override; |
| |
| Status CommitDeletedBlocks(vector<BlockId>* deleted) override; |
| |
| // Add the given block that needs to be deleted to 'deleted_interval_map_', |
| // which keeps track of container and the range to be hole punched. |
| void AddBlock(const LogBlockRefPtr& lb); |
| |
| private: |
| // Block <offset, offset + length> pair. |
| typedef std::pair<int64_t, int64_t> BlockInterval; |
| |
| // Map used to aggregate BlockInterval instances across containers. |
| unordered_map<LogBlockContainerRefPtr, |
| vector<BlockInterval>, |
| ScopedRefPtrHashFunctor<LogBlockContainer>, |
| ScopedRefPtrEqualToFunctor<LogBlockContainer>> deleted_interval_map_; |
| |
| // The owning LogBlockManager. Must outlive the LogBlockDeletionTransaction. |
| LogBlockManager* lbm_; |
| vector<BlockId> deleted_blocks_; |
| DISALLOW_COPY_AND_ASSIGN(LogBlockDeletionTransaction); |
| }; |
| |
| void LogBlockDeletionTransaction::AddDeletedBlock(BlockId block) { |
| deleted_blocks_.emplace_back(block); |
| } |
| |
| LogBlockDeletionTransaction::~LogBlockDeletionTransaction() { |
| for (auto& entry : deleted_interval_map_) { |
| LogBlockContainer* container = entry.first.get(); |
| |
| // For the full and dead containers, it is much cheaper |
| // to delete the container files outright, rather than |
| // punching holes. |
| if (container->check_death_condition()) { |
| // Mark the container as deleted and remove it from the global map. |
| // |
| // It's possible for multiple deletion transactions to end up here. For |
| // example, if one transaction deletes the second to last block in a |
| // container and the second deletes its last block, the destructors of |
| // both transactions could run at a time that container's internal |
| // bookkeeping reflects the deadness of the container. |
| // |
| // The function 'TrySetDead()' can only be called successfully once, |
| // because there is a compare-and-swap operation inside. |
| if (container->TrySetDead()) { |
| lbm_->RemoveDeadContainer(container->ToString()); |
| } |
| continue; |
| } |
| |
| CHECK_OK_PREPEND(CoalesceIntervals<int64_t>(&entry.second), |
| Substitute("could not coalesce hole punching for container: $0", |
| container->ToString())); |
| |
| scoped_refptr<LogBlockContainer> self(container); |
| for (const auto& interval : entry.second) { |
| container->ExecClosure([self, interval]() { |
| self->ContainerDeletionAsync(interval.first, interval.second - interval.first); |
| }); |
| } |
| } |
| } |
| |
| Status LogBlockDeletionTransaction::CommitDeletedBlocks(vector<BlockId>* deleted) { |
| deleted->clear(); |
| shared_ptr<LogBlockDeletionTransaction> transaction = shared_from_this(); |
| |
| vector<LogBlockRefPtr> log_blocks; |
| Status first_failure = lbm_->RemoveLogBlocks(deleted_blocks_, &log_blocks, deleted); |
| for (const auto& lb : log_blocks) { |
| // Register the block to be hole punched if metadata recording |
| // is successful. |
| lb->RegisterDeletion(transaction); |
| AddBlock(lb); |
| |
| if (lbm_->metrics_) { |
| lbm_->metrics_->generic_metrics.total_blocks_deleted->Increment(); |
| } |
| } |
| |
| if (!first_failure.ok()) { |
| first_failure = first_failure.CloneAndPrepend(Substitute("only deleted $0 blocks, " |
| "first failure", deleted->size())); |
| } |
| deleted_blocks_.clear(); |
| return first_failure; |
| } |
| |
| void LogBlockDeletionTransaction::AddBlock(const LogBlockRefPtr& lb) { |
| DCHECK_GE(lb->fs_aligned_length(), 0); |
| |
| BlockInterval block_interval(lb->offset(), |
| lb->offset() + lb->fs_aligned_length()); |
| deleted_interval_map_[lb->container()].emplace_back(block_interval); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // LogBlockContainerLoadResult |
| //////////////////////////////////////////////////////////// |
| |
| struct LogBlockContainerLoadResult { |
| Status status; |
| FsReport report; |
| |
| // Keep track of containers that have nothing but dead blocks; they will be |
| // deleted during repair. |
| vector<LogBlockContainerRefPtr> dead_containers; |
| // Keep track of containers whose live block ratio is low; their metadata |
| // files will be compacted during repair. |
| unordered_map<string, vector<BlockRecordPB>> low_live_block_containers; |
| // Keep track of deleted blocks whose space hasn't been punched; they will |
| // be repunched during repair. |
| vector<LogBlockRefPtr> need_repunching_blocks; |
| |
| LogBlockContainerLoadResult() { |
| // We are going to perform these checks. |
| // |
| // Note: this isn't necessarily the complete set of FsReport checks; there |
| // may be checks that the LBM cannot perform. |
| report.full_container_space_check.emplace(); |
| report.incomplete_container_check.emplace(); |
| report.malformed_record_check.emplace(); |
| report.misaligned_block_check.emplace(); |
| report.partial_record_check.emplace(); |
| } |
| }; |
| |
| //////////////////////////////////////////////////////////// |
| // LogBlock (definition) |
| //////////////////////////////////////////////////////////// |
| |
| LogBlock::LogBlock(LogBlockContainerRefPtr container, BlockId block_id, |
| int64_t offset, int64_t length) |
| : container_(std::move(container)), |
| block_id_(block_id), |
| offset_(offset), |
| length_(length) { |
| DCHECK_GE(offset, 0); |
| DCHECK_GE(length, 0); |
| } |
| |
| int64_t LogBlock::fs_aligned_length() const { |
| uint64_t fs_block_size = container_->instance()->filesystem_block_size_bytes(); |
| |
| // Nearly all blocks are placed on a filesystem block boundary, which means |
| // their length post-alignment is simply their length aligned up to the |
| // nearest fs block size. |
| // |
| // However, due to KUDU-1793, some blocks may start or end at misaligned |
| // offsets. We don't maintain enough state to precisely pinpoint such a |
| // block's (aligned) end offset in this case, so we'll just undercount it. |
| // This should be safe, although it may mean unreclaimed disk space (i.e. |
| // when fs_aligned_length() is used in hole punching). |
| if (PREDICT_TRUE(offset_ % fs_block_size == 0)) { |
| return KUDU_ALIGN_UP(length_, fs_block_size); |
| } |
| return length_; |
| } |
| |
| void LogBlock::RegisterDeletion( |
| const shared_ptr<LogBlockDeletionTransaction>& transaction) { |
| DCHECK(!transaction_); |
| DCHECK(transaction); |
| |
| transaction_ = transaction; |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // LogWritableBlock (definition) |
| //////////////////////////////////////////////////////////// |
| |
| LogWritableBlock::LogWritableBlock(LogBlockContainerRefPtr container, |
| BlockId block_id, int64_t block_offset) |
| : container_(std::move(container)), |
| block_id_(block_id), |
| block_offset_(block_offset), |
| block_length_(0), |
| state_(CLEAN) { |
| DCHECK_GE(block_offset, 0); |
| DCHECK_EQ(0, block_offset % container_->instance()->filesystem_block_size_bytes()); |
| container_->blocks_being_written_incr(1); |
| if (container_->metrics()) { |
| container_->metrics()->generic_metrics.blocks_open_writing->Increment(); |
| container_->metrics()->generic_metrics.total_writable_blocks->Increment(); |
| } |
| } |
| |
| LogWritableBlock::~LogWritableBlock() { |
| // Put the decrement 'blocks_being_written_' at the beginning of this |
| // function can help to avoid unnecessary hole punch. |
| container_->blocks_being_written_incr(-1); |
| if (state_ != CLOSED) { |
| WARN_NOT_OK(Abort(), Substitute("Failed to abort block $0", |
| id().ToString())); |
| } |
| } |
| |
| Status LogWritableBlock::Close() { |
| return container_->DoCloseBlocks({ this }, LogBlockContainer::SyncMode::SYNC); |
| } |
| |
| Status LogWritableBlock::Abort() { |
| // Only updates metrics and block state for read-only container. |
| if (container_->read_only()) { |
| if (state_ != CLOSED) { |
| state_ = CLOSED; |
| if (container_->metrics()) { |
| container_->metrics()->generic_metrics.blocks_open_writing->Decrement(); |
| container_->metrics()->generic_metrics.total_bytes_written->IncrementBy( |
| block_length_); |
| } |
| } |
| return container_->read_only_status().CloneAndPrepend( |
| Substitute("container $0 is read-only", container_->ToString())); |
| } |
| |
| // Close the block and then delete it. Theoretically, we could do nothing |
| // for Abort() other than updating metrics and block state. Here is the |
| // reasoning why it would be safe to do so. Currently, failures in block |
| // creation can happen at various stages: |
| // 1) before the block is finalized (either in CLEAN or DIRTY state). It is |
| // safe to do nothing, as the block is not yet finalized and next block |
| // will overwrite the dirty data. |
| // 2) after the block is finalized but before the metadata is successfully |
| // appended. That means the container's internal bookkeeping has been |
| // updated and the data could be durable. Doing nothing can result in |
| // data-consuming gaps in containers, but these gaps can be cleaned up |
| // by hole repunching at start up. |
| // 3) after metadata is appended. If we do nothing, this can result in |
| // orphaned blocks if the metadata is durable. But orphaned blocks can be |
| // garbage collected later. |
| // |
| // TODO(Hao): implement a fine-grained abort handling. |
| // However it is better to provide a fine-grained abort handling to |
| // avoid large chunks of data-consuming gaps and orphaned blocks. A |
| // possible way to do so is, |
| // 1) for per block abort, if the block state is FINALIZED, do hole |
| // punching. |
| // 2) for BlockTransaction abort, DoCloseBlock() should append metadata |
| // for deletion and coalescing hole punching for blocks in the same |
| // container. |
| |
| RETURN_NOT_OK(container_->DoCloseBlocks({ this }, LogBlockContainer::SyncMode::NO_SYNC)); |
| |
| // DoCloseBlocks() has unlocked the container; it may be locked by someone else. |
| // But block_manager_ is immutable, so this is safe. |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| container_->block_manager()->NewDeletionTransaction(); |
| deletion_transaction->AddDeletedBlock(id()); |
| vector<BlockId> deleted; |
| return deletion_transaction->CommitDeletedBlocks(&deleted); |
| } |
| |
| const BlockId& LogWritableBlock::id() const { |
| return block_id_; |
| } |
| |
| BlockManager* LogWritableBlock::block_manager() const { |
| return container_->block_manager(); |
| } |
| |
| Status LogWritableBlock::Append(const Slice& data) { |
| return AppendV(ArrayView<const Slice>(&data, 1)); |
| } |
| |
| Status LogWritableBlock::AppendV(ArrayView<const Slice> data) { |
| DCHECK(state_ == CLEAN || state_ == DIRTY) |
| << "Invalid state: " << state_; |
| |
| // Calculate the amount of data to write |
| size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0), |
| [&](int sum, const Slice& curr) { |
| return sum + curr.size(); |
| }); |
| |
| // The metadata change is deferred to Close(). We can't do |
| // it now because the block's length is still in flux. |
| int64_t cur_block_offset = block_offset_ + block_length_; |
| RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data_size)); |
| |
| MicrosecondsInt64 start_time = GetMonoTimeMicros(); |
| RETURN_NOT_OK(container_->WriteVData(cur_block_offset, data)); |
| MicrosecondsInt64 end_time = GetMonoTimeMicros(); |
| |
| int64_t dur = end_time - start_time; |
| TRACE_COUNTER_INCREMENT("lbm_write_time_us", dur); |
| const char* counter = BUCKETED_COUNTER_NAME("lbm_writes", dur); |
| TRACE_COUNTER_INCREMENT(counter, 1); |
| |
| block_length_ += data_size; |
| state_ = DIRTY; |
| return Status::OK(); |
| } |
| |
| Status LogWritableBlock::FlushDataAsync() { |
| VLOG(3) << "Flushing block " << id(); |
| RETURN_NOT_OK(container_->FlushData(block_offset_, block_length_)); |
| return Status::OK(); |
| } |
| |
| Status LogWritableBlock::Finalize() { |
| DCHECK(state_ == CLEAN || state_ == DIRTY || state_ == FINALIZED) |
| << "Invalid state: " << state_; |
| |
| if (state_ == FINALIZED) { |
| return Status::OK(); |
| } |
| |
| SCOPED_CLEANUP({ |
| container_->FinalizeBlock(block_offset_, block_length_); |
| state_ = FINALIZED; |
| }); |
| |
| VLOG(3) << "Finalizing block " << id(); |
| if (state_ == DIRTY && |
| FLAGS_block_manager_preflush_control == "finalize") { |
| // We do not mark the container as read-only if FlushDataAsync() fails |
| // since the corresponding metadata has not yet been appended. |
| RETURN_NOT_OK(FlushDataAsync()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| size_t LogWritableBlock::BytesAppended() const { |
| return block_length_; |
| } |
| |
| WritableBlock::State LogWritableBlock::state() const { |
| return state_; |
| } |
| |
| void LogWritableBlock::DoClose() { |
| if (state_ == CLOSED) return; |
| |
| if (container_->metrics()) { |
| container_->metrics()->generic_metrics.blocks_open_writing->Decrement(); |
| container_->metrics()->generic_metrics.total_bytes_written->IncrementBy( |
| block_length_); |
| container_->metrics()->generic_metrics.total_blocks_created->Increment(); |
| } |
| |
| // Finalize() was not called; this indicates we should |
| // finalize the block. |
| if (state_ == CLEAN || state_ == DIRTY) { |
| container_->FinalizeBlock(block_offset_, block_length_); |
| } |
| |
| LogBlockRefPtr lb = container_->block_manager()->CreateAndAddLogBlock( |
| container_, block_id_, block_offset_, block_length_); |
| CHECK(lb); |
| container_->BlockCreated(lb); |
| state_ = CLOSED; |
| } |
| |
| Status LogWritableBlock::AppendMetadata() { |
| BlockRecordPB record; |
| id().CopyToPB(record.mutable_block_id()); |
| record.set_op_type(CREATE); |
| record.set_timestamp_us(GetCurrentTimeMicros()); |
| record.set_offset(block_offset_); |
| record.set_length(block_length_); |
| return container_->AppendMetadata(record); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // LogReadableBlock |
| //////////////////////////////////////////////////////////// |
| |
| // A log-backed block that has been opened for reading. |
| // |
| // Refers to a LogBlock representing the block's persisted metadata. |
| class LogReadableBlock : public ReadableBlock { |
| public: |
| explicit LogReadableBlock(LogBlockRefPtr log_block); |
| |
| ~LogReadableBlock(); |
| |
| Status Close() override; |
| |
| const BlockId& id() const override; |
| |
| BlockManager* block_manager() const override; |
| |
| Status Size(uint64_t* sz) const override; |
| |
| Status Read(uint64_t offset, Slice result) const override; |
| |
| Status ReadV(uint64_t offset, ArrayView<Slice> results) const override; |
| |
| size_t memory_footprint() const override; |
| |
| private: |
| // A reference to this block's metadata. |
| LogBlockRefPtr log_block_; |
| |
| // Whether or not this block has been closed. Close() is thread-safe, so |
| // this must be an atomic primitive. |
| AtomicBool closed_; |
| |
| DISALLOW_COPY_AND_ASSIGN(LogReadableBlock); |
| }; |
| |
| LogReadableBlock::LogReadableBlock(LogBlockRefPtr log_block) |
| : log_block_(std::move(log_block)), |
| closed_(false) { |
| if (log_block_->container()->metrics()) { |
| log_block_->container()->metrics()->generic_metrics.blocks_open_reading->Increment(); |
| log_block_->container()->metrics()->generic_metrics.total_readable_blocks->Increment(); |
| } |
| } |
| |
| LogReadableBlock::~LogReadableBlock() { |
| WARN_NOT_OK(Close(), Substitute("Failed to close block $0", |
| id().ToString())); |
| } |
| |
| Status LogReadableBlock::Close() { |
| if (closed_.CompareAndSet(false, true)) { |
| if (log_block_->container()->metrics()) { |
| log_block_->container()->metrics()->generic_metrics.blocks_open_reading->Decrement(); |
| } |
| log_block_.reset(); |
| } |
| |
| return Status::OK(); |
| } |
| |
| BlockManager* LogReadableBlock::block_manager() const { |
| return log_block_->container()->block_manager(); |
| } |
| |
| const BlockId& LogReadableBlock::id() const { |
| return log_block_->block_id(); |
| } |
| |
| Status LogReadableBlock::Size(uint64_t* sz) const { |
| DCHECK(!closed_.Load()); |
| |
| *sz = log_block_->length(); |
| return Status::OK(); |
| } |
| |
| Status LogReadableBlock::Read(uint64_t offset, Slice result) const { |
| return ReadV(offset, ArrayView<Slice>(&result, 1)); |
| } |
| |
| Status LogReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results) const { |
| DCHECK(!closed_.Load()); |
| |
| size_t read_length = accumulate(results.begin(), results.end(), static_cast<size_t>(0), |
| [&](int sum, const Slice& curr) { |
| return sum + curr.size(); |
| }); |
| |
| uint64_t read_offset = log_block_->offset() + offset; |
| if (log_block_->length() < offset + read_length) { |
| return Status::IOError("Out-of-bounds read", |
| Substitute("read of [$0-$1) in block [$2-$3)", |
| read_offset, |
| read_offset + read_length, |
| log_block_->offset(), |
| log_block_->offset() + log_block_->length())); |
| } |
| |
| MicrosecondsInt64 start_time = GetMonoTimeMicros(); |
| RETURN_NOT_OK(log_block_->container()->ReadVData(read_offset, results)); |
| MicrosecondsInt64 end_time = GetMonoTimeMicros(); |
| |
| int64_t dur = end_time - start_time; |
| TRACE_COUNTER_INCREMENT("lbm_read_time_us", dur); |
| |
| const char* counter = BUCKETED_COUNTER_NAME("lbm_reads", dur); |
| TRACE_COUNTER_INCREMENT(counter, 1); |
| |
| if (log_block_->container()->metrics()) { |
| log_block_->container()->metrics()->generic_metrics.total_bytes_read->IncrementBy(read_length); |
| } |
| return Status::OK(); |
| } |
| |
| size_t LogReadableBlock::memory_footprint() const { |
| return kudu_malloc_usable_size(this); |
| } |
| |
| } // namespace internal |
| |
| //////////////////////////////////////////////////////////// |
| // LogBlockManager |
| //////////////////////////////////////////////////////////// |
| |
| static const uint64_t kBlockMapChunk = 1 << 4; |
| static const uint64_t kBlockMapMask = kBlockMapChunk - 1; |
| const char* LogBlockManager::kContainerMetadataFileSuffix = ".metadata"; |
| const char* LogBlockManager::kContainerDataFileSuffix = ".data"; |
| |
| // These values were arrived at via experimentation. See commit 4923a74 for |
| // more details. |
| const map<int64_t, int64_t> LogBlockManager::kPerFsBlockSizeBlockLimits({ |
| { 1024, 673 }, |
| { 2048, 1353 }, |
| { 4096, 2721 }}); |
| |
| LogBlockManager::LogBlockManager(Env* env, |
| DataDirManager* dd_manager, |
| FsErrorManager* error_manager, |
| FileCache* file_cache, |
| BlockManagerOptions opts) |
| : env_(DCHECK_NOTNULL(env)), |
| dd_manager_(DCHECK_NOTNULL(dd_manager)), |
| error_manager_(DCHECK_NOTNULL(error_manager)), |
| opts_(std::move(opts)), |
| mem_tracker_(MemTracker::CreateTracker(-1, |
| "log_block_manager", |
| opts_.parent_mem_tracker)), |
| file_cache_(file_cache), |
| buggy_el6_kernel_(IsBuggyEl6Kernel(env->GetKernelRelease())), |
| next_block_id_(1) { |
| managed_block_shards_.resize(kBlockMapChunk); |
| for (auto& mb : managed_block_shards_) { |
| mb.lock = std::unique_ptr<simple_spinlock>(new simple_spinlock()); |
| mb.blocks_by_block_id |
| = std::unique_ptr<BlockMap>(new BlockMap(10, |
| BlockMap::hasher(), |
| BlockMap::key_equal(), |
| BlockAllocator(mem_tracker_))); |
| mb.blocks_by_block_id->set_deleted_key(BlockId()); |
| } |
| |
| // HACK: when running in a test environment, we often instantiate many |
| // LogBlockManagers in the same process, eg corresponding to different |
| // tablet servers in a minicluster, or due to running many separate test |
| // cases of some CFile-related code. In that case, we need to make it more |
| // likely that the block IDs are not reused. So, instead of starting with |
| // block ID 1, we'll start with a random block ID. A collision is still |
| // possible, but exceedingly unlikely. |
| if (IsGTest()) { |
| Random r(GetRandomSeed32()); |
| next_block_id_.Store(r.Next64()); |
| } |
| |
| if (opts_.metric_entity) { |
| metrics_.reset(new internal::LogBlockManagerMetrics(opts_.metric_entity)); |
| } |
| } |
| |
| LogBlockManager::~LogBlockManager() { |
| // Release all of the memory accounted by the blocks. |
| int64_t mem = 0; |
| for (const auto& mb : managed_block_shards_) { |
| for (const auto& entry : *mb.blocks_by_block_id) { |
| mem += kudu_malloc_usable_size(entry.second.get()); |
| } |
| } |
| mem_tracker_->Release(mem); |
| |
| // A LogBlock's destructor depends on its container, so all LogBlocks must be |
| // destroyed before their containers. |
| for (auto& mb : managed_block_shards_) { |
| mb.blocks_by_block_id->clear(); |
| } |
| |
| // Containers may have outstanding tasks running on data directories; wait |
| // for them to complete before destroying the containers. |
| dd_manager_->WaitOnClosures(); |
| } |
| |
| // Ensure that no load task failed without being handled. |
| // |
| // Currently only disk failures are handled. Reports from failed disks are |
| // unusable. |
| #define RETURN_ON_NON_DISK_FAILURE(d, s) \ |
| do { \ |
| if (PREDICT_FALSE(!(s).ok())) { \ |
| if (!(s).IsDiskFailure()) { \ |
| return s; \ |
| } \ |
| LOG(ERROR) << Substitute("Not using report from $0: $1", \ |
| (d)->dir(), (s).ToString()); \ |
| } \ |
| } while (false) |
| |
| Status LogBlockManager::Open(FsReport* report, std::atomic<int>* containers_processed, |
| std::atomic<int>* containers_total) { |
| // Establish (and log) block limits for each data directory using kernel, |
| // filesystem, and gflags information. |
| for (const auto& dd : dd_manager_->dirs()) { |
| optional<int64_t> limit; |
| if (FLAGS_log_container_max_blocks == -1) { |
| // No limit, unless this is KUDU-1508. |
| |
| // The log block manager requires hole punching and, of the ext |
| // filesystems, only ext4 supports it. Thus, if this is an ext |
| // filesystem, it's ext4 by definition. |
| if (buggy_el6_kernel_ && dd->fs_type() == FsType::EXT) { |
| uint64_t fs_block_size = |
| dd->instance()->metadata()->filesystem_block_size_bytes(); |
| bool untested_block_size = |
| !ContainsKey(kPerFsBlockSizeBlockLimits, fs_block_size); |
| string msg = Substitute( |
| "Data dir $0 is on an ext4 filesystem vulnerable to KUDU-1508 " |
| "with $1block size $2", dd->dir(), |
| untested_block_size ? "untested " : "", fs_block_size); |
| if (untested_block_size) { |
| LOG(WARNING) << msg; |
| } else { |
| LOG(INFO) << msg; |
| } |
| limit = LookupBlockLimit(fs_block_size); |
| } |
| } else if (FLAGS_log_container_max_blocks > 0) { |
| // Use the provided limit. |
| limit = FLAGS_log_container_max_blocks; |
| } |
| |
| if (limit) { |
| LOG(INFO) << Substitute( |
| "Limiting containers on data directory $0 to $1 blocks", |
| dd->dir(), *limit); |
| } |
| InsertOrDie(&block_limits_by_data_dir_, dd.get(), limit); |
| } |
| |
| // Open containers in each data dirs. |
| vector<Status> statuses(dd_manager_->dirs().size()); |
| vector<vector<unique_ptr<internal::LogBlockContainerLoadResult>>> container_results( |
| dd_manager_->dirs().size()); |
| int i = -1; |
| for (const auto& dd : dd_manager_->dirs()) { |
| i++; |
| int uuid_idx; |
| CHECK(dd_manager_->FindUuidIndexByDir(dd.get(), &uuid_idx)); |
| // TODO(awong): store Statuses for each directory in the directory manager |
| // so we can avoid these artificial Statuses. |
| if (dd_manager_->IsDirFailed(uuid_idx)) { |
| statuses[i] = Status::IOError("Data directory failed", "", EIO); |
| continue; |
| } |
| |
| // Open the data dir asynchronously. |
| auto* dd_raw = dd.get(); |
| auto* results = &container_results[i]; |
| auto* s = &statuses[i]; |
| dd->ExecClosure([this, dd_raw, results, s, containers_processed, containers_total]() { |
| this->OpenDataDir(dd_raw, results, s, containers_processed, containers_total); |
| WARN_NOT_OK(*s, Substitute("failed to open dir $0", dd_raw->dir())); |
| }); |
| } |
| |
| // Wait for the opens to complete. |
| dd_manager_->WaitOnClosures(); |
| |
| // Check load errors and merge each data dir's container load results, then do repair tasks. |
| unique_ptr<ThreadPool> repair_pool; |
| RETURN_NOT_OK(ThreadPoolBuilder("repair_pool") |
| .set_max_threads(dd_manager_->dirs().size()) |
| .Build(&repair_pool)); |
| vector<unique_ptr<internal::LogBlockContainerLoadResult>> dir_results( |
| dd_manager_->dirs().size()); |
| for (int i = 0; i < dd_manager_->dirs().size(); ++i) { |
| const auto& s = statuses[i]; |
| const auto& dd = dd_manager_->dirs()[i]; |
| RETURN_ON_NON_DISK_FAILURE(dd, s); |
| // If open dir error, do not try to repair. |
| if (PREDICT_FALSE(!s.ok())) { |
| continue; |
| } |
| |
| unique_ptr<internal::LogBlockContainerLoadResult> dir_result( |
| new internal::LogBlockContainerLoadResult()); |
| dir_result->report.data_dirs.push_back(dd->dir()); |
| bool do_repair = true; |
| for (const auto& container_result : container_results[i]) { |
| RETURN_ON_NON_DISK_FAILURE(dd, container_result->status); |
| if (PREDICT_FALSE(!s.ok())) { |
| // If open container error, do not try to repair. |
| do_repair = false; |
| break; |
| } |
| |
| dir_result->report.MergeFrom(container_result->report); |
| dir_result->dead_containers.insert( |
| dir_result->dead_containers.end(), |
| container_result->dead_containers.begin(), |
| container_result->dead_containers.end()); |
| container_result->dead_containers.clear(); |
| dir_result->low_live_block_containers.insert( |
| container_result->low_live_block_containers.begin(), |
| container_result->low_live_block_containers.end()); |
| container_result->low_live_block_containers.clear(); |
| dir_result->need_repunching_blocks.insert( |
| dir_result->need_repunching_blocks.end(), |
| container_result->need_repunching_blocks.begin(), |
| container_result->need_repunching_blocks.end()); |
| container_result->need_repunching_blocks.clear(); |
| } |
| if (do_repair) { |
| dir_results[i] = std::move(dir_result); |
| auto* dd_raw = dd.get(); |
| auto* dr = dir_results[i].get(); |
| CHECK_OK(repair_pool->Submit([this, dd_raw, dr]() { this->RepairTask(dd_raw, dr); })); |
| } |
| } |
| |
| // Wait for the repair tasks to complete. |
| repair_pool->Wait(); |
| repair_pool->Shutdown(); |
| |
| FsReport merged_report; |
| for (int i = 0; i < dd_manager_->dirs().size(); ++i) { |
| if (PREDICT_FALSE(!dir_results[i])) { |
| continue; |
| } |
| if (PREDICT_TRUE(dir_results[i]->status.ok())) { |
| merged_report.MergeFrom(dir_results[i]->report); |
| continue; |
| } |
| RETURN_ON_NON_DISK_FAILURE(dd_manager_->dirs()[i], dir_results[i]->status); |
| } |
| |
| if (dd_manager_->AreAllDirsFailed()) { |
| return Status::IOError("All data dirs failed to open", "", EIO); |
| } |
| |
| // Either return or log the report. |
| if (report) { |
| *report = std::move(merged_report); |
| } else { |
| RETURN_NOT_OK(merged_report.LogAndCheckForFatalErrors()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts, |
| unique_ptr<WritableBlock>* block) { |
| CHECK(!opts_.read_only); |
| |
| // Find a free container. If one cannot be found, create a new one. |
| // |
| // TODO(unknown): should we cap the number of outstanding containers and |
| // force callers to block if we've reached it? |
| LogBlockContainerRefPtr container; |
| RETURN_NOT_OK(GetOrCreateContainer(opts, &container)); |
| |
| // Generate a free block ID. |
| // We have to loop here because earlier versions used non-sequential block IDs, |
| // and thus we may have to "skip over" some block IDs that are claimed. |
| BlockId new_block_id; |
| do { |
| new_block_id.SetId(next_block_id_.Increment()); |
| } while (!TryUseBlockId(new_block_id)); |
| |
| block->reset(new LogWritableBlock(container, |
| new_block_id, |
| container->next_block_offset())); |
| VLOG(3) << "Created block " << (*block)->id() << " in container " |
| << container->ToString(); |
| return Status::OK(); |
| } |
| |
| Status LogBlockManager::OpenBlock(const BlockId& block_id, |
| unique_ptr<ReadableBlock>* block) { |
| LogBlockRefPtr lb; |
| { |
| int index = block_id.id() & kBlockMapMask; |
| std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock); |
| lb = FindPtrOrNull(*managed_block_shards_[index].blocks_by_block_id, block_id); |
| } |
| if (!lb) { |
| return Status::NotFound("Can't find block", block_id.ToString()); |
| } |
| |
| VLOG(3) << "Opened block " << block_id |
| << " from container " << lb->container()->ToString(); |
| block->reset(new internal::LogReadableBlock(std::move(lb))); |
| return Status::OK(); |
| } |
| |
| unique_ptr<BlockCreationTransaction> LogBlockManager::NewCreationTransaction() { |
| CHECK(!opts_.read_only); |
| return std::make_unique<internal::LogBlockCreationTransaction>(); |
| } |
| |
| shared_ptr<BlockDeletionTransaction> LogBlockManager::NewDeletionTransaction() { |
| CHECK(!opts_.read_only); |
| return std::make_shared<internal::LogBlockDeletionTransaction>(this); |
| } |
| |
| Status LogBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) { |
| block_ids->clear(); |
| for (const auto& mb : managed_block_shards_) { |
| std::lock_guard<simple_spinlock> l(*mb.lock); |
| AppendKeysFromMap(*mb.blocks_by_block_id, block_ids); |
| block_ids->insert(block_ids->end(), mb.open_block_ids.begin(), mb.open_block_ids.end()); |
| } |
| return Status::OK(); |
| } |
| |
| void LogBlockManager::NotifyBlockId(BlockId block_id) { |
| next_block_id_.StoreMax(block_id.id() + 1); |
| } |
| |
| void LogBlockManager::AddNewContainerUnlocked(const LogBlockContainerRefPtr& container) { |
| DCHECK(lock_.is_locked()); |
| InsertOrDie(&all_containers_by_name_, container->ToString(), container); |
| if (metrics()) { |
| metrics()->containers->Increment(); |
| if (container->full()) { |
| metrics()->full_containers->Increment(); |
| } |
| } |
| } |
| |
| void LogBlockManager::RemoveDeadContainerUnlocked(const string& container_name) { |
| DCHECK(lock_.is_locked()); |
| LogBlockContainerRefPtr to_delete(EraseKeyReturnValuePtr( |
| &all_containers_by_name_, container_name)); |
| CHECK(to_delete); |
| CHECK(to_delete->full()) |
| << Substitute("Container $0 is not full", container_name); |
| CHECK(to_delete->dead()) |
| << Substitute("Container $0 is not dead", container_name); |
| if (metrics()) { |
| metrics()->containers->Decrement(); |
| metrics()->full_containers->Decrement(); |
| } |
| } |
| |
| void LogBlockManager::RemoveDeadContainer(const string& container_name) { |
| // Remove the container from memory. |
| { |
| std::lock_guard<simple_spinlock> l(lock_); |
| RemoveDeadContainerUnlocked(container_name); |
| } |
| |
| // Update the metrics if necessary. |
| if (metrics()) { |
| metrics()->dead_containers_deleted->Increment(); |
| } |
| } |
| |
| Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts, |
| LogBlockContainerRefPtr* container) { |
| Dir* dir; |
| RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir), |
| error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id)); |
| |
| { |
| std::lock_guard<simple_spinlock> l(lock_); |
| auto& d = available_containers_by_data_dir_[DCHECK_NOTNULL(dir)]; |
| if (!d.empty()) { |
| *container = d.front(); |
| d.pop_front(); |
| return Status::OK(); |
| } |
| } |
| |
| // All containers are in use; create a new one. |
| LogBlockContainerRefPtr new_container; |
| Status s = LogBlockContainer::Create(this, dir, &new_container); |
| |
| // We could create a container in a different directory, but there's |
| // currently no point in doing so. On disk failure, the tablet specified by |
| // 'opts' will be shut down, so the returned container would not be used. |
| HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); |
| RETURN_NOT_OK_PREPEND(s, "Could not create new log block container at " + dir->dir()); |
| { |
| std::lock_guard<simple_spinlock> l(lock_); |
| dirty_dirs_.insert(dir->dir()); |
| AddNewContainerUnlocked(new_container); |
| } |
| *container = std::move(new_container); |
| return Status::OK(); |
| } |
| |
| void LogBlockManager::MakeContainerAvailable(LogBlockContainerRefPtr container) { |
| std::lock_guard<simple_spinlock> l(lock_); |
| MakeContainerAvailableUnlocked(std::move(container)); |
| } |
| |
| void LogBlockManager::MakeContainerAvailableUnlocked(LogBlockContainerRefPtr container) { |
| DCHECK(lock_.is_locked()); |
| if (container->full() || container->read_only()) { |
| return; |
| } |
| VLOG(3) << Substitute("container $0 being made available", container->ToString()); |
| available_containers_by_data_dir_[container->data_dir()].push_front(std::move(container)); |
| } |
| |
| Status LogBlockManager::SyncContainer(const LogBlockContainer& container) { |
| Status s; |
| bool to_sync = false; |
| { |
| std::lock_guard<simple_spinlock> l(lock_); |
| to_sync = dirty_dirs_.erase(container.data_dir()->dir()); |
| } |
| |
| if (to_sync && FLAGS_enable_data_block_fsync) { |
| if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment(); |
| s = env_->SyncDir(container.data_dir()->dir()); |
| |
| // If SyncDir fails, the container directory must be restored to |
| // dirty_dirs_. Otherwise a future successful LogWritableBlock::Close() |
| // on this container won't call SyncDir again, and the container might |
| // be lost on crash. |
| // |
| // In the worst case (another block synced this container as we did), |
| // we'll sync it again needlessly. |
| if (!s.ok()) { |
| container.HandleError(s); |
| std::lock_guard<simple_spinlock> l(lock_); |
| dirty_dirs_.insert(container.data_dir()->dir()); |
| } |
| } |
| return s; |
| } |
| |
| bool LogBlockManager::TryUseBlockId(const BlockId& block_id) { |
| if (block_id.IsNull()) { |
| return false; |
| } |
| |
| int index = block_id.id() & kBlockMapMask; |
| std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock); |
| if (ContainsKey(*managed_block_shards_[index].blocks_by_block_id, block_id)) { |
| return false; |
| } |
| |
| return InsertIfNotPresent(&managed_block_shards_[index].open_block_ids, block_id); |
| } |
| |
| LogBlockRefPtr LogBlockManager::CreateAndAddLogBlock( |
| LogBlockContainerRefPtr container, |
| const BlockId& block_id, |
| int64_t offset, |
| int64_t length) { |
| LogBlockRefPtr lb(new LogBlock(std::move(container), block_id, offset, length)); |
| mem_tracker_->Consume(kudu_malloc_usable_size(lb.get())); |
| |
| if (AddLogBlock(lb)) { |
| return lb; |
| } |
| return nullptr; |
| } |
| |
| bool LogBlockManager::AddLogBlock(LogBlockRefPtr lb) { |
| // InsertIfNotPresent doesn't use move semantics, so instead we just |
| // insert an empty scoped_refptr and assign into it down below rather |
| // than using the utility function. |
| int index = lb->block_id().id() & kBlockMapMask; |
| std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock); |
| auto& blocks_by_block_id = *managed_block_shards_[index].blocks_by_block_id; |
| LogBlockRefPtr* entry_ptr = &blocks_by_block_id[lb->block_id()]; |
| if (*entry_ptr) { |
| // Already have an entry for this block ID. |
| return false; |
| } |
| |
| VLOG(2) << Substitute("Added block: id $0, offset $1, length $2", |
| lb->block_id().ToString(), lb->offset(), lb->length()); |
| |
| // There may already be an entry in open_block_ids_arr_ (e.g. we just finished |
| // writing out a block). |
| managed_block_shards_[index].open_block_ids.erase(lb->block_id()); |
| if (metrics()) { |
| metrics()->blocks_under_management->Increment(); |
| metrics()->bytes_under_management->IncrementBy(lb->length()); |
| } |
| |
| *entry_ptr = std::move(lb); |
| return true; |
| } |
| |
| Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids, |
| vector<LogBlockRefPtr>* log_blocks, |
| vector<BlockId>* deleted) { |
| Status first_failure; |
| vector<LogBlockRefPtr> lbs; |
| int64_t malloc_space = 0, blocks_length = 0; |
| for (const auto& block_id : block_ids) { |
| LogBlockRefPtr lb; |
| Status s = RemoveLogBlock(block_id, &lb); |
| if (!s.ok() && !s.IsNotFound()) { |
| if (first_failure.ok()) first_failure = s; |
| } else if (s.ok()) { |
| malloc_space += kudu_malloc_usable_size(lb.get()); |
| blocks_length += lb->length(); |
| lbs.emplace_back(std::move(lb)); |
| } else { |
| // If we get NotFound, then the block was already deleted. |
| DCHECK(s.IsNotFound()); |
| deleted->emplace_back(block_id); |
| } |
| } |
| |
| // Update various metrics. |
| mem_tracker_->Release(malloc_space); |
| if (metrics()) { |
| metrics()->blocks_under_management->DecrementBy(lbs.size()); |
| metrics()->bytes_under_management->DecrementBy(blocks_length); |
| } |
| |
| for (auto& lb : lbs) { |
| VLOG(3) << "Deleting block " << lb->block_id(); |
| lb->container()->BlockDeleted(lb); |
| |
| // Record the on-disk deletion. |
| // |
| // TODO(unknown): what if this fails? Should we restore the in-memory block? |
| BlockRecordPB record; |
| lb->block_id().CopyToPB(record.mutable_block_id()); |
| record.set_op_type(DELETE); |
| record.set_timestamp_us(GetCurrentTimeMicros()); |
| Status s = lb->container()->AppendMetadata(record); |
| |
| // We don't bother fsyncing the metadata append for deletes in order to avoid |
| // the disk overhead. Even if we did fsync it, we'd still need to account for |
| // garbage at startup time (in the event that we crashed just before the |
| // fsync). |
| // |
| // TODO(KUDU-829): Implement GC of orphaned blocks. |
| |
| if (!s.ok()) { |
| if (first_failure.ok()) { |
| first_failure = s.CloneAndPrepend( |
| "Unable to append deletion record to block metadata"); |
| } |
| } else { |
| // Metadata files of containers with very few live blocks will be compacted. |
| if (!lb->container()->read_only() && |
| FLAGS_log_container_metadata_runtime_compact && |
| lb->container()->ShouldCompact()) { |
| scoped_refptr<LogBlockContainer> self(lb->container()); |
| lb->container()->ExecClosure([self]() { |
| self->CompactMetadata(); |
| }); |
| } |
| |
| deleted->emplace_back(lb->block_id()); |
| log_blocks->emplace_back(std::move(lb)); |
| } |
| } |
| |
| return first_failure; |
| } |
| |
| Status LogBlockManager::RemoveLogBlock(const BlockId& block_id, |
| LogBlockRefPtr* lb) { |
| int index = block_id.id() & kBlockMapMask; |
| std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock); |
| auto& blocks_by_block_id = managed_block_shards_[index].blocks_by_block_id; |
| |
| auto it = blocks_by_block_id->find(block_id); |
| if (it == blocks_by_block_id->end()) { |
| return Status::NotFound("Can't find block", block_id.ToString()); |
| } |
| |
| LogBlockContainer* container = it->second->container(); |
| HANDLE_DISK_FAILURE(container->read_only_status(), |
| error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, container->data_dir())); |
| |
| // Return early if deleting a block in a failed directory. |
| set<int> failed_dirs = dd_manager_->GetFailedDirs(); |
| if (PREDICT_FALSE(!failed_dirs.empty())) { |
| int uuid_idx; |
| CHECK(dd_manager_->FindUuidIndexByDir(container->data_dir(), &uuid_idx)); |
| if (ContainsKey(failed_dirs, uuid_idx)) { |
| LOG_EVERY_N(INFO, 10) << Substitute("Block $0 is in a failed directory; not deleting", |
| block_id.ToString()); |
| return Status::IOError("Block is in a failed directory"); |
| } |
| } |
| *lb = std::move(it->second); |
| blocks_by_block_id->erase(it); |
| |
| VLOG(2) << Substitute("Removed block: id $0, offset $1, length $2", |
| (*lb)->block_id().ToString(), (*lb)->offset(), (*lb)->length()); |
| return Status::OK(); |
| } |
| |
| void LogBlockManager::OpenDataDir( |
| Dir* dir, |
| vector<unique_ptr<internal::LogBlockContainerLoadResult>>* results, |
| Status* result_status, |
| std::atomic<int>* containers_processed, |
| std::atomic<int>* containers_total) { |
| vector<string> children; |
| Status s = env_->GetChildren(dir->dir(), &children); |
| if (!s.ok()) { |
| HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb( |
| ErrorHandlerType::DISK_ERROR, dir)); |
| *result_status = s.CloneAndPrepend(Substitute( |
| "Could not list children of $0", dir->dir())); |
| return; |
| } |
| |
| // Find all containers and open them. |
| unordered_set<string> containers_seen; |
| results->reserve(children.size() / 2); |
| for (const string& child : children) { |
| string container_name; |
| if (!TryStripSuffixString( |
| child, LogBlockManager::kContainerDataFileSuffix, &container_name) && |
| !TryStripSuffixString( |
| child, LogBlockManager::kContainerMetadataFileSuffix, &container_name)) { |
| continue; |
| } |
| InsertIfNotPresent(&containers_seen, container_name); |
| } |
| |
| if (containers_total) { |
| *containers_total += containers_seen.size(); |
| if (metrics_) { |
| metrics()->total_containers_startup->IncrementBy(containers_seen.size()); |
| } |
| } |
| |
| for (const string& container_name : containers_seen) { |
| // Add a new result for the container. |
| results->emplace_back(new internal::LogBlockContainerLoadResult()); |
| LogBlockContainerRefPtr container; |
| s = LogBlockContainer::Open( |
| this, dir, &results->back()->report, container_name, &container); |
| if (containers_processed) { |
| ++*containers_processed; |
| if (metrics_) { |
| metrics()->processed_containers_startup->Increment(); |
| } |
| } |
| if (!s.ok()) { |
| if (s.IsAborted()) { |
| // Skip the container. Open() added a record of it to 'results->back()->report' for us. |
| continue; |
| } |
| if (opts_.read_only && s.IsNotFound()) { |
| // Skip the container while the operation is read-only and the files are away, |
| // especially for the kudu cli tool. |
| continue; |
| } |
| *result_status = s.CloneAndPrepend(Substitute( |
| "Could not open container $0", container_name)); |
| return; |
| } |
| |
| // Load the container's records asynchronously. |
| auto* r = results->back().get(); |
| dir->ExecClosure([this, dir, container, r]() { |
| this->LoadContainer(dir, container, r); |
| }); |
| } |
| } |
| |
| void LogBlockManager::LoadContainer(Dir* dir, |
| LogBlockContainerRefPtr container, |
| internal::LogBlockContainerLoadResult* result) { |
| // Process the records, building a container-local map for live blocks and |
| // a list of dead blocks. |
| // |
| // It's important that we don't try to add these blocks to the global map |
| // incrementally as we see each record, since it's possible that one container |
| // has a "CREATE <b>" while another has a "CREATE <b> ; DELETE <b>" pair. |
| // If we processed those two containers in this order, then upon processing |
| // the second container, we'd think there was a duplicate block. Building |
| // the container-local map first ensures that we discount deleted blocks |
| // before checking for duplicate IDs. |
| // |
| // NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse |
| // exceedingly unlikely. However, we might have old data which still exhibits |
| // the above issue. |
| UntrackedBlockMap live_blocks; |
| BlockRecordMap live_block_records; |
| vector<LogBlockRefPtr> dead_blocks; |
| uint64_t max_block_id = 0; |
| Status s = container->ProcessRecords(&result->report, |
| &live_blocks, |
| &live_block_records, |
| &dead_blocks, |
| &max_block_id, |
| LogBlockContainer::ProcessRecordType::kReadAndUpdate); |
| if (!s.ok()) { |
| result->status = s.CloneAndPrepend(Substitute( |
| "Could not process records in container $0", container->ToString())); |
| return; |
| } |
| |
| // With deleted blocks out of the way, check for misaligned blocks. |
| // |
| // We could also enforce that the record's offset is aligned with the |
| // underlying filesystem's block size, an invariant maintained by the log |
| // block manager. However, due to KUDU-1793, that invariant may have been |
| // broken, so we'll note but otherwise allow it. |
| for (const auto& e : live_blocks) { |
| if (PREDICT_FALSE(e.second->offset() % |
| container->instance()->filesystem_block_size_bytes() != 0)) { |
| result->report.misaligned_block_check->entries.emplace_back( |
| container->ToString(), e.first); |
| |
| } |
| } |
| |
| if (container->full()) { |
| // Full containers without any live blocks can be deleted outright. |
| // |
| // TODO(adar): this should be reported as an inconsistency once dead |
| // container deletion is also done in real time. Until then, it would be |
| // confusing to report it as such since it'll be a natural event at startup. |
| if (container->live_blocks() == 0) { |
| DCHECK(live_blocks.empty()); |
| result->dead_containers.emplace_back(container); |
| } else if (static_cast<double>(container->live_blocks()) / |
| container->total_blocks() <= FLAGS_log_container_live_metadata_before_compact_ratio) { |
| // Metadata files of containers with very few live blocks will be compacted. |
| vector<BlockRecordPB> records = LogBlockContainer::SortRecords(std::move(live_block_records)); |
| result->low_live_block_containers[container->ToString()] = std::move(records); |
| } |
| |
| // Having processed the block records, let's check whether any full |
| // containers have any extra space (left behind after a crash or from an |
| // older version of Kudu). |
| // |
| // Filesystems are unpredictable beasts and may misreport the amount of |
| // space allocated to a file in various interesting ways. Some examples: |
| // - XFS's speculative preallocation feature may artificially enlarge the |
| // container's data file without updating its file size. This makes the |
| // file size untrustworthy for the purposes of measuring allocated space. |
| // See KUDU-1856 for more details. |
| // - On el6.6/ext4 a container data file that consumed ~32K according to |
| // its extent tree was actually reported as consuming an additional fs |
| // block (2k) of disk space. A similar container data file (generated |
| // via the same workload) on Ubuntu 16.04/ext4 did not exhibit this. |
| // The suspicion is that older versions of ext4 include interior nodes |
| // of the extent tree when reporting file block usage. |
| // |
| // To deal with these issues, our extra space cleanup code (deleted block |
| // repunching and container truncation) is gated on an "actual disk space |
| // consumed" heuristic. To prevent unnecessary triggering of the |
| // heuristic, we allow for some slop in our size measurements. The exact |
| // amount of slop is configurable via |
| // log_container_excess_space_before_cleanup_fraction. |
| // |
| // Too little slop and we'll do unnecessary work at startup. Too much and |
| // more unused space may go unreclaimed. |
| string data_filename = StrCat(container->ToString(), kContainerDataFileSuffix); |
| uint64_t reported_size; |
| s = env_->GetFileSizeOnDisk(data_filename, &reported_size); |
| if (!s.ok()) { |
| HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb( |
| ErrorHandlerType::DISK_ERROR, dir)); |
| result->status = s.CloneAndPrepend(Substitute( |
| "Could not get on-disk file size of container $0", container->ToString())); |
| return; |
| } |
| int64_t cleanup_threshold_size = container->live_bytes_aligned() * |
| (1 + FLAGS_log_container_excess_space_before_cleanup_fraction); |
| if (reported_size > cleanup_threshold_size) { |
| result->report.full_container_space_check->entries.emplace_back( |
| container->ToString(), reported_size - container->live_bytes_aligned()); |
| |
| // If the container is to be deleted outright, don't bother repunching |
| // its blocks. The report entry remains, however, so it's clear that |
| // there was a space discrepancy. |
| if (container->live_blocks()) { |
| result->need_repunching_blocks.insert(result->need_repunching_blocks.end(), |
| dead_blocks.begin(), dead_blocks.end()); |
| } |
| } |
| |
| result->report.stats.lbm_full_container_count++; |
| } |
| result->report.stats.live_block_bytes += container->live_bytes(); |
| result->report.stats.live_block_bytes_aligned += container->live_bytes_aligned(); |
| result->report.stats.live_block_count += container->live_blocks(); |
| result->report.stats.lbm_container_count++; |
| |
| next_block_id_.StoreMax(max_block_id + 1); |
| |
| int64_t mem_usage |