| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #pragma once |
| |
| #include <cstdint> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include <gtest/gtest_prod.h> |
| |
| #include "kudu/consensus/log_util.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/make_shared.h" |
| #include "kudu/util/status.h" |
| |
| namespace kudu { |
| |
| class Counter; |
| class Env; |
| class FileCache; |
| class FsManager; |
| class Histogram; |
| class MetricEntity; |
| class faststring; |
| |
| namespace consensus { |
| class OpId; |
| class ReplicateMsg; |
| } // namespace consensus |
| |
| namespace log { |
| class LogEntryBatchPB; |
| class LogIndex; |
| struct LogIndexEntry; |
| |
| // Reads a set of segments from a given path. Segment headers and footers are |
| // read and parsed, but entries are not. |
| // |
| // Once initialized, care should be taken to avoid mutating the segments in a |
| // non-thread-safe manner. E.g. segment mutation should be made thread-safe with |
| // locking primitives, or done via a copy-then-replace access pattern. |
| // UpdateLastSegmentOffset and ReplaceLastSegment are examples of each these |
| // patterns respectively. |
| // |
| // This class is thread safe. |
| class LogReader : public enable_make_shared<LogReader> { |
| public: |
| ~LogReader(); |
| |
| // Opens a LogReader on the tablet log directory specified by |
| // 'tablet_wal_dir', and sets 'reader' to the newly created LogReader. |
| // |
| // 'index' may be NULL, but if it is, ReadReplicatesInRange() may not |
| // be used. |
| static Status Open(Env* env, |
| const std::string& tablet_wal_dir, |
| const scoped_refptr<LogIndex>& index, |
| const std::string& tablet_id, |
| const scoped_refptr<MetricEntity>& metric_entity, |
| FileCache* file_cache, |
| std::shared_ptr<LogReader>* reader); |
| |
| // Same as above, but will use `fs_manager` to determine the default WAL dir |
| // for the tablet. |
| static Status Open(FsManager* fs_manager, |
| const scoped_refptr<LogIndex>& index, |
| const std::string& tablet_id, |
| const scoped_refptr<MetricEntity>& metric_entity, |
| FileCache* file_cache, |
| std::shared_ptr<LogReader>* reader); |
| |
| // Return the minimum replicate index that is retained in the currently available |
| // logs. May return -1 if no replicates have been logged. |
| int64_t GetMinReplicateIndex() const; |
| |
| // Return a readable segment with the given sequence number, or NULL if it |
| // cannot be found (e.g. if it has already been GCed). |
| scoped_refptr<ReadableLogSegment> GetSegmentBySequenceNumber(int64_t seq) const; |
| |
| // Copies a snapshot of the current sequence of segments into 'segments'. |
| // 'segments' will be cleared first. |
| void GetSegmentsSnapshot(SegmentSequence* segments) const; |
| |
| // Reads all ReplicateMsgs from 'starting_at' to 'up_to' both inclusive. |
| // The caller takes ownership of the returned ReplicateMsg objects. |
| // |
| // Will attempt to read no more than 'max_bytes_to_read', unless it is set to |
| // LogReader::kNoSizeLimit. If the size limit would prevent reading any operations at |
| // all, then will read exactly one operation. |
| // |
| // Requires that a LogIndex was passed into LogReader::Open(). |
| Status ReadReplicatesInRange( |
| int64_t starting_at, |
| int64_t up_to, |
| int64_t max_bytes_to_read, |
| std::vector<consensus::ReplicateMsg*>* replicates) const; |
| static const int64_t kNoSizeLimit; |
| |
| // Look up the OpId for the given operation index. |
| // Returns a bad Status if the log index fails to load (eg. due to an IO error). |
| Status LookupOpId(int64_t op_index, consensus::OpId* op_id) const; |
| |
| // Returns the number of segments. |
| int num_segments() const; |
| |
| std::string ToString() const; |
| |
| protected: |
| LogReader(Env* env, |
| scoped_refptr<LogIndex> index, |
| std::string tablet_id, |
| const scoped_refptr<MetricEntity>& metric_entity, |
| FileCache* file_cache); |
| |
| private: |
| FRIEND_TEST(LogTestOptionalCompression, TestLogReader); |
| FRIEND_TEST(LogTestOptionalCompression, TestReadLogWithReplacedReplicates); |
| friend class Log; |
| friend class LogTest; |
| friend class LogTestOptionalCompression; |
| |
| enum State { |
| kLogReaderInitialized, |
| kLogReaderReading, |
| kLogReaderClosed |
| }; |
| |
| // Appends 'segment' to the segments available for read by this reader. |
| // Index entries in 'segment's footer will be added to the index. |
| // If the segment has no footer it will be scanned so this should not be used |
| // for new segments. |
| Status AppendSegment(scoped_refptr<ReadableLogSegment> segment); |
| |
| // Same as above but for segments without any entries. |
| // Used by the Log to add "empty" segments. |
| void AppendEmptySegment(scoped_refptr<ReadableLogSegment> segment); |
| |
| // Removes segments with sequence numbers less than or equal to |
| // 'segment_sequence_number' from this reader. |
| void TrimSegmentsUpToAndIncluding(int64_t segment_sequence_number); |
| |
| // Replaces the last segment in the reader with 'segment'. |
| // Used to replace a segment that was still in the process of being written |
| // with its complete version which has a footer and index entries. |
| // Requires that the last segment in 'segments_' has the same sequence |
| // number as 'segment'. |
| // Expects 'segment' to be properly closed and to have footer. |
| void ReplaceLastSegment(scoped_refptr<ReadableLogSegment> segment); |
| |
| // Appends 'segment' to the segment sequence. |
| // Assumes that the segment was scanned, if no footer was found. |
| // To be used only internally, clients of this class with private access (i.e. friends) |
| // should use the thread safe version, AppendSegment(), which will also scan the segment |
| // if no footer is present. |
| void AppendSegmentUnlocked(scoped_refptr<ReadableLogSegment> segment); |
| |
| // Used by Log to update its LogReader on how far it is possible to read |
| // the current segment. Requires that the reader has at least one segment |
| // and that the last segment has no footer, meaning it is currently being |
| // written to. |
| void UpdateLastSegmentOffset(int64_t readable_to_offset); |
| |
| // Read the LogEntryBatchPB pointed to by the provided index entry. |
| // 'tmp_buf' is used as scratch space to avoid extra allocation. |
| Status ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry, |
| faststring* tmp_buf, |
| LogEntryBatchPB* batch) const; |
| |
| // Reads the headers of all segments in 'tablet_wal_path'. |
| Status Init(const std::string& tablet_wal_path); |
| |
| // Initializes an 'empty' reader for tests, i.e. does not scan a path looking for segments. |
| void InitEmptyReaderForTests(); |
| |
| Env* env_; |
| FileCache* file_cache_; |
| const scoped_refptr<LogIndex> log_index_; |
| const std::string tablet_id_; |
| |
| // Metrics |
| scoped_refptr<Counter> bytes_read_; |
| scoped_refptr<Counter> entries_read_; |
| scoped_refptr<Histogram> read_batch_latency_; |
| |
| // The sequence of all current log segments in increasing sequence number |
| // order. |
| SegmentSequence segments_; |
| |
| mutable simple_spinlock lock_; |
| |
| State state_; |
| |
| DISALLOW_COPY_AND_ASSIGN(LogReader); |
| }; |
| |
| } // namespace log |
| } // namespace kudu |