blob: 620e90d785636862f796ca5560db50585bc46a7f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include <gtest/gtest_prod.h>
#include "kudu/consensus/log_util.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/locks.h"
#include "kudu/util/make_shared.h"
#include "kudu/util/status.h"
namespace kudu {
class Counter;
class Env;
class FileCache;
class FsManager;
class Histogram;
class MetricEntity;
class faststring;
namespace consensus {
class OpId;
class ReplicateMsg;
} // namespace consensus
namespace log {
class LogEntryBatchPB;
class LogIndex;
struct LogIndexEntry;
// Reads a set of segments from a given path. Segment headers and footers are
// read and parsed, but entries are not.
//
// Once initialized, care should be taken to avoid mutating the segments in a
// non-thread-safe manner. E.g. segment mutation should be made thread-safe with
// locking primitives, or done via a copy-then-replace access pattern.
// UpdateLastSegmentOffset and ReplaceLastSegment are examples of each these
// patterns respectively.
//
// This class is thread safe.
class LogReader : public enable_make_shared<LogReader> {
public:
~LogReader();
// Opens a LogReader on the tablet log directory specified by
// 'tablet_wal_dir', and sets 'reader' to the newly created LogReader.
//
// 'index' may be NULL, but if it is, ReadReplicatesInRange() may not
// be used.
static Status Open(Env* env,
const std::string& tablet_wal_dir,
const scoped_refptr<LogIndex>& index,
const std::string& tablet_id,
const scoped_refptr<MetricEntity>& metric_entity,
FileCache* file_cache,
std::shared_ptr<LogReader>* reader);
// Same as above, but will use `fs_manager` to determine the default WAL dir
// for the tablet.
static Status Open(FsManager* fs_manager,
const scoped_refptr<LogIndex>& index,
const std::string& tablet_id,
const scoped_refptr<MetricEntity>& metric_entity,
FileCache* file_cache,
std::shared_ptr<LogReader>* reader);
// Return the minimum replicate index that is retained in the currently available
// logs. May return -1 if no replicates have been logged.
int64_t GetMinReplicateIndex() const;
// Return a readable segment with the given sequence number, or NULL if it
// cannot be found (e.g. if it has already been GCed).
scoped_refptr<ReadableLogSegment> GetSegmentBySequenceNumber(int64_t seq) const;
// Copies a snapshot of the current sequence of segments into 'segments'.
// 'segments' will be cleared first.
void GetSegmentsSnapshot(SegmentSequence* segments) const;
// Reads all ReplicateMsgs from 'starting_at' to 'up_to' both inclusive.
// The caller takes ownership of the returned ReplicateMsg objects.
//
// Will attempt to read no more than 'max_bytes_to_read', unless it is set to
// LogReader::kNoSizeLimit. If the size limit would prevent reading any operations at
// all, then will read exactly one operation.
//
// Requires that a LogIndex was passed into LogReader::Open().
Status ReadReplicatesInRange(
int64_t starting_at,
int64_t up_to,
int64_t max_bytes_to_read,
std::vector<consensus::ReplicateMsg*>* replicates) const;
static const int64_t kNoSizeLimit;
// Look up the OpId for the given operation index.
// Returns a bad Status if the log index fails to load (eg. due to an IO error).
Status LookupOpId(int64_t op_index, consensus::OpId* op_id) const;
// Returns the number of segments.
int num_segments() const;
std::string ToString() const;
protected:
LogReader(Env* env,
scoped_refptr<LogIndex> index,
std::string tablet_id,
const scoped_refptr<MetricEntity>& metric_entity,
FileCache* file_cache);
private:
FRIEND_TEST(LogTestOptionalCompression, TestLogReader);
FRIEND_TEST(LogTestOptionalCompression, TestReadLogWithReplacedReplicates);
friend class Log;
friend class LogTest;
friend class LogTestOptionalCompression;
enum State {
kLogReaderInitialized,
kLogReaderReading,
kLogReaderClosed
};
// Appends 'segment' to the segments available for read by this reader.
// Index entries in 'segment's footer will be added to the index.
// If the segment has no footer it will be scanned so this should not be used
// for new segments.
Status AppendSegment(scoped_refptr<ReadableLogSegment> segment);
// Same as above but for segments without any entries.
// Used by the Log to add "empty" segments.
void AppendEmptySegment(scoped_refptr<ReadableLogSegment> segment);
// Removes segments with sequence numbers less than or equal to
// 'segment_sequence_number' from this reader.
void TrimSegmentsUpToAndIncluding(int64_t segment_sequence_number);
// Replaces the last segment in the reader with 'segment'.
// Used to replace a segment that was still in the process of being written
// with its complete version which has a footer and index entries.
// Requires that the last segment in 'segments_' has the same sequence
// number as 'segment'.
// Expects 'segment' to be properly closed and to have footer.
void ReplaceLastSegment(scoped_refptr<ReadableLogSegment> segment);
// Appends 'segment' to the segment sequence.
// Assumes that the segment was scanned, if no footer was found.
// To be used only internally, clients of this class with private access (i.e. friends)
// should use the thread safe version, AppendSegment(), which will also scan the segment
// if no footer is present.
void AppendSegmentUnlocked(scoped_refptr<ReadableLogSegment> segment);
// Used by Log to update its LogReader on how far it is possible to read
// the current segment. Requires that the reader has at least one segment
// and that the last segment has no footer, meaning it is currently being
// written to.
void UpdateLastSegmentOffset(int64_t readable_to_offset);
// Read the LogEntryBatchPB pointed to by the provided index entry.
// 'tmp_buf' is used as scratch space to avoid extra allocation.
Status ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
faststring* tmp_buf,
LogEntryBatchPB* batch) const;
// Reads the headers of all segments in 'tablet_wal_path'.
Status Init(const std::string& tablet_wal_path);
// Initializes an 'empty' reader for tests, i.e. does not scan a path looking for segments.
void InitEmptyReaderForTests();
Env* env_;
FileCache* file_cache_;
const scoped_refptr<LogIndex> log_index_;
const std::string tablet_id_;
// Metrics
scoped_refptr<Counter> bytes_read_;
scoped_refptr<Counter> entries_read_;
scoped_refptr<Histogram> read_batch_latency_;
// The sequence of all current log segments in increasing sequence number
// order.
SegmentSequence segments_;
mutable simple_spinlock lock_;
State state_;
DISALLOW_COPY_AND_ASSIGN(LogReader);
};
} // namespace log
} // namespace kudu