blob: ba6739c0b3e1b4abbb27bb4c145da8b27e5ed5f6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_CONSENSUS_LOG_CACHE_H
#define KUDU_CONSENSUS_LOG_CACHE_H
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
#include "kudu/util/async_util.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
namespace kudu {
class MetricEntity;
class MemTracker;
namespace log {
class Log;
class LogReader;
} // namespace log
namespace consensus {
class ReplicateMsg;
// Write-through cache for the log.
//
// This stores a set of log messages by their index. New operations
// can be appended to the end as they are written to the log. Readers
// fetch entries that were explicitly appended, or they can fetch older
// entries which are asynchronously fetched from the disk.
class LogCache {
public:
LogCache(const scoped_refptr<MetricEntity>& metric_entity,
const scoped_refptr<log::Log>& log,
const std::string& local_uuid,
const std::string& tablet_id);
~LogCache();
// Initialize the cache.
//
// 'preceding_op' is the current latest op. The next AppendOperation() call
// must follow this op.
//
// Requires that the cache is empty.
void Init(const OpId& preceding_op);
// Read operations from the log, following 'after_op_index'.
// If such an op exists in the log, an OK result will always include at least one
// operation.
//
// The result will be limited such that the total ByteSize() of the returned ops
// is less than max_size_bytes, unless that would result in an empty result, in
// which case exactly one op is returned.
//
// The OpId which precedes the returned ops is returned in *preceding_op.
// The index of this OpId will match 'after_op_index'.
//
// If the ops being requested are not available in the log, this will synchronously
// read these ops from disk. Therefore, this function may take a substantial amount
// of time and should not be called with important locks held, etc.
Status ReadOps(int64_t after_op_index,
int max_size_bytes,
std::vector<ReplicateRefPtr>* messages,
OpId* preceding_op);
// Append the operations into the log and the cache.
// When the messages have completed writing into the on-disk log, fires 'callback'.
//
// If the cache memory limit is exceeded, the entries may no longer be in the cache
// when the callback fires.
//
// Returns non-OK if the Log append itself fails.
Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
const StatusCallback& callback);
// Return true if an operation with the given index has been written through
// the cache. The operation may not necessarily be durable yet -- it could still be
// en route to the log.
bool HasOpBeenWritten(int64_t log_index) const;
// Evict any operations with op index <= 'index'.
void EvictThroughOp(int64_t index);
// Return the number of bytes of memory currently in use by the cache.
int64_t BytesUsed() const;
int64_t num_cached_ops() const {
return metrics_.log_cache_num_ops->value();
}
// Dump the current contents of the cache to the log.
void DumpToLog() const;
// Dumps the contents of the cache to the provided string vector.
void DumpToStrings(std::vector<std::string>* lines) const;
void DumpToHtml(std::ostream& out) const;
std::string StatsString() const;
std::string ToString() const;
// Look up the OpId for the given operation index.
// If it is not in the cache, this consults the on-disk log index and thus
// may take a non-trivial amount of time due to IO.
//
// Returns "Incomplete" if the op has not yet been written.
// Returns "NotFound" if the op has been GCed.
// Returns another bad Status if the log index fails to load (eg. due to an IO error).
Status LookupOpId(int64_t op_index, OpId* op_id) const;
private:
FRIEND_TEST(LogCacheTest, TestAppendAndGetMessages);
FRIEND_TEST(LogCacheTest, TestGlobalMemoryLimit);
FRIEND_TEST(LogCacheTest, TestReplaceMessages);
friend class LogCacheTest;
// Try to evict the oldest operations from the queue, stopping either when
// 'bytes_to_evict' bytes have been evicted, or the op with index
// 'stop_after_index' has been evicted, whichever comes first.
void EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evict);
// Update metrics and MemTracker to account for the removal of the
// given message.
void AccountForMessageRemovalUnlocked(const ReplicateRefPtr& msg);
// Return a string with stats
std::string StatsStringUnlocked() const;
std::string ToStringUnlocked() const;
std::string LogPrefixUnlocked() const;
void LogCallback(int64_t last_idx_in_batch,
bool borrowed_memory,
const StatusCallback& user_callback,
const Status& log_status);
scoped_refptr<log::Log> const log_;
// The UUID of the local peer.
const std::string local_uuid_;
// The id of the tablet.
const std::string tablet_id_;
mutable simple_spinlock lock_;
// An ordered map that serves as the buffer for the cached messages.
// Maps from log index -> ReplicateMsg
typedef std::map<uint64_t, ReplicateRefPtr> MessageCache;
MessageCache cache_;
// The next log index to append. Each append operation must either
// start with this log index, or go backward (but never skip forward).
int64_t next_sequential_op_index_;
// Any operation with an index >= min_pinned_op_ may not be
// evicted from the cache. This is used to prevent ops from being evicted
// until they successfully have been appended to the underlying log.
// Protected by lock_.
int64_t min_pinned_op_index_;
// Pointer to a parent memtracker for all log caches. This
// exists to compute server-wide cache size and enforce a
// server-wide memory limit. When the first instance of a log
// cache is created, a new entry is added to MemTracker's static
// map; subsequent entries merely increment the refcount, so that
// the parent tracker can be deleted if all log caches are
// deleted (e.g., if all tablets are deleted from a server, or if
// the server is shutdown).
std::shared_ptr<MemTracker> parent_tracker_;
// A MemTracker for this instance.
std::shared_ptr<MemTracker> tracker_;
struct Metrics {
explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity);
// Keeps track of the total number of operations in the cache.
scoped_refptr<AtomicGauge<int64_t> > log_cache_num_ops;
// Keeps track of the memory consumed by the cache, in bytes.
scoped_refptr<AtomicGauge<int64_t> > log_cache_size;
};
Metrics metrics_;
DISALLOW_COPY_AND_ASSIGN(LogCache);
};
} // namespace consensus
} // namespace kudu
#endif /* KUDU_CONSENSUS_LOG_CACHE_H */