| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| |
| #ifndef ROCKSDB_LITE |
| |
| #include <map> |
| #include <string> |
| |
| #include "rocksdb/cache.h" |
| #include "rocksdb/write_batch.h" |
| #include "rocksdb/write_buffer_manager.h" |
| |
| #include "db/column_family.h" |
| #include "db/db_impl.h" |
| #include "db/log_writer.h" |
| #include "db/version_set.h" |
| #include "db/wal_manager.h" |
| #include "env/mock_env.h" |
| #include "table/mock_table.h" |
| #include "util/file_reader_writer.h" |
| #include "util/string_util.h" |
| #include "util/testharness.h" |
| #include "util/testutil.h" |
| |
| namespace rocksdb { |
| |
| // TODO(icanadi) mock out VersionSet |
| // TODO(icanadi) move other WalManager-specific tests from db_test here |
| class WalManagerTest : public testing::Test { |
| public: |
| WalManagerTest() |
| : env_(new MockEnv(Env::Default())), |
| dbname_(test::TmpDir() + "/wal_manager_test"), |
| db_options_(), |
| table_cache_(NewLRUCache(50000, 16)), |
| write_buffer_manager_(db_options_.db_write_buffer_size), |
| current_log_number_(0) { |
| DestroyDB(dbname_, Options()); |
| } |
| |
| void Init() { |
| ASSERT_OK(env_->CreateDirIfMissing(dbname_)); |
| ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_))); |
| db_options_.db_paths.emplace_back(dbname_, |
| std::numeric_limits<uint64_t>::max()); |
| db_options_.wal_dir = dbname_; |
| db_options_.env = env_.get(); |
| |
| versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, |
| table_cache_.get(), &write_buffer_manager_, |
| &write_controller_)); |
| |
| wal_manager_.reset(new WalManager(db_options_, env_options_)); |
| } |
| |
| void Reopen() { |
| wal_manager_.reset(new WalManager(db_options_, env_options_)); |
| } |
| |
| // NOT thread safe |
| void Put(const std::string& key, const std::string& value) { |
| assert(current_log_writer_.get() != nullptr); |
| uint64_t seq = versions_->LastSequence() + 1; |
| WriteBatch batch; |
| batch.Put(key, value); |
| WriteBatchInternal::SetSequence(&batch, seq); |
| current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)); |
| versions_->SetLastToBeWrittenSequence(seq); |
| versions_->SetLastSequence(seq); |
| } |
| |
| // NOT thread safe |
| void RollTheLog(bool archived) { |
| current_log_number_++; |
| std::string fname = ArchivedLogFileName(dbname_, current_log_number_); |
| unique_ptr<WritableFile> file; |
| ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_)); |
| unique_ptr<WritableFileWriter> file_writer( |
| new WritableFileWriter(std::move(file), env_options_)); |
| current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false)); |
| } |
| |
| void CreateArchiveLogs(int num_logs, int entries_per_log) { |
| for (int i = 1; i <= num_logs; ++i) { |
| RollTheLog(true); |
| for (int k = 0; k < entries_per_log; ++k) { |
| Put(ToString(k), std::string(1024, 'a')); |
| } |
| } |
| } |
| |
| std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter( |
| const SequenceNumber seq) { |
| unique_ptr<TransactionLogIterator> iter; |
| Status status = wal_manager_->GetUpdatesSince( |
| seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get()); |
| EXPECT_OK(status); |
| return iter; |
| } |
| |
| std::unique_ptr<MockEnv> env_; |
| std::string dbname_; |
| ImmutableDBOptions db_options_; |
| WriteController write_controller_; |
| EnvOptions env_options_; |
| std::shared_ptr<Cache> table_cache_; |
| WriteBufferManager write_buffer_manager_; |
| std::unique_ptr<VersionSet> versions_; |
| std::unique_ptr<WalManager> wal_manager_; |
| |
| std::unique_ptr<log::Writer> current_log_writer_; |
| uint64_t current_log_number_; |
| }; |
| |
| TEST_F(WalManagerTest, ReadFirstRecordCache) { |
| Init(); |
| std::string path = dbname_ + "/000001.log"; |
| unique_ptr<WritableFile> file; |
| ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions())); |
| |
| SequenceNumber s; |
| ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s)); |
| ASSERT_EQ(s, 0U); |
| |
| ASSERT_OK( |
| wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s)); |
| ASSERT_EQ(s, 0U); |
| |
| unique_ptr<WritableFileWriter> file_writer( |
| new WritableFileWriter(std::move(file), EnvOptions())); |
| log::Writer writer(std::move(file_writer), 1, |
| db_options_.recycle_log_file_num > 0); |
| WriteBatch batch; |
| batch.Put("foo", "bar"); |
| WriteBatchInternal::SetSequence(&batch, 10); |
| writer.AddRecord(WriteBatchInternal::Contents(&batch)); |
| |
| // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here. |
| // Waiting for lei to finish with db_test |
| // env_->count_sequential_reads_ = true; |
| // sequential_read_counter_ sanity test |
| // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0); |
| |
| ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); |
| ASSERT_EQ(s, 10U); |
| // did a read |
| // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here |
| // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1); |
| |
| ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); |
| ASSERT_EQ(s, 10U); |
| // no new reads since the value is cached |
| // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here |
| // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1); |
| } |
| |
| namespace { |
| uint64_t GetLogDirSize(std::string dir_path, Env* env) { |
| uint64_t dir_size = 0; |
| std::vector<std::string> files; |
| env->GetChildren(dir_path, &files); |
| for (auto& f : files) { |
| uint64_t number; |
| FileType type; |
| if (ParseFileName(f, &number, &type) && type == kLogFile) { |
| std::string const file_path = dir_path + "/" + f; |
| uint64_t file_size; |
| env->GetFileSize(file_path, &file_size); |
| dir_size += file_size; |
| } |
| } |
| return dir_size; |
| } |
| std::vector<std::uint64_t> ListSpecificFiles( |
| Env* env, const std::string& path, const FileType expected_file_type) { |
| std::vector<std::string> files; |
| std::vector<uint64_t> file_numbers; |
| env->GetChildren(path, &files); |
| uint64_t number; |
| FileType type; |
| for (size_t i = 0; i < files.size(); ++i) { |
| if (ParseFileName(files[i], &number, &type)) { |
| if (type == expected_file_type) { |
| file_numbers.push_back(number); |
| } |
| } |
| } |
| return file_numbers; |
| } |
| |
| int CountRecords(TransactionLogIterator* iter) { |
| int count = 0; |
| SequenceNumber lastSequence = 0; |
| BatchResult res; |
| while (iter->Valid()) { |
| res = iter->GetBatch(); |
| EXPECT_TRUE(res.sequence > lastSequence); |
| ++count; |
| lastSequence = res.sequence; |
| EXPECT_OK(iter->status()); |
| iter->Next(); |
| } |
| return count; |
| } |
| } // namespace |
| |
| TEST_F(WalManagerTest, WALArchivalSizeLimit) { |
| db_options_.wal_ttl_seconds = 0; |
| db_options_.wal_size_limit_mb = 1000; |
| Init(); |
| |
| // TEST : Create WalManager with huge size limit and no ttl. |
| // Create some archived files and call PurgeObsoleteWALFiles(). |
| // Count the archived log files that survived. |
| // Assert that all of them did. |
| // Change size limit. Re-open WalManager. |
| // Assert that archive is not greater than wal_size_limit_mb after |
| // PurgeObsoleteWALFiles() |
| // Set ttl and time_to_check_ to small values. Re-open db. |
| // Assert that there are no archived logs left. |
| |
| std::string archive_dir = ArchivalDirectory(dbname_); |
| CreateArchiveLogs(20, 5000); |
| |
| std::vector<std::uint64_t> log_files = |
| ListSpecificFiles(env_.get(), archive_dir, kLogFile); |
| ASSERT_EQ(log_files.size(), 20U); |
| |
| db_options_.wal_size_limit_mb = 8; |
| Reopen(); |
| wal_manager_->PurgeObsoleteWALFiles(); |
| |
| uint64_t archive_size = GetLogDirSize(archive_dir, env_.get()); |
| ASSERT_TRUE(archive_size <= db_options_.wal_size_limit_mb * 1024 * 1024); |
| |
| db_options_.wal_ttl_seconds = 1; |
| env_->FakeSleepForMicroseconds(2 * 1000 * 1000); |
| Reopen(); |
| wal_manager_->PurgeObsoleteWALFiles(); |
| |
| log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile); |
| ASSERT_TRUE(log_files.empty()); |
| } |
| |
| TEST_F(WalManagerTest, WALArchivalTtl) { |
| db_options_.wal_ttl_seconds = 1000; |
| Init(); |
| |
| // TEST : Create WalManager with a ttl and no size limit. |
| // Create some archived log files and call PurgeObsoleteWALFiles(). |
| // Assert that files are not deleted |
| // Reopen db with small ttl. |
| // Assert that all archived logs was removed. |
| |
| std::string archive_dir = ArchivalDirectory(dbname_); |
| CreateArchiveLogs(20, 5000); |
| |
| std::vector<uint64_t> log_files = |
| ListSpecificFiles(env_.get(), archive_dir, kLogFile); |
| ASSERT_GT(log_files.size(), 0U); |
| |
| db_options_.wal_ttl_seconds = 1; |
| env_->FakeSleepForMicroseconds(3 * 1000 * 1000); |
| Reopen(); |
| wal_manager_->PurgeObsoleteWALFiles(); |
| |
| log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile); |
| ASSERT_TRUE(log_files.empty()); |
| } |
| |
| TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) { |
| Init(); |
| RollTheLog(false); |
| Put("key1", std::string(1024, 'a')); |
| // Create a zero record WAL file. |
| RollTheLog(false); |
| RollTheLog(false); |
| |
| Put("key2", std::string(1024, 'a')); |
| |
| auto iter = OpenTransactionLogIter(0); |
| ASSERT_EQ(2, CountRecords(iter.get())); |
| } |
| |
| TEST_F(WalManagerTest, TransactionLogIteratorJustEmptyFile) { |
| Init(); |
| RollTheLog(false); |
| auto iter = OpenTransactionLogIter(0); |
| // Check that an empty iterator is returned |
| ASSERT_TRUE(!iter->Valid()); |
| } |
| |
| } // namespace rocksdb |
| |
| int main(int argc, char** argv) { |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |
| |
| #else |
| #include <stdio.h> |
| |
| int main(int argc, char** argv) { |
| fprintf(stderr, "SKIPPED as WalManager is not supported in ROCKSDB_LITE\n"); |
| return 0; |
| } |
| |
| #endif // !ROCKSDB_LITE |