blob: 909130870aeedf70cb1b5740d9d039ab5f71ce92 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_CONSENSUS_LOG_CACHE_H
#define KUDU_CONSENSUS_LOG_CACHE_H
#include <cstddef>
#include <cstdint>
#include <iosfwd>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <gtest/gtest_prod.h>
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
namespace kudu {
class MemTracker;
namespace log {
class Log;
} // namespace log
namespace consensus {
class OpId;
// Write-through cache for the log.
//
// This stores a set of log messages by their index. New operations
// can be appended to the end as they are written to the log. Readers
// fetch entries that were explicitly appended, or they can fetch older
// entries which are asynchronously fetched from the disk.
class LogCache {
public:
LogCache(const scoped_refptr<MetricEntity>& metric_entity,
scoped_refptr<log::Log> log,
std::string local_uuid,
std::string tablet_id);
~LogCache();
// Initialize the cache.
//
// 'preceding_op' is the current latest op. The next AppendOperation() call
// must follow this op.
//
// Requires that the cache is empty.
void Init(const OpId& preceding_op);
// Read operations from the log, following 'after_op_index'.
// If such an op exists in the log, an OK result will always include at least one
// operation.
//
// The result will be limited such that the total ByteSizeLong() of the
// returned ops is less than max_size_bytes, unless that would result in an
// empty result, in which case exactly one op is returned.
//
// The OpId which precedes the returned ops is returned in *preceding_op.
// The index of this OpId will match 'after_op_index'.
//
// If the ops being requested are not available in the log, this will synchronously
// read these ops from disk. Therefore, this function may take a substantial amount
// of time and should not be called with important locks held, etc.
Status ReadOps(int64_t after_op_index,
int64_t max_size_bytes,
std::vector<ReplicateRefPtr>* messages,
OpId* preceding_op);
// Append the operations into the log and the cache.
// When the messages have completed writing into the on-disk log, fires 'callback'.
//
// If the cache memory limit is exceeded, the entries may no longer be in the cache
// when the callback fires.
//
// Returns non-OK if the Log append itself fails.
Status AppendOperations(std::vector<ReplicateRefPtr> msgs,
const StatusCallback& callback);
// Truncate any operations with index > 'index'.
//
// Following this, reads of truncated indexes using ReadOps(), LookupOpId(),
// HasOpBeenWritten(), etc, will return as if the operations were never appended.
//
// NOTE: unless a new operation is appended followig 'index', this truncation does
// not persist across server restarts.
void TruncateOpsAfter(int64_t index);
// Return true if an operation with the given index has been written through
// the cache. The operation may not necessarily be durable yet -- it could still be
// en route to the log.
bool HasOpBeenWritten(int64_t index) const;
// Evict any operations with op index <= 'index'.
void EvictThroughOp(int64_t index);
// Return the number of bytes of memory currently in use by the cache.
int64_t BytesUsed() const;
int64_t num_cached_ops() const {
return metrics_.log_cache_num_ops->value();
}
// Dump the current contents of the cache to the log.
void DumpToLog() const;
// Dumps the contents of the cache to the provided string vector.
void DumpToStrings(std::vector<std::string>* lines) const;
void DumpToHtml(std::ostream& out) const;
std::string StatsString() const;
std::string ToString() const;
// Look up the OpId for the given operation index.
// If it is not in the cache, this consults the on-disk log index and thus
// may take a non-trivial amount of time due to IO.
//
// Returns "Incomplete" if the op has not yet been written.
// Returns "NotFound" if the op has been GCed.
// Returns another bad Status if the log index fails to load (eg. due to an IO error).
Status LookupOpId(int64_t op_index, OpId* op_id) const;
private:
FRIEND_TEST(LogCacheTest, TestAppendAndGetMessages);
FRIEND_TEST(LogCacheTest, TestGlobalMemoryLimit);
FRIEND_TEST(LogCacheTest, TestReplaceMessages);
FRIEND_TEST(LogCacheTest, TestTruncation);
friend class LogCacheTest;
// An entry in the cache.
struct CacheEntry {
ReplicateRefPtr msg;
// The cached value of msg->SpaceUsedLong(). This method is expensive
// to compute, so we compute it only once upon insertion.
size_t mem_usage;
};
// Try to evict the oldest operations from the queue, stopping either when
// 'bytes_to_evict' bytes have been evicted, or the op with index
// 'stop_after_index' has been evicted, whichever comes first.
void EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evict);
// Update metrics and MemTracker to account for the removal of the
// given message.
void AccountForMessageRemovalUnlocked(const CacheEntry& entry);
void TruncateOpsAfterUnlocked(int64_t index);
// Return a string with stats
std::string StatsStringUnlocked() const;
std::string ToStringUnlocked() const;
std::string LogPrefixUnlocked() const;
void LogCallback(int64_t last_idx_in_batch,
bool borrowed_memory,
const StatusCallback& user_callback,
const Status& log_status);
scoped_refptr<log::Log> const log_;
// The UUID of the local peer.
const std::string local_uuid_;
// The id of the tablet.
const std::string tablet_id_;
mutable simple_spinlock lock_;
// An ordered map that serves as the buffer for the cached messages.
// Maps from log index -> ReplicateMsg
typedef std::map<uint64_t, CacheEntry> MessageCache;
MessageCache cache_;
// The next log index to append. Each append operation must either
// start with this log index, or go backward (but never skip forward).
int64_t next_sequential_op_index_;
// Any operation with an index >= min_pinned_op_ may not be
// evicted from the cache. This is used to prevent ops from being evicted
// until they successfully have been appended to the underlying log.
// Protected by lock_.
int64_t min_pinned_op_index_;
// Pointer to a parent memtracker for all log caches. This
// exists to compute server-wide cache size and enforce a
// server-wide memory limit. When the first instance of a log
// cache is created, a new entry is added to MemTracker's static
// map; subsequent entries merely increment the refcount, so that
// the parent tracker can be deleted if all log caches are
// deleted (e.g., if all tablets are deleted from a server, or if
// the server is shutdown).
std::shared_ptr<MemTracker> parent_tracker_;
// A MemTracker for this instance.
std::shared_ptr<MemTracker> tracker_;
struct Metrics {
explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity);
// Keeps track of the total number of operations in the cache.
scoped_refptr<AtomicGauge<int64_t> > log_cache_num_ops;
// Keeps track of the memory consumed by the cache, in bytes.
scoped_refptr<AtomicGauge<int64_t> > log_cache_size;
};
Metrics metrics_;
DISALLOW_COPY_AND_ASSIGN(LogCache);
};
} // namespace consensus
} // namespace kudu
#endif /* KUDU_CONSENSUS_LOG_CACHE_H */