blob: 7bb53f08c0c8dee43763345ed3a4ef7def165395 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include "kudu/fs/file_block_manager.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/log_block_manager.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/util/env_util.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_util.h"
#include "kudu/util/thread.h"
using kudu::env_util::ReadFully;
using kudu::pb_util::ReadablePBContainerFile;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
// LogBlockManager opens two files per container, and CloseManyBlocksTest
// uses one container for each block. To simplify testing (i.e. no need to
// raise the ulimit on open files), the default is kept low.
DEFINE_int32(num_blocks_close, 500,
"Number of blocks to simultaneously close in CloseManyBlocksTest");
DECLARE_uint64(log_container_preallocate_bytes);
DECLARE_uint64(log_container_max_size);
DECLARE_int64(fs_data_dirs_reserved_bytes);
DECLARE_int64(disk_reserved_bytes_free_for_testing);
DECLARE_int32(log_block_manager_full_disk_cache_seconds);
DECLARE_string(block_manager);
// Generic block manager metrics.
METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_reading);
METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_writing);
METRIC_DECLARE_counter(block_manager_total_writable_blocks);
METRIC_DECLARE_counter(block_manager_total_readable_blocks);
METRIC_DECLARE_counter(block_manager_total_bytes_written);
METRIC_DECLARE_counter(block_manager_total_bytes_read);
// Log block manager metrics.
METRIC_DECLARE_gauge_uint64(log_block_manager_bytes_under_management);
METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
METRIC_DECLARE_counter(log_block_manager_containers);
METRIC_DECLARE_counter(log_block_manager_full_containers);
// The LogBlockManager is only supported on Linux, since it requires hole punching.
#define RETURN_NOT_LOG_BLOCK_MANAGER() \
do { \
if (FLAGS_block_manager != "log") { \
LOG(INFO) << "This platform does not use the log block manager by default. Skipping test."; \
return; \
} \
} while (false)
namespace kudu {
namespace fs {
template <typename T>
class BlockManagerTest : public KuduTest {
public:
BlockManagerTest() :
bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() })) {
}
virtual void SetUp() OVERRIDE {
CHECK_OK(bm_->Create());
CHECK_OK(bm_->Open());
}
protected:
T* CreateBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
const shared_ptr<MemTracker>& parent_mem_tracker,
const vector<string>& paths) {
BlockManagerOptions opts;
opts.metric_entity = metric_entity;
opts.parent_mem_tracker = parent_mem_tracker;
opts.root_paths = paths;
return new T(env_.get(), opts);
}
Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
const shared_ptr<MemTracker>& parent_mem_tracker,
const vector<string>& paths,
bool create) {
// Blow away old memtrackers first.
bm_.reset();
bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker, paths));
if (create) {
RETURN_NOT_OK(bm_->Create());
}
return bm_->Open();
}
void RunMultipathTest(const vector<string>& paths);
void RunLogMetricsTest();
void RunLogContainerPreallocationTest();
void RunMemTrackerTest();
gscoped_ptr<T> bm_;
};
template <>
void BlockManagerTest<LogBlockManager>::SetUp() {
RETURN_NOT_LOG_BLOCK_MANAGER();
CHECK_OK(bm_->Create());
CHECK_OK(bm_->Open());
}
template <>
void BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>& paths) {
// Ensure that each path has an instance file and that it's well-formed.
for (const string& path : paths) {
vector<string> children;
ASSERT_OK(env_->GetChildren(path, &children));
ASSERT_EQ(3, children.size());
for (const string& child : children) {
if (child == "." || child == "..") {
continue;
}
PathInstanceMetadataPB instance;
ASSERT_OK(pb_util::ReadPBContainerFromPath(env_.get(),
JoinPathSegments(path, child),
&instance));
}
}
// Write ten blocks.
const char* kTestData = "test data";
for (int i = 0; i < 10; i++) {
gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->Append(kTestData));
ASSERT_OK(written_block->Close());
}
// Each path should now have some additional block subdirectories. We
// can't know for sure exactly how many (depends on the block IDs
// generated), but this ensures that at least some change were made.
for (const string& path : paths) {
vector<string> children;
ASSERT_OK(env_->GetChildren(path, &children));
ASSERT_GT(children.size(), 3);
}
}
template <>
void BlockManagerTest<LogBlockManager>::RunMultipathTest(const vector<string>& paths) {
// Write (3 * numPaths * 2) blocks, in groups of (numPaths * 2). That should
// yield two containers per path.
const char* kTestData = "test data";
for (int i = 0; i < 3; i++) {
ScopedWritableBlockCloser closer;
for (int j = 0; j < paths.size() * 2; j++) {
gscoped_ptr<WritableBlock> block;
ASSERT_OK(bm_->CreateBlock(&block));
ASSERT_OK(block->Append(kTestData));
closer.AddBlock(std::move(block));
}
ASSERT_OK(closer.CloseBlocks());
}
// Verify the results: 7 children = dot, dotdot, instance file, and two
// containers (two files per container).
for (const string& path : paths) {
vector<string> children;
ASSERT_OK(env_->GetChildren(path, &children));
ASSERT_EQ(children.size(), 7);
}
}
template <>
void BlockManagerTest<FileBlockManager>::RunLogMetricsTest() {
LOG(INFO) << "Test skipped; wrong block manager";
}
static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
int bytes_under_management, int blocks_under_management,
int containers, int full_containers) {
ASSERT_EQ(bytes_under_management, down_cast<AtomicGauge<uint64_t>*>(
entity->FindOrNull(METRIC_log_block_manager_bytes_under_management)
.get())->value());
ASSERT_EQ(blocks_under_management, down_cast<AtomicGauge<uint64_t>*>(
entity->FindOrNull(METRIC_log_block_manager_blocks_under_management)
.get())->value());
ASSERT_EQ(containers, down_cast<Counter*>(
entity->FindOrNull(METRIC_log_block_manager_containers)
.get())->value());
ASSERT_EQ(full_containers, down_cast<Counter*>(
entity->FindOrNull(METRIC_log_block_manager_full_containers)
.get())->value());
}
template <>
void BlockManagerTest<LogBlockManager>::RunLogMetricsTest() {
MetricRegistry registry;
scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
ASSERT_OK(this->ReopenBlockManager(entity,
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false));
ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
// Lower the max container size so that we can more easily test full
// container metrics.
FLAGS_log_container_max_size = 1024;
// One block --> one container.
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(this->bm_->CreateBlock(&writer));
ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 1, 0));
// And when the block is closed, it becomes "under management".
ASSERT_OK(writer->Close());
ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 1, 0));
// Create 10 blocks concurrently. We reuse the existing container and
// create 9 new ones. All of them get filled.
BlockId saved_id;
{
Random rand(SeedRandom());
ScopedWritableBlockCloser closer;
for (int i = 0; i < 10; i++) {
gscoped_ptr<WritableBlock> b;
ASSERT_OK(this->bm_->CreateBlock(&b));
if (saved_id.IsNull()) {
saved_id = b->id();
}
uint8_t data[1024];
for (int i = 0; i < sizeof(data); i += sizeof(uint32_t)) {
data[i] = rand.Next();
}
b->Append(Slice(data, sizeof(data)));
closer.AddBlock(std::move(b));
}
ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 10, 0));
// Only when the blocks are closed are the containers considered full.
ASSERT_OK(closer.CloseBlocks());
ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 10 * 1024, 11, 10, 10));
}
// Reopen the block manager and test the metrics. They're all based on
// persistent information so they should be the same.
MetricRegistry new_registry;
scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
ASSERT_OK(this->ReopenBlockManager(new_entity,
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false));
ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
// Delete a block. Its contents should no longer be under management.
ASSERT_OK(this->bm_->DeleteBlock(saved_id));
ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 9 * 1024, 10, 10, 10));
}
template <>
void BlockManagerTest<FileBlockManager>::RunLogContainerPreallocationTest() {
LOG(INFO) << "Test skipped; wrong block manager";
}
template <>
void BlockManagerTest<LogBlockManager>::RunLogContainerPreallocationTest() {
// Create a block with some test data. This should also trigger
// preallocation of the container, provided it's supported by the kernel.
gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->Close());
// Now reopen the block manager and create another block. More
// preallocation, but it should be from the end of the previous block,
// not from the end of the file.
ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false));
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->Close());
// dot, dotdot, test metadata, instance file, and one container file pair.
vector<string> children;
ASSERT_OK(this->env_->GetChildren(GetTestDataDirectory(), &children));
ASSERT_EQ(6, children.size());
// If preallocation was done from the end of the file (rather than the
// end of the previous block), the file's size would be twice the
// preallocation amount. That would be wrong.
//
// Instead, we expect the size to either be 0 (preallocation isn't
// supported) or equal to the preallocation amount.
bool found = false;
for (const string& child : children) {
if (HasSuffixString(child, ".data")) {
found = true;
uint64_t size;
ASSERT_OK(this->env_->GetFileSizeOnDisk(
JoinPathSegments(GetTestDataDirectory(), child), &size));
ASSERT_TRUE(size == 0 || size == FLAGS_log_container_preallocate_bytes);
}
}
ASSERT_TRUE(found);
}
template <>
void BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test tracker");
ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
tracker,
{ GetTestDataDirectory() },
false));
// The file block manager does not allocate memory for persistent data.
int64_t initial_mem = tracker->consumption();
ASSERT_EQ(initial_mem, 0);
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(this->bm_->CreateBlock(&writer));
ASSERT_OK(writer->Close());
ASSERT_EQ(tracker->consumption(), initial_mem);
}
template <>
void BlockManagerTest<LogBlockManager>::RunMemTrackerTest() {
shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test tracker");
ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
tracker,
{ GetTestDataDirectory() },
false));
// The initial consumption should be non-zero due to the block map.
int64_t initial_mem = tracker->consumption();
ASSERT_GT(initial_mem, 0);
// Allocating a persistent block should increase the consumption.
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(this->bm_->CreateBlock(&writer));
ASSERT_OK(writer->Close());
ASSERT_GT(tracker->consumption(), initial_mem);
}
// What kinds of BlockManagers are supported?
#if defined(__linux__)
typedef ::testing::Types<FileBlockManager, LogBlockManager> BlockManagers;
#else
typedef ::testing::Types<FileBlockManager> BlockManagers;
#endif
TYPED_TEST_CASE(BlockManagerTest, BlockManagers);
// Test the entire lifecycle of a block.
TYPED_TEST(BlockManagerTest, EndToEndTest) {
// Create a block.
gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(this->bm_->CreateBlock(&written_block));
// Write some data to it.
string test_data = "test data";
ASSERT_OK(written_block->Append(test_data));
ASSERT_OK(written_block->Close());
// Read the data back.
gscoped_ptr<ReadableBlock> read_block;
ASSERT_OK(this->bm_->OpenBlock(written_block->id(), &read_block));
uint64_t sz;
ASSERT_OK(read_block->Size(&sz));
ASSERT_EQ(test_data.length(), sz);
Slice data;
gscoped_ptr<uint8_t[]> scratch(new uint8_t[test_data.length()]);
ASSERT_OK(read_block->Read(0, test_data.length(), &data, scratch.get()));
ASSERT_EQ(test_data, data);
// Delete the block.
ASSERT_OK(this->bm_->DeleteBlock(written_block->id()));
ASSERT_TRUE(this->bm_->OpenBlock(written_block->id(), nullptr)
.IsNotFound());
}
// Test that we can still read from an opened block after deleting it
// (even if we can't open it again).
TYPED_TEST(BlockManagerTest, ReadAfterDeleteTest) {
// Write a new block.
gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(this->bm_->CreateBlock(&written_block));
string test_data = "test data";
ASSERT_OK(written_block->Append(test_data));
ASSERT_OK(written_block->Close());
// Open it for reading, then delete it. Subsequent opens should fail.
gscoped_ptr<ReadableBlock> read_block;
ASSERT_OK(this->bm_->OpenBlock(written_block->id(), &read_block));
ASSERT_OK(this->bm_->DeleteBlock(written_block->id()));
ASSERT_TRUE(this->bm_->OpenBlock(written_block->id(), nullptr)
.IsNotFound());
// But we should still be able to read from the opened block.
Slice data;
gscoped_ptr<uint8_t[]> scratch(new uint8_t[test_data.length()]);
ASSERT_OK(read_block->Read(0, test_data.length(), &data, scratch.get()));
ASSERT_EQ(test_data, data);
}
TYPED_TEST(BlockManagerTest, CloseTwiceTest) {
// Create a new block and close it repeatedly.
gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->Close());
ASSERT_OK(written_block->Close());
// Open it for reading and close it repeatedly.
gscoped_ptr<ReadableBlock> read_block;
ASSERT_OK(this->bm_->OpenBlock(written_block->id(), &read_block));
ASSERT_OK(read_block->Close());
ASSERT_OK(read_block->Close());
}
TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
if (!AllowSlowTests()) {
LOG(INFO) << "Not running in slow-tests mode";
return;
}
// Disable preallocation for this test as it creates many containers.
FLAGS_log_container_preallocate_bytes = 0;
Random rand(SeedRandom());
vector<WritableBlock*> dirty_blocks;
ElementDeleter deleter(&dirty_blocks);
LOG(INFO) << "Creating " << FLAGS_num_blocks_close << " blocks";
for (int i = 0; i < FLAGS_num_blocks_close; i++) {
// Create a block.
gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(this->bm_->CreateBlock(&written_block));
// Write 64k bytes of random data into it.
uint8_t data[65536];
for (int i = 0; i < sizeof(data); i += sizeof(uint32_t)) {
data[i] = rand.Next();
}
written_block->Append(Slice(data, sizeof(data)));
dirty_blocks.push_back(written_block.release());
}
LOG_TIMING(INFO, Substitute("closing $0 blocks", FLAGS_num_blocks_close)) {
ASSERT_OK(this->bm_->CloseBlocks(dirty_blocks));
}
}
// We can't really test that FlushDataAsync() "works", but we can test that
// it doesn't break anything.
TYPED_TEST(BlockManagerTest, FlushDataAsyncTest) {
gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(this->bm_->CreateBlock(&written_block));
string test_data = "test data";
ASSERT_OK(written_block->Append(test_data));
ASSERT_OK(written_block->FlushDataAsync());
}
TYPED_TEST(BlockManagerTest, WritableBlockStateTest) {
gscoped_ptr<WritableBlock> written_block;
// Common flow: CLEAN->DIRTY->CLOSED.
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_EQ(WritableBlock::CLEAN, written_block->state());
string test_data = "test data";
ASSERT_OK(written_block->Append(test_data));
ASSERT_EQ(WritableBlock::DIRTY, written_block->state());
ASSERT_OK(written_block->Append(test_data));
ASSERT_EQ(WritableBlock::DIRTY, written_block->state());
ASSERT_OK(written_block->Close());
ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
// Test FLUSHING->CLOSED transition.
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->Append(test_data));
ASSERT_OK(written_block->FlushDataAsync());
ASSERT_EQ(WritableBlock::FLUSHING, written_block->state());
ASSERT_OK(written_block->Close());
ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
// Test CLEAN->CLOSED transition.
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->Close());
ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
// Test FlushDataAsync() no-op.
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->FlushDataAsync());
ASSERT_EQ(WritableBlock::FLUSHING, written_block->state());
// Test DIRTY->CLOSED transition.
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->Append(test_data));
ASSERT_OK(written_block->Close());
ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
}
TYPED_TEST(BlockManagerTest, AbortTest) {
gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(this->bm_->CreateBlock(&written_block));
string test_data = "test data";
ASSERT_OK(written_block->Append(test_data));
ASSERT_OK(written_block->Abort());
ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
ASSERT_TRUE(this->bm_->OpenBlock(written_block->id(), nullptr)
.IsNotFound());
ASSERT_OK(this->bm_->CreateBlock(&written_block));
ASSERT_OK(written_block->Append(test_data));
ASSERT_OK(written_block->FlushDataAsync());
ASSERT_OK(written_block->Abort());
ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
ASSERT_TRUE(this->bm_->OpenBlock(written_block->id(), nullptr)
.IsNotFound());
}
TYPED_TEST(BlockManagerTest, PersistenceTest) {
// Create three blocks:
// 1. Empty.
// 2. Non-empty.
// 3. Deleted.
gscoped_ptr<WritableBlock> written_block1;
gscoped_ptr<WritableBlock> written_block2;
gscoped_ptr<WritableBlock> written_block3;
ASSERT_OK(this->bm_->CreateBlock(&written_block1));
ASSERT_OK(written_block1->Close());
ASSERT_OK(this->bm_->CreateBlock(&written_block2));
string test_data = "test data";
ASSERT_OK(written_block2->Append(test_data));
ASSERT_OK(written_block2->Close());
ASSERT_OK(this->bm_->CreateBlock(&written_block3));
ASSERT_OK(written_block3->Append(test_data));
ASSERT_OK(written_block3->Close());
ASSERT_OK(this->bm_->DeleteBlock(written_block3->id()));
// Reopen the block manager. This may read block metadata from disk.
//
// The existing block manager is left open, which proxies for the process
// having crashed without cleanly shutting down the block manager. The
// on-disk metadata should still be clean.
gscoped_ptr<BlockManager> new_bm(this->CreateBlockManager(
scoped_refptr<MetricEntity>(),
MemTracker::CreateTracker(-1, "other tracker"),
{ GetTestDataDirectory() }));
ASSERT_OK(new_bm->Open());
// Test that the state of all three blocks is properly reflected.
gscoped_ptr<ReadableBlock> read_block;
ASSERT_OK(new_bm->OpenBlock(written_block1->id(), &read_block));
uint64_t sz;
ASSERT_OK(read_block->Size(&sz));
ASSERT_EQ(0, sz);
ASSERT_OK(read_block->Close());
ASSERT_OK(new_bm->OpenBlock(written_block2->id(), &read_block));
ASSERT_OK(read_block->Size(&sz));
ASSERT_EQ(test_data.length(), sz);
Slice data;
gscoped_ptr<uint8_t[]> scratch(new uint8_t[test_data.length()]);
ASSERT_OK(read_block->Read(0, test_data.length(), &data, scratch.get()));
ASSERT_EQ(test_data, data);
ASSERT_OK(read_block->Close());
ASSERT_TRUE(new_bm->OpenBlock(written_block3->id(), nullptr)
.IsNotFound());
}
TYPED_TEST(BlockManagerTest, MultiPathTest) {
// Recreate the block manager with three paths.
vector<string> paths;
for (int i = 0; i < 3; i++) {
paths.push_back(this->GetTestPath(Substitute("path$0", i)));
}
ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
shared_ptr<MemTracker>(),
paths,
true));
ASSERT_NO_FATAL_FAILURE(this->RunMultipathTest(paths));
}
static void CloseHelper(ReadableBlock* block) {
CHECK_OK(block->Close());
}
// Tests that ReadableBlock::Close() is thread-safe and idempotent.
TYPED_TEST(BlockManagerTest, ConcurrentCloseReadableBlockTest) {
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(this->bm_->CreateBlock(&writer));
ASSERT_OK(writer->Close());
gscoped_ptr<ReadableBlock> reader;
ASSERT_OK(this->bm_->OpenBlock(writer->id(), &reader));
vector<scoped_refptr<Thread> > threads;
for (int i = 0; i < 100; i++) {
scoped_refptr<Thread> t;
ASSERT_OK(Thread::Create("test", Substitute("t$0", i),
&CloseHelper, reader.get(), &t));
threads.push_back(t);
}
for (const scoped_refptr<Thread>& t : threads) {
t->Join();
}
}
static void CheckMetrics(const scoped_refptr<MetricEntity>& metrics,
int blocks_open_reading, int blocks_open_writing,
int total_readable_blocks, int total_writable_blocks,
int total_bytes_read, int total_bytes_written) {
ASSERT_EQ(blocks_open_reading, down_cast<AtomicGauge<uint64_t>*>(
metrics->FindOrNull(METRIC_block_manager_blocks_open_reading).get())->value());
ASSERT_EQ(blocks_open_writing, down_cast<AtomicGauge<uint64_t>*>(
metrics->FindOrNull(METRIC_block_manager_blocks_open_writing).get())->value());
ASSERT_EQ(total_readable_blocks, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_readable_blocks).get())->value());
ASSERT_EQ(total_writable_blocks, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_writable_blocks).get())->value());
ASSERT_EQ(total_bytes_read, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_bytes_read).get())->value());
ASSERT_EQ(total_bytes_written, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_bytes_written).get())->value());
}
TYPED_TEST(BlockManagerTest, MetricsTest) {
const string kTestData = "test data";
MetricRegistry registry;
scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
ASSERT_OK(this->ReopenBlockManager(entity,
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false));
ASSERT_NO_FATAL_FAILURE(CheckMetrics(entity, 0, 0, 0, 0, 0, 0));
for (int i = 0; i < 3; i++) {
gscoped_ptr<WritableBlock> writer;
gscoped_ptr<ReadableBlock> reader;
// An open writer. Also reflected in total_writable_blocks.
ASSERT_OK(this->bm_->CreateBlock(&writer));
ASSERT_NO_FATAL_FAILURE(CheckMetrics(
entity, 0, 1, i, i + 1,
i * kTestData.length(), i * kTestData.length()));
// Block is no longer opened for writing, but its data
// is now reflected in total_bytes_written.
ASSERT_OK(writer->Append(kTestData));
ASSERT_OK(writer->Close());
ASSERT_NO_FATAL_FAILURE(CheckMetrics(
entity, 0, 0, i, i + 1,
i * kTestData.length(), (i + 1) * kTestData.length()));
// An open reader.
ASSERT_OK(this->bm_->OpenBlock(writer->id(), &reader));
ASSERT_NO_FATAL_FAILURE(CheckMetrics(
entity, 1, 0, i + 1, i + 1,
i * kTestData.length(), (i + 1) * kTestData.length()));
// The read is reflected in total_bytes_read.
Slice data;
gscoped_ptr<uint8_t[]> scratch(new uint8_t[kTestData.length()]);
ASSERT_OK(reader->Read(0, kTestData.length(), &data, scratch.get()));
ASSERT_NO_FATAL_FAILURE(CheckMetrics(
entity, 1, 0, i + 1, i + 1,
(i + 1) * kTestData.length(), (i + 1) * kTestData.length()));
// The reader is now gone.
ASSERT_OK(reader->Close());
ASSERT_NO_FATAL_FAILURE(CheckMetrics(
entity, 0, 0, i + 1, i + 1,
(i + 1) * kTestData.length(), (i + 1) * kTestData.length()));
}
}
TYPED_TEST(BlockManagerTest, LogMetricsTest) {
ASSERT_NO_FATAL_FAILURE(this->RunLogMetricsTest());
}
TYPED_TEST(BlockManagerTest, LogContainerPreallocationTest) {
ASSERT_NO_FATAL_FAILURE(this->RunLogContainerPreallocationTest());
}
TYPED_TEST(BlockManagerTest, MemTrackerTest) {
ASSERT_NO_FATAL_FAILURE(this->RunMemTrackerTest());
}
// LogBlockManager-specific tests.
class LogBlockManagerTest : public BlockManagerTest<LogBlockManager> {
};
// Regression test for KUDU-1190, a crash at startup when a block ID has been
// reused.
TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
RETURN_NOT_LOG_BLOCK_MANAGER();
// Typically, the LBM starts with a random block ID when running as a
// gtest. In this test, we want to control the block IDs.
bm_->next_block_id_.Store(1);
vector<BlockId> block_ids;
// Create 4 containers, with the first four block IDs in the sequence.
{
ScopedWritableBlockCloser closer;
for (int i = 0; i < 4; i++) {
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(bm_->CreateBlock(&writer));
block_ids.push_back(writer->id());
closer.AddBlock(std::move(writer));
}
ASSERT_OK(closer.CloseBlocks());
}
// Create one more block, which should reuse the first container.
{
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(bm_->CreateBlock(&writer));
ASSERT_OK(writer->Close());
}
ASSERT_EQ(4, bm_->available_containers_.size());
// Delete the original blocks.
for (const BlockId& b : block_ids) {
ASSERT_OK(bm_->DeleteBlock(b));
}
// Reset the block ID sequence and re-create new blocks which should reuse the same
// block IDs. This isn't allowed in current versions of Kudu, but older versions
// could produce this situation, and we still need to handle it on startup.
bm_->next_block_id_.Store(1);
for (int i = 0; i < 4; i++) {
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(bm_->CreateBlock(&writer));
ASSERT_EQ(writer->id(), block_ids[i]);
ASSERT_OK(writer->Close());
}
// Now we have 4 containers with the following metadata:
// 1: CREATE(1) CREATE (5) DELETE(1) CREATE(4)
// 2: CREATE(2) DELETE(2) CREATE(1)
// 3: CREATE(3) DELETE(3) CREATE(2)
// 4: CREATE(4) DELETE(4) CREATE(3)
// Re-open the block manager and make sure it can deal with this case where
// block IDs have been reused.
ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false));
}
// Test partial record at end of metadata file. See KUDU-1377.
// The idea behind this test is that we should tolerate one partial record at
// the end of a given container metadata file, since we actively append a
// record to a container metadata file when a new block is created or deleted.
// A system crash or disk-full event can result in a partially-written metadata
// record. Ignoring a trailing, partial (not corrupt) record is safe, so long
// as we only consider a container valid if there is at most one trailing
// partial record. If any other metadata record is somehow incomplete or
// corrupt, we consider that an error and the entire container is considered
// corrupted.
//
// Note that we rely on filesystem integrity to ensure that we do not lose
// trailing, fsync()ed metadata.
TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
RETURN_NOT_LOG_BLOCK_MANAGER();
// Create several blocks.
vector<BlockId> created_blocks;
BlockId last_block_id;
for (int i = 0; i < 4; i++) {
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(bm_->CreateBlock(&writer));
last_block_id = writer->id();
created_blocks.push_back(last_block_id);
ASSERT_OK(writer->Close());
}
ASSERT_EQ(4, bm_->CountBlocksForTests());
gscoped_ptr<ReadableBlock> block;
ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
ASSERT_OK(block->Close());
// Start corrupting the metadata file in different ways.
string path = LogBlockManager::ContainerPathForTests(bm_->available_containers_.front());
string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
uint64_t good_meta_size;
ASSERT_OK(env_->GetFileSize(metadata_path, &good_meta_size));
// First, add an extra byte to the end of the metadata file. This makes the
// trailing "record" of the metadata file corrupt, but doesn't cause data
// loss. The result is that the container will automatically truncate the
// metadata file back to its correct size.
{
RWFileOptions opts;
opts.mode = Env::OPEN_EXISTING;
gscoped_ptr<RWFile> file;
ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
ASSERT_OK(file->Truncate(good_meta_size + 1));
}
uint64_t cur_meta_size;
ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
ASSERT_EQ(good_meta_size + 1, cur_meta_size);
// Reopen the metadata file. We will still see all of our blocks. The size of
// the metadata file will be restored back to its previous value.
ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false));
ASSERT_EQ(4, bm_->CountBlocksForTests());
ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
ASSERT_OK(block->Close());
// Check that the file was truncated back to its previous size by the system.
ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
ASSERT_EQ(good_meta_size, cur_meta_size);
// Delete the first block we created. This necessitates writing to the
// metadata file of the originally-written container, since we append a
// delete record to the metadata.
ASSERT_OK(bm_->DeleteBlock(created_blocks[0]));
ASSERT_EQ(3, bm_->CountBlocksForTests());
ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
good_meta_size = cur_meta_size;
// Add a new block, increasing the size of the container metadata file.
{
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(bm_->CreateBlock(&writer));
last_block_id = writer->id();
created_blocks.push_back(last_block_id);
ASSERT_OK(writer->Close());
}
ASSERT_EQ(4, bm_->CountBlocksForTests());
ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
ASSERT_GT(cur_meta_size, good_meta_size);
uint64_t prev_good_meta_size = good_meta_size; // Store previous size.
good_meta_size = cur_meta_size;
// Now, truncate the metadata file so that we lose the last valid record.
// This will result in the loss of a block record, therefore we will observe
// data loss, however it will look like a failed partial write.
{
RWFileOptions opts;
opts.mode = Env::OPEN_EXISTING;
gscoped_ptr<RWFile> file;
ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
ASSERT_OK(file->Truncate(good_meta_size - 1));
}
// Reopen the truncated metadata file. We will not find all of our blocks.
ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false));
// Because the last record was a partial record on disk, the system should
// have assumed that it was an incomplete write and truncated the metadata
// file back to the previous valid record. Let's verify that that's the case.
good_meta_size = prev_good_meta_size;
ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
ASSERT_EQ(good_meta_size, cur_meta_size);
ASSERT_EQ(3, bm_->CountBlocksForTests());
Status s = bm_->OpenBlock(last_block_id, &block);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Can't find block");
// Add a new block, increasing the size of the container metadata file.
{
gscoped_ptr<WritableBlock> writer;
ASSERT_OK(bm_->CreateBlock(&writer));
last_block_id = writer->id();
created_blocks.push_back(last_block_id);
ASSERT_OK(writer->Close());
}
ASSERT_EQ(4, bm_->CountBlocksForTests());
ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
ASSERT_OK(block->Close());
ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
ASSERT_GT(cur_meta_size, good_meta_size);
good_meta_size = cur_meta_size;
// Ensure that we only ever created a single container.
ASSERT_EQ(1, bm_->all_containers_.size());
ASSERT_EQ(1, bm_->available_containers_.size());
// Find location of 2nd record in metadata file and corrupt it.
// This is an unrecoverable error because it's in the middle of the file.
gscoped_ptr<RandomAccessFile> meta_file;
ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
ReadablePBContainerFile pb_reader(std::move(meta_file));
ASSERT_OK(pb_reader.Open());
BlockRecordPB record;
ASSERT_OK(pb_reader.ReadNextPB(&record));
uint64_t offset = pb_reader.offset();
uint64_t latest_meta_size;
ASSERT_OK(env_->GetFileSize(metadata_path, &latest_meta_size));
ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
Slice result;
gscoped_ptr<uint8_t[]> scratch(new uint8_t[latest_meta_size]);
ASSERT_OK(ReadFully(meta_file.get(), 0, latest_meta_size, &result, scratch.get()));
string data = result.ToString();
// Flip the high bit of the length field, which is a 4-byte little endian
// unsigned integer. This will cause the length field to represent a large
// value and also cause the length checksum not to validate.
data[offset + 3] ^= 1 << 7;
gscoped_ptr<WritableFile> writable_meta;
ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_meta));
ASSERT_OK(writable_meta->Append(data));
ASSERT_OK(writable_meta->Close());
// Now try to reopen the container.
// This should look like a bad checksum, and it's not recoverable.
s = this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false);
ASSERT_TRUE(s.IsCorruption());
ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
}
TEST_F(LogBlockManagerTest, TestDiskSpaceCheck) {
RETURN_NOT_LOG_BLOCK_MANAGER();
FLAGS_log_block_manager_full_disk_cache_seconds = 0; // Don't cache device fullness.
FLAGS_fs_data_dirs_reserved_bytes = 1; // Keep at least 1 byte reserved in the FS.
FLAGS_disk_reserved_bytes_free_for_testing = 0;
FLAGS_log_container_preallocate_bytes = 100;
vector<BlockId> created_blocks;
gscoped_ptr<WritableBlock> writer;
Status s = bm_->CreateBlock(&writer);
ASSERT_TRUE(s.IsIOError());
ASSERT_STR_CONTAINS(s.ToString(), "All data directories are full");
FLAGS_disk_reserved_bytes_free_for_testing = 101;
ASSERT_OK(bm_->CreateBlock(&writer));
FLAGS_disk_reserved_bytes_free_for_testing = 0;
s = bm_->CreateBlock(&writer);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
ASSERT_OK(writer->Close());
}
} // namespace fs
} // namespace kudu