blob: a6eba35b17866693485bdb666aa88b9d17635530 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstdint>
#include <deque>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gtest/gtest_prod.h>
#include <sparsepp/spp.h>
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/status.h"
namespace kudu {
class Env;
class FileCache;
namespace fs {
class DataDirManager;
class Dir;
class FsErrorManager;
struct FsReport;
namespace internal {
class LogBlock;
class LogBlockContainer;
class LogBlockDeletionTransaction;
class LogWritableBlock;
struct LogBlockContainerLoadResult;
struct LogBlockManagerMetrics;
} // namespace internal
typedef scoped_refptr<internal::LogBlock> LogBlockRefPtr;
typedef scoped_refptr<internal::LogBlockContainer> LogBlockContainerRefPtr;
// A log-backed (i.e. sequentially allocated file) block storage
// implementation.
//
// This is a block storage implementation that attempts to reduce the
// number of files used by clustering blocks into large files known
// henceforth as containers. A container begins empty and is written to
// sequentially, block by block. When a container becomes full, it is set
// aside and a new container is created.
//
// Implementation details
// ----------------------
//
// A container is comprised of two files, one for metadata and one for
// data. Both are written to sequentially. During a write, the block's data
// is written as-is to the data file. After the block has been
// synchronized, a small record is written to the metadata file containing
// the block's ID and its location within the data file.
//
// Block deletions are handled similarly. When a block is deleted, a record
// is written describing the deletion, orphaning the old block data. The
// orphaned data can be reclaimed instantaneously via hole punching, or
// later via garbage collection. The latter is used when hole punching is
// not supported on the filesystem, or on next boot if there's a crash
// after deletion but before hole punching. The metadata file itself is not
// compacted, as it is expected to remain quite small even after a great
// many create/delete cycles.
//
// Data and metadata operations are carefully ordered to ensure the
// correctness of the persistent representation at all times. During the
// writable block lifecycle (i.e. when a block is being created), data
// operations come before metadata operations. In the event that a metadata
// operation fails, the result is an orphaned block that is detected and
// pruned in the next garbage collection cycle. Conversely, metadata
// operations precede the data operations when deleting a block. In the
// worst case, a failure in the latter yields more garbage data that can be
// deleted in a GC.
//
// Care is taken to keep the in-memory representation of the block manager
// in sync with its persistent representation. To wit, a block is only made
// available in memory if _all_ on-disk operations (including any necessary
// synchronization calls) are successful.
//
// Writes to containers are batched together through the use of block
// transactions: each writer will take ownership of an "available" container,
// write a block to the container, and release ownership of the container once
// the writer "finalizes" the block, making the container available to other
// writers. This can happen concurrently; multiple transactions can interleave
// writes to single container, provided each writer finalizes its block before
// the next writer reaches for a container. Once any of the writers is
// completely done with its IO, it can commit its transaction, syncing its
// blocks and the container to disk (potentially as others are writing!).
//
// In order to maintain on-disk consistency, if the above commit fails, the
// entire container is marked read-only, and any future writes to the container
// will fail. There is a tradeoff here to note--having concurrent writers
// grants better utilization for each container; however a failure to sync by
// any of the writers will cause the others to fail and potentially corrupt the
// underlying container.
//
// When a new block is created, a container is selected from the data
// directory group appropriate for the block, as indicated by hints in
// provided CreateBlockOptions (i.e. blocks for diskrowsets should be placed
// within its tablet's data directory group).
//
// All log block manager metadata requests are served from memory. When an
// existing block manager is opened, all on-disk container metadata is
// parsed to build a single in-memory map describing the existence and
// locations of various blocks. Each entry in the map consumes ~64 bytes,
// putting the memory overhead at ~610 MB for 10 million blocks.
//
// New blocks are placed on a filesystem block boundary, and the size of
// hole punch requests is rounded up to the nearest filesystem block size.
// Taken together, this guarantees that hole punching can actually reclaim
// disk space (instead of just zeroing the block's bytes on disk).
//
// Design trade-offs
// -----------------
// In general, log-backed block storage is optimized for sustained reads
// and writes. The idea is that all blocks in a given container contain
// related data and are generally read at once, reducing seeks for
// sustained scans. This comes at a cost: the containers need to be garbage
// collected every now and then, though newer systems can take advantage of
// filesystem hole punching (as described above) to reclaim space.
//
// The on-disk container metadata design favors simplicity and contiguous
// access over space consumption and scalability to a very large number of
// blocks. To be more specific, the separation of metadata from data allows
// for high performance sustained reads at block manager open time at a
// manageability cost: a container is not a single file, and needs multiple
// open fds to be of use. Moreover, the log-structured nature of the
// metadata is simple and performant at open time.
//
// Likewise, the default container placement policy favors simplicity over
// performance. In the future, locality hints will ensure that blocks
// pertaining to similar data are colocated, improving scan performance.
//
// The choice to serve all metadata requests from memory favors simplicity
// over memory consumption. With a very large number of blocks, the
// in-memory map may balloon in size and some sort of "spilling" behavior
// may be beneficial.
// TODO
// ----
// - Implement garbage collection fallback for hole punching.
// - Implement locality hints so that specific containers can be used for
// groups of blocks (i.e. an entire column).
// - Implement failure recovery (i.e. metadata truncation and other
// similarly recoverable errors).
// - Evaluate and implement a solution for data integrity (e.g. per-block
// checksum).
// - Change the availability semantics to only mark a container as available if
// the current writer has committed and synced its transaction.
// The log-backed block manager.
class LogBlockManager : public BlockManager {
public:
static const char* kContainerMetadataFileSuffix;
static const char* kContainerDataFileSuffix;
// Note: all objects passed as pointers should remain alive for the lifetime
// of the block manager.
LogBlockManager(Env* env,
DataDirManager* dd_manager,
FsErrorManager* error_manager,
FileCache* file_cache,
BlockManagerOptions opts);
~LogBlockManager();
Status Open(FsReport* report, std::atomic<int>* containers_processed = nullptr,
std::atomic<int>* containers_total = nullptr) override;
Status CreateBlock(const CreateBlockOptions& opts,
std::unique_ptr<WritableBlock>* block) override;
Status OpenBlock(const BlockId& block_id,
std::unique_ptr<ReadableBlock>* block) override;
std::unique_ptr<BlockCreationTransaction> NewCreationTransaction() override;
std::shared_ptr<BlockDeletionTransaction> NewDeletionTransaction() override;
Status GetAllBlockIds(std::vector<BlockId>* block_ids) override;
void NotifyBlockId(BlockId block_id) override;
FsErrorManager* error_manager() override { return error_manager_; }
private:
FRIEND_TEST(LogBlockManagerTest, TestAbortBlock);
FRIEND_TEST(LogBlockManagerTest, TestCloseFinalizedBlock);
FRIEND_TEST(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup);
FRIEND_TEST(LogBlockManagerTest, TestFinalizeBlock);
FRIEND_TEST(LogBlockManagerTest, TestLIFOContainerSelection);
FRIEND_TEST(LogBlockManagerTest, TestLookupBlockLimit);
FRIEND_TEST(LogBlockManagerTest, TestMetadataTruncation);
FRIEND_TEST(LogBlockManagerTest, TestParseKernelRelease);
FRIEND_TEST(LogBlockManagerTest, TestBumpBlockIds);
FRIEND_TEST(LogBlockManagerTest, TestReuseBlockIds);
FRIEND_TEST(LogBlockManagerTest, TestFailMultipleTransactionsPerContainer);
friend class internal::LogBlockContainer;
friend class internal::LogBlockDeletionTransaction;
friend class internal::LogWritableBlock;
// Type for the actual block map used to store all live blocks.
// We use sparse_hash_map<> here to reduce memory overhead.
typedef MemTrackerAllocator<
std::pair<const BlockId, LogBlockRefPtr>> BlockAllocator;
typedef spp::sparse_hash_map<
BlockId,
LogBlockRefPtr,
BlockIdHash,
BlockIdEqual,
BlockAllocator> BlockMap;
// Simpler typedef for a block map which isn't tracked in the memory tracker.
//
// Only used during startup.
typedef std::unordered_map<
const BlockId,
LogBlockRefPtr,
BlockIdHash,
BlockIdEqual> UntrackedBlockMap;
// Map used to store live block records during container metadata processing.
//
// Only used during startup.
typedef std::unordered_map<
const BlockId,
BlockRecordPB,
BlockIdHash,
BlockIdEqual> BlockRecordMap;
// Adds an as of yet unseen container to this block manager.
//
// Must be called with 'lock_' held.
void AddNewContainerUnlocked(const LogBlockContainerRefPtr& container);
// Removes a previously added container from this block manager. The
// container must be dead (i.e. full and without any live blocks).
//
// Must be called with 'lock_' held.
void RemoveDeadContainerUnlocked(const std::string& container_name);
// Variant of RemoveDeadContainerUnlocked that acquires 'lock_'.
void RemoveDeadContainer(const std::string& container_name);
// Returns a container appropriate for the given CreateBlockOptions, creating
// a new container if necessary.
//
// After returning, the container is considered to be in use. When
// writing is finished, call MakeContainerAvailable() to make it
// available to other writers.
Status GetOrCreateContainer(const CreateBlockOptions& opts,
LogBlockContainerRefPtr* container);
// Indicate that this container is no longer in use and can be handed out
// to other writers.
void MakeContainerAvailable(LogBlockContainerRefPtr container);
void MakeContainerAvailableUnlocked(LogBlockContainerRefPtr container);
// Synchronizes a container's dirty metadata to disk, taking care not to
// sync more than is necessary (using 'dirty_dirs_').
Status SyncContainer(const internal::LogBlockContainer& container);
// Attempts to claim 'block_id' for use in a new WritableBlock.
//
// Returns true if the given block ID was not in use (and marks it as in
// use), false otherwise.
bool TryUseBlockId(const BlockId& block_id);
// Creates and adds a LogBlock to in-memory data structures.
//
// Returns the created LogBlock if it was successfully added or nullptr if a
// block with that ID was already present.
LogBlockRefPtr CreateAndAddLogBlock(
LogBlockContainerRefPtr container,
const BlockId& block_id,
int64_t offset,
int64_t length);
// Adds a LogBlock for an already-constructed LogBlock object.
//
// Returns true if the LogBlock was successfully added, false if it was already present.
bool AddLogBlock(LogBlockRefPtr lb);
// Removes the given set of LogBlocks from in-memory data structures, and
// appends the block deletion metadata to record the on-disk deletion.
// The 'log_blocks' out parameter will be set with the LogBlocks that were
// successfully removed. The 'deleted' out parameter will be set with the
// blocks were already deleted, e.g encountered 'NotFound' error during removal.
//
// Returns the first deletion failure that was seen, if any.
Status RemoveLogBlocks(std::vector<BlockId> block_ids,
std::vector<LogBlockRefPtr>* log_blocks,
std::vector<BlockId>* deleted);
// Removes a LogBlock from in-memory data structures.
// The 'lb' out parameter will be set with the successfully deleted LogBlock.
//
// Returns an error of LogBlock cannot be successfully removed.
Status RemoveLogBlock(const BlockId& block_id,
LogBlockRefPtr* lb);
// Simple wrapper of Repair(), used as a runnable function in thread.
void RepairTask(Dir* dir, internal::LogBlockContainerLoadResult* result);
// Repairs any inconsistencies for 'dir' described in 'report'.
//
// The following additional repairs will be performed:
// 1. Blocks in 'need_repunching' will be punched out again.
// 2. Containers in 'dead_containers' will be deleted from disk.
// 3. Containers in 'low_live_block_containers' will have their metadata
// files compacted.
//
// Returns an error if repairing a fatal inconsistency failed.
Status Repair(Dir* dir,
FsReport* report,
std::vector<LogBlockRefPtr> need_repunching,
std::vector<LogBlockContainerRefPtr> dead_containers,
std::unordered_map<
std::string,
std::vector<BlockRecordPB>> low_live_block_containers);
// Rewrites a container metadata file, appending all entries in 'records'.
// The new metadata file is created as a temporary file and renamed over the
// existing file after it is fully written.
//
// On success, writes the difference in file sizes to 'file_bytes_delta'. On
// failure, an effort is made to delete the temporary file.
//
// Note: the new file is synced but its parent directory is not.
Status RewriteMetadataFile(const internal::LogBlockContainer& container,
const std::vector<BlockRecordPB>& records,
int64_t* file_bytes_delta);
// Opens a particular data directory belonging to the block manager. The
// results of consistency checking are written to 'results'.
//
// Success or failure is set in 'result_status'.
//
// If 'containers_processed' and 'containers_total' are not nullptr, they will
// be populated with total containers attempted to be opened/processed and
// total containers present respectively in the subsequent calls made to
// the block manager.
// TODO(achennaka): Implement a cleaner way to pass the atomic values to startup page
// probably by using metrics.
void OpenDataDir(Dir* dir,
std::vector<std::unique_ptr<internal::LogBlockContainerLoadResult>>* results,
Status* result_status,
std::atomic<int>* containers_processed = nullptr,
std::atomic<int>* containers_total = nullptr);
// Reads records from one log block container in the data directory.
// The result details will be collected into 'result'.
void LoadContainer(Dir* dir,
LogBlockContainerRefPtr container,
internal::LogBlockContainerLoadResult* result);
ObjectIdGenerator* oid_generator() { return &oid_generator_; }
Env* env() const { return env_; }
// Returns the path of the given container. Only for use by tests.
static std::string ContainerPathForTests(internal::LogBlockContainer* container);
// Returns whether the given kernel release is vulnerable to KUDU-1508.
static bool IsBuggyEl6Kernel(const std::string& kernel_release);
// Finds an appropriate block limit from 'kPerFsBlockSizeBlockLimits'
// using the given filesystem block size.
static int64_t LookupBlockLimit(int64_t fs_block_size);
const internal::LogBlockManagerMetrics* metrics() const { return metrics_.get(); }
// For kernels affected by KUDU-1508, tracks a known good upper bound on the
// number of blocks per container, given a particular filesystem block size.
static const std::map<int64_t, int64_t> kPerFsBlockSizeBlockLimits;
// For manipulating files.
Env* env_;
// Manages and owns the data directories in which the block manager will
// place its blocks.
DataDirManager* dd_manager_;
// Manages callbacks used to handle disk failure.
FsErrorManager* error_manager_;
// The options that the LogBlockManager was created with.
const BlockManagerOptions opts_;
// Tracks memory consumption of any allocations numerous enough to be
// interesting (e.g. LogBlocks).
std::shared_ptr<MemTracker> mem_tracker_;
// Block IDs container used to prevent collisions when creating new anonymous blocks.
struct ManagedBlockShard {
// Protects 'blocks_by_block_id' and 'open_block_ids'.
std::unique_ptr<simple_spinlock> lock;
// Maps block IDs to blocks that are now readable, either because they
// already existed on disk when the block manager was opened, or because
// they're WritableBlocks that were closed.
std::unique_ptr<BlockMap> blocks_by_block_id;
// Contains block IDs for WritableBlocks that are still open for writing.
// When a WritableBlock is closed, its ID is moved to 'blocks_by_block_id'.
BlockIdSet open_block_ids;
};
// Sharding block IDs containers.
std::vector<ManagedBlockShard> managed_block_shards_;
// Protects 'all_containers_by_name_', 'available_containers_by_data_dir_' and 'dirty_dirs'.
mutable simple_spinlock lock_;
// Maps a data directory to an upper bound on the number of blocks that a
// container residing in that directory should observe, if one is necessary.
std::unordered_map<const Dir*,
std::optional<int64_t>> block_limits_by_data_dir_;
// Manages files opened for reading.
FileCache* file_cache_;
// Holds (and owns) all containers loaded from disk.
std::unordered_map<std::string,
LogBlockContainerRefPtr> all_containers_by_name_;
// Holds only those containers that are currently available for writing,
// excluding containers that are either in use or full.
//
// Does not own the containers.
std::unordered_map<const Dir*,
std::deque<LogBlockContainerRefPtr>> available_containers_by_data_dir_;
// Tracks dirty container directories.
//
// Synced and cleared by SyncMetadata().
std::unordered_set<std::string> dirty_dirs_;
// If true, the kernel is vulnerable to KUDU-1508.
const bool buggy_el6_kernel_;
// For generating container names.
ObjectIdGenerator oid_generator_;
// For generating block IDs.
AtomicInt<uint64_t> next_block_id_;
// Metrics for the block manager.
//
// May be null if instantiated without metrics.
std::unique_ptr<internal::LogBlockManagerMetrics> metrics_;
DISALLOW_COPY_AND_ASSIGN(LogBlockManager);
};
} // namespace fs
} // namespace kudu