blob: 67cf6f4a36787200e7846ba4bbd6c372a0b61b5b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/log_cache.h"
#include <functional>
#include <map>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/wire_format_lite.h>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
DEFINE_int32(log_cache_size_limit_mb, 128,
"The total per-tablet size of consensus entries which may be kept in memory. "
"The log cache attempts to keep all entries which have not yet been replicated "
"to all followers in memory, but if the total size of those entries exceeds "
"this limit within an individual tablet, the oldest will be evicted.");
TAG_FLAG(log_cache_size_limit_mb, advanced);
DEFINE_int32(global_log_cache_size_limit_mb, 1024,
"Server-wide version of 'log_cache_size_limit_mb'. The total memory used for "
"caching log entries across all tablets is kept under this threshold.");
TAG_FLAG(global_log_cache_size_limit_mb, advanced);
using kudu::pb_util::SecureShortDebugString;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace consensus {
METRIC_DEFINE_gauge_int64(tablet, log_cache_num_ops, "Log Cache Operation Count",
MetricUnit::kOperations,
"Number of operations in the log cache.",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_gauge_int64(tablet, log_cache_size, "Log Cache Memory Usage",
MetricUnit::kBytes,
"Amount of memory in use for caching the local log.",
kudu::MetricLevel::kDebug);
static const char kParentMemTrackerId[] = "log_cache";
typedef vector<const ReplicateMsg*>::const_iterator MsgIter;
LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
scoped_refptr<log::Log> log,
string local_uuid,
string tablet_id)
: log_(std::move(log)),
local_uuid_(std::move(local_uuid)),
tablet_id_(std::move(tablet_id)),
next_sequential_op_index_(0),
min_pinned_op_index_(0),
metrics_(metric_entity) {
const int64_t max_ops_size_bytes = FLAGS_log_cache_size_limit_mb * 1024L * 1024L;
const int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1024L * 1024L;
// Set up (or reuse) a tracker with the global limit. It is parented directly
// to the root tracker so that it's always global.
parent_tracker_ = MemTracker::FindOrCreateGlobalTracker(global_max_ops_size_bytes,
kParentMemTrackerId);
// And create a child tracker with the per-tablet limit.
tracker_ = MemTracker::CreateTracker(
max_ops_size_bytes, Substitute("$0:$1:$2", kParentMemTrackerId,
local_uuid_, tablet_id_),
parent_tracker_);
// Put a fake message at index 0, since this simplifies a lot of our
// code paths elsewhere.
auto zero_op = new ReplicateMsg();
*zero_op->mutable_id() = MinimumOpId();
InsertOrDie(&cache_, 0, { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsedLong() });
}
LogCache::~LogCache() {
tracker_->Release(tracker_->consumption());
cache_.clear();
}
void LogCache::Init(const OpId& preceding_op) {
std::lock_guard<simple_spinlock> l(lock_);
CHECK_EQ(cache_.size(), 1)
<< "Cache should have only our special '0' op";
next_sequential_op_index_ = preceding_op.index() + 1;
min_pinned_op_index_ = next_sequential_op_index_;
}
void LogCache::TruncateOpsAfter(int64_t index) {
std::unique_lock<simple_spinlock> l(lock_);
TruncateOpsAfterUnlocked(index);
}
void LogCache::TruncateOpsAfterUnlocked(int64_t index) {
int64_t first_to_truncate = index + 1;
// If the index is not consecutive then it must be lower than or equal
// to the last index, i.e. we're overwriting.
CHECK_LE(first_to_truncate, next_sequential_op_index_);
// Now remove the overwritten operations.
for (int64_t i = first_to_truncate; i < next_sequential_op_index_; ++i) {
auto it = cache_.find(i);
if (it != cache_.end()) {
AccountForMessageRemovalUnlocked(it->second);
cache_.erase(it);
}
}
next_sequential_op_index_ = index + 1;
}
Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
const StatusCallback& callback) {
CHECK_GT(msgs.size(), 0);
// SpaceUsed is relatively expensive, so do calculations outside the lock
// and cache the result with each message.
int64_t mem_required = 0;
vector<CacheEntry> entries_to_insert;
entries_to_insert.reserve(msgs.size());
for (const auto& msg : msgs) {
CacheEntry e = { msg, msg->get()->SpaceUsedLong() };
mem_required += e.mem_usage;
entries_to_insert.emplace_back(std::move(e));
}
int64_t first_idx_in_batch = msgs.front()->get()->id().index();
int64_t last_idx_in_batch = msgs.back()->get()->id().index();
std::unique_lock<simple_spinlock> l(lock_);
// If we're not appending a consecutive op we're likely overwriting and
// need to replace operations in the cache.
if (first_idx_in_batch != next_sequential_op_index_) {
TruncateOpsAfterUnlocked(first_idx_in_batch - 1);
}
// Try to consume the memory. If it can't be consumed, we may need to evict.
bool borrowed_memory = false;
if (!tracker_->TryConsume(mem_required)) {
int spare = tracker_->SpareCapacity();
int need_to_free = mem_required - spare;
VLOG_WITH_PREFIX_UNLOCKED(1) << "Memory limit would be exceeded trying to append "
<< HumanReadableNumBytes::ToString(mem_required)
<< " to log cache (available="
<< HumanReadableNumBytes::ToString(spare)
<< "): attempting to evict some operations...";
// TODO: we should also try to evict from other tablets - probably better to
// evict really old ops from another tablet than evict recent ops from this one.
EvictSomeUnlocked(min_pinned_op_index_, need_to_free);
// Force consuming, so that we don't refuse appending data. We might
// blow past our limit a little bit (as much as the number of tablets times
// the amount of in-flight data in the log), but until implementing the above TODO,
// it's difficult to solve this issue.
tracker_->Consume(mem_required);
borrowed_memory = parent_tracker_->LimitExceeded();
}
for (auto& e : entries_to_insert) {
auto index = e.msg->get()->id().index();
EmplaceOrDie(&cache_, index, std::move(e));
next_sequential_op_index_ = index + 1;
}
// We drop the lock during the AsyncAppendReplicates call, since it may block
// if the queue is full, and the queue might not drain if it's trying to call
// our callback and blocked on this lock.
l.unlock();
metrics_.log_cache_size->IncrementBy(mem_required);
metrics_.log_cache_num_ops->IncrementBy(msgs.size());
Status log_status = log_->AsyncAppendReplicates(
std::move(msgs), [this, last_idx_in_batch, borrowed_memory, callback](const Status& s) {
this->LogCallback(last_idx_in_batch, borrowed_memory, callback, s);
});
if (!log_status.ok()) {
LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Couldn't append to log: " << log_status.ToString();
tracker_->Release(mem_required);
return log_status;
}
return Status::OK();
}
void LogCache::LogCallback(int64_t last_idx_in_batch,
bool borrowed_memory,
const StatusCallback& user_callback,
const Status& log_status) {
if (log_status.ok()) {
std::lock_guard<simple_spinlock> l(lock_);
if (min_pinned_op_index_ <= last_idx_in_batch) {
VLOG_WITH_PREFIX_UNLOCKED(1) << "Updating pinned index to " << (last_idx_in_batch + 1);
min_pinned_op_index_ = last_idx_in_batch + 1;
}
// If we went over the global limit in order to log this batch, evict some to
// get back down under the limit.
if (borrowed_memory) {
int64_t spare_capacity = parent_tracker_->SpareCapacity();
if (spare_capacity < 0) {
EvictSomeUnlocked(min_pinned_op_index_, -spare_capacity);
}
}
}
user_callback(log_status);
}
bool LogCache::HasOpBeenWritten(int64_t index) const {
std::lock_guard<simple_spinlock> l(lock_);
return index < next_sequential_op_index_;
}
Status LogCache::LookupOpId(int64_t op_index, OpId* op_id) const {
// First check the log cache itself.
{
std::lock_guard<simple_spinlock> l(lock_);
// We sometimes try to look up OpIds that have never been written
// on the local node. In that case, don't try to read the op from
// the log reader, since it might actually race against the writing
// of the op.
if (op_index >= next_sequential_op_index_) {
return Status::Incomplete(Substitute("Op with index $0 is ahead of the local log "
"(next sequential op: $1)",
op_index, next_sequential_op_index_));
}
auto iter = cache_.find(op_index);
if (iter != cache_.end()) {
*op_id = iter->second.msg->get()->id();
return Status::OK();
}
}
// If it misses, read from the log.
return log_->reader()->LookupOpId(op_index, op_id);
}
namespace {
// Calculate the total byte size that will be used on the wire to replicate
// this message as part of a consensus update request. This accounts for the
// length delimiting and tagging of the message.
int64_t TotalByteSizeForMessage(const ReplicateMsg& msg) {
int64_t msg_size = google::protobuf::internal::WireFormatLite::LengthDelimitedSize(
msg.ByteSizeLong());
msg_size += 1; // for the type tag
return msg_size;
}
} // anonymous namespace
Status LogCache::ReadOps(int64_t after_op_index,
int64_t max_size_bytes,
std::vector<ReplicateRefPtr>* messages,
OpId* preceding_op) {
DCHECK_GE(after_op_index, 0);
RETURN_NOT_OK(LookupOpId(after_op_index, preceding_op));
std::unique_lock<simple_spinlock> l(lock_);
int64_t next_index = after_op_index + 1;
// Return as many operations as we can, up to the limit
int64_t remaining_space = max_size_bytes;
while (remaining_space > 0 && next_index < next_sequential_op_index_) {
// If the messages the peer needs haven't been loaded into the queue yet,
// load them.
MessageCache::const_iterator iter = cache_.lower_bound(next_index);
if (iter == cache_.end() || iter->first != next_index) {
int64_t up_to;
if (iter == cache_.end()) {
// Read all the way to the current op
up_to = next_sequential_op_index_ - 1;
} else {
// Read up to the next entry that's in the cache
up_to = iter->first - 1;
}
l.unlock();
vector<ReplicateMsg*> raw_replicate_ptrs;
RETURN_NOT_OK_PREPEND(
log_->reader()->ReadReplicatesInRange(
next_index, up_to, remaining_space, &raw_replicate_ptrs),
Substitute("Failed to read ops $0..$1", next_index, up_to));
l.lock();
VLOG_WITH_PREFIX_UNLOCKED(2)
<< "Successfully read " << raw_replicate_ptrs.size() << " ops "
<< "from disk (" << next_index << ".."
<< (next_index + raw_replicate_ptrs.size() - 1) << ")";
for (ReplicateMsg* msg : raw_replicate_ptrs) {
CHECK_EQ(next_index, msg->id().index());
remaining_space -= TotalByteSizeForMessage(*msg);
if (remaining_space > 0 || messages->empty()) {
messages->push_back(make_scoped_refptr_replicate(msg));
next_index++;
} else {
delete msg;
}
}
} else {
// Pull contiguous messages from the cache until the size limit is achieved.
for (; iter != cache_.end(); ++iter) {
const ReplicateRefPtr& msg = iter->second.msg;
int64_t index = msg->get()->id().index();
if (index != next_index) {
continue;
}
remaining_space -= TotalByteSizeForMessage(*msg->get());
if (remaining_space < 0 && !messages->empty()) {
break;
}
messages->push_back(msg);
next_index++;
}
}
}
return Status::OK();
}
void LogCache::EvictThroughOp(int64_t index) {
std::lock_guard<simple_spinlock> lock(lock_);
EvictSomeUnlocked(index, MathLimits<int64_t>::kMax);
}
void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evict) {
DCHECK(lock_.is_locked());
VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting log cache index <= "
<< stop_after_index
<< " or " << HumanReadableNumBytes::ToString(bytes_to_evict)
<< ": before state: " << ToStringUnlocked();
int64_t bytes_evicted = 0;
for (auto iter = cache_.begin(); iter != cache_.end();) {
const CacheEntry& entry = (*iter).second;
const ReplicateRefPtr& msg = entry.msg;
VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->get()->id();
int64_t msg_index = msg->get()->id().index();
if (msg_index == 0) {
// Always keep our special '0' op.
++iter;
continue;
}
if (msg_index > stop_after_index || msg_index >= min_pinned_op_index_) {
break;
}
if (!msg->HasOneRef()) {
VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting cache: cannot remove " << msg->get()->id()
<< " because it is in-use by a peer.";
++iter;
continue;
}
VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting cache. Removing: " << msg->get()->id();
AccountForMessageRemovalUnlocked(entry);
bytes_evicted += entry.mem_usage;
cache_.erase(iter++);
if (bytes_evicted >= bytes_to_evict) {
break;
}
}
VLOG_WITH_PREFIX_UNLOCKED(1) << "Evicting log cache: after state: " << ToStringUnlocked();
}
void LogCache::AccountForMessageRemovalUnlocked(const LogCache::CacheEntry& entry) {
tracker_->Release(entry.mem_usage);
metrics_.log_cache_size->DecrementBy(entry.mem_usage);
metrics_.log_cache_num_ops->Decrement();
}
int64_t LogCache::BytesUsed() const {
return tracker_->consumption();
}
string LogCache::StatsString() const {
std::lock_guard<simple_spinlock> lock(lock_);
return StatsStringUnlocked();
}
string LogCache::StatsStringUnlocked() const {
return Substitute("LogCacheStats(num_ops=$0, bytes=$1)",
metrics_.log_cache_num_ops->value(),
metrics_.log_cache_size->value());
}
std::string LogCache::ToString() const {
std::lock_guard<simple_spinlock> lock(lock_);
return ToStringUnlocked();
}
std::string LogCache::ToStringUnlocked() const {
return Substitute("Pinned index: $0, $1",
min_pinned_op_index_,
StatsStringUnlocked());
}
std::string LogCache::LogPrefixUnlocked() const {
return Substitute("T $0 P $1: ",
tablet_id_,
local_uuid_);
}
void LogCache::DumpToLog() const {
vector<string> strings;
DumpToStrings(&strings);
for (const string& s : strings) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << s;
}
}
void LogCache::DumpToStrings(vector<string>* lines) const {
std::lock_guard<simple_spinlock> lock(lock_);
int counter = 0;
lines->push_back(ToStringUnlocked());
lines->push_back("Messages:");
for (const auto& entry : cache_) {
const ReplicateMsg* msg = entry.second.msg->get();
lines->push_back(
Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
counter++, msg->id().term(), msg->id().index(),
OperationType_Name(msg->op_type()),
msg->ByteSizeLong()));
}
}
void LogCache::DumpToHtml(std::ostream& out) const {
using std::endl;
std::lock_guard<simple_spinlock> lock(lock_);
out << "<h3>Messages:</h3>" << endl;
out << "<table>" << endl;
out << "<tr><th>Entry</th><th>OpId</th><th>Type</th><th>Size</th><th>Status</th></tr>" << endl;
int counter = 0;
for (const auto& entry : cache_) {
const ReplicateMsg* msg = entry.second.msg->get();
out << Substitute("<tr><th>$0</th><th>$1.$2</th><td>REPLICATE $3</td>"
"<td>$4</td><td>$5</td></tr>",
counter++, msg->id().term(), msg->id().index(),
OperationType_Name(msg->op_type()),
msg->ByteSizeLong(), SecureShortDebugString(msg->id())) << endl;
}
out << "</table>";
}
#define INSTANTIATE_METRIC(x) \
x.Instantiate(metric_entity, 0)
LogCache::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
: log_cache_num_ops(INSTANTIATE_METRIC(METRIC_log_cache_num_ops)),
log_cache_size(INSTANTIATE_METRIC(METRIC_log_cache_size)) {
}
#undef INSTANTIATE_METRIC
} // namespace consensus
} // namespace kudu