blob: b9a71f1aaff6ba0ff00cde47aa9b82c9fdcabe49 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/consensus/consensus-test-util.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_cache.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/test_util.h"
using std::shared_ptr;
DECLARE_int32(log_cache_size_limit_mb);
DECLARE_int32(global_log_cache_size_limit_mb);
METRIC_DECLARE_entity(tablet);
namespace kudu {
namespace consensus {
static const char* kPeerUuid = "leader";
static const char* kTestTablet = "test-tablet";
class LogCacheTest : public KuduTest {
public:
LogCacheTest()
: schema_(GetSimpleTestSchema()),
metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "LogCacheTest")) {
}
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root")));
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
CHECK_OK(log::Log::Open(log::LogOptions(),
fs_manager_.get(),
kTestTablet,
schema_,
0, // schema_version
NULL,
&log_));
CloseAndReopenCache(MinimumOpId());
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());
}
virtual void TearDown() OVERRIDE {
log_->WaitUntilAllFlushed();
}
void CloseAndReopenCache(const OpId& preceding_id) {
// Blow away the memtrackers before creating the new cache.
cache_.reset();
cache_.reset(new LogCache(metric_entity_,
log_.get(),
kPeerUuid,
kTestTablet));
cache_->Init(preceding_id);
}
protected:
static void FatalOnError(const Status& s) {
CHECK_OK(s);
}
Status AppendReplicateMessagesToCache(
int first,
int count,
int payload_size = 0) {
for (int i = first; i < first + count; i++) {
int term = i / 7;
int index = i;
vector<ReplicateRefPtr> msgs;
msgs.push_back(make_scoped_refptr_replicate(
CreateDummyReplicate(term, index, clock_->Now(), payload_size).release()));
RETURN_NOT_OK(cache_->AppendOperations(msgs, Bind(&FatalOnError)));
}
return Status::OK();
}
const Schema schema_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
gscoped_ptr<FsManager> fs_manager_;
gscoped_ptr<LogCache> cache_;
scoped_refptr<log::Log> log_;
scoped_refptr<server::Clock> clock_;
};
TEST_F(LogCacheTest, TestAppendAndGetMessages) {
ASSERT_EQ(0, cache_->metrics_.log_cache_num_ops->value());
ASSERT_EQ(0, cache_->metrics_.log_cache_size->value());
ASSERT_OK(AppendReplicateMessagesToCache(1, 100));
ASSERT_EQ(100, cache_->metrics_.log_cache_num_ops->value());
ASSERT_GE(cache_->metrics_.log_cache_size->value(), 500);
log_->WaitUntilAllFlushed();
vector<ReplicateRefPtr> messages;
OpId preceding;
ASSERT_OK(cache_->ReadOps(0, 8 * 1024 * 1024, &messages, &preceding));
EXPECT_EQ(100, messages.size());
EXPECT_EQ("0.0", OpIdToString(preceding));
// Get starting in the middle of the cache.
messages.clear();
ASSERT_OK(cache_->ReadOps(70, 8 * 1024 * 1024, &messages, &preceding));
EXPECT_EQ(30, messages.size());
EXPECT_EQ("10.70", OpIdToString(preceding));
EXPECT_EQ("10.71", OpIdToString(messages[0]->get()->id()));
// Get at the end of the cache
messages.clear();
ASSERT_OK(cache_->ReadOps(100, 8 * 1024 * 1024, &messages, &preceding));
EXPECT_EQ(0, messages.size());
EXPECT_EQ("14.100", OpIdToString(preceding));
// Evict some and verify that the eviction took effect.
cache_->EvictThroughOp(50);
ASSERT_EQ(50, cache_->metrics_.log_cache_num_ops->value());
// Can still read data that was evicted, since it got written through.
messages.clear();
ASSERT_OK(cache_->ReadOps(20, 8 * 1024 * 1024, &messages, &preceding));
EXPECT_EQ(80, messages.size());
EXPECT_EQ("2.20", OpIdToString(preceding));
EXPECT_EQ("3.21", OpIdToString(messages[0]->get()->id()));
}
// Ensure that the cache always yields at least one message,
// even if that message is larger than the batch size. This ensures
// that we don't get "stuck" in the case that a large message enters
// the cache.
TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) {
// generate a 2MB dummy payload
const int kPayloadSize = 2 * 1024 * 1024;
// Append several large ops to the cache
ASSERT_OK(AppendReplicateMessagesToCache(1, 4, kPayloadSize));
log_->WaitUntilAllFlushed();
// We should get one of them, even though we only ask for 100 bytes
vector<ReplicateRefPtr> messages;
OpId preceding;
ASSERT_OK(cache_->ReadOps(0, 100, &messages, &preceding));
ASSERT_EQ(1, messages.size());
}
// Tests that the cache returns Status::NotFound() if queried for messages after an
// index that is higher than it's latest, returns an empty set of messages when queried for
// the the last index and returns all messages when queried for MinimumOpId().
TEST_F(LogCacheTest, TestCacheEdgeCases) {
// Append 1 message to the cache
ASSERT_OK(AppendReplicateMessagesToCache(1, 1));
log_->WaitUntilAllFlushed();
std::vector<ReplicateRefPtr> messages;
OpId preceding;
// Test when the searched index is MinimumOpId().index().
ASSERT_OK(cache_->ReadOps(0, 100, &messages, &preceding));
ASSERT_EQ(1, messages.size());
ASSERT_OPID_EQ(MakeOpId(0, 0), preceding);
messages.clear();
preceding.Clear();
// Test when 'after_op_index' is the last index in the cache.
ASSERT_OK(cache_->ReadOps(1, 100, &messages, &preceding));
ASSERT_EQ(0, messages.size());
ASSERT_OPID_EQ(MakeOpId(0, 1), preceding);
messages.clear();
preceding.Clear();
// Now test the case when 'after_op_index' is after the last index
// in the cache.
Status s = cache_->ReadOps(2, 100, &messages, &preceding);
ASSERT_TRUE(s.IsIncomplete()) << "unexpected status: " << s.ToString();
ASSERT_EQ(0, messages.size());
ASSERT_FALSE(preceding.IsInitialized());
messages.clear();
preceding.Clear();
// Evict entries from the cache, and ensure that we can still read
// entries at the beginning of the log.
cache_->EvictThroughOp(50);
ASSERT_OK(cache_->ReadOps(0, 100, &messages, &preceding));
ASSERT_EQ(1, messages.size());
ASSERT_OPID_EQ(MakeOpId(0, 0), preceding);
}
TEST_F(LogCacheTest, TestMemoryLimit) {
FLAGS_log_cache_size_limit_mb = 1;
CloseAndReopenCache(MinimumOpId());
const int kPayloadSize = 400 * 1024;
// Limit should not be violated.
ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
log_->WaitUntilAllFlushed();
ASSERT_EQ(1, cache_->num_cached_ops());
// Verify the size is right. It's not exactly kPayloadSize because of in-memory
// overhead, etc.
int size_with_one_msg = cache_->BytesUsed();
ASSERT_GT(size_with_one_msg, 300 * 1024);
ASSERT_LT(size_with_one_msg, 500 * 1024);
// Add another operation which fits under the 1MB limit.
ASSERT_OK(AppendReplicateMessagesToCache(2, 1, kPayloadSize));
log_->WaitUntilAllFlushed();
ASSERT_EQ(2, cache_->num_cached_ops());
int size_with_two_msgs = cache_->BytesUsed();
ASSERT_GT(size_with_two_msgs, 2 * 300 * 1024);
ASSERT_LT(size_with_two_msgs, 2 * 500 * 1024);
// Append a third operation, which will push the cache size above the 1MB limit
// and cause eviction of the first operation.
LOG(INFO) << "appending op 3";
// Verify that we have trimmed by appending a message that would
// otherwise be rejected, since the cache max size limit is 2MB.
ASSERT_OK(AppendReplicateMessagesToCache(3, 1, kPayloadSize));
log_->WaitUntilAllFlushed();
ASSERT_EQ(2, cache_->num_cached_ops());
ASSERT_EQ(size_with_two_msgs, cache_->BytesUsed());
// Test explicitly evicting one of the ops.
cache_->EvictThroughOp(2);
ASSERT_EQ(1, cache_->num_cached_ops());
ASSERT_EQ(size_with_one_msg, cache_->BytesUsed());
// Explicitly evict the last op.
cache_->EvictThroughOp(3);
ASSERT_EQ(0, cache_->num_cached_ops());
ASSERT_EQ(cache_->BytesUsed(), 0);
}
TEST_F(LogCacheTest, TestGlobalMemoryLimit) {
FLAGS_global_log_cache_size_limit_mb = 4;
CloseAndReopenCache(MinimumOpId());
// Exceed the global hard limit.
ScopedTrackedConsumption consumption(cache_->parent_tracker_, 3*1024*1024);
const int kPayloadSize = 768 * 1024;
// Should succeed, but only end up caching one of the two ops because of the global limit.
ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize));
log_->WaitUntilAllFlushed();
ASSERT_EQ(1, cache_->num_cached_ops());
ASSERT_LE(cache_->BytesUsed(), 1024 * 1024);
}
// Test that the log cache properly replaces messages when an index
// is reused. This is a regression test for a bug where the memtracker's
// consumption wasn't properly managed when messages were replaced.
TEST_F(LogCacheTest, TestReplaceMessages) {
const int kPayloadSize = 128 * 1024;
shared_ptr<MemTracker> tracker = cache_->tracker_;;
ASSERT_EQ(0, tracker->consumption());
ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
int size_with_one_msg = tracker->consumption();
for (int i = 0; i < 10; i++) {
ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
}
log_->WaitUntilAllFlushed();
EXPECT_EQ(size_with_one_msg, tracker->consumption());
EXPECT_EQ(Substitute("Pinned index: 2, LogCacheStats(num_ops=1, bytes=$0)",
size_with_one_msg),
cache_->ToString());
}
} // namespace consensus
} // namespace kudu