| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/fs/log_block_manager.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <cstring> |
| #include <deque> |
| #include <functional> |
| #include <initializer_list> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <random> |
| #include <set> |
| #include <string> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/fs/block_id.h" |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/data_dirs.h" |
| #include "kudu/fs/dir_manager.h" |
| #include "kudu/fs/error_manager.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/fs/fs_report.h" |
| #include "kudu/fs/log_block_manager-test-util.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/strip.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/strings/util.h" |
| #include "kudu/util/atomic.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/file_cache.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" // IWYU pragma: keep |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| #include "kudu/util/threadpool.h" |
| |
| using kudu::pb_util::ReadablePBContainerFile; |
| using std::set; |
| using std::string; |
| using std::shared_ptr; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| DECLARE_bool(cache_force_single_shard); |
| DECLARE_bool(crash_on_eio); |
| DECLARE_double(env_inject_eio); |
| DECLARE_double(log_container_excess_space_before_cleanup_fraction); |
| DECLARE_double(log_container_live_metadata_before_compact_ratio); |
| DECLARE_int32(fs_target_data_dirs_per_tablet); |
| DECLARE_int64(log_container_max_blocks); |
| DECLARE_string(block_manager_preflush_control); |
| DECLARE_string(env_inject_eio_globs); |
| DECLARE_uint64(log_container_preallocate_bytes); |
| DECLARE_uint64(log_container_max_size); |
| DECLARE_uint64(log_container_metadata_max_size); |
| DECLARE_bool(log_container_metadata_runtime_compact); |
| DECLARE_double(log_container_metadata_size_before_compact_ratio); |
| DEFINE_int32(startup_benchmark_block_count_for_testing, 1000000, |
| "Block count to do startup benchmark."); |
| DEFINE_int32(startup_benchmark_data_dir_count_for_testing, 8, |
| "Data directories to do startup benchmark."); |
| DEFINE_int32(startup_benchmark_reopen_times, 10, |
| "Block manager reopen times."); |
| DEFINE_int32(startup_benchmark_deleted_block_percentage, 90, |
| "Percentage of deleted blocks in containers."); |
| DEFINE_validator(startup_benchmark_deleted_block_percentage, |
| [](const char* /*n*/, int32_t v) { return 0 <= v && v <= 100; }); |
| DECLARE_bool(encrypt_data_at_rest); |
| |
| // Block manager metrics. |
| METRIC_DECLARE_counter(block_manager_total_blocks_deleted); |
| |
| // Log block manager metrics. |
| METRIC_DECLARE_gauge_uint64(log_block_manager_bytes_under_management); |
| METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management); |
| METRIC_DECLARE_counter(log_block_manager_holes_punched); |
| METRIC_DECLARE_gauge_uint64(log_block_manager_containers); |
| METRIC_DECLARE_gauge_uint64(log_block_manager_full_containers); |
| METRIC_DECLARE_gauge_uint64(log_block_manager_dead_containers_deleted); |
| |
| namespace kudu { |
| namespace fs { |
| |
| namespace internal { |
| class LogBlockContainer; |
| } // namespace internal |
| |
| class LogBlockManagerTest : public KuduTest, public ::testing::WithParamInterface<bool> { |
| public: |
| LogBlockManagerTest() : |
| test_tablet_name_("test_tablet"), |
| test_block_opts_({ test_tablet_name_ }), |
| // Use a small file cache (smaller than the number of containers). |
| // |
| // Not strictly necessary except for TestDeleteFromContainerAfterMetadataCompaction. |
| file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()), |
| bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) { |
| CHECK_OK(file_cache_.Init()); |
| } |
| |
| void SetUp() override { |
| // Pass in a report to prevent the block manager from logging unnecessarily. |
| FsReport report; |
| ASSERT_OK(bm_->Open(&report)); |
| ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_)); |
| ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_)); |
| } |
| |
| protected: |
| LogBlockManager* CreateBlockManager(const scoped_refptr<MetricEntity>& metric_entity, |
| std::vector<std::string> test_data_dirs = {}) { |
| PrepareDataDirs(&test_data_dirs); |
| |
| if (!dd_manager_) { |
| // Ensure the directory manager is initialized. |
| CHECK_OK(DataDirManager::CreateNewForTests(env_, test_data_dirs, |
| DataDirManagerOptions(), &dd_manager_)); |
| } |
| |
| BlockManagerOptions opts; |
| opts.metric_entity = metric_entity; |
| return new LogBlockManager(env_, dd_manager_.get(), &error_manager_, |
| &file_cache_, std::move(opts)); |
| } |
| |
| Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity = nullptr, |
| FsReport* report = nullptr, |
| std::vector<std::string> test_data_dirs = {}, |
| bool force = false) { |
| PrepareDataDirs(&test_data_dirs); |
| |
| // The directory manager must outlive the block manager. Destroy the block |
| // manager first to enforce this. |
| bm_.reset(); |
| |
| if (force) { |
| // Ensure the directory manager is initialized. |
| CHECK_OK(DataDirManager::CreateNewForTests(env_, test_data_dirs, |
| DataDirManagerOptions(), &dd_manager_)); |
| RETURN_NOT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_)); |
| RETURN_NOT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_)); |
| } else { |
| // Re-open the directory manager first to clear any in-memory maps. |
| RETURN_NOT_OK(DataDirManager::OpenExistingForTests(env_, test_data_dirs, |
| DataDirManagerOptions(), &dd_manager_)); |
| RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_)); |
| } |
| |
| bm_.reset(CreateBlockManager(metric_entity, test_data_dirs)); |
| RETURN_NOT_OK(bm_->Open(report)); |
| return Status::OK(); |
| } |
| |
| // Returns the only container data file in the test directory. Yields an |
| // assert failure if more than one is found. |
| void GetOnlyContainerDataFile(string* data_file) { |
| vector<string> data_files; |
| DoGetContainers(DATA_FILES, &data_files); |
| ASSERT_EQ(1, data_files.size()); |
| *data_file = data_files[0]; |
| } |
| |
| // Returns the only container metadata file in the test directory. Yields an |
| // assert failure if more than one is found. |
| void GetOnlyContainerMetadataFile(string* metadata_file) { |
| vector<string> metadata_files; |
| DoGetContainers(METADATA_FILES, &metadata_files); |
| ASSERT_EQ(1, metadata_files.size()); |
| *metadata_file = metadata_files[0]; |
| } |
| |
| void GetContainerMetadataFiles(vector<string>* metadata_files) { |
| DoGetContainers(METADATA_FILES, metadata_files); |
| } |
| |
| // Like GetOnlyContainerDataFile(), but returns a container name (i.e. data |
| // or metadata file with the file suffix removed). |
| void GetOnlyContainer(string* container) { |
| vector<string> containers; |
| DoGetContainers(CONTAINER_NAMES, &containers); |
| ASSERT_EQ(1, containers.size()); |
| *container = containers[0]; |
| } |
| |
| // Returns the names of all of the containers found in the test directory. |
| void GetContainerNames(vector<string>* container_names) { |
| DoGetContainers(CONTAINER_NAMES, container_names); |
| } |
| |
| // Asserts that 'expected_num_containers' are found in the test directory. |
| void AssertNumContainers(int expected_num_containers) { |
| vector<string> containers; |
| DoGetContainers(CONTAINER_NAMES, &containers); |
| ASSERT_EQ(expected_num_containers, containers.size()); |
| } |
| |
| // Asserts that 'report' contains no inconsistencies. |
| void AssertEmptyReport(const FsReport& report) { |
| ASSERT_TRUE(report.full_container_space_check->entries.empty()); |
| ASSERT_TRUE(report.incomplete_container_check->entries.empty()); |
| ASSERT_TRUE(report.malformed_record_check->entries.empty()); |
| ASSERT_TRUE(report.misaligned_block_check->entries.empty()); |
| ASSERT_TRUE(report.partial_record_check->entries.empty()); |
| } |
| |
| DataDirGroupPB test_group_pb_; |
| string test_tablet_name_; |
| CreateBlockOptions test_block_opts_; |
| |
| unique_ptr<DataDirManager> dd_manager_; |
| FsErrorManager error_manager_; |
| FileCache file_cache_; |
| unique_ptr<LogBlockManager> bm_; |
| |
| private: |
| enum GetMode { |
| DATA_FILES, |
| METADATA_FILES, |
| CONTAINER_NAMES, |
| }; |
| void DoGetContainers(GetMode mode, vector<string>* out) { |
| // Populate 'data_files' and 'metadata_files'. |
| vector<string> data_files; |
| vector<string> metadata_files; |
| for (const string& data_dir : dd_manager_->GetDirs()) { |
| vector<string> children; |
| ASSERT_OK(env_->GetChildren(data_dir, &children)); |
| for (const string& child : children) { |
| if (HasSuffixString(child, LogBlockManager::kContainerDataFileSuffix)) { |
| data_files.push_back(JoinPathSegments(data_dir, child)); |
| } else if (HasSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix)) { |
| metadata_files.push_back(JoinPathSegments(data_dir, child)); |
| } |
| } |
| } |
| |
| switch (mode) { |
| case DATA_FILES: |
| *out = std::move(data_files); |
| break; |
| case METADATA_FILES: |
| *out = std::move(metadata_files); |
| break; |
| case CONTAINER_NAMES: |
| // Build the union of 'data_files' and 'metadata_files' with suffixes |
| // stripped. |
| unordered_set<string> container_names; |
| for (const auto& df : data_files) { |
| string c; |
| ASSERT_TRUE(TryStripSuffixString( |
| df, LogBlockManager::kContainerDataFileSuffix, &c)); |
| container_names.emplace(std::move(c)); |
| } |
| for (const auto& mdf : metadata_files) { |
| string c; |
| ASSERT_TRUE(TryStripSuffixString( |
| mdf, LogBlockManager::kContainerMetadataFileSuffix, &c)); |
| container_names.emplace(std::move(c)); |
| } |
| out->assign(container_names.begin(), container_names.end()); |
| break; |
| } |
| } |
| void PrepareDataDirs(std::vector<std::string>* test_data_dirs) { |
| if (test_data_dirs->empty()) { |
| *test_data_dirs = { test_dir_ }; |
| } |
| for (const auto& test_data_dir : *test_data_dirs) { |
| Status s = Env::Default()->CreateDir(test_data_dir); |
| CHECK(s.IsAlreadyPresent() || s.ok()) |
| << "Could not create directory " << test_data_dir << ": " << s.ToString(); |
| } |
| } |
| }; |
| |
| static void CheckGaugeMetric(const scoped_refptr<MetricEntity>& entity, |
| int expected_value, const MetricPrototype* prototype) { |
| AtomicGauge<uint64_t>* gauge = down_cast<AtomicGauge<uint64_t>*>( |
| entity->FindOrNull(*prototype).get()); |
| DCHECK(gauge); |
| ASSERT_EQ(expected_value, gauge->value()) << prototype->name(); |
| } |
| |
| static void CheckCounterMetric(const scoped_refptr<MetricEntity>& entity, |
| int expected_value, const MetricPrototype* prototype) { |
| Counter* counter = down_cast<Counter*>(entity->FindOrNull(*prototype).get()); |
| DCHECK(counter); |
| ASSERT_EQ(expected_value, counter->value()) << prototype->name(); |
| } |
| |
| static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity, |
| const vector<std::pair<int, const MetricPrototype*>> gauge_values, |
| const vector<std::pair<int, const MetricPrototype*>> counter_values) { |
| for (const auto& gauge_value : gauge_values) { |
| NO_FATALS(CheckGaugeMetric(entity, gauge_value.first, gauge_value.second)); |
| } |
| for (const auto& counter_value: counter_values) { |
| NO_FATALS(CheckCounterMetric(entity, counter_value.first, counter_value.second)); |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerTest, ::testing::Values(false, true)); |
| |
| TEST_P(LogBlockManagerTest, MetricsTest) { |
| SetEncryptionFlags(GetParam()); |
| MetricRegistry registry; |
| scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(®istry, "test"); |
| ASSERT_OK(ReopenBlockManager(entity)); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {0, &METRIC_log_block_manager_blocks_under_management}, |
| {0, &METRIC_log_block_manager_containers}, |
| {0, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| // Lower the max container size so that we can more easily test full |
| // container metrics. |
| // TODO(abukor): If this is 1024, this becomes full when writing the first |
| // block because of alignments. If it is over 4k, it fails with encryption |
| // disabled due to having only 5 containers instead of 10. Investigate this. |
| FLAGS_log_container_max_size = GetParam() ? 8192 : 1024; |
| |
| // One block --> one container. |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {0, &METRIC_log_block_manager_blocks_under_management}, |
| {1, &METRIC_log_block_manager_containers}, |
| {0, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| // And when the block is closed, it becomes "under management". |
| ASSERT_OK(writer->Close()); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {1, &METRIC_log_block_manager_blocks_under_management}, |
| {1, &METRIC_log_block_manager_containers}, |
| {0, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| // Create 10 blocks concurrently. We reuse the existing container and |
| // create 9 new ones. All of them get filled. |
| BlockId saved_id; |
| { |
| uint8_t data[1024]; |
| Random rand(SeedRandom()); |
| unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction(); |
| for (int i = 0; i < 10; i++) { |
| unique_ptr<WritableBlock> b; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &b)); |
| if (saved_id.IsNull()) { |
| saved_id = b->id(); |
| } |
| for (int j = 0; j < sizeof(data); j += sizeof(uint32_t)) { |
| data[j] = rand.Next(); |
| } |
| b->Append(Slice(data, sizeof(data))); |
| ASSERT_OK(b->Finalize()); |
| transaction->AddCreatedBlock(std::move(b)); |
| } |
| // Metrics for full containers are updated after Finalize(). |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {1, &METRIC_log_block_manager_blocks_under_management}, |
| {10, &METRIC_log_block_manager_containers}, |
| {10, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| ASSERT_OK(transaction->CommitCreatedBlocks()); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {10 * 1024, &METRIC_log_block_manager_bytes_under_management}, |
| {11, &METRIC_log_block_manager_blocks_under_management}, |
| {10, &METRIC_log_block_manager_containers}, |
| {10, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| } |
| |
| // Reopen the block manager and test the metrics. They're all based on |
| // persistent information so they should be the same. |
| MetricRegistry new_registry; |
| scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test"); |
| ASSERT_OK(ReopenBlockManager(new_entity)); |
| NO_FATALS(CheckLogMetrics(new_entity, |
| { {10 * 1024, &METRIC_log_block_manager_bytes_under_management}, |
| {11, &METRIC_log_block_manager_blocks_under_management}, |
| {10, &METRIC_log_block_manager_containers}, |
| {10, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| // Delete a block. Its contents should no longer be under management. |
| { |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| bm_->NewDeletionTransaction(); |
| deletion_transaction->AddDeletedBlock(saved_id); |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| NO_FATALS(CheckLogMetrics(new_entity, |
| { {9 * 1024, &METRIC_log_block_manager_bytes_under_management}, |
| {10, &METRIC_log_block_manager_blocks_under_management}, |
| {10, &METRIC_log_block_manager_containers}, |
| {10, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {1, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| } |
| dd_manager_->WaitOnClosures(); |
| NO_FATALS(CheckLogMetrics(new_entity, |
| { {9 * 1024, &METRIC_log_block_manager_bytes_under_management}, |
| {10, &METRIC_log_block_manager_blocks_under_management}, |
| {10, &METRIC_log_block_manager_containers}, |
| {10, &METRIC_log_block_manager_full_containers} }, |
| { {1, &METRIC_log_block_manager_holes_punched}, |
| {1, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| // Set the max container size to default so that we can create a bunch of blocks |
| // in the same container. Delete those created blocks afterwards to verify only |
| // one hole punch operation is executed since the blocks are contiguous. |
| FLAGS_log_container_max_size = 10LU * 1024 * 1024 * 1024; |
| { |
| vector<BlockId> blocks; |
| unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction(); |
| for (int i = 0; i < 10; i++) { |
| unique_ptr<WritableBlock> b; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &b)); |
| blocks.emplace_back(b->id()); |
| b->Append("test data"); |
| ASSERT_OK(b->Finalize()); |
| transaction->AddCreatedBlock(std::move(b)); |
| } |
| ASSERT_OK(transaction->CommitCreatedBlocks()); |
| |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| bm_->NewDeletionTransaction(); |
| for (const auto& block : blocks) { |
| deletion_transaction->AddDeletedBlock(block); |
| } |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| ASSERT_EQ(blocks.size(), deleted.size()); |
| NO_FATALS(CheckLogMetrics(new_entity, |
| { {9 * 1024, &METRIC_log_block_manager_bytes_under_management}, |
| {10, &METRIC_log_block_manager_blocks_under_management}, |
| {11, &METRIC_log_block_manager_containers}, |
| {10, &METRIC_log_block_manager_full_containers} }, |
| { {1, &METRIC_log_block_manager_holes_punched}, |
| {11, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| } |
| // Wait for the actual hole punching to take place. |
| for (const auto& data_dir : dd_manager_->dirs()) { |
| data_dir->WaitOnClosures(); |
| } |
| NO_FATALS(CheckLogMetrics(new_entity, |
| { {9 * 1024, &METRIC_log_block_manager_bytes_under_management}, |
| {10, &METRIC_log_block_manager_blocks_under_management}, |
| {11, &METRIC_log_block_manager_containers}, |
| {10, &METRIC_log_block_manager_full_containers} }, |
| { {2, &METRIC_log_block_manager_holes_punched}, |
| {11, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| } |
| |
| TEST_P(LogBlockManagerTest, ContainerPreallocationTest) { |
| SetEncryptionFlags(GetParam()); |
| string kTestData = "test data"; |
| |
| // For this test to work properly, the preallocation window has to be at |
| // least three times the size of the test data. |
| ASSERT_GE(FLAGS_log_container_preallocate_bytes, kTestData.size() * 3); |
| |
| // Create a block with some test data. This should also trigger |
| // preallocation of the container, provided it's supported by the kernel. |
| unique_ptr<WritableBlock> written_block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &written_block)); |
| ASSERT_OK(written_block->Append(kTestData)); |
| ASSERT_OK(written_block->Close()); |
| |
| // We expect the container size to be equal to the preallocation amount, |
| // which we know is greater than the test data size. |
| string container_data_filename; |
| NO_FATALS(GetOnlyContainerDataFile(&container_data_filename)); |
| uint64_t size; |
| ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size)); |
| ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size); |
| |
| // Upon writing a second block, we'd expect the container to remain the same |
| // size. |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &written_block)); |
| ASSERT_OK(written_block->Append(kTestData)); |
| ASSERT_OK(written_block->Close()); |
| NO_FATALS(GetOnlyContainerDataFile(&container_data_filename)); |
| ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size)); |
| ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size); |
| |
| // Now reopen the block manager and create another block. The block manager |
| // should be smart enough to reuse the previously preallocated amount. |
| ASSERT_OK(ReopenBlockManager()); |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &written_block)); |
| ASSERT_OK(written_block->Append(kTestData)); |
| ASSERT_OK(written_block->Close()); |
| NO_FATALS(GetOnlyContainerDataFile(&container_data_filename)); |
| ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size)); |
| ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size); |
| } |
| |
| // Test for KUDU-2202 to ensure that once the block manager has been notified |
| // of a block ID, it will not reuse it. |
| TEST_P(LogBlockManagerTest, TestBumpBlockIds) { |
| SetEncryptionFlags(GetParam()); |
| const int kNumBlocks = 10; |
| vector<BlockId> block_ids; |
| unique_ptr<WritableBlock> writer; |
| for (int i = 0; i < kNumBlocks; i++) { |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| block_ids.push_back(writer->id()); |
| } |
| BlockId max_so_far = *std::max_element(block_ids.begin(), block_ids.end()); |
| |
| // Simulate a complete reset of the block manager's block ID record, e.g. |
| // from restarting but with all the blocks gone. |
| bm_->next_block_id_.Store(1); |
| |
| // Now simulate being notified by some other component (e.g. tablet metadata) |
| // of the presence of a block ID. |
| bm_->NotifyBlockId(BlockId(max_so_far)); |
| |
| // Once notified, new blocks should be assigned higher IDs. |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_LT(max_so_far, writer->id()); |
| max_so_far = writer->id(); |
| |
| // Notifications of lower or invalid block IDs should not disrupt ordering. |
| bm_->NotifyBlockId(BlockId(1)); |
| bm_->NotifyBlockId(BlockId()); |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_LT(max_so_far, writer->id()); |
| } |
| |
| // Regression test for KUDU-1190, a crash at startup when a block ID has been |
| // reused. |
| TEST_P(LogBlockManagerTest, TestReuseBlockIds) { |
| SetEncryptionFlags(GetParam()); |
| // Typically, the LBM starts with a random block ID when running as a |
| // gtest. In this test, we want to control the block IDs. |
| bm_->next_block_id_.Store(1); |
| |
| vector<BlockId> block_ids; |
| |
| // Create 4 containers, with the first four block IDs in the sequence. |
| { |
| unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction(); |
| for (int i = 0; i < 4; i++) { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| block_ids.push_back(writer->id()); |
| transaction->AddCreatedBlock(std::move(writer)); |
| } |
| ASSERT_OK(transaction->CommitCreatedBlocks()); |
| } |
| |
| // Create one more block, which should reuse the first container. |
| { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_OK(writer->Close()); |
| } |
| |
| ASSERT_EQ(4, bm_->all_containers_by_name_.size()); |
| |
| // Delete the original blocks. |
| { |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| bm_->NewDeletionTransaction(); |
| for (const BlockId& b : block_ids) { |
| deletion_transaction->AddDeletedBlock(b); |
| } |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| |
| // Reset the block ID sequence and re-create new blocks which should reuse the same |
| // block IDs. This isn't allowed in current versions of Kudu, but older versions |
| // could produce this situation, and we still need to handle it on startup. |
| bm_->next_block_id_.Store(1); |
| for (int i = 0; i < 4; i++) { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_EQ(writer->id(), block_ids[i]); |
| ASSERT_OK(writer->Close()); |
| } |
| |
| // Now we have 4 containers with the following metadata: |
| // 1: CREATE(1) CREATE (5) DELETE(1) CREATE(4) |
| // 2: CREATE(2) DELETE(2) CREATE(1) |
| // 3: CREATE(3) DELETE(3) CREATE(2) |
| // 4: CREATE(4) DELETE(4) CREATE(3) |
| |
| // Re-open the block manager and make sure it can deal with this case where |
| // block IDs have been reused. |
| ASSERT_OK(ReopenBlockManager()); |
| } |
| |
| // Test partial record at end of metadata file. See KUDU-1377. |
| // The idea behind this test is that we should tolerate one partial record at |
| // the end of a given container metadata file, since we actively append a |
| // record to a container metadata file when a new block is created or deleted. |
| // A system crash or disk-full event can result in a partially-written metadata |
| // record. Ignoring a trailing, partial (not corrupt) record is safe, so long |
| // as we only consider a container valid if there is at most one trailing |
| // partial record. If any other metadata record is somehow incomplete or |
| // corrupt, we consider that an error and the entire container is considered |
| // corrupted. |
| // |
| // Note that we rely on filesystem integrity to ensure that we do not lose |
| // trailing, fsync()ed metadata. |
| TEST_P(LogBlockManagerTest, TestMetadataTruncation) { |
| SetEncryptionFlags(GetParam()); |
| // Create several blocks. |
| vector<BlockId> created_blocks; |
| BlockId last_block_id; |
| for (int i = 0; i < 4; i++) { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| last_block_id = writer->id(); |
| created_blocks.push_back(last_block_id); |
| ASSERT_OK(writer->Close()); |
| } |
| vector<BlockId> block_ids; |
| ASSERT_OK(bm_->GetAllBlockIds(&block_ids)); |
| ASSERT_EQ(4, block_ids.size()); |
| unique_ptr<ReadableBlock> block; |
| ASSERT_OK(bm_->OpenBlock(last_block_id, &block)); |
| ASSERT_OK(block->Close()); |
| |
| // Start corrupting the metadata file in different ways. |
| |
| string path = LogBlockManager::ContainerPathForTests( |
| bm_->all_containers_by_name_.begin()->second.get()); |
| string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix; |
| string data_path = path + LogBlockManager::kContainerDataFileSuffix; |
| |
| uint64_t good_meta_size; |
| ASSERT_OK(env_->GetFileSize(metadata_path, &good_meta_size)); |
| |
| // First, add extra null bytes to the end of the metadata file. This makes |
| // the trailing "record" of the metadata file corrupt, but doesn't cause data |
| // loss. The result is that the container will automatically truncate the |
| // metadata file back to its correct size. |
| // We'll do this with 1, 8, and 128 extra bytes-- the first case is too few |
| // bytes to be a valid record, while the second is too few but is enough for |
| // a data length and its checksum, and the third is too long for a record. |
| // The 8- and 128-byte cases are regression tests for KUDU-2260. |
| uint64_t cur_meta_size; |
| for (const auto num_bytes : {1, 8, 128}) { |
| { |
| RWFileOptions opts; |
| opts.mode = Env::MUST_EXIST; |
| opts.is_sensitive = true; |
| unique_ptr<RWFile> file; |
| ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file)); |
| ASSERT_OK(file->Truncate(good_meta_size + num_bytes)); |
| } |
| |
| ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size)); |
| ASSERT_EQ(good_meta_size + num_bytes, cur_meta_size); |
| |
| // Reopen the metadata file. We will still see all of our blocks. The size of |
| // the metadata file will be restored back to its previous value. |
| ASSERT_OK(ReopenBlockManager()); |
| ASSERT_OK(bm_->GetAllBlockIds(&block_ids)); |
| ASSERT_EQ(4, block_ids.size()); |
| ASSERT_OK(bm_->OpenBlock(last_block_id, &block)); |
| ASSERT_OK(block->Close()); |
| |
| // Check that the file was truncated back to its previous size by the system. |
| ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size)); |
| ASSERT_EQ(good_meta_size, cur_meta_size); |
| } |
| |
| // Delete the first block we created. This necessitates writing to the |
| // metadata file of the originally-written container, since we append a |
| // delete record to the metadata. |
| { |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| bm_->NewDeletionTransaction(); |
| deletion_transaction->AddDeletedBlock(created_blocks[0]); |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| ASSERT_OK(bm_->GetAllBlockIds(&block_ids)); |
| ASSERT_EQ(3, block_ids.size()); |
| |
| ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size)); |
| good_meta_size = cur_meta_size; |
| |
| // Add a new block, increasing the size of the container metadata file. |
| { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| last_block_id = writer->id(); |
| created_blocks.push_back(last_block_id); |
| ASSERT_OK(writer->Close()); |
| } |
| ASSERT_OK(bm_->GetAllBlockIds(&block_ids)); |
| ASSERT_EQ(4, block_ids.size()); |
| ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size)); |
| ASSERT_GT(cur_meta_size, good_meta_size); |
| uint64_t prev_good_meta_size = good_meta_size; // Store previous size. |
| good_meta_size = cur_meta_size; |
| |
| // Now, truncate the metadata file so that we lose the last valid record. |
| // This will result in the loss of a block record, therefore we will observe |
| // data loss, however it will look like a failed partial write. |
| { |
| RWFileOptions opts; |
| opts.mode = Env::MUST_EXIST; |
| opts.is_sensitive = true; |
| unique_ptr<RWFile> file; |
| ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file)); |
| ASSERT_OK(file->Truncate(good_meta_size - 1)); |
| } |
| |
| // Reopen the truncated metadata file. We will not find all of our blocks. |
| ASSERT_OK(ReopenBlockManager()); |
| |
| // Because the last record was a partial record on disk, the system should |
| // have assumed that it was an incomplete write and truncated the metadata |
| // file back to the previous valid record. Let's verify that that's the case. |
| good_meta_size = prev_good_meta_size; |
| ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size)); |
| ASSERT_EQ(good_meta_size, cur_meta_size); |
| |
| ASSERT_OK(bm_->GetAllBlockIds(&block_ids)); |
| ASSERT_EQ(3, block_ids.size()); |
| Status s = bm_->OpenBlock(last_block_id, &block); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Can't find block"); |
| |
| // Add a new block, increasing the size of the container metadata file. |
| { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| last_block_id = writer->id(); |
| created_blocks.push_back(last_block_id); |
| ASSERT_OK(writer->Close()); |
| } |
| |
| ASSERT_OK(bm_->GetAllBlockIds(&block_ids)); |
| ASSERT_EQ(4, block_ids.size()); |
| ASSERT_OK(bm_->OpenBlock(last_block_id, &block)); |
| ASSERT_OK(block->Close()); |
| |
| ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size)); |
| ASSERT_GT(cur_meta_size, good_meta_size); |
| good_meta_size = cur_meta_size; |
| |
| // Ensure that we only ever created a single container. |
| ASSERT_EQ(1, bm_->all_containers_by_name_.size()); |
| ASSERT_EQ(1, bm_->available_containers_by_data_dir_.size()); |
| ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size()); |
| |
| // Find location of 2nd record in metadata file and corrupt it. |
| // This is an unrecoverable error because it's in the middle of the file. |
| unique_ptr<RandomAccessFile> meta_file; |
| RandomAccessFileOptions raf_opts; |
| raf_opts.is_sensitive = true; |
| ASSERT_OK(env_->NewRandomAccessFile(raf_opts, metadata_path, &meta_file)); |
| ReadablePBContainerFile pb_reader(std::move(meta_file)); |
| ASSERT_OK(pb_reader.Open()); |
| BlockRecordPB record; |
| ASSERT_OK(pb_reader.ReadNextPB(&record)); |
| uint64_t offset = pb_reader.offset(); |
| |
| uint64_t latest_meta_size; |
| ASSERT_OK(env_->GetFileSize(metadata_path, &latest_meta_size)); |
| ASSERT_OK(env_->NewRandomAccessFile(raf_opts, metadata_path, &meta_file)); |
| latest_meta_size -= meta_file->GetEncryptionHeaderSize(); |
| unique_ptr<uint8_t[]> scratch(new uint8_t[latest_meta_size]); |
| Slice result(scratch.get(), latest_meta_size); |
| ASSERT_OK(meta_file->Read(meta_file->GetEncryptionHeaderSize(), result)); |
| string data = result.ToString(); |
| // Flip the high bit of the length field, which is a 4-byte little endian |
| // unsigned integer. This will cause the length field to represent a large |
| // value and also cause the length checksum not to validate. |
| data[offset + 3] ^= 1 << 7; |
| unique_ptr<WritableFile> writable_file; |
| WritableFileOptions wf_opts; |
| wf_opts.is_sensitive = true; |
| ASSERT_OK(env_->NewWritableFile(wf_opts, metadata_path, &writable_file)); |
| ASSERT_OK(writable_file->Append(data)); |
| ASSERT_OK(writable_file->Close()); |
| |
| // Now try to reopen the container. |
| // This should look like a bad checksum, and it's not recoverable. |
| s = ReopenBlockManager(); |
| ASSERT_TRUE(s.IsCorruption()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum"); |
| |
| // Now truncate both the data and metadata files. |
| // This should be recoverable. See KUDU-668. |
| ASSERT_OK(env_->NewWritableFile(wf_opts, metadata_path, &writable_file)); |
| ASSERT_OK(writable_file->Close()); |
| ASSERT_OK(env_->NewWritableFile(wf_opts, data_path, &writable_file)); |
| ASSERT_OK(writable_file->Close()); |
| |
| ASSERT_OK(ReopenBlockManager()); |
| } |
| |
| // Regression test for a crash when a container's append offset exceeded its |
| // preallocation offset. |
| TEST_P(LogBlockManagerTest, TestAppendExceedsPreallocation) { |
| SetEncryptionFlags(GetParam()); |
| FLAGS_log_container_preallocate_bytes = 1; |
| |
| // Create a container, preallocate it by one byte, and append more than one. |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_OK(writer->Append("hello world")); |
| ASSERT_OK(writer->Close()); |
| |
| // On second append, don't crash just because the append offset is ahead of |
| // the preallocation offset! |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_OK(writer->Append("hello world")); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestPreallocationAndTruncation) { |
| SetEncryptionFlags(GetParam()); |
| // Ensure preallocation window is greater than the container size itself. |
| FLAGS_log_container_max_size = 1024 * 1024; |
| FLAGS_log_container_preallocate_bytes = 32 * 1024 * 1024; |
| |
| // Fill up one container. |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| unique_ptr<uint8_t[]> data(new uint8_t[FLAGS_log_container_max_size]); |
| memset(data.get(), 0, FLAGS_log_container_max_size); |
| ASSERT_OK(writer->Append({ data.get(), FLAGS_log_container_max_size } )); |
| string fname; |
| NO_FATALS(GetOnlyContainerDataFile(&fname)); |
| uint64_t size_after_append; |
| ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_append)); |
| ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size_after_append); |
| |
| // Close it. The extra preallocated space should be truncated off the file. |
| ASSERT_OK(writer->Close()); |
| uint64_t size_after_close; |
| ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_close)); |
| ASSERT_GE(size_after_close, FLAGS_log_container_max_size); |
| ASSERT_LT(size_after_close, size_after_append); |
| |
| // Now test the same startup behavior by artificially growing the file |
| // and reopening the block manager. |
| // |
| // Try preallocating in two ways: once with a change to the file size and |
| // once without. The second way serves as a proxy for XFS's speculative |
| // preallocation behavior, described in KUDU-1856. |
| for (RWFile::PreAllocateMode mode : {RWFile::CHANGE_FILE_SIZE, |
| RWFile::DONT_CHANGE_FILE_SIZE}) { |
| LOG(INFO) << "Pass " << mode; |
| unique_ptr<RWFile> data_file; |
| RWFileOptions opts; |
| opts.mode = Env::MUST_EXIST; |
| ASSERT_OK(env_->NewRWFile(opts, fname, &data_file)); |
| opts.is_sensitive = true; |
| ASSERT_OK(data_file->PreAllocate(size_after_close, size_after_close, mode)); |
| uint64_t size_after_preallocate; |
| ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_preallocate)); |
| ASSERT_EQ(size_after_close * 2, size_after_preallocate); |
| |
| if (mode == RWFile::DONT_CHANGE_FILE_SIZE) { |
| // Some older versions of ext4 (such as on el6) do not appear to truncate |
| // unwritten preallocated space that extends beyond the file size. Let's |
| // coax them by writing a single byte into that space. |
| // |
| // Note: this doesn't invalidate the usefulness of this test, as it's |
| // quite possible for us to have written a little bit of data into XFS's |
| // speculative preallocated area. |
| ASSERT_OK(data_file->Write(size_after_close, "a")); |
| } |
| |
| // Now reopen the block manager. It should notice that the container grew |
| // and truncate the extra preallocated space off again. |
| ASSERT_OK(ReopenBlockManager()); |
| uint64_t size_after_reopen; |
| ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_reopen)); |
| ASSERT_EQ(size_after_close, size_after_reopen); |
| } |
| } |
| |
| TEST_P(LogBlockManagerTest, TestContainerWithManyHoles) { |
| SetEncryptionFlags(GetParam()); |
| // This is a regression test of sorts for KUDU-1508, though it doesn't |
| // actually fail if the fix is missing; it just corrupts the filesystem. |
| |
| static unordered_map<int, int> block_size_to_last_interior_node_block_number = |
| {{1024, 168}, |
| {2048, 338}, |
| {4096, 680}}; |
| |
| const int kNumBlocks = 16 * 1024; |
| |
| uint64_t fs_block_size; |
| ASSERT_OK(env_->GetBlockSize(test_dir_, &fs_block_size)); |
| if (!ContainsKey(block_size_to_last_interior_node_block_number, |
| fs_block_size)) { |
| LOG(INFO) << Substitute("Filesystem block size is $0, skipping test", |
| fs_block_size); |
| return; |
| } |
| int last_interior_node_block_number = FindOrDie( |
| block_size_to_last_interior_node_block_number, fs_block_size); |
| |
| ASSERT_GE(kNumBlocks, last_interior_node_block_number); |
| |
| // Create a bunch of blocks. They should all go in one container (unless |
| // the container becomes full). |
| LOG(INFO) << Substitute("Creating $0 blocks", kNumBlocks); |
| vector<BlockId> ids; |
| for (int i = 0; i < kNumBlocks; i++) { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Append("aaaa")); |
| ASSERT_OK(block->Close()); |
| ids.push_back(block->id()); |
| } |
| |
| // Delete every other block. In effect, this maximizes the number of extents |
| // in the container by forcing the filesystem to alternate every hole with |
| // a live extent. |
| LOG(INFO) << "Deleting every other block"; |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| this->bm_->NewDeletionTransaction(); |
| for (int i = 0; i < ids.size(); i += 2) { |
| deletion_transaction->AddDeletedBlock(ids[i]); |
| } |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| |
| // Delete all of the blocks belonging to the interior node. If KUDU-1508 |
| // applies, this should corrupt the filesystem. |
| LOG(INFO) << Substitute("Deleting remaining blocks up to block number $0", |
| last_interior_node_block_number); |
| for (int i = 1; i < last_interior_node_block_number; i += 2) { |
| deletion_transaction->AddDeletedBlock(ids[i]); |
| } |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestParseKernelRelease) { |
| SetEncryptionFlags(GetParam()); |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("1.7.0.0.el6.x86_64")); |
| |
| // no el6 infix |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32")); |
| |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-1.0.0.el6.x86_64")); |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.33-1.0.0.el6.x86_64")); |
| |
| // Make sure it's a numeric sort, not a lexicographic one. |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-1000.0.0.el6.x86_64")); |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.100-1.0.0.el6.x86_64")); |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.10.0-1.0.0.el6.x86_64")); |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("10.0.0-1.0.0.el6.x86_64")); |
| |
| // Kernels from el6.6, el6.7: buggy |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-504.30.3.el6.x86_64")); |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-573.el6.x86_64")); |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-573.1.1.el6.x86_64")); |
| |
| // Kernel from el6.8: buggy |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.el6.x86_64")); |
| |
| // Kernels from el6.8 update stream before a fix was applied: buggy. |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.11.1.el6.x86_64")); |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.14.1.el6.x86_64")); |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.14.2.el6.x86_64")); |
| |
| // Kernels from el6.8 update stream after a fix was applied: not buggy. |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.15.1.el6.x86_64")); |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.18.1.el6.x86_64")); |
| |
| // Kernel from el6.9 development prior to fix: buggy. |
| ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-673.0.0.el6.x86_64")); |
| |
| // Kernel from el6.9 development post-fix: not buggy. |
| ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-674.0.0.el6.x86_64")); |
| } |
| |
| #ifdef NDEBUG |
| |
| // Simple micro-benchmark which creates a large number of blocks and then |
| // times the startup of the LBM. |
| // |
| // This is simplistic in several ways compared to two typical workloads: |
| // 1. minimal number of containers, each of which is entirely full |
| // without any deleted blocks. |
| // (typical workloads end up writing to several containers at once |
| // due to concurrent write operations such as multiple MM threads |
| // flushing) |
| // 2. minimal number of containers, each of which is entirely full |
| // with about --startup_benchmark_deleted_block_percentage percent |
| // deleted blocks. |
| // (typical workloads of write, alter operations, and background MM |
| // threads running a long time since last bootstrap) |
| // |
| // However it still can be used to micro-optimize the startup process. |
| class LogBlockManagerStartupBenchmarkTest: public LogBlockManagerTest {}; |
| INSTANTIATE_TEST_SUITE_P(StartupBenchmarkSuite, LogBlockManagerStartupBenchmarkTest, |
| ::testing::Values(false, true)); |
| |
| TEST_P(LogBlockManagerStartupBenchmarkTest, StartupBenchmark) { |
| bool delete_blocks = GetParam(); |
| std::vector<std::string> test_dirs; |
| for (int i = 0; i < FLAGS_startup_benchmark_data_dir_count_for_testing; ++i) { |
| test_dirs.emplace_back(test_dir_ + "/" + std::to_string(i)); |
| } |
| // Re-open block manager to place data on multiple data directories. |
| ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs, /* force= */ true)); |
| |
| // Disable preflushing since this can slow down our writes. In particular, |
| // since we write such small blocks in this test, each block will likely |
| // begin on the same 4KB page as the prior one we wrote, and due to the |
| // "stable page writes" feature, each block will thus end up waiting |
| // on the writeback of the prior one. |
| // |
| // See http://yoshinorimatsunobu.blogspot.com/2014/03/how-syncfilerange-really-works.html |
| // for details. |
| FLAGS_block_manager_preflush_control = "never"; |
| const int kNumBlocks = AllowSlowTests() ? FLAGS_startup_benchmark_block_count_for_testing : 1000; |
| |
| // Creates 'kNumBlocks' blocks with minimal data. |
| vector<BlockId> block_ids; |
| { |
| unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction(); |
| for (int i = 0; i < kNumBlocks; i++) { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK_FAST(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK_FAST(block->Append("x")); |
| ASSERT_OK_FAST(block->Finalize()); |
| block_ids.emplace_back(block->id()); |
| transaction->AddCreatedBlock(std::move(block)); |
| } |
| ASSERT_OK(transaction->CommitCreatedBlocks()); |
| } |
| |
| if (delete_blocks) { |
| std::mt19937 gen(SeedRandom()); |
| std::shuffle(block_ids.begin(), block_ids.end(), gen); |
| { |
| int to_delete_count = |
| block_ids.size() * FLAGS_startup_benchmark_deleted_block_percentage / 100; |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| this->bm_->NewDeletionTransaction(); |
| for (const BlockId& b : block_ids) { |
| deletion_transaction->AddDeletedBlock(b); |
| if (--to_delete_count <= 0) { |
| break; |
| } |
| } |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| } |
| |
| for (int i = 0; i < FLAGS_startup_benchmark_reopen_times; i++) { |
| LOG_TIMING(INFO, "reopening block manager") { |
| ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs)); |
| } |
| } |
| } |
| #endif |
| |
| TEST_P(LogBlockManagerTest, TestFailMultipleTransactionsPerContainer) { |
| SetEncryptionFlags(GetParam()); |
| // Create multiple transactions that will share a container. |
| const int kNumTransactions = 3; |
| vector<unique_ptr<BlockCreationTransaction>> block_transactions; |
| for (int i = 0; i < kNumTransactions; i++) { |
| block_transactions.emplace_back(bm_->NewCreationTransaction()); |
| } |
| |
| // Repeatedly add new blocks for the transactions. Finalizing each block |
| // makes the block's container available, allowing the same container to be |
| // reused by the next block. |
| const int kNumBlocks = 10; |
| for (int i = 0; i < kNumBlocks; i++) { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK_FAST(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK_FAST(block->Append("x")); |
| ASSERT_OK_FAST(block->Finalize()); |
| block_transactions[i % kNumTransactions]->AddCreatedBlock(std::move(block)); |
| } |
| ASSERT_EQ(1, bm_->all_containers_by_name_.size()); |
| |
| // Briefly inject an error while committing one of the transactions. This |
| // should make the container read-only, preventing the remaining transactions |
| // from proceeding. |
| { |
| google::FlagSaver saver; |
| FLAGS_crash_on_eio = false; |
| FLAGS_env_inject_eio = 1.0; |
| Status s = block_transactions[0]->CommitCreatedBlocks(); |
| ASSERT_TRUE(s.IsIOError()) << s.ToString(); |
| } |
| |
| // Now try to add some more blocks. |
| for (int i = 0; i < kNumTransactions; i++) { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK_FAST(bm_->CreateBlock(test_block_opts_, &block)); |
| |
| // The first write will fail, as the container has been marked read-only. |
| // This will leave the container unavailable and force the creation of a |
| // new container. |
| Status s = block->Append("x"); |
| if (i == 0) { |
| ASSERT_TRUE(s.IsIOError()) << s.ToString(); |
| } else { |
| ASSERT_OK_FAST(s); |
| ASSERT_OK_FAST(block->Finalize()); |
| } |
| block_transactions[i]->AddCreatedBlock(std::move(block)); |
| } |
| ASSERT_EQ(2, bm_->all_containers_by_name_.size()); |
| |
| // At this point, all of the transactions have blocks in read-only containers |
| // and, thus, will be unable to commit. |
| for (const auto& block_transaction : block_transactions) { |
| ASSERT_TRUE(block_transaction->CommitCreatedBlocks().IsIOError()); |
| } |
| } |
| |
| TEST_P(LogBlockManagerTest, TestLookupBlockLimit) { |
| SetEncryptionFlags(GetParam()); |
| int64_t limit_1024 = LogBlockManager::LookupBlockLimit(1024); |
| int64_t limit_2048 = LogBlockManager::LookupBlockLimit(2048); |
| int64_t limit_4096 = LogBlockManager::LookupBlockLimit(4096); |
| |
| // Test the floor behavior in LookupBlockLimit(). |
| for (int i = 0; i < 16384; i++) { |
| if (i < 2048) { |
| ASSERT_EQ(limit_1024, LogBlockManager::LookupBlockLimit(i)); |
| } else if (i < 4096) { |
| ASSERT_EQ(limit_2048, LogBlockManager::LookupBlockLimit(i)); |
| } else { |
| ASSERT_EQ(limit_4096, LogBlockManager::LookupBlockLimit(i)); |
| } |
| } |
| } |
| |
| TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByBlockNum) { |
| SetEncryptionFlags(GetParam()); |
| const int kNumBlocks = 1000; |
| |
| // Creates 'kNumBlocks' blocks with minimal data. |
| auto create_some_blocks = [&]() { |
| for (int i = 0; i < kNumBlocks; i++) { |
| unique_ptr<WritableBlock> block; |
| RETURN_NOT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| RETURN_NOT_OK(block->Append("aaaa")); |
| RETURN_NOT_OK(block->Close()); |
| } |
| return Status::OK(); |
| }; |
| |
| // All of these blocks should fit into one container. |
| ASSERT_OK(create_some_blocks()); |
| NO_FATALS(AssertNumContainers(1)); |
| |
| // With a limit imposed, the existing container is immediately full, and we |
| // need a few more to satisfy another 'kNumBlocks' blocks. |
| FLAGS_log_container_max_blocks = 400; |
| ASSERT_OK(ReopenBlockManager()); |
| ASSERT_OK(create_some_blocks()); |
| NO_FATALS(AssertNumContainers(4)); |
| |
| // Now remove the limit and create more blocks. They should go into existing |
| // containers, which are now no longer full. |
| FLAGS_log_container_max_blocks = -1; |
| ASSERT_OK(ReopenBlockManager()); |
| |
| ASSERT_OK(create_some_blocks()); |
| NO_FATALS(AssertNumContainers(4)); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSize) { |
| SetEncryptionFlags(GetParam()); |
| const int kNumBlocks = 1000; |
| |
| // Creates 'kNumBlocks' blocks with minimal data. |
| auto create_some_blocks = [&]() { |
| for (int i = 0; i < kNumBlocks; i++) { |
| unique_ptr<WritableBlock> block; |
| RETURN_NOT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| RETURN_NOT_OK(block->Append("aaaa")); |
| RETURN_NOT_OK(block->Close()); |
| } |
| return Status::OK(); |
| }; |
| |
| // All of these blocks should fit into one container. |
| ASSERT_OK(create_some_blocks()); |
| NO_FATALS(AssertNumContainers(1)); |
| |
| // With a limit imposed, the existing container is immediately full, and we |
| // need a few more to satisfy another metadata file size. |
| // Each CREATE type entry in metadata protobuf file is 39 bytes, so 400 of |
| // such entries added by 'create_some_blocks' will make the container full. |
| FLAGS_log_container_metadata_max_size = 400 * 39; |
| ASSERT_OK(ReopenBlockManager()); |
| ASSERT_OK(create_some_blocks()); |
| NO_FATALS(AssertNumContainers(4)); |
| |
| // Now remove the limit and create more blocks. They should go into existing |
| // containers, which are now no longer full. |
| FLAGS_log_container_metadata_max_size = 0; |
| ASSERT_OK(ReopenBlockManager()); |
| |
| ASSERT_OK(create_some_blocks()); |
| NO_FATALS(AssertNumContainers(4)); |
| } |
| |
| TEST_F(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSizeWithCompaction) { |
| const int kNumBlocks = 2000; |
| const int kNumThreads = 10; |
| const double kLiveBlockRatio = 0.1; |
| |
| // Creates and deletes some blocks. |
| auto create_and_delete_blocks = [&]() { |
| vector<BlockId> ids; |
| // Creates 'kNumBlocks' blocks. |
| for (int i = 0; i < kNumBlocks; i++) { |
| unique_ptr<WritableBlock> block; |
| RETURN_NOT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| RETURN_NOT_OK(block->Append("aaaa")); |
| RETURN_NOT_OK(block->Close()); |
| ids.push_back(block->id()); |
| } |
| |
| // Deletes 'kNumBlocks * (1 - kLiveBlockRatio)' blocks. |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| bm_->NewDeletionTransaction(); |
| for (const auto& id : ids) { |
| if (rand() % 100 < 100 * kLiveBlockRatio) { |
| continue; |
| } |
| deletion_transaction->AddDeletedBlock(id); |
| } |
| vector<BlockId> deleted; |
| RETURN_NOT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| |
| return Status::OK(); |
| }; |
| |
| // Create a thread pool to create and delete blocks. |
| unique_ptr<ThreadPool> pool; |
| ASSERT_OK(ThreadPoolBuilder("test-metadata-compact-pool") |
| .set_max_threads(kNumThreads) |
| .Build(&pool)); |
| auto mt_create_and_delete_blocks = [&]() { |
| for (int i = 0; i < kNumThreads; ++i) { |
| ASSERT_OK(pool->Submit(create_and_delete_blocks)); |
| } |
| pool->Wait(); |
| dd_manager_->WaitOnClosures(); |
| }; |
| |
| FLAGS_log_container_metadata_runtime_compact = true; |
| // Define a small value to make metadata easy to be full. |
| FLAGS_log_container_metadata_max_size = 32 * 1024; |
| NO_FATALS(mt_create_and_delete_blocks()); |
| vector<string> metadata_files; |
| NO_FATALS(GetContainerMetadataFiles(&metadata_files)); |
| for (const auto& metadata_file : metadata_files) { |
| uint64_t file_size; |
| NO_FATALS(env_->GetFileSize(metadata_file, &file_size)); |
| ASSERT_GE(FLAGS_log_container_metadata_max_size * |
| FLAGS_log_container_metadata_size_before_compact_ratio, |
| file_size); |
| } |
| |
| // Reopen and test again. |
| ASSERT_OK(ReopenBlockManager()); |
| NO_FATALS(mt_create_and_delete_blocks()); |
| NO_FATALS(GetContainerMetadataFiles(&metadata_files)); |
| for (const auto& metadata_file : metadata_files) { |
| uint64_t file_size; |
| NO_FATALS(env_->GetFileSize(metadata_file, &file_size)); |
| ASSERT_GE(FLAGS_log_container_metadata_max_size * |
| FLAGS_log_container_metadata_size_before_compact_ratio, |
| file_size); |
| } |
| |
| // Now remove the limit and create more blocks. They should go into existing |
| // containers, which are now no longer full. |
| FLAGS_log_container_metadata_runtime_compact = false; |
| ASSERT_OK(ReopenBlockManager()); |
| NO_FATALS(mt_create_and_delete_blocks()); |
| NO_FATALS(GetContainerMetadataFiles(&metadata_files)); |
| bool exist_larger_one = false; |
| for (const auto& metadata_file : metadata_files) { |
| uint64_t file_size; |
| NO_FATALS(env_->GetFileSize(metadata_file, &file_size)); |
| if (file_size > FLAGS_log_container_metadata_max_size * |
| FLAGS_log_container_metadata_size_before_compact_ratio) { |
| exist_larger_one = true; |
| break; |
| } |
| } |
| ASSERT_TRUE(exist_larger_one); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) { |
| SetEncryptionFlags(GetParam()); |
| FLAGS_log_container_preallocate_bytes = 0; |
| const int kNumBlocks = 100; |
| |
| // Create one container. |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Close()); |
| string container_name; |
| NO_FATALS(GetOnlyContainer(&container_name)); |
| |
| // Add a mixture of regular and misaligned blocks to it. |
| LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom()); |
| ASSERT_OK(corruptor.Init()); |
| int num_misaligned_blocks = 0; |
| for (int i = 0; i < kNumBlocks; i++) { |
| if (rand() % 2) { |
| ASSERT_OK(corruptor.AddMisalignedBlockToContainer()); |
| |
| // Need to reopen the block manager after each corruption because the |
| // container metadata writers do not expect the metadata files to have |
| // been changed underneath them. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| num_misaligned_blocks++; |
| } else { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| |
| // Append at least once to ensure that the data file grows. |
| // |
| // The LBM considers the last record of a container to be malformed if |
| // it's zero-length and if the file hasn't grown enough to catch up it. |
| // This combination (zero-length block at the end of a full container |
| // without any remaining preallocated space) is nearly impossible in real |
| // life, so we avoid it in testing too. |
| int num_appends = (rand() % 8) + 1; |
| uint64_t raw_block_id = block->id().id(); |
| Slice s(reinterpret_cast<const uint8_t*>(&raw_block_id), |
| sizeof(raw_block_id)); |
| for (int j = 0; j < num_appends; j++) { |
| // The corruptor writes the block ID repeatedly into each misaligned |
| // block, so we'll make our regular blocks do the same thing. |
| ASSERT_OK(block->Append(s)); |
| } |
| ASSERT_OK(block->Close()); |
| } |
| } |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()) << report.ToString(); |
| ASSERT_EQ(num_misaligned_blocks, report.misaligned_block_check->entries.size()); |
| for (const auto& mb : report.misaligned_block_check->entries) { |
| ASSERT_EQ(container_name, mb.container); |
| } |
| |
| // Delete about half of them, chosen randomly. |
| vector<BlockId> block_ids; |
| { |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| this->bm_->NewDeletionTransaction(); |
| ASSERT_OK(bm_->GetAllBlockIds(&block_ids)); |
| for (const auto& id : block_ids) { |
| if (rand() % 2) { |
| deletion_transaction->AddDeletedBlock(id); |
| } |
| } |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| |
| // Wait for the block manager to punch out all of the holes. It's easiest to |
| // do this by reopening it; shutdown will wait for outstanding hole punches. |
| // |
| // On reopen, some misaligned blocks should be gone from the report. |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| ASSERT_GT(report.misaligned_block_check->entries.size(), 0); |
| ASSERT_LT(report.misaligned_block_check->entries.size(), num_misaligned_blocks); |
| for (const auto& mb : report.misaligned_block_check->entries) { |
| ASSERT_EQ(container_name, mb.container); |
| } |
| |
| // Read and verify the contents of each remaining block. |
| ASSERT_OK(bm_->GetAllBlockIds(&block_ids)); |
| for (const auto& id : block_ids) { |
| uint64_t raw_block_id = id.id(); |
| unique_ptr<ReadableBlock> b; |
| ASSERT_OK(bm_->OpenBlock(id, &b)); |
| uint64_t size; |
| ASSERT_OK(b->Size(&size)); |
| ASSERT_EQ(0, size % sizeof(raw_block_id)); |
| uint8_t buf[size]; |
| ASSERT_OK(b->Read(0, Slice(buf, size))); |
| for (int i = 0; i < size; i += sizeof(raw_block_id)) { |
| ASSERT_EQ(raw_block_id, *reinterpret_cast<uint64_t*>(buf + i)); |
| } |
| ASSERT_OK(b->Close()); |
| } |
| } |
| |
| TEST_P(LogBlockManagerTest, TestRepairPreallocateExcessSpace) { |
| SetEncryptionFlags(GetParam()); |
| // Enforce that the container's actual size is strictly upper-bounded by the |
| // calculated size so we can more easily trigger repairs. |
| FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0; |
| |
| // Disable preallocation so we can more easily control it. |
| FLAGS_log_container_preallocate_bytes = 0; |
| |
| // Make it easy to create a full container. |
| FLAGS_log_container_max_size = 1; |
| |
| const int kNumContainers = 10; |
| |
| // Create several full containers. |
| { |
| unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction(); |
| for (int i = 0; i < kNumContainers; i++) { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Append("a")); |
| transaction->AddCreatedBlock(std::move(block)); |
| } |
| ASSERT_OK(transaction->CommitCreatedBlocks()); |
| } |
| vector<string> container_names; |
| NO_FATALS(GetContainerNames(&container_names)); |
| |
| // Corrupt one container. |
| LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom()); |
| ASSERT_OK(corruptor.Init()); |
| ASSERT_OK(corruptor.PreallocateFullContainer()); |
| |
| // Check the report. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| ASSERT_EQ(1, report.full_container_space_check->entries.size()); |
| const LBMFullContainerSpaceCheck::Entry& fcs = |
| report.full_container_space_check->entries[0]; |
| unordered_set<string> container_name_set(container_names.begin(), |
| container_names.end()); |
| ASSERT_TRUE(ContainsKey(container_name_set, fcs.container)); |
| ASSERT_GT(fcs.excess_bytes, 0); |
| ASSERT_TRUE(fcs.repaired); |
| report.full_container_space_check->entries.clear(); |
| NO_FATALS(AssertEmptyReport(report)); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) { |
| SetEncryptionFlags(GetParam()); |
| const int kNumBlocks = 100; |
| |
| // Enforce that the container's actual size is strictly upper-bounded by the |
| // calculated size so we can more easily trigger repairs. |
| FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0; |
| |
| // Force our single container to become full once created. |
| FLAGS_log_container_max_size = GetParam() ? 4096 : 0; |
| |
| // Force the test to measure extra space in unpunched holes, not in the |
| // preallocation buffer. |
| FLAGS_log_container_preallocate_bytes = 0; |
| |
| // Create one container. |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Close()); |
| string data_file; |
| NO_FATALS(GetOnlyContainerDataFile(&data_file)); |
| uint64_t initial_file_size_on_disk; |
| ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &initial_file_size_on_disk)); |
| |
| // Add some "unpunched blocks" to the container. |
| LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom()); |
| ASSERT_OK(corruptor.Init()); |
| for (int i = 0; i < kNumBlocks; i++) { |
| ASSERT_OK(corruptor.AddUnpunchedBlockToFullContainer()); |
| } |
| |
| uint64_t file_size_on_disk; |
| ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk)); |
| ASSERT_GT(file_size_on_disk, initial_file_size_on_disk); |
| |
| // Check the report. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| ASSERT_EQ(1, report.full_container_space_check->entries.size()); |
| const LBMFullContainerSpaceCheck::Entry& fcs = |
| report.full_container_space_check->entries[0]; |
| string container; |
| NO_FATALS(GetOnlyContainer(&container)); |
| ASSERT_EQ(container, fcs.container); |
| ASSERT_EQ(file_size_on_disk, fcs.excess_bytes + initial_file_size_on_disk); |
| ASSERT_TRUE(fcs.repaired); |
| report.full_container_space_check->entries.clear(); |
| NO_FATALS(AssertEmptyReport(report)); |
| |
| // Wait for the block manager to punch out all of the holes (done as part of |
| // repair at startup). It's easiest to do this by reopening it; shutdown will |
| // wait for outstanding hole punches. |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| NO_FATALS(AssertEmptyReport(report)); |
| |
| // File size should be 0 post-repair. |
| ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk)); |
| ASSERT_EQ(initial_file_size_on_disk, file_size_on_disk); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestRepairIncompleteContainer) { |
| SetEncryptionFlags(GetParam()); |
| const int kNumContainers = 20; |
| |
| // Create some incomplete containers. The corruptor will select between |
| // several variants of "incompleteness" at random (see |
| // LBMCorruptor::CreateIncompleteContainer() for details). |
| LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom()); |
| ASSERT_OK(corruptor.Init()); |
| for (int i = 0; i < kNumContainers; i++) { |
| ASSERT_OK(corruptor.CreateIncompleteContainer()); |
| } |
| vector<string> container_names; |
| NO_FATALS(GetContainerNames(&container_names)); |
| ASSERT_EQ(kNumContainers, container_names.size()); |
| |
| // Check the report. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| ASSERT_EQ(kNumContainers, report.incomplete_container_check->entries.size()); |
| unordered_set<string> container_name_set(container_names.begin(), |
| container_names.end()); |
| for (const auto& ic : report.incomplete_container_check->entries) { |
| ASSERT_TRUE(ContainsKey(container_name_set, ic.container)); |
| ASSERT_TRUE(ic.repaired); |
| } |
| report.incomplete_container_check->entries.clear(); |
| NO_FATALS(AssertEmptyReport(report)); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) { |
| SetEncryptionFlags(GetParam()); |
| const int kNumRecords = 50; |
| |
| // Create one container. |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Append("a")); |
| ASSERT_OK(block->Close()); |
| string container_name; |
| NO_FATALS(GetOnlyContainer(&container_name)); |
| |
| // Add some malformed records. The corruptor will select between |
| // several variants of "malformedness" at random (see |
| // LBMCorruptor::AddMalformedRecordToContainer for details). |
| LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom()); |
| ASSERT_OK(corruptor.Init()); |
| for (int i = 0; i < kNumRecords; i++) { |
| ASSERT_OK(corruptor.AddMalformedRecordToContainer()); |
| } |
| |
| // Check the report. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_TRUE(report.HasFatalErrors()); |
| ASSERT_EQ(kNumRecords, report.malformed_record_check->entries.size()); |
| for (const auto& mr : report.malformed_record_check->entries) { |
| ASSERT_EQ(container_name, mr.container); |
| } |
| report.malformed_record_check->entries.clear(); |
| NO_FATALS(AssertEmptyReport(report)); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) { |
| SetEncryptionFlags(GetParam()); |
| const int kNumBlocks = 50; |
| |
| // Create one container. |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Append("a")); |
| ASSERT_OK(block->Close()); |
| string container_name; |
| NO_FATALS(GetOnlyContainer(&container_name)); |
| |
| // Add some misaligned blocks. |
| LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom()); |
| ASSERT_OK(corruptor.Init()); |
| for (int i = 0; i < kNumBlocks; i++) { |
| ASSERT_OK(corruptor.AddMisalignedBlockToContainer()); |
| } |
| |
| // Check the report. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| ASSERT_EQ(kNumBlocks, report.misaligned_block_check->entries.size()); |
| uint64_t fs_block_size; |
| ASSERT_OK(env_->GetBlockSize(test_dir_, &fs_block_size)); |
| for (const auto& mb : report.misaligned_block_check->entries) { |
| ASSERT_EQ(container_name, mb.container); |
| } |
| report.misaligned_block_check->entries.clear(); |
| NO_FATALS(AssertEmptyReport(report)); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestRepairPartialRecords) { |
| SetEncryptionFlags(GetParam()); |
| const int kNumContainers = 50; |
| const int kNumRecords = 10; |
| |
| // Create some containers. |
| { |
| unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction(); |
| for (int i = 0; i < kNumContainers; i++) { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Append("a")); |
| transaction->AddCreatedBlock(std::move(block)); |
| } |
| } |
| vector<string> container_names; |
| NO_FATALS(GetContainerNames(&container_names)); |
| ASSERT_EQ(kNumContainers, container_names.size()); |
| |
| // Add some partial records. |
| LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom()); |
| ASSERT_OK(corruptor.Init()); |
| for (int i = 0; i < kNumRecords; i++) { |
| ASSERT_OK(corruptor.AddPartialRecordToContainer()); |
| } |
| |
| // Check the report. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| ASSERT_EQ(kNumRecords, report.partial_record_check->entries.size()); |
| unordered_set<string> container_name_set(container_names.begin(), |
| container_names.end()); |
| for (const auto& pr : report.partial_record_check->entries) { |
| ASSERT_TRUE(ContainsKey(container_name_set, pr.container)); |
| ASSERT_GT(pr.offset, 0); |
| ASSERT_TRUE(pr.repaired); |
| } |
| report.partial_record_check->entries.clear(); |
| NO_FATALS(AssertEmptyReport(report)); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) { |
| SetEncryptionFlags(GetParam()); |
| // Force our single container to become full once created. |
| FLAGS_log_container_max_size = 0; |
| |
| // Create one container. |
| BlockId block_id; |
| { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Append("a")); |
| ASSERT_OK(block->Close()); |
| block_id = block->id(); |
| } |
| string data_file_name; |
| string metadata_file_name; |
| NO_FATALS(GetOnlyContainerDataFile(&data_file_name)); |
| NO_FATALS(GetOnlyContainerDataFile(&metadata_file_name)); |
| |
| // Reopen the block manager. The container files should still be there. |
| ASSERT_OK(ReopenBlockManager()); |
| ASSERT_TRUE(env_->FileExists(data_file_name)); |
| ASSERT_TRUE(env_->FileExists(metadata_file_name)); |
| |
| // Delete the one block and reopen it again. The container files should have |
| // been deleted. |
| { |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| this->bm_->NewDeletionTransaction(); |
| deletion_transaction->AddDeletedBlock(block_id); |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| ASSERT_OK(ReopenBlockManager()); |
| ASSERT_FALSE(env_->FileExists(data_file_name)); |
| ASSERT_FALSE(env_->FileExists(metadata_file_name)); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) { |
| SetEncryptionFlags(GetParam()); |
| // With this ratio, the metadata of a full container comprised of half dead |
| // blocks will be compacted at startup. |
| FLAGS_log_container_live_metadata_before_compact_ratio = 0.50; |
| |
| // Set an easy-to-test upper bound on container size. |
| FLAGS_log_container_max_blocks = 10; |
| |
| // Create one full container and store the initial size of its metadata file. |
| vector<BlockId> block_ids; |
| for (int i = 0; i < FLAGS_log_container_max_blocks; i++) { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Append("a")); |
| ASSERT_OK(block->Close()); |
| block_ids.emplace_back(block->id()); |
| } |
| string metadata_file_name; |
| NO_FATALS(GetOnlyContainerMetadataFile(&metadata_file_name)); |
| uint64_t pre_compaction_file_size; |
| ASSERT_OK(env_->GetFileSize(metadata_file_name, &pre_compaction_file_size)); |
| |
| // Delete a block and reopen the block manager. Eventually, the container's |
| // metadata file should get compacted at startup (we look for this by testing |
| // its file size). |
| uint64_t post_compaction_file_size; |
| int64_t last_live_aligned_bytes; |
| int num_blocks_deleted = 0; |
| for (const auto& id : block_ids) { |
| { |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| bm_->NewDeletionTransaction(); |
| deletion_transaction->AddDeletedBlock(id); |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| num_blocks_deleted++; |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| last_live_aligned_bytes = report.stats.live_block_bytes_aligned; |
| |
| ASSERT_OK(env_->GetFileSize(metadata_file_name, &post_compaction_file_size)); |
| if (post_compaction_file_size < pre_compaction_file_size) { |
| break; |
| } |
| } |
| |
| // We should be able to anticipate precisely when the compaction occurred. |
| ASSERT_EQ(FLAGS_log_container_max_blocks * |
| FLAGS_log_container_live_metadata_before_compact_ratio, |
| num_blocks_deleted); |
| |
| // The "gap" in the compacted container's block records (corresponding to |
| // dead blocks that were removed) shouldn't affect the number of live bytes |
| // post-alignment. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_EQ(last_live_aligned_bytes, report.stats.live_block_bytes_aligned); |
| } |
| |
| // Regression test for a bug in which, after a metadata file was compacted, |
| // we would not properly handle appending to the new (post-compaction) metadata. |
| // |
| // The bug was related to a stale file descriptor left in the file_cache, so |
| // this test explicitly targets that scenario. |
| TEST_P(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) { |
| SetEncryptionFlags(GetParam()); |
| // Compact aggressively. |
| FLAGS_log_container_live_metadata_before_compact_ratio = 0.99; |
| // Use a single shard so that we have an accurate max cache capacity |
| // regardless of the number of cores on the machine. |
| FLAGS_cache_force_single_shard = true; |
| // Use very small containers, so that we generate a lot of them (and thus |
| // consume a lot of file descriptors). |
| FLAGS_log_container_max_blocks = 4; |
| // Reopen so the flags take effect. |
| ASSERT_OK(ReopenBlockManager()); |
| |
| // Create many container with a bunch of blocks, half of which are deleted. |
| vector<BlockId> block_ids; |
| { |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| this->bm_->NewDeletionTransaction(); |
| for (int i = 0; i < 1000; i++) { |
| unique_ptr<WritableBlock> block; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block)); |
| ASSERT_OK(block->Close()); |
| if (i % 2 == 1) { |
| deletion_transaction->AddDeletedBlock(block->id()); |
| } else { |
| block_ids.emplace_back(block->id()); |
| } |
| } |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| |
| // Reopen the block manager. This will cause it to compact all of the metadata |
| // files, since we've deleted half the blocks in every container and the |
| // threshold is set high above. |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| |
| // Delete the remaining blocks in a random order. This will append to metadata |
| // files which have just been compacted. Since we have more metadata files than |
| // we have file_cache capacity, this will also generate a mix of cache hits, |
| // misses, and re-insertions. |
| std::mt19937 gen(SeedRandom()); |
| std::shuffle(block_ids.begin(), block_ids.end(), gen); |
| { |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| this->bm_->NewDeletionTransaction(); |
| for (const BlockId &b : block_ids) { |
| deletion_transaction->AddDeletedBlock(b); |
| } |
| vector<BlockId> deleted; |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| } |
| |
| // Reopen to make sure that the metadata can be properly loaded and |
| // that the resulting block manager is empty. |
| ASSERT_OK(ReopenBlockManager(nullptr, &report)); |
| ASSERT_EQ(0, report.stats.live_block_count); |
| ASSERT_EQ(0, report.stats.live_block_bytes_aligned); |
| } |
| |
| // Test to ensure that if a directory cannot be read from, its startup process |
| // will run smoothly. The directory manager will note the failed directories |
| // and only healthy ones are reported. |
| TEST_P(LogBlockManagerTest, TestOpenWithFailedDirectories) { |
| SetEncryptionFlags(GetParam()); |
| // Initialize a new directory manager with multiple directories. |
| bm_.reset(); |
| vector<string> test_dirs; |
| const int kNumDirs = 5; |
| for (int i = 0; i < kNumDirs; i++) { |
| string dir = GetTestPath(Substitute("test_dir_$0", i)); |
| ASSERT_OK(env_->CreateDir(dir)); |
| test_dirs.emplace_back(std::move(dir)); |
| } |
| ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_dirs, |
| DataDirManagerOptions(), &dd_manager_)); |
| |
| // Open the directory manager successfully. |
| ASSERT_OK(DataDirManager::OpenExistingForTests(env_, test_dirs, |
| DataDirManagerOptions(), &dd_manager_)); |
| |
| // Wire in a callback to fail data directories. |
| error_manager_.SetErrorNotificationCb( |
| ErrorHandlerType::DISK_ERROR, [this](const string& uuid) { |
| this->dd_manager_->MarkDirFailedByUuid(uuid); |
| }); |
| bm_.reset(CreateBlockManager(nullptr)); |
| |
| // Fail one of the directories, chosen randomly. |
| FLAGS_crash_on_eio = false; |
| FLAGS_env_inject_eio = 1; |
| int failed_idx = Random(SeedRandom()).Next() % kNumDirs; |
| FLAGS_env_inject_eio_globs = JoinPathSegments(test_dirs[failed_idx], "**"); |
| |
| // Check the report, ensuring the correct directory has failed. |
| FsReport report; |
| ASSERT_OK(bm_->Open(&report)); |
| ASSERT_EQ(kNumDirs - 1, report.data_dirs.size()); |
| for (const string& data_dir : report.data_dirs) { |
| ASSERT_NE(data_dir, test_dirs[failed_idx]); |
| } |
| const set<int>& failed_dirs = dd_manager_->GetFailedDirs(); |
| ASSERT_EQ(1, failed_dirs.size()); |
| |
| int uuid_idx; |
| dd_manager_->FindUuidIndexByRoot(test_dirs[failed_idx], &uuid_idx); |
| ASSERT_TRUE(ContainsKey(failed_dirs, uuid_idx)); |
| } |
| |
| // Test Close() a FINALIZED block. Including, |
| // 1) a container can be reused when the block is finalized. |
| // 2) the block cannot be opened/found until close it. |
| // 3) the same container is not marked as available twice. |
| TEST_P(LogBlockManagerTest, TestFinalizeBlock) { |
| SetEncryptionFlags(GetParam()); |
| // Create 4 blocks. |
| vector<unique_ptr<WritableBlock>> blocks; |
| for (int i = 0; i < 4; i++) { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_OK(writer->Append("test data")); |
| ASSERT_OK(writer->Finalize()); |
| blocks.emplace_back(std::move(writer)); |
| } |
| ASSERT_EQ(1, bm_->all_containers_by_name_.size()); |
| |
| for (const auto& block : blocks) { |
| // Open the block and verify they cannot be found. |
| ASSERT_TRUE(bm_->OpenBlock(block->id(), nullptr).IsNotFound()); |
| ASSERT_OK(block->Close()); |
| } |
| |
| ASSERT_EQ(1, bm_->all_containers_by_name_.size()); |
| // Ensure the same container has not been marked as available twice. |
| ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size()); |
| } |
| |
| // Test available log container selection is LIFO. |
| TEST_P(LogBlockManagerTest, TestLIFOContainerSelection) { |
| SetEncryptionFlags(GetParam()); |
| // Create 4 blocks and 4 opened containers that are not full. |
| vector<unique_ptr<WritableBlock>> blocks; |
| for (int i = 0; i < 4; i++) { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| writer->Append("test data"); |
| blocks.emplace_back(std::move(writer)); |
| } |
| for (const auto& block : blocks) { |
| ASSERT_OK(block->Close()); |
| } |
| ASSERT_EQ(4, bm_->all_containers_by_name_.size()); |
| |
| blocks.clear(); |
| // Create some other blocks, and finalize each block after write. |
| // The first available container in the queue will be reused every time. |
| internal::LogBlockContainer* container = |
| bm_->available_containers_by_data_dir_.begin()->second.front().get(); |
| for (int i = 0; i < 4; i++) { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| writer->Append("test data"); |
| ASSERT_OK(writer->Finalize()); |
| // After finalizing the written block, the used container will be |
| // available again and can be reused for the following created block. |
| ASSERT_EQ(container, |
| bm_->available_containers_by_data_dir_.begin()->second.front().get()); |
| blocks.emplace_back(std::move(writer)); |
| } |
| for (const auto& block : blocks) { |
| ASSERT_OK(block->Close()); |
| } |
| ASSERT_EQ(4, bm_->all_containers_by_name_.size()); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestAbortBlock) { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_OK(writer->Append("test data")); |
| ASSERT_OK(writer->Abort()); |
| // Ensures the container is available after block's Abort(). |
| ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size()); |
| |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| ASSERT_OK(writer->Append("test data")); |
| ASSERT_OK(writer->Finalize()); |
| ASSERT_OK(writer->Abort()); |
| // Ensures the container is available after block's Abort(). |
| ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size()); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestDeleteDeadContainersByDeletionTransaction) { |
| SetEncryptionFlags(GetParam()); |
| const auto TestProcess = [&] (int block_num) { |
| ASSERT_GT(block_num, 0); |
| MetricRegistry registry; |
| scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate( |
| ®istry, Substitute("test-$0", block_num)); |
| |
| ASSERT_OK(ReopenBlockManager(entity)); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {0, &METRIC_log_block_manager_blocks_under_management}, |
| {0, &METRIC_log_block_manager_containers}, |
| {0, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| // Create a bunch of blocks -> one container. |
| vector<BlockId> blocks; |
| for (int i = 0; i < block_num - 1; ++i) { |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| blocks.emplace_back(writer->id()); |
| ASSERT_OK(writer->Finalize()); |
| ASSERT_OK(writer->Close()); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {i + 1, &METRIC_log_block_manager_blocks_under_management}, |
| {1, &METRIC_log_block_manager_containers}, |
| {0, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| } |
| { |
| // The last block makes a full container. |
| FLAGS_log_container_max_size = GetParam() ? 4097 : 1; |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| blocks.emplace_back(writer->id()); |
| ASSERT_OK(writer->Append("a")); |
| ASSERT_OK(writer->Finalize()); |
| ASSERT_OK(writer->Close()); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {1, &METRIC_log_block_manager_bytes_under_management}, |
| {block_num, &METRIC_log_block_manager_blocks_under_management}, |
| {1, &METRIC_log_block_manager_containers}, |
| {1, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {0, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| } |
| ASSERT_EQ(block_num, blocks.size()); |
| |
| // Check the container files. |
| string data_file_name; |
| string metadata_file_name; |
| NO_FATALS(GetOnlyContainerDataFile(&data_file_name)); |
| NO_FATALS(GetOnlyContainerMetadataFile(&metadata_file_name)); |
| |
| // Open the last block for reading. |
| unique_ptr<ReadableBlock> reader; |
| ASSERT_OK(bm_->OpenBlock(blocks[block_num-1], &reader)); |
| uint64_t size; |
| ASSERT_OK(reader->Size(&size)); |
| ASSERT_EQ(1, size); |
| |
| // Delete all of the blocks, which makes a dead container. |
| { |
| vector<BlockId> deleted; |
| shared_ptr<BlockDeletionTransaction> deletion_transaction = |
| this->bm_->NewDeletionTransaction(); |
| for (const auto& block : blocks) { |
| deletion_transaction->AddDeletedBlock(block); |
| } |
| ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); |
| ASSERT_EQ(block_num, deleted.size()); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {0, &METRIC_log_block_manager_blocks_under_management}, |
| {1, &METRIC_log_block_manager_containers}, |
| {1, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {block_num, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| } |
| // The container is still alive, because there is a opened block previously. |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {0, &METRIC_log_block_manager_blocks_under_management}, |
| {1, &METRIC_log_block_manager_containers}, |
| {1, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {block_num, &METRIC_block_manager_total_blocks_deleted}, |
| {0, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| // After the reader is closed, the container is actually deleted. |
| reader->Close(); |
| NO_FATALS(CheckLogMetrics(entity, |
| { {0, &METRIC_log_block_manager_bytes_under_management}, |
| {0, &METRIC_log_block_manager_blocks_under_management}, |
| {0, &METRIC_log_block_manager_containers}, |
| {0, &METRIC_log_block_manager_full_containers} }, |
| { {0, &METRIC_log_block_manager_holes_punched}, |
| {block_num, &METRIC_block_manager_total_blocks_deleted}, |
| {1, &METRIC_log_block_manager_dead_containers_deleted} })); |
| |
| // The container files should have been deleted. |
| ASSERT_FALSE(env_->FileExists(data_file_name)); |
| ASSERT_FALSE(env_->FileExists(metadata_file_name)); |
| }; |
| |
| for (int i = 1; i < 4; ++i) { |
| NO_FATALS(TestProcess(i)); |
| } |
| } |
| |
| // Test for KUDU-2665 to ensure that once the container is full and has no live |
| // blocks but with a reference by WritableBlock, it will not be deleted. |
| TEST_P(LogBlockManagerTest, TestDoNotDeleteFakeDeadContainer) { |
| SetEncryptionFlags(GetParam()); |
| // Lower the max container size. |
| FLAGS_log_container_max_size = 64 * 1024; |
| |
| const auto Process = [&] (bool close_block) { |
| // Create a bunch of blocks on the same container. |
| vector<BlockId> blocks; |
| for (int i = 0; i < 10; ++i) { |
| unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction(); |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| blocks.emplace_back(writer->id()); |
| ASSERT_OK(writer->Append("a")); |
| ASSERT_OK(writer->Finalize()); |
| transaction->AddCreatedBlock(std::move(writer)); |
| ASSERT_OK(transaction->CommitCreatedBlocks()); |
| } |
| |
| // Create a special block. |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| BlockId block_id = writer->id(); |
| unique_ptr<uint8_t[]> data(new uint8_t[FLAGS_log_container_max_size]); |
| ASSERT_OK(writer->Append({ data.get(), FLAGS_log_container_max_size })); |
| ASSERT_OK(writer->Finalize()); |
| // Do not close and reset the writer. |
| // Now the container is full and has no live blocks. |
| |
| // Delete the bunch of blocks. |
| { |
| vector<BlockId> deleted; |
| shared_ptr<BlockDeletionTransaction> transaction = bm_->NewDeletionTransaction(); |
| for (const auto& e : blocks) { |
| transaction->AddDeletedBlock(e); |
| } |
| ASSERT_OK(transaction->CommitDeletedBlocks(&deleted)); |
| transaction.reset(); |
| for (const auto& data_dir : dd_manager_->dirs()) { |
| data_dir->WaitOnClosures(); |
| } |
| } |
| |
| // Close and reset the writer. |
| // It's going to test Abort() when 'close_block' is false. |
| if (close_block) { |
| ASSERT_OK(writer->Close()); |
| } |
| writer.reset(); |
| |
| // Open the special block after restart. |
| ASSERT_OK(ReopenBlockManager()); |
| unique_ptr<ReadableBlock> block; |
| if (close_block) { |
| ASSERT_OK(bm_->OpenBlock(block_id, &block)); |
| } else { |
| ASSERT_TRUE(bm_->OpenBlock(block_id, &block).IsNotFound()); |
| } |
| }; |
| |
| Process(true); |
| Process(false); |
| } |
| |
| TEST_P(LogBlockManagerTest, TestHalfPresentContainer) { |
| SetEncryptionFlags(GetParam()); |
| BlockId block_id; |
| string data_file_name; |
| string metadata_file_name; |
| MetricRegistry registry; |
| scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(®istry, "test"); |
| |
| const auto CreateContainer = [&] (bool create_block = false) { |
| ASSERT_OK(ReopenBlockManager(entity)); |
| unique_ptr<WritableBlock> writer; |
| ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer)); |
| block_id = writer->id(); |
| if (create_block) { |
| ASSERT_OK(writer->Append("a")); |
| } |
| ASSERT_OK(writer->Finalize()); |
| ASSERT_OK(writer->Close()); |
| NO_FATALS(GetOnlyContainerDataFile(&data_file_name)); |
| NO_FATALS(GetOnlyContainerMetadataFile(&metadata_file_name)); |
| }; |
| |
| const auto CreateMetadataFile = [&] () { |
| // We're often recreating an existing file, so we must invalidate any |
| // entry in the file cache first. |
| file_cache_.Invalidate(metadata_file_name); |
| |
| unique_ptr<WritableFile> metadata_file_writer; |
| WritableFileOptions opts; |
| opts.is_sensitive = true; |
| ASSERT_OK(env_->NewWritableFile(opts, metadata_file_name, &metadata_file_writer)); |
| ASSERT_OK(metadata_file_writer->Append(Slice("a"))); |
| metadata_file_writer->Close(); |
| }; |
| |
| const auto CreateDataFile = [&] () { |
| // We're often recreating an existing file, so we must invalidate any |
| // entry in the file cache first. |
| file_cache_.Invalidate(data_file_name); |
| |
| unique_ptr<WritableFile> data_file_writer; |
| WritableFileOptions opts; |
| opts.is_sensitive = true; |
| ASSERT_OK(env_->NewWritableFile(opts, data_file_name, &data_file_writer)); |
| data_file_writer->Close(); |
| }; |
| |
| const auto DeleteBlock = [&] () { |
| vector<BlockId> deleted; |
| shared_ptr<BlockDeletionTransaction> transaction = bm_->NewDeletionTransaction(); |
| transaction->AddDeletedBlock(block_id); |
| ASSERT_OK(transaction->CommitDeletedBlocks(&deleted)); |
| transaction.reset(); |
| for (const auto& data_dir : dd_manager_->dirs()) { |
| data_dir->WaitOnClosures(); |
| } |
| }; |
| |
| const auto CheckOK = [&] () { |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(entity, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| NO_FATALS(AssertEmptyReport(report)); |
| }; |
| |
| const auto CheckFailed = [&] (const Status& expect) { |
| Status s = ReopenBlockManager(entity); |
| ASSERT_EQ(s.CodeAsString(), expect.CodeAsString()); |
| }; |
| |
| const auto CheckRepaired = [&] () { |
| FsReport report; |
| ASSERT_OK(ReopenBlockManager(entity, &report)); |
| ASSERT_FALSE(report.HasFatalErrors()); |
| ASSERT_EQ(1, report.incomplete_container_check->entries.size()); |
| report.incomplete_container_check->entries.clear(); |
| NO_FATALS(AssertEmptyReport(report)); |
| }; |
| |
| // Case1: the metadata file has gone missing and |
| // the size of the existing data file is 0. |
| { |
| // Create a container. |
| NO_FATALS(CreateContainer()); |
| |
| // Delete the metadata file. |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| |
| // The container has been repaired. |
| NO_FATALS(CheckRepaired()); |
| } |
| |
| // Case2: the metadata file has gone missing and |
| // the size of the existing data file is >0. |
| { |
| // Create a container. |
| NO_FATALS(CreateContainer(true)); |
| |
| // Delete the metadata file. |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| |
| // The metadata file has gone missing. |
| NO_FATALS(CheckFailed(Status::NotFound(""))); |
| |
| // Delete the data file to keep path clean. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| } |
| |
| // Case3: the size of the existing metadata file is <MIN and |
| // the data file has gone missing. |
| { |
| // Create a container. |
| NO_FATALS(CreateContainer()); |
| |
| // Delete the data file&metadata file, and keep the path. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| |
| // Create a metadata file whose size is <MIN. |
| NO_FATALS(CreateMetadataFile()); |
| |
| // The container has been repaired. |
| NO_FATALS(CheckRepaired()); |
| } |
| |
| // Case4: the size of the existing metadata file is <MIN and |
| // the size of the existing data file is 0. |
| { |
| // Create a container. |
| NO_FATALS(CreateContainer()); |
| |
| // Delete the metadata file. |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| |
| // Create a metadata file whose size is <MIN. |
| NO_FATALS(CreateMetadataFile()); |
| |
| // The container has been repaired. |
| NO_FATALS(CheckRepaired()); |
| } |
| |
| // Case5: the size of the existing metadata file is <MIN and |
| // the size of the existing data file is >0. |
| { |
| // Create a container. |
| NO_FATALS(CreateContainer(true)); |
| |
| // Delete the metadata file. |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| |
| // Create a metadata file whose size is <MIN. |
| NO_FATALS(CreateMetadataFile()); |
| |
| // Check passed, but open metadata file failed at last. |
| NO_FATALS(CheckFailed(Status::Incomplete(""))); |
| |
| // Delete the data file and metadata file to keep path clean. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| } |
| |
| // Case6: the existing metadata file has no live blocks and |
| // the data file has gone missing. |
| { |
| NO_FATALS(CreateContainer(true)); |
| |
| // Delete the only block. |
| NO_FATALS(DeleteBlock()); |
| |
| // Delete the data file. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| |
| // The container has been repaired. |
| NO_FATALS(CheckRepaired()); |
| } |
| |
| // Case7: the existing metadata file has no live blocks and |
| // the size of the existing data file is 0. |
| { |
| NO_FATALS(CreateContainer(true)); |
| |
| // Delete the only block. |
| NO_FATALS(DeleteBlock()); |
| |
| // Delete the data file. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| |
| // Create an empty data file. |
| NO_FATALS(CreateDataFile()); |
| |
| // Check passed, but verify records failed at last(malformed records). |
| NO_FATALS(CheckFailed(Status::Corruption(""))); |
| |
| // Delete the data file and metadata file to keep path clean. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| } |
| |
| // Case8: the existing metadata file has no live blocks and |
| // the size of the existing data file is >0. |
| { |
| NO_FATALS(CreateContainer(true)); |
| |
| // Delete the only block. |
| NO_FATALS(DeleteBlock()); |
| |
| // The container is ok. |
| NO_FATALS(CheckOK()); |
| } |
| |
| // Case9: the existing metadata file has live blocks and |
| // the data file has gone missing. |
| { |
| // Create a container. |
| NO_FATALS(CreateContainer(true)); |
| |
| // Delete the data file. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| |
| // The data file has gone missing. |
| NO_FATALS(CheckFailed(Status::NotFound(""))); |
| |
| // Delete the metadata file to keep path clean. |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| } |
| |
| // Case10: the existing metadata file has live blocks and |
| // the size of the existing data file is 0. |
| { |
| // Create a container. |
| NO_FATALS(CreateContainer(true)); |
| |
| // Delete the data file. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| |
| // Create an empty data file. |
| NO_FATALS(CreateDataFile()); |
| |
| // Check passed, but verify records failed at last(malformed records). |
| NO_FATALS(CheckFailed(Status::Corruption(""))); |
| |
| // Delete the data file and metadata file to keep path clean. |
| ASSERT_OK(env_->DeleteFile(data_file_name)); |
| ASSERT_OK(env_->DeleteFile(metadata_file_name)); |
| } |
| |
| // Case11: the existing metadata file has live blocks and |
| // the size of the existing data file is >0. |
| { |
| // Create a container. |
| NO_FATALS(CreateContainer(true)); |
| |
| // The container is ok. |
| NO_FATALS(CheckOK()); |
| } |
| } |
| |
| } // namespace fs |
| } // namespace kudu |