| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "db/db_test_util.h" |
| #include "port/stack_trace.h" |
| #include "util/fault_injection_test_env.h" |
| #include "util/sync_point.h" |
| |
| namespace rocksdb { |
| |
| class DBFlushTest : public DBTestBase { |
| public: |
| DBFlushTest() : DBTestBase("/db_flush_test") {} |
| }; |
| |
| class DBFlushDirectIOTest : public DBFlushTest, |
| public ::testing::WithParamInterface<bool> { |
| public: |
| DBFlushDirectIOTest() : DBFlushTest() {} |
| }; |
| |
| // We had issue when two background threads trying to flush at the same time, |
| // only one of them get committed. The test verifies the issue is fixed. |
| TEST_F(DBFlushTest, FlushWhileWritingManifest) { |
| Options options; |
| options.disable_auto_compactions = true; |
| options.max_background_flushes = 2; |
| options.env = env_; |
| Reopen(options); |
| FlushOptions no_wait; |
| no_wait.wait = false; |
| |
| SyncPoint::GetInstance()->LoadDependency( |
| {{"VersionSet::LogAndApply:WriteManifest", |
| "DBFlushTest::FlushWhileWritingManifest:1"}, |
| {"MemTableList::InstallMemtableFlushResults:InProgress", |
| "VersionSet::LogAndApply:WriteManifestDone"}}); |
| SyncPoint::GetInstance()->EnableProcessing(); |
| |
| ASSERT_OK(Put("foo", "v")); |
| ASSERT_OK(dbfull()->Flush(no_wait)); |
| TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1"); |
| ASSERT_OK(Put("bar", "v")); |
| ASSERT_OK(dbfull()->Flush(no_wait)); |
| // If the issue is hit we will wait here forever. |
| dbfull()->TEST_WaitForFlushMemTable(); |
| #ifndef ROCKSDB_LITE |
| ASSERT_EQ(2, TotalTableFiles()); |
| #endif // ROCKSDB_LITE |
| } |
| |
| TEST_F(DBFlushTest, SyncFail) { |
| std::unique_ptr<FaultInjectionTestEnv> fault_injection_env( |
| new FaultInjectionTestEnv(env_)); |
| Options options; |
| options.disable_auto_compactions = true; |
| options.env = fault_injection_env.get(); |
| |
| SyncPoint::GetInstance()->LoadDependency( |
| {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"}, |
| {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}}); |
| SyncPoint::GetInstance()->EnableProcessing(); |
| |
| Reopen(options); |
| Put("key", "value"); |
| auto* cfd = |
| reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily()) |
| ->cfd(); |
| int refs_before = cfd->current()->TEST_refs(); |
| FlushOptions flush_options; |
| flush_options.wait = false; |
| ASSERT_OK(dbfull()->Flush(flush_options)); |
| fault_injection_env->SetFilesystemActive(false); |
| TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); |
| TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); |
| fault_injection_env->SetFilesystemActive(true); |
| dbfull()->TEST_WaitForFlushMemTable(); |
| #ifndef ROCKSDB_LITE |
| ASSERT_EQ("", FilesPerLevel()); // flush failed. |
| #endif // ROCKSDB_LITE |
| // Flush job should release ref count to current version. |
| ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); |
| Destroy(options); |
| } |
| |
| TEST_F(DBFlushTest, FlushInLowPriThreadPool) { |
| // Verify setting an empty high-pri (flush) thread pool causes flushes to be |
| // scheduled in the low-pri (compaction) thread pool. |
| Options options = CurrentOptions(); |
| options.level0_file_num_compaction_trigger = 4; |
| options.memtable_factory.reset(new SpecialSkipListFactory(1)); |
| Reopen(options); |
| env_->SetBackgroundThreads(0, Env::HIGH); |
| |
| std::thread::id tid; |
| int num_flushes = 0, num_compactions = 0; |
| SyncPoint::GetInstance()->SetCallBack( |
| "DBImpl::BGWorkFlush", [&](void* arg) { |
| if (tid == std::thread::id()) { |
| tid = std::this_thread::get_id(); |
| } else { |
| ASSERT_EQ(tid, std::this_thread::get_id()); |
| } |
| ++num_flushes; |
| }); |
| SyncPoint::GetInstance()->SetCallBack( |
| "DBImpl::BGWorkCompaction", [&](void* arg) { |
| ASSERT_EQ(tid, std::this_thread::get_id()); |
| ++num_compactions; |
| }); |
| SyncPoint::GetInstance()->EnableProcessing(); |
| |
| ASSERT_OK(Put("key", "val")); |
| for (int i = 0; i < 4; ++i) { |
| ASSERT_OK(Put("key", "val")); |
| dbfull()->TEST_WaitForFlushMemTable(); |
| } |
| dbfull()->TEST_WaitForCompact(); |
| ASSERT_EQ(4, num_flushes); |
| ASSERT_EQ(1, num_compactions); |
| } |
| |
| TEST_P(DBFlushDirectIOTest, DirectIO) { |
| Options options; |
| options.create_if_missing = true; |
| options.disable_auto_compactions = true; |
| options.max_background_flushes = 2; |
| options.use_direct_io_for_flush_and_compaction = GetParam(); |
| options.env = new MockEnv(Env::Default()); |
| SyncPoint::GetInstance()->SetCallBack( |
| "BuildTable:create_file", [&](void* arg) { |
| bool* use_direct_writes = static_cast<bool*>(arg); |
| ASSERT_EQ(*use_direct_writes, |
| options.use_direct_io_for_flush_and_compaction); |
| }); |
| |
| SyncPoint::GetInstance()->EnableProcessing(); |
| Reopen(options); |
| ASSERT_OK(Put("foo", "v")); |
| FlushOptions flush_options; |
| flush_options.wait = true; |
| ASSERT_OK(dbfull()->Flush(flush_options)); |
| Destroy(options); |
| delete options.env; |
| } |
| |
| INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, |
| testing::Bool()); |
| |
| } // namespace rocksdb |
| |
| int main(int argc, char** argv) { |
| rocksdb::port::InstallStackTraceHandler(); |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |