| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/consensus/log.h" |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <cerrno> |
| #include <cstdint> |
| #include <functional> |
| #include <limits> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <glog/stl_logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/log-test-base.h" |
| #include "kudu/consensus/log.pb.h" |
| #include "kudu/consensus/log_anchor_registry.h" |
| #include "kudu/consensus/log_index.h" |
| #include "kudu/consensus/log_reader.h" |
| #include "kudu/consensus/log_util.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/async_util.h" |
| #include "kudu/util/compression/compression.pb.h" |
| #include "kudu/util/debug/sanitizer_scopes.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/file_cache.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DEFINE_int32(num_batches, 10000, |
| "Number of batches to write to/read from the Log in TestWriteManyBatches"); |
| |
| DECLARE_int32(log_min_segments_to_retain); |
| DECLARE_int32(log_max_segments_to_retain); |
| DECLARE_double(log_inject_io_error_on_preallocate_fraction); |
| DECLARE_int64(fs_wal_dir_reserved_bytes); |
| DECLARE_int64(disk_reserved_bytes_free_for_testing); |
| DECLARE_string(log_compression_codec); |
| |
| namespace kudu { |
| namespace log { |
| |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using consensus::CommitMsg; |
| using consensus::MakeOpId; |
| using consensus::NO_OP; |
| using consensus::OpId; |
| using consensus::ReplicateMsg; |
| using consensus::WRITE_OP; |
| using strings::Substitute; |
| |
| struct TestLogSequenceElem { |
| enum ElemType { |
| REPLICATE, |
| COMMIT, |
| ROLL |
| }; |
| ElemType type; |
| OpId id; |
| }; |
| |
| class LogTest : public LogTestBase { |
| public: |
| void CreateAndRegisterNewAnchor(int64_t log_index, vector<LogAnchor*>* anchors) { |
| anchors->push_back(new LogAnchor()); |
| log_anchor_registry_->Register(log_index, CURRENT_TEST_NAME(), anchors->back()); |
| } |
| |
| // Create a series of NO_OP entries in the log. |
| // Anchor each segment on the first OpId of each log segment, |
| // and update op_id to point to the next valid OpId. |
| Status AppendMultiSegmentSequence(int num_total_segments, int num_ops_per_segment, |
| OpId* op_id, vector<LogAnchor*>* anchors) { |
| CHECK(op_id->IsInitialized()); |
| for (int i = 0; i < num_total_segments - 1; i++) { |
| if (anchors) { |
| CreateAndRegisterNewAnchor(op_id->index(), anchors); |
| } |
| RETURN_NOT_OK(AppendNoOps(op_id, num_ops_per_segment)); |
| RETURN_NOT_OK(RollLog()); |
| } |
| |
| if (anchors) { |
| CreateAndRegisterNewAnchor(op_id->index(), anchors); |
| } |
| RETURN_NOT_OK(AppendNoOps(op_id, num_ops_per_segment)); |
| return Status::OK(); |
| } |
| |
| Status AppendNewEmptySegmentToReader(int sequence_number, |
| int first_repl_index, |
| LogReader* reader) { |
| string fqp = GetTestPath(strings::Substitute("wal-00000000$0", sequence_number)); |
| shared_ptr<RWFile> log_seg; |
| RETURN_NOT_OK(file_cache_->OpenFile<Env::MUST_CREATE>(fqp, &log_seg)); |
| |
| scoped_refptr<ReadableLogSegment> readable_segment( |
| new ReadableLogSegment(fqp, std::move(log_seg))); |
| |
| LogSegmentHeaderPB header; |
| header.set_sequence_number(sequence_number); |
| header.set_tablet_id(kTestTablet); |
| SchemaToPB(GetSimpleTestSchema(), header.mutable_schema()); |
| |
| LogSegmentFooterPB footer; |
| footer.set_num_entries(10); |
| footer.set_min_replicate_index(first_repl_index); |
| footer.set_max_replicate_index(first_repl_index + 9L); |
| |
| RETURN_NOT_OK(readable_segment->Init(header, footer, 0)); |
| RETURN_NOT_OK(reader->AppendSegment(readable_segment)); |
| return Status::OK(); |
| } |
| |
| void GenerateTestSequence(Random* rng, int seq_len, |
| vector<TestLogSequenceElem>* ops, |
| vector<int64_t>* terms_by_index); |
| void AppendTestSequence(const vector<TestLogSequenceElem>& seq); |
| |
| // Where to corrupt the log entry. |
| enum CorruptionPosition { |
| // Corrupt/truncate within the header. |
| IN_HEADER, |
| // Corrupt/truncate within the entry data itself. |
| IN_ENTRY |
| }; |
| |
| void DoCorruptionTest(CorruptionType type, CorruptionPosition place, |
| const Status& expected_status, int expected_entries); |
| |
| }; |
| |
| // For cases which should run both with and without compression. |
| class LogTestOptionalCompression : public LogTest, |
| public testing::WithParamInterface<CompressionType> { |
| public: |
| LogTestOptionalCompression() { |
| const auto& name = CompressionType_Name(GetParam()); |
| LOG(INFO) << "using compression type: " << name; |
| FLAGS_log_compression_codec = name; |
| } |
| }; |
| INSTANTIATE_TEST_SUITE_P(Codecs, LogTestOptionalCompression, |
| ::testing::Values(NO_COMPRESSION, LZ4)); |
| |
| // If we write more than one entry in a batch, we should be able to |
| // read all of those entries back. |
| TEST_P(LogTestOptionalCompression, TestMultipleEntriesInABatch) { |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid; |
| opid.set_term(1); |
| opid.set_index(1); |
| |
| AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2); |
| |
| // RollOver() the batch so that we have a properly formed footer. |
| ASSERT_OK(log_->AllocateSegmentAndRollOverForTests()); |
| |
| SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| |
| LogEntries entries; |
| ASSERT_OK(segments[0]->ReadEntries(&entries)); |
| |
| ASSERT_EQ(2, entries.size()); |
| |
| // Verify the index. |
| { |
| LogIndexEntry entry; |
| ASSERT_OK(log_->log_index_->GetEntry(1, &entry)); |
| ASSERT_EQ(1, entry.op_id.term()); |
| ASSERT_EQ(1, entry.segment_sequence_number); |
| int64_t offset = entry.offset_in_segment; |
| |
| ASSERT_OK(log_->log_index_->GetEntry(2, &entry)); |
| ASSERT_EQ(1, entry.op_id.term()); |
| ASSERT_EQ(1, entry.segment_sequence_number); |
| int64_t second_offset = entry.offset_in_segment; |
| |
| // The second entry should be at the same offset as the first entry |
| // since they were written in the same batch. |
| ASSERT_EQ(second_offset, offset); |
| } |
| |
| // Test LookupOpId |
| { |
| OpId loaded_op; |
| ASSERT_OK(log_->reader()->LookupOpId(1, &loaded_op)); |
| ASSERT_EQ("1.1", OpIdToString(loaded_op)); |
| ASSERT_OK(log_->reader()->LookupOpId(2, &loaded_op)); |
| ASSERT_EQ("1.2", OpIdToString(loaded_op)); |
| Status s = log_->reader()->LookupOpId(3, &loaded_op); |
| ASSERT_TRUE(s.IsNotFound()) << "unexpected status: " << s.ToString(); |
| } |
| |
| ASSERT_OK(log_->Close()); |
| } |
| |
| // Tests that everything works properly with fsync enabled: |
| // This also tests SyncDir() (see KUDU-261), which is called whenever |
| // a new log segment is initialized. |
| TEST_P(LogTestOptionalCompression, TestFsync) { |
| options_.force_fsync_all = true; |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid; |
| opid.set_term(0); |
| opid.set_index(1); |
| |
| AppendNoOp(&opid); |
| |
| ASSERT_OK(log_->Close()); |
| } |
| |
| // Regression test for part of KUDU-735: |
| // if a log is not preallocated, we should properly track its on-disk size as we append to |
| // it. |
| TEST_P(LogTestOptionalCompression, TestSizeIsMaintained) { |
| options_.preallocate_segments = false; |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid = MakeOpId(0, 1); |
| AppendNoOp(&opid); |
| |
| SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| int64_t orig_size = segments[0]->file_size(); |
| ASSERT_GT(orig_size, 0); |
| |
| AppendNoOp(&opid); |
| |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| int64_t new_size = segments[0]->file_size(); |
| ASSERT_GT(new_size, orig_size); |
| |
| ASSERT_OK(log_->Close()); |
| } |
| |
| // Test that the reader can read from the log even if it hasn't been |
| // properly closed. |
| TEST_P(LogTestOptionalCompression, TestLogNotTrimmed) { |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid; |
| opid.set_term(0); |
| opid.set_index(1); |
| |
| AppendNoOp(&opid); |
| |
| SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| |
| LogEntries entries; |
| ASSERT_OK(segments[0]->ReadEntries(&entries)); |
| // Close after testing to ensure correct shutdown |
| // TODO(unknown): put this in TearDown() with a test on log state? |
| ASSERT_OK(log_->Close()); |
| } |
| |
| // Test that the reader will not fail if a log file is completely blank. |
| // This happens when it's opened but nothing has been written. |
| // The reader should gracefully handle this situation. See KUDU-140. |
| TEST_P(LogTestOptionalCompression, TestBlankLogFile) { |
| ASSERT_OK(BuildLog()); |
| |
| // The log's reader will have a segment... |
| ASSERT_EQ(log_->reader()->num_segments(), 1); |
| |
| // ...and we're able to read from it. |
| SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| |
| LogEntries entries; |
| ASSERT_OK(segments[0]->ReadEntries(&entries)); |
| |
| // ...It's just that it's empty. |
| ASSERT_TRUE(entries.empty()); |
| } |
| |
| void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place, |
| const Status& expected_status, int expected_entries) { |
| const int kNumEntries = 4; |
| ASSERT_OK(BuildLog()); |
| OpId op_id = MakeOpId(1, 1); |
| ASSERT_OK(AppendNoOps(&op_id, kNumEntries)); |
| |
| // Find the entry that we want to corrupt and get the active segment path |
| // before closing the log; both will be invalid after. |
| LogIndexEntry entry; |
| ASSERT_OK(log_->log_index_->GetEntry(4, &entry)); |
| string active_segment_path = log_->ActiveSegmentPathForTests(); |
| |
| ASSERT_OK(log_->Close()); |
| |
| // Corrupt the log as specified. |
| int offset; |
| switch (place) { |
| case IN_HEADER: |
| offset = entry.offset_in_segment + 1; |
| break; |
| case IN_ENTRY: |
| offset = entry.offset_in_segment + kEntryHeaderSizeV2 + 1; |
| break; |
| default: |
| LOG(FATAL) << "unreachable"; |
| } |
| ASSERT_OK(CorruptLogFile(env_, active_segment_path, type, offset)); |
| |
| // Open a new reader -- we don't reuse the existing LogReader from log_ |
| // because it has a cached header. |
| shared_ptr<LogReader> reader; |
| ASSERT_OK(LogReader::Open(fs_manager_.get(), |
| make_scoped_refptr(new LogIndex(env_, |
| file_cache_.get(), |
| log_->log_dir_)), |
| kTestTablet, |
| metric_entity_tablet_, |
| file_cache_.get(), |
| &reader)); |
| ASSERT_EQ(1, reader->num_segments()); |
| |
| SegmentSequence segments; |
| reader->GetSegmentsSnapshot(&segments); |
| Status s = segments[0]->ReadEntries(&entries_); |
| ASSERT_EQ(s.CodeAsString(), expected_status.CodeAsString()) |
| << "Got unexpected status: " << s.ToString(); |
| |
| // Last entry is ignored, but we should still see the previous ones. |
| ASSERT_EQ(expected_entries, entries_.size()); |
| } |
| // Tests that the log reader reads up until some truncated entry is found. |
| // It should still return OK, since on a crash, it's acceptable to have |
| // a partial entry at EOF. |
| TEST_P(LogTestOptionalCompression, TestTruncateLogInEntry) { |
| DoCorruptionTest(TRUNCATE_FILE, IN_ENTRY, Status::OK(), 3); |
| } |
| |
| // Same, but truncate in the middle of the header of that entry. |
| TEST_P(LogTestOptionalCompression, TestTruncateLogInHeader) { |
| DoCorruptionTest(TRUNCATE_FILE, IN_HEADER, Status::OK(), 3); |
| } |
| |
| // Similar to the above, except flips a byte. In this case, it should return |
| // a Corruption instead of an OK, because we still have a valid footer in |
| // the file (indicating that all of the entries should be valid as well). |
| TEST_P(LogTestOptionalCompression, TestCorruptLogInEntry) { |
| DoCorruptionTest(FLIP_BYTE, IN_ENTRY, Status::Corruption(""), 3); |
| } |
| |
| // Same, but corrupt in the middle of the header of that entry. |
| TEST_P(LogTestOptionalCompression, TestCorruptLogInHeader) { |
| DoCorruptionTest(FLIP_BYTE, IN_HEADER, Status::Corruption(""), 3); |
| } |
| |
| // Tests that segments roll over when max segment size is reached |
| // and that the player plays all entries in the correct order. |
| TEST_P(LogTestOptionalCompression, TestSegmentRollover) { |
| ASSERT_OK(BuildLog()); |
| // Set a small segment size so that we have roll overs. |
| log_->SetMaxSegmentSizeForTests(990); |
| const int kNumEntriesPerBatch = 100; |
| |
| OpId op_id = MakeOpId(1, 1); |
| int num_entries = 0; |
| |
| SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| |
| while (segments.size() < 3) { |
| ASSERT_OK(AppendNoOps(&op_id, kNumEntriesPerBatch)); |
| num_entries += kNumEntriesPerBatch; |
| // Update the segments |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| } |
| |
| ASSERT_FALSE(segments.back()->HasFooter()); |
| ASSERT_OK(log_->Close()); |
| |
| shared_ptr<LogReader> reader; |
| ASSERT_OK(LogReader::Open(fs_manager_.get(), |
| /*index*/nullptr, |
| kTestTablet, |
| metric_entity_tablet_, |
| file_cache_.get(), |
| &reader)); |
| reader->GetSegmentsSnapshot(&segments); |
| |
| ASSERT_TRUE(segments.back()->HasFooter()); |
| |
| for (const scoped_refptr<ReadableLogSegment>& entry : segments) { |
| Status s = entry->ReadEntries(&entries_); |
| if (!s.ok()) { |
| FAIL() << "Failed to read entries in segment: " << entry->path() |
| << ". Status: " << s.ToString() |
| << ".\nSegments: " << DumpSegmentsToString(segments); |
| } |
| } |
| |
| ASSERT_EQ(num_entries, entries_.size()); |
| } |
| |
| TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) { |
| FLAGS_log_compression_codec = "no_compression"; |
| |
| const int kNumEntries = 4; |
| ASSERT_OK(BuildLog()); |
| |
| SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(segments.size(), 1); |
| scoped_refptr<ReadableLogSegment> readable_segment = segments[0]; |
| |
| int header_size = log_->segment_allocator_.active_segment_->written_offset(); |
| ASSERT_GT(header_size, 0); |
| readable_segment->UpdateReadableToOffset(header_size); |
| |
| // Reading the readable segment now should return OK but yield no |
| // entries. |
| LogEntries entries; |
| ASSERT_OK(readable_segment->ReadEntries(&entries)); |
| ASSERT_TRUE(entries.empty()); |
| |
| // Dummy add_entry to help us estimate the size of what |
| // gets written to disk. |
| LogEntryBatchPB batch; |
| OpId op_id = MakeOpId(1, 1); |
| LogEntryPB* log_entry = batch.add_entry(); |
| log_entry->set_type(REPLICATE); |
| ReplicateMsg* repl = log_entry->mutable_replicate(); |
| repl->mutable_id()->CopyFrom(op_id); |
| repl->set_op_type(NO_OP); |
| repl->set_timestamp(0L); |
| |
| // Entries are prefixed with a header. |
| int64_t single_entry_size = batch.ByteSizeLong() + kEntryHeaderSizeV2; |
| |
| size_t written_entries_size = header_size; |
| ASSERT_OK(AppendNoOps(&op_id, kNumEntries, &written_entries_size)); |
| ASSERT_EQ(single_entry_size * kNumEntries + header_size, written_entries_size); |
| ASSERT_EQ(written_entries_size, log_->segment_allocator_.active_segment_->written_offset()); |
| |
| // Updating the readable segment with the offset of the first entry should |
| // make it read a single entry even though there are several in the log. |
| readable_segment->UpdateReadableToOffset(header_size + single_entry_size); |
| entries.clear(); |
| ASSERT_OK(readable_segment->ReadEntries(&entries)); |
| ASSERT_EQ(1, entries.size()); |
| |
| // Now append another entry so that the Log sets the correct readable offset |
| // on the reader. |
| ASSERT_OK(AppendNoOps(&op_id, 1, &written_entries_size)); |
| |
| // Now the reader should be able to read all 5 entries. |
| entries.clear(); |
| ASSERT_OK(readable_segment->ReadEntries(&entries)); |
| ASSERT_EQ(5, entries.size()); |
| |
| // Offset should get updated for an additional entry. |
| ASSERT_EQ(single_entry_size * (kNumEntries + 1) + header_size, |
| written_entries_size); |
| ASSERT_EQ(written_entries_size, log_->segment_allocator_.active_segment_->written_offset()); |
| |
| // When we roll it should go back to the header size. |
| ASSERT_OK(log_->AllocateSegmentAndRollOverForTests()); |
| ASSERT_EQ(header_size, log_->segment_allocator_.active_segment_->written_offset()); |
| written_entries_size = header_size; |
| |
| // Now that we closed the original segment. If we get a segment from the reader |
| // again, we should get one with a footer and we should be able to read all entries. |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(2, segments.size()); |
| readable_segment = segments[0]; |
| entries.clear(); |
| ASSERT_OK(readable_segment->ReadEntries(&entries)); |
| ASSERT_EQ(5, entries.size()); |
| |
| // Offset should get updated for an additional entry, again. |
| ASSERT_OK(AppendNoOp(&op_id, &written_entries_size)); |
| ASSERT_EQ(single_entry_size + header_size, written_entries_size); |
| ASSERT_EQ(written_entries_size, log_->segment_allocator_.active_segment_->written_offset()); |
| } |
| |
| // Tests that segments can be GC'd while the log is running. |
| TEST_P(LogTestOptionalCompression, TestGCWithLogRunning) { |
| FLAGS_log_min_segments_to_retain = 2; |
| ASSERT_OK(BuildLog()); |
| |
| vector<LogAnchor*> anchors; |
| ElementDeleter deleter(&anchors); |
| |
| SegmentSequence segments; |
| |
| const int kNumTotalSegments = 4; |
| const int kNumOpsPerSegment = 5; |
| int num_gced_segments; |
| OpId op_id = MakeOpId(1, 1); |
| |
| ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment, |
| &op_id, &anchors)); |
| |
| // We should get 4 anchors, each pointing at the beginning of a new segment |
| ASSERT_EQ(anchors.size(), 4); |
| |
| // Anchors should prevent GC. |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(4, segments.size()) << DumpSegmentsToString(segments); |
| RetentionIndexes retention; |
| ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&retention.for_durability)); |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| ASSERT_EQ(0, num_gced_segments); |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(4, segments.size()) << DumpSegmentsToString(segments); |
| |
| // Logs should be retained for durability even if this puts it above the |
| // maximum configured number of segments. |
| { |
| google::FlagSaver saver; |
| FLAGS_log_min_segments_to_retain = 1; |
| FLAGS_log_max_segments_to_retain = 1; |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| ASSERT_EQ(0, num_gced_segments); |
| } |
| |
| // Freeing the first 2 anchors should allow GC of them. |
| ASSERT_OK(log_anchor_registry_->Unregister(anchors[0])); |
| ASSERT_OK(log_anchor_registry_->Unregister(anchors[1])); |
| ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&retention.for_durability)); |
| // We should now be anchored on op 0.11, i.e. on the 3rd segment |
| ASSERT_EQ(anchors[2]->log_index, retention.for_durability); |
| |
| // However, first, we'll try bumping the min retention threshold and |
| // verify that we don't GC any. |
| { |
| google::FlagSaver saver; |
| FLAGS_log_min_segments_to_retain = 10; |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| ASSERT_EQ(0, num_gced_segments); |
| } |
| |
| // Try again without the modified flag. |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| ASSERT_EQ(2, num_gced_segments) << DumpSegmentsToString(segments); |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments); |
| |
| // Release the remaining "rolled segment" anchor. GC will not delete the |
| // last rolled segment. |
| ASSERT_OK(log_anchor_registry_->Unregister(anchors[2])); |
| ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&retention.for_durability)); |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| ASSERT_EQ(0, num_gced_segments) << DumpSegmentsToString(segments); |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments); |
| |
| // Check that we get a NotFound if we try to read before the GCed point. |
| { |
| vector<ReplicateMsg*> repls; |
| ElementDeleter d(&repls); |
| Status s = log_->reader()->ReadReplicatesInRange( |
| 1, 2, LogReader::kNoSizeLimit, &repls); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| } |
| |
| ASSERT_OK(log_->Close()); |
| NO_FATALS(CheckRightNumberOfSegmentFiles(2)); |
| |
| // We skip the first three, since we unregistered them above. |
| for (int i = 3; i < kNumTotalSegments; i++) { |
| ASSERT_OK(log_anchor_registry_->Unregister(anchors[i])); |
| } |
| } |
| |
| // Test that, when we are set to retain a given number of log segments, |
| // we also retain any relevant log index chunks, even if those operations |
| // are not necessary for recovery. |
| TEST_P(LogTestOptionalCompression, TestGCOfIndexChunks) { |
| FLAGS_log_min_segments_to_retain = 4; |
| ASSERT_OK(BuildLog()); |
| |
| // Append some segments which cross from one index chunk into another. |
| // 999990-999994 \___ the first index |
| // 999995-999999 / chunk points to these |
| // 1000000-100004 \_ |
| // 1000005-100009 _|- the second index chunk points to these |
| // 1000010-<still open> / |
| const int kNumTotalSegments = 5; |
| const int kNumOpsPerSegment = 5; |
| OpId op_id = MakeOpId(1, 999990); |
| ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment, |
| &op_id, nullptr)); |
| |
| // Run a GC on an op in the second index chunk. We should remove only the |
| // earliest segment, because we are set to retain 4. |
| int num_gced_segments = 0; |
| ASSERT_OK(log_->GC(RetentionIndexes(1000006, 1000006), &num_gced_segments)); |
| ASSERT_EQ(1, num_gced_segments); |
| |
| // And we should still be able to read ops in the retained segment, even though |
| // the GC index was higher. |
| OpId loaded_op; |
| ASSERT_OK(log_->reader()->LookupOpId(999995, &loaded_op)); |
| ASSERT_EQ("1.999995", OpIdToString(loaded_op)); |
| |
| // If we drop the retention count down to 1, we can now GC, and the log index |
| // chunk should also be GCed. |
| FLAGS_log_min_segments_to_retain = 1; |
| ASSERT_OK(log_->GC(RetentionIndexes(1000003, 1000003), &num_gced_segments)); |
| ASSERT_EQ(1, num_gced_segments); |
| |
| Status s = log_->reader()->LookupOpId(999995, &loaded_op); |
| ASSERT_TRUE(s.IsNotFound()) << "unexpected status: " << s.ToString(); |
| } |
| |
| // Tests that we can append FLUSH_MARKER messages to the log queue to make sure |
| // all messages up to a certain point were fsync()ed without actually |
| // writing them to the log. |
| TEST_P(LogTestOptionalCompression, TestWaitUntilAllFlushed) { |
| ASSERT_OK(BuildLog()); |
| // Append 2 replicate/commit pairs asynchronously |
| ASSERT_OK(AppendReplicateBatchAndCommitEntryPairsToLog(2, APPEND_ASYNC)); |
| |
| ASSERT_OK(log_->WaitUntilAllFlushed()); |
| |
| // Make sure we only get 4 entries back and that no FLUSH_MARKER commit is found. |
| vector<scoped_refptr<ReadableLogSegment> > segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| |
| ASSERT_OK(segments[0]->ReadEntries(&entries_)); |
| ASSERT_EQ(4, entries_.size()); |
| for (int i = 0; i < 4 ; i++) { |
| if (i % 2 == 0) { |
| ASSERT_TRUE(entries_[i]->has_replicate()); |
| } else { |
| ASSERT_TRUE(entries_[i]->has_commit()); |
| ASSERT_EQ(WRITE_OP, entries_[i]->commit().op_type()); |
| } |
| } |
| } |
| |
| // Tests log reopening and that GC'ing the old log's segments works. |
| TEST_P(LogTestOptionalCompression, TestLogReopenAndGC) { |
| ASSERT_OK(BuildLog()); |
| |
| SegmentSequence segments; |
| |
| vector<LogAnchor*> anchors; |
| ElementDeleter deleter(&anchors); |
| |
| const int kNumTotalSegments = 3; |
| const int kNumOpsPerSegment = 5; |
| int num_gced_segments; |
| OpId op_id = MakeOpId(1, 1); |
| ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment, |
| &op_id, &anchors)); |
| // Anchors should prevent GC. |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(3, segments.size()); |
| RetentionIndexes retention; |
| ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&retention.for_durability)); |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(3, segments.size()); |
| |
| ASSERT_OK(log_->Close()); |
| |
| // Now reopen the log as if we had replayed the state into the stores. |
| // that were in memory and do GC. |
| ASSERT_OK(BuildLog()); |
| |
| // The "old" data consists of 3 segments. We still hold anchors. |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(4, segments.size()); |
| |
| // Write to a new log segment, as if we had taken new requests and the |
| // mem stores are holding anchors, but don't roll it. |
| CreateAndRegisterNewAnchor(op_id.index(), &anchors); |
| ASSERT_OK(AppendNoOps(&op_id, kNumOpsPerSegment)); |
| |
| // Now release the "old" anchors and GC them. |
| for (int i = 0; i < 3; i++) { |
| ASSERT_OK(log_anchor_registry_->Unregister(anchors[i])); |
| } |
| ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&retention.for_durability)); |
| |
| // If we set the 'for_peers' index to indicate that these log |
| // segments are needed for catchup, that will prevent GC, |
| // even though they're no longer necessary for durability. |
| retention.for_peers = 0; |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| ASSERT_EQ(0, num_gced_segments); |
| NO_FATALS(CheckRightNumberOfSegmentFiles(4)); |
| |
| // Set the max segments to retain so that, even though we have peers who need |
| // the segments, we'll GC them. |
| FLAGS_log_max_segments_to_retain = 2; |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| ASSERT_EQ(2, num_gced_segments); |
| |
| // After GC there should be only one left, besides the one currently being |
| // written to. That is because min_segments_to_retain defaults to 2. |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments); |
| ASSERT_OK(log_->Close()); |
| |
| NO_FATALS(CheckRightNumberOfSegmentFiles(2)); |
| |
| // Unregister the final anchor. |
| ASSERT_OK(log_anchor_registry_->Unregister(anchors[3])); |
| } |
| |
| // Helper to measure the performance of the log. |
| TEST_P(LogTestOptionalCompression, TestWriteManyBatches) { |
| uint64_t num_batches = 10; |
| if (AllowSlowTests()) { |
| num_batches = FLAGS_num_batches; |
| } |
| ASSERT_OK(BuildLog()); |
| |
| LOG(INFO)<< "Starting to write " << num_batches << " to log"; |
| LOG_TIMING(INFO, "Wrote all batches to log") { |
| ASSERT_OK(AppendReplicateBatchAndCommitEntryPairsToLog(num_batches)); |
| } |
| ASSERT_OK(log_->Close()); |
| LOG(INFO) << "Done writing"; |
| |
| LOG_TIMING(INFO, "Read all entries from Log") { |
| LOG(INFO) << "Starting to read log"; |
| uint32_t num_entries = 0; |
| |
| vector<scoped_refptr<ReadableLogSegment> > segments; |
| |
| shared_ptr<LogReader> reader; |
| ASSERT_OK(LogReader::Open(fs_manager_.get(), |
| /*index*/nullptr, |
| kTestTablet, |
| metric_entity_tablet_, |
| file_cache_.get(), |
| &reader)); |
| reader->GetSegmentsSnapshot(&segments); |
| |
| for (const scoped_refptr<ReadableLogSegment>& entry : segments) { |
| entries_.clear(); |
| ASSERT_OK(entry->ReadEntries(&entries_)); |
| num_entries += entries_.size(); |
| } |
| ASSERT_EQ(num_entries, num_batches * 2); |
| LOG(INFO) << "End readfile"; |
| } |
| } |
| |
| // This tests that querying LogReader works. |
| // This sets up a reader with some segments to query which amount to the |
| // following: |
| // seg002: 0.10 through 0.19 |
| // seg003: 0.20 through 0.29 |
| // seg004: 0.30 through 0.39 |
| TEST_P(LogTestOptionalCompression, TestLogReader) { |
| LogReader reader(env_, |
| /*index*/nullptr, |
| kTestTablet, |
| metric_entity_tablet_, |
| file_cache_.get()); |
| reader.InitEmptyReaderForTests(); |
| ASSERT_OK(AppendNewEmptySegmentToReader(2, 10, &reader)); |
| ASSERT_OK(AppendNewEmptySegmentToReader(3, 20, &reader)); |
| ASSERT_OK(AppendNewEmptySegmentToReader(4, 30, &reader)); |
| |
| OpId op; |
| op.set_term(0); |
| SegmentSequence segments; |
| |
| // Queries for specific segment sequence numbers. |
| scoped_refptr<ReadableLogSegment> segment = reader.GetSegmentBySequenceNumber(2); |
| ASSERT_EQ(2, segment->header().sequence_number()); |
| segment = reader.GetSegmentBySequenceNumber(3); |
| ASSERT_EQ(3, segment->header().sequence_number()); |
| |
| segment = reader.GetSegmentBySequenceNumber(4); |
| ASSERT_EQ(4, segment->header().sequence_number()); |
| |
| segment = reader.GetSegmentBySequenceNumber(5); |
| ASSERT_TRUE(segment.get() == nullptr); |
| } |
| |
| // Test that, even if the LogReader's index is empty because no segments |
| // have been properly closed, we can still read the entries as the reader |
| // returns the current segment. |
| TEST_P(LogTestOptionalCompression, TestLogReaderReturnsLatestSegmentIfIndexEmpty) { |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid = MakeOpId(1, 1); |
| ASSERT_OK(AppendCommit(opid, APPEND_ASYNC)); |
| ASSERT_OK(AppendReplicateBatch(opid, APPEND_SYNC)); |
| |
| SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(segments.size(), 1); |
| |
| LogEntries entries; |
| ASSERT_OK(segments[0]->ReadEntries(&entries)); |
| ASSERT_EQ(2, entries.size()); |
| } |
| |
| TEST_F(LogTest, TestOpIdUtils) { |
| OpId id = MakeOpId(1, 2); |
| ASSERT_EQ("1.2", consensus::OpIdToString(id)); |
| ASSERT_EQ(1, id.term()); |
| ASSERT_EQ(2, id.index()); |
| } |
| |
| std::ostream& operator<<(std::ostream& os, const TestLogSequenceElem& elem) { |
| switch (elem.type) { |
| case TestLogSequenceElem::ROLL: |
| os << "ROLL"; |
| break; |
| case TestLogSequenceElem::REPLICATE: |
| os << "R" << elem.id; |
| break; |
| case TestLogSequenceElem::COMMIT: |
| os << "C" << elem.id; |
| break; |
| } |
| return os; |
| } |
| |
| // Generates a plausible sequence of items in the log, including term changes, moving the |
| // index backwards, log rolls, etc. |
| // |
| // NOTE: this log sequence may contain some aberrations which would not occur in a real |
| // consensus log, but our API supports them. In the future we may want to add assertions |
| // to the Log implementation that prevent such aberrations, in which case we'd need to |
| // modify this. |
| void LogTest::GenerateTestSequence(Random* rng, int seq_len, |
| vector<TestLogSequenceElem>* ops, |
| vector<int64_t>* terms_by_index) { |
| terms_by_index->assign(seq_len + 1L, -1L); |
| int64_t committed_index = 0; |
| int64_t max_repl_index = 0; |
| |
| OpId id = MakeOpId(1, 0); |
| for (int i = 0; i < seq_len; i++) { |
| if (rng->OneIn(5)) { |
| // Reset term - it may stay the same, or go up/down |
| id.set_term(std::max(static_cast<int64_t>(1), id.term() + rng->Uniform(5) - 2)); |
| } |
| |
| // Advance index by exactly one |
| id.set_index(id.index() + 1); |
| |
| if (rng->OneIn(5)) { |
| // Move index backward a bit, but not past the committed index |
| id.set_index(std::max(committed_index + 1, id.index() - rng->Uniform(5))); |
| } |
| |
| // Roll the log sometimes |
| if (i != 0 && rng->OneIn(15)) { |
| TestLogSequenceElem op; |
| op.type = TestLogSequenceElem::ROLL; |
| ops->push_back(op); |
| } |
| |
| TestLogSequenceElem op; |
| op.type = TestLogSequenceElem::REPLICATE; |
| op.id = id; |
| ops->push_back(op); |
| (*terms_by_index)[id.index()] = id.term(); |
| max_repl_index = std::max(max_repl_index, id.index()); |
| |
| // Advance the commit index sometimes |
| if (rng->OneIn(5)) { |
| while (committed_index < id.index()) { |
| committed_index++; |
| TestLogSequenceElem op; |
| op.type = TestLogSequenceElem::COMMIT; |
| op.id = MakeOpId((*terms_by_index)[committed_index], committed_index); |
| ops->push_back(op); |
| } |
| } |
| } |
| terms_by_index->resize(max_repl_index + 1); |
| } |
| |
| void LogTest::AppendTestSequence(const vector<TestLogSequenceElem>& seq) { |
| for (const TestLogSequenceElem& e : seq) { |
| VLOG(1) << "Appending: " << e; |
| switch (e.type) { |
| case TestLogSequenceElem::REPLICATE: |
| { |
| OpId id(e.id); |
| ASSERT_OK(AppendNoOp(&id)); |
| break; |
| } |
| case TestLogSequenceElem::COMMIT: |
| { |
| CommitMsg commit; |
| commit.set_op_type(NO_OP); |
| commit.mutable_commited_op_id()->CopyFrom(e.id); |
| Synchronizer s; |
| ASSERT_OK(log_->AsyncAppendCommit(commit, s.AsStatusCallback())); |
| ASSERT_OK(s.Wait()); |
| } |
| case TestLogSequenceElem::ROLL: |
| { |
| ASSERT_OK(RollLog()); |
| } |
| } |
| } |
| } |
| |
| static int RandInRange(Random* r, int min_inclusive, int max_inclusive) { |
| int width = max_inclusive - min_inclusive + 1; |
| return min_inclusive + r->Uniform(width); |
| } |
| |
| // Test that if multiple REPLICATE entries are written for the same index, |
| // that we read the latest one. |
| // |
| // This is a randomized test: we generate a plausible sequence of log messages, |
| // write it out, and then read random ranges of log indexes, making sure we |
| // always see the correct term for each REPLICATE message (i.e whichever term |
| // was the last to append it). |
| TEST_P(LogTestOptionalCompression, TestReadLogWithReplacedReplicates) { |
| const int kSequenceLength = AllowSlowTests() ? 1000 : 50; |
| |
| Random rng(SeedRandom()); |
| vector<int64_t> terms_by_index; |
| vector<TestLogSequenceElem> seq; |
| GenerateTestSequence(&rng, kSequenceLength, &seq, &terms_by_index); |
| LOG(INFO) << "test sequence: " << seq; |
| const int64_t max_repl_index = terms_by_index.size() - 1; |
| LOG(INFO) << "max_repl_index: " << max_repl_index; |
| |
| // Write the test sequence to the log. |
| // TODO: should consider adding batching here of multiple replicates |
| ASSERT_OK(BuildLog()); |
| AppendTestSequence(seq); |
| |
| const int kNumRandomReads = 100; |
| |
| // We'll advance 'gc_index' randomly through the log until we've gotten to |
| // the end. This ensures that, when we GC, we don't ever remove the latest |
| // version of a replicate message unintentionally. |
| shared_ptr<LogReader> reader = log_->reader(); |
| for (int gc_index = 1; gc_index < max_repl_index;) { |
| SCOPED_TRACE(Substitute("after GCing $0", gc_index)); |
| |
| // Test reading random ranges of indexes and verifying that we get back the |
| // REPLICATE messages with the correct terms |
| for (int random_read = 0; random_read < kNumRandomReads; random_read++) { |
| int start_index = RandInRange(&rng, gc_index, max_repl_index - 1); |
| int end_index = RandInRange(&rng, start_index, max_repl_index); |
| { |
| SCOPED_TRACE(Substitute("Reading $0-$1", start_index, end_index)); |
| vector<ReplicateMsg*> repls; |
| ElementDeleter d(&repls); |
| ASSERT_OK(log_->reader()->ReadReplicatesInRange( |
| start_index, end_index, LogReader::kNoSizeLimit, &repls)); |
| ASSERT_EQ(end_index - start_index + 1, repls.size()); |
| int expected_index = start_index; |
| for (const ReplicateMsg* repl : repls) { |
| ASSERT_EQ(expected_index, repl->id().index()); |
| ASSERT_EQ(terms_by_index[expected_index], repl->id().term()); |
| expected_index++; |
| } |
| } |
| |
| int64_t bytes_read = reader->bytes_read_->value(); |
| int64_t entries_read = reader->entries_read_->value(); |
| int64_t read_batch_count = reader->read_batch_latency_->TotalCount(); |
| EXPECT_GT(reader->bytes_read_->value(), 0); |
| EXPECT_GT(reader->entries_read_->value(), 0); |
| EXPECT_GT(reader->read_batch_latency_->TotalCount(), 0); |
| |
| // Test a size-limited read. |
| int size_limit = RandInRange(&rng, 1, 1000); |
| { |
| SCOPED_TRACE(Substitute("Reading $0-$1 with size limit $2", |
| start_index, end_index, size_limit)); |
| vector<ReplicateMsg*> repls; |
| ElementDeleter d(&repls); |
| ASSERT_OK(reader->ReadReplicatesInRange(start_index, end_index, size_limit, &repls)); |
| ASSERT_LE(repls.size(), end_index - start_index + 1); |
| size_t total_size = 0; |
| int expected_index = start_index; |
| for (const ReplicateMsg* repl : repls) { |
| ASSERT_EQ(expected_index, repl->id().index()); |
| ASSERT_EQ(terms_by_index[expected_index], repl->id().term()); |
| expected_index++; |
| total_size += repl->SpaceUsedLong(); |
| } |
| if (total_size > size_limit) { |
| ASSERT_EQ(1, repls.size()); |
| } else { |
| ASSERT_LE(total_size, size_limit); |
| } |
| } |
| |
| EXPECT_GT(reader->bytes_read_->value(), bytes_read); |
| EXPECT_GT(reader->entries_read_->value(), entries_read); |
| EXPECT_GT(reader->read_batch_latency_->TotalCount(), read_batch_count); |
| } |
| |
| int num_gced = 0; |
| ASSERT_OK(log_->GC(RetentionIndexes(gc_index), &num_gced)); |
| gc_index += rng.Uniform(10); |
| } |
| } |
| |
| // Ensure that we can read replicate messages from the LogReader with a very |
| // high (> 32 bit) log index and term. Regression test for KUDU-1933. |
| TEST_P(LogTestOptionalCompression, TestReadReplicatesHighIndex) { |
| const int64_t first_log_index = std::numeric_limits<int32_t>::max() - 3; |
| const int kSequenceLength = 10; |
| |
| ASSERT_OK(BuildLog()); |
| OpId op_id; |
| op_id.set_term(first_log_index); |
| op_id.set_index(first_log_index); |
| ASSERT_OK(AppendNoOps(&op_id, kSequenceLength)); |
| |
| shared_ptr<LogReader> reader = log_->reader(); |
| vector<ReplicateMsg*> replicates; |
| ElementDeleter deleter(&replicates); |
| ASSERT_OK(reader->ReadReplicatesInRange(first_log_index, first_log_index + kSequenceLength - 1, |
| LogReader::kNoSizeLimit, &replicates)); |
| ASSERT_EQ(kSequenceLength, replicates.size()); |
| ASSERT_GT(op_id.index(), std::numeric_limits<int32_t>::max()); |
| } |
| |
| // Test various situations where we expect different segments depending on what the |
| // min log index is. |
| TEST_F(LogTest, TestGetGCableDataSize) { |
| FLAGS_log_compression_codec = "no_compression"; |
| FLAGS_log_min_segments_to_retain = 2; |
| ASSERT_OK(BuildLog()); |
| |
| const int kNumTotalSegments = 5; |
| const int kNumOpsPerSegment = 5; |
| const int kSegmentSizeBytes = 331; |
| OpId op_id = MakeOpId(1, 10); |
| // Create 5 segments, starting from log index 10, with 5 ops per segment. |
| // [10-14], [15-19], [20-24], [25-29], [30-34] |
| ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment, |
| &op_id, nullptr)); |
| |
| // GCing through the first op should not be able to remove any logs. |
| EXPECT_EQ(0, log_->GetGCableDataSize(RetentionIndexes(10))); |
| |
| // Check that even when the min index is the last index from the oldest segment, |
| // we still return the same result. |
| EXPECT_EQ(0, log_->GetGCableDataSize(RetentionIndexes(14))); |
| |
| // Check that if the first segment is GCable, we return its size. |
| EXPECT_EQ(kSegmentSizeBytes, log_->GetGCableDataSize(RetentionIndexes(15))); |
| |
| // GCable index at the end of the third segment. Should only be able to GC the first |
| // two. |
| EXPECT_EQ(kSegmentSizeBytes * 2, log_->GetGCableDataSize(RetentionIndexes(24))); |
| |
| // GCing through the first op in the fourth segment should be able to remove |
| // the first three. |
| EXPECT_EQ(kSegmentSizeBytes * 3, log_->GetGCableDataSize(RetentionIndexes(25))); |
| |
| // Even if we could GC all of the ops written, we should respect the 'log_min_segments_to_retain' |
| // setting and not GC the last two. |
| EXPECT_EQ(kSegmentSizeBytes * 3, log_->GetGCableDataSize(RetentionIndexes(35))); |
| |
| // If we change the configuration, we should be able to GC all of the closed segments. |
| // The last segment is not GCable because it is still open. |
| FLAGS_log_min_segments_to_retain = 0; |
| EXPECT_EQ(kSegmentSizeBytes * 4, log_->GetGCableDataSize(RetentionIndexes(35))); |
| } |
| |
| // Regression test. Check that failed preallocation returns an error instead of |
| // hanging. |
| TEST_F(LogTest, TestFailedLogPreAllocation) { |
| options_.async_preallocate_segments = false; |
| ASSERT_OK(BuildLog()); |
| |
| log_->SetMaxSegmentSizeForTests(1); |
| FLAGS_log_inject_io_error_on_preallocate_fraction = 1.0; |
| OpId opid = MakeOpId(1, 1); |
| Status s = AppendNoOp(&opid); |
| ASSERT_TRUE(s.IsIOError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Injected IOError"); |
| } |
| |
| // Test the enforcement of reserving disk space for the log. |
| TEST_F(LogTest, TestDiskSpaceCheck) { |
| FLAGS_fs_wal_dir_reserved_bytes = 1; // Keep at least 1 byte reserved in the FS. |
| FLAGS_disk_reserved_bytes_free_for_testing = 0; |
| options_.segment_size_mb = 1; |
| Status s = BuildLog(); |
| ASSERT_TRUE(s.IsIOError()); |
| ASSERT_EQ(ENOSPC, s.posix_code()); |
| ASSERT_STR_CONTAINS(s.ToString(), "Insufficient disk space"); |
| |
| FLAGS_disk_reserved_bytes_free_for_testing = 2 * 1024 * 1024; |
| ASSERT_OK(BuildLog()); |
| |
| // TODO: We don't currently do bookkeeping to ensure that we check if the |
| // disk is past its quota if we write beyond the preallocation limit for a |
| // single segment. If we did that, we could ensure that we check once we |
| // detect that we are past the preallocation limit. |
| } |
| |
| // Test that the append thread shuts itself down after it's idle. |
| TEST_F(LogTest, TestAutoStopIdleAppendThread) { |
| ASSERT_OK(BuildLog()); |
| OpId opid = MakeOpId(1, 1); |
| |
| // Append something to the queue and ensure that the thread starts itself. |
| // We loop here in case for some reason this thread gets de-scheduled just |
| // after the append long enough for the append thread to shut itself down |
| // again. |
| ASSERT_EVENTUALLY([&]() { |
| AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2); |
| ASSERT_TRUE(log_->append_thread_active_for_tests()); |
| debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; |
| ASSERT_GT(log_->segment_allocator_.active_segment_->compress_buf_.capacity(), |
| faststring::kInitialCapacity); |
| }); |
| // After some time, the append thread should shut itself down. |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_FALSE(log_->append_thread_active_for_tests()); |
| }); |
| |
| // The log should free its buffer once it is idle. |
| { |
| debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; |
| ASSERT_EQ(faststring::kInitialCapacity, |
| log_->segment_allocator_.active_segment_->compress_buf_.capacity()); |
| } |
| } |
| |
| // Test that Log::TotalSize() captures creation, addition, and deletion of log segments. |
| TEST_P(LogTestOptionalCompression, TestTotalSize) { |
| // Build a log. There is an active segment, so on-disk size should be positive. |
| ASSERT_OK(BuildLog()); |
| int64_t one_segment_size = log_->OnDiskSize(); |
| ASSERT_GT(one_segment_size, 0); |
| |
| // Append entries and roll over to new segments. |
| vector<LogAnchor*> anchors; |
| ElementDeleter deleter(&anchors); |
| SegmentSequence segments; |
| const int kNumTotalSegments = 3; |
| const int kNumOpsPerSegment = 2; |
| |
| OpId op_id = MakeOpId(1, 1); |
| ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment, |
| &op_id, &anchors)); |
| ASSERT_EQ(3, anchors.size()); |
| |
| // Now that there's multiple segments, the total size should be larger than |
| // the one-segment size. |
| int64_t three_segment_size = log_->OnDiskSize(); |
| ASSERT_GT(three_segment_size, one_segment_size); |
| |
| // Free an anchor so we can GC the segment it points to. |
| RetentionIndexes retention; |
| ASSERT_OK(log_anchor_registry_->Unregister(anchors[0])); |
| ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&retention.for_durability)); |
| int num_gced_segments; |
| ASSERT_OK(log_->GC(retention, &num_gced_segments)); |
| ASSERT_EQ(1, num_gced_segments) << DumpSegmentsToString(segments); |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments); |
| |
| // Now we've added two segments and GC'd one, so the total size should be |
| // between the one-segment size and the three-segment size. |
| int64_t two_segment_size = log_->OnDiskSize(); |
| ASSERT_LT(two_segment_size, three_segment_size); |
| ASSERT_GT(two_segment_size, one_segment_size); |
| |
| // Cleanup: close the log and unregister the remaining registered anchors. |
| ASSERT_OK(log_->Close()); |
| for (int i = 1; i < kNumTotalSegments; i++) { |
| ASSERT_OK(log_anchor_registry_->Unregister(anchors[i])); |
| } |
| } |
| |
| } // namespace log |
| } // namespace kudu |