blob: 3d60c47ed12ee9d46df597643ea34399841f1af7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_replica.h"
#include <cstdint>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica_mm_ops.h"
#include "kudu/tablet/transactions/alter_schema_transaction.h"
#include "kudu/tablet/transactions/transaction.h"
#include "kudu/tablet/transactions/transaction_driver.h"
#include "kudu/tablet/transactions/transaction_tracker.h"
#include "kudu/tablet/transactions/write_transaction.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
#include "kudu/util/threadpool.h"
DECLARE_int32(flush_threshold_mb);
METRIC_DECLARE_entity(tablet);
METRIC_DECLARE_gauge_uint64(live_row_count);
using kudu::consensus::CommitMsg;
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::consensus::ConsensusMetadata;
using kudu::consensus::ConsensusMetadataManager;
using kudu::consensus::OpId;
using kudu::consensus::RECEIVED_OPID;
using kudu::consensus::RaftConfigPB;
using kudu::consensus::RaftConsensus;
using kudu::consensus::RaftPeerPB;
using kudu::log::Log;
using kudu::log::LogOptions;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::Messenger;
using kudu::rpc::ResultTracker;
using kudu::tserver::AlterSchemaRequestPB;
using kudu::tserver::AlterSchemaResponsePB;
using kudu::tserver::WriteRequestPB;
using kudu::tserver::WriteResponsePB;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
namespace kudu {
namespace tablet {
static Schema GetTestSchema() {
return Schema({ ColumnSchema("key", INT32) }, 1);
}
class TabletReplicaTest : public KuduTabletTest {
public:
TabletReplicaTest()
: KuduTabletTest(GetTestSchema()),
insert_counter_(0),
delete_counter_(0),
dns_resolver_(new DnsResolver) {
}
void SetUpReplica(bool new_replica = true) {
ASSERT_TRUE(tablet_replica_.get() == nullptr);
RaftConfigPB config;
config.set_opid_index(consensus::kInvalidOpIdIndex);
RaftPeerPB* config_peer = config.add_peers();
config_peer->set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
config_peer->mutable_last_known_addr()->set_host("0.0.0.0");
config_peer->mutable_last_known_addr()->set_port(0);
config_peer->set_member_type(RaftPeerPB::VOTER);
if (new_replica) {
ASSERT_OK(cmeta_manager_->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm));
}
// "Bootstrap" and start the TabletReplica.
tablet_replica_.reset(
new TabletReplica(tablet()->shared_metadata(),
cmeta_manager_,
*config_peer,
apply_pool_.get(),
Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
Unretained(this),
tablet()->tablet_id())));
ASSERT_OK(tablet_replica_->Init({ /*quiescing*/nullptr,
/*num_leaders*/nullptr,
raft_pool_.get() }));
// Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
// TODO(mpercy): Refactor TabletHarness to allow taking a
// LogAnchorRegistry, while also providing TabletMetadata for consumption
// by TabletReplica before Tablet is instantiated.
tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
}
virtual void SetUp() override {
KuduTabletTest::SetUp();
ASSERT_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
ASSERT_OK(builder.Build(&messenger_));
cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager()));
metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
NO_FATALS(SetUpReplica());
}
Status StartReplica(const ConsensusBootstrapInfo& info) {
scoped_refptr<Log> log;
RETURN_NOT_OK(Log::Open(LogOptions(),
fs_manager(),
/*file_cache*/nullptr,
tablet()->tablet_id(),
*tablet()->schema(),
tablet()->metadata()->schema_version(),
metric_entity_.get(),
&log));
tablet_replica_->SetBootstrapping();
return tablet_replica_->Start(info,
tablet(),
clock(),
messenger_,
scoped_refptr<ResultTracker>(),
log,
prepare_pool_.get(),
dns_resolver_.get());
}
Status StartReplicaAndWaitUntilLeader(const ConsensusBootstrapInfo& info) {
RETURN_NOT_OK(StartReplica(info));
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
return tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout);
}
void TabletReplicaStateChangedCallback(const string& tablet_id, const string& reason) {
LOG(INFO) << "Tablet replica state changed for tablet " << tablet_id << ". Reason: " << reason;
}
virtual void TearDown() override {
tablet_replica_->Shutdown();
prepare_pool_->Shutdown();
apply_pool_->Shutdown();
KuduTabletTest::TearDown();
}
void RestartReplica() {
tablet_replica_->Shutdown();
tablet_replica_.reset();
NO_FATALS(SetUpReplica(/*new_replica=*/ false));
scoped_refptr<ConsensusMetadata> cmeta;
ASSERT_OK(cmeta_manager_->Load(tablet_replica_->tablet_id(), &cmeta));
shared_ptr<Tablet> tablet;
scoped_refptr<Log> log;
ConsensusBootstrapInfo bootstrap_info;
tablet_replica_->SetBootstrapping();
ASSERT_OK(BootstrapTablet(tablet_replica_->tablet_metadata(),
cmeta->CommittedConfig(),
clock(),
/*mem_tracker*/nullptr,
/*result_tracker*/nullptr,
&metric_registry_,
/*file_cache*/nullptr,
tablet_replica_,
tablet_replica_->log_anchor_registry(),
&tablet,
&log,
&bootstrap_info));
ASSERT_OK(tablet_replica_->Start(bootstrap_info,
tablet,
clock(),
messenger_,
scoped_refptr<ResultTracker>(),
log,
prepare_pool_.get(),
dns_resolver_.get()));
// Wait for the replica to be usable.
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout));
}
protected:
// Generate monotonic sequence of key column integers.
Status GenerateSequentialInsertRequest(const Schema& schema,
WriteRequestPB* write_req) {
write_req->set_tablet_id(tablet()->tablet_id());
RETURN_NOT_OK(SchemaToPB(schema, write_req->mutable_schema()));
KuduPartialRow row(&schema);
for (int i = 0; i < schema.num_columns(); i++) {
RETURN_NOT_OK(row.SetInt32(i, insert_counter_++));
}
RowOperationsPBEncoder enc(write_req->mutable_row_operations());
enc.Add(RowOperationsPB::INSERT, row);
return Status::OK();
}
// Generate monotonic sequence of deletions, starting with 0.
// Will assert if you try to delete more rows than you inserted.
Status GenerateSequentialDeleteRequest(WriteRequestPB* write_req) {
CHECK_LT(delete_counter_, insert_counter_);
Schema schema(GetTestSchema());
write_req->set_tablet_id(tablet()->tablet_id());
CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
KuduPartialRow row(&schema);
CHECK_OK(row.SetInt32("key", delete_counter_++));
RowOperationsPBEncoder enc(write_req->mutable_row_operations());
enc.Add(RowOperationsPB::DELETE, row);
return Status::OK();
}
Status ExecuteWrite(TabletReplica* replica, const WriteRequestPB& req) {
unique_ptr<WriteResponsePB> resp(new WriteResponsePB());
unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(replica,
&req,
nullptr, // No RequestIdPB
resp.get()));
CountDownLatch rpc_latch(1);
tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
RETURN_NOT_OK(replica->SubmitWrite(std::move(tx_state)));
rpc_latch.Wait();
CHECK(!resp->has_error())
<< "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
return Status::OK();
}
Status UpdateSchema(const SchemaPB& schema, int schema_version) {
AlterSchemaRequestPB alter;
alter.set_dest_uuid(tablet()->metadata()->fs_manager()->uuid());
alter.set_tablet_id(tablet()->tablet_id());
alter.set_schema_version(schema_version);
*alter.mutable_schema() = schema;
return ExecuteAlter(tablet_replica_.get(), alter);
}
Status ExecuteAlter(TabletReplica* replica, const AlterSchemaRequestPB& req) {
unique_ptr<AlterSchemaResponsePB> resp(new AlterSchemaResponsePB());
unique_ptr<AlterSchemaTransactionState> tx_state(
new AlterSchemaTransactionState(replica, &req, resp.get()));
CountDownLatch rpc_latch(1);
tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
new LatchTransactionCompletionCallback<AlterSchemaResponsePB>(&rpc_latch, resp.get())));
RETURN_NOT_OK(replica->SubmitAlterSchema(std::move(tx_state)));
rpc_latch.Wait();
CHECK(!resp->has_error())
<< "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
return Status::OK();
}
static Status RollLog(TabletReplica* replica) {
RETURN_NOT_OK(replica->log_->WaitUntilAllFlushed());
return replica->log_->AllocateSegmentAndRollOverForTests();
}
Status ExecuteWriteAndRollLog(TabletReplica* tablet_replica, const WriteRequestPB& req) {
RETURN_NOT_OK(ExecuteWrite(tablet_replica, req));
// Roll the log after each write.
// Usually the append thread does the roll and no additional sync is required. However in
// this test the thread that is appending is not the same thread that is rolling the log
// so we must make sure the Log's queue is flushed before we roll or we might have a race
// between the appender thread and the thread executing the test.
CHECK_OK(RollLog(tablet_replica));
return Status::OK();
}
// Execute insert requests and roll log after each one.
Status ExecuteInsertsAndRollLogs(int num_inserts) {
for (int i = 0; i < num_inserts; i++) {
gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
RETURN_NOT_OK(GenerateSequentialInsertRequest(GetTestSchema(), req.get()));
RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
}
return Status::OK();
}
// Execute delete requests and roll log after each one.
Status ExecuteDeletesAndRollLogs(int num_deletes) {
for (int i = 0; i < num_deletes; i++) {
gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
CHECK_OK(GenerateSequentialDeleteRequest(req.get()));
CHECK_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
}
return Status::OK();
}
// Assert that there are no log anchors held on the tablet replica.
//
// NOTE: when a transaction finishes and notifies the completion callback, it still is
// registered with the transaction tracker for a very short time before being
// destructed. So, this should always be called with an ASSERT_EVENTUALLY wrapper.
void AssertNoLogAnchors() {
// Make sure that there are no registered anchors in the registry
ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
}
// Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
void AssertLogAnchorEarlierThanLogLatest() {
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
boost::optional<OpId> last_log_opid = tablet_replica_->consensus()->GetLastOpId(RECEIVED_OPID);
ASSERT_NE(boost::none, last_log_opid);
ASSERT_LT(retention.for_durability, last_log_opid->index())
<< "Expected valid log anchor, got earliest opid: " << retention.for_durability
<< " (expected any value earlier than last log id: " << SecureShortDebugString(*last_log_opid)
<< ")";
}
// We disable automatic log GC. Don't leak those changes.
google::FlagSaver flag_saver_;
int32_t insert_counter_;
int32_t delete_counter_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
shared_ptr<Messenger> messenger_;
unique_ptr<ThreadPool> prepare_pool_;
unique_ptr<ThreadPool> apply_pool_;
unique_ptr<ThreadPool> raft_pool_;
unique_ptr<DnsResolver> dns_resolver_;
scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
// Must be destroyed before thread pools.
scoped_refptr<TabletReplica> tablet_replica_;
};
// A Transaction that waits on the apply_continue latch inside of Apply().
class DelayedApplyTransaction : public WriteTransaction {
public:
DelayedApplyTransaction(CountDownLatch* apply_started,
CountDownLatch* apply_continue,
unique_ptr<WriteTransactionState> state)
: WriteTransaction(std::move(state), consensus::LEADER),
apply_started_(DCHECK_NOTNULL(apply_started)),
apply_continue_(DCHECK_NOTNULL(apply_continue)) {
}
virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) override {
apply_started_->CountDown();
LOG(INFO) << "Delaying apply...";
apply_continue_->Wait();
LOG(INFO) << "Apply proceeding";
return WriteTransaction::Apply(commit_msg);
}
private:
CountDownLatch* apply_started_;
CountDownLatch* apply_continue_;
DISALLOW_COPY_AND_ASSIGN(DelayedApplyTransaction);
};
// Ensure that Log::GC() doesn't delete logs when the MRS has an anchor.
TEST_F(TabletReplicaTest, TestMRSAnchorPreventsLogGC) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
Log* log = tablet_replica_->log_.get();
int32_t num_gced;
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
log::SegmentSequence segments;
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(1, segments.size());
ASSERT_OK(ExecuteInsertsAndRollLogs(3));
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(4, segments.size());
NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
// Ensure nothing gets deleted.
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(0, num_gced) << "earliest needed: " << retention.for_durability;
// Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
tablet_replica_->tablet()->Flush();
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// The first two segments should be deleted.
// The last is anchored due to the commit in the last segment being the last
// OpId in the log.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(2, num_gced) << "earliest needed: " << retention.for_durability;
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
}
// Ensure that Log::GC() doesn't delete logs when the DMS has an anchor.
TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
Log* log = tablet_replica_->log_.get();
shared_ptr<RaftConsensus> consensus = tablet_replica_->shared_consensus();
int32_t num_gced;
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
log::SegmentSequence segments;
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(1, segments.size());
ASSERT_OK(ExecuteInsertsAndRollLogs(2));
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(3, segments.size());
// Flush MRS & GC log so the next mutation goes into a DMS.
ASSERT_OK(tablet_replica_->tablet()->Flush());
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
// We will only GC 1, and have 1 left because the earliest needed OpId falls
// back to the latest OpId written to the Log if no anchors are set.
ASSERT_EQ(1, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
boost::optional<OpId> id = consensus->GetLastOpId(consensus::RECEIVED_OPID);
ASSERT_NE(boost::none, id);
LOG(INFO) << "Before: " << *id;
// We currently have no anchors and the last operation in the log is 0.3
// Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
// what I think is a wrong assertion.
// I.e. since 0.4 is the last operation that we know is in memory 0.4 is the
// last anchor we expect _and_ it's the last op in the log.
// Only if we apply two operations is the last anchored operation and the
// last operation in the log different.
// Execute a mutation.
ASSERT_OK(ExecuteDeletesAndRollLogs(2));
NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(4, segments.size());
// Execute another couple inserts, but Flush it so it doesn't anchor.
ASSERT_OK(ExecuteInsertsAndRollLogs(2));
ASSERT_OK(tablet_replica_->tablet()->Flush());
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(6, segments.size());
// Ensure the delta and last insert remain in the logs, anchored by the delta.
// Note that this will allow GC of the 2nd insert done above.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(1, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
// Flush DMS to release the anchor.
tablet_replica_->tablet()->FlushBiggestDMS();
// Verify no anchors after Flush().
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// We should only hang onto one segment due to no anchors.
// The last log OpId is the commit in the last segment, so it only anchors
// that segment, not the previous, because it's not the first OpId in the
// segment.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(3, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
}
// Ensure that Log::GC() doesn't compact logs with OpIds of active transactions.
TEST_F(TabletReplicaTest, TestActiveTransactionPreventsLogGC) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
Log* log = tablet_replica_->log_.get();
int32_t num_gced;
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
log::SegmentSequence segments;
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(1, segments.size());
ASSERT_OK(ExecuteInsertsAndRollLogs(4));
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
// Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
tablet_replica_->tablet()->Flush();
// Verify no anchors after Flush().
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// Now create a long-lived Transaction that hangs during Apply().
// Allow other transactions to go through. Logs should be populated, but the
// long-lived Transaction should prevent the log from being deleted since it
// is in-flight.
CountDownLatch rpc_latch(1);
CountDownLatch apply_started(1);
CountDownLatch apply_continue(1);
gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
{
// Long-running mutation.
ASSERT_OK(GenerateSequentialDeleteRequest(req.get()));
unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_replica_.get(),
req.get(),
nullptr, // No RequestIdPB
resp.get()));
tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
gscoped_ptr<DelayedApplyTransaction> transaction(
new DelayedApplyTransaction(&apply_started,
&apply_continue,
std::move(tx_state)));
scoped_refptr<TransactionDriver> driver;
ASSERT_OK(tablet_replica_->NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
&driver));
ASSERT_OK(driver->ExecuteAsync());
apply_started.Wait();
ASSERT_TRUE(driver->GetOpId().IsInitialized())
<< "By the time a transaction is applied, it should have an Opid";
// The apply will hang until we CountDown() the continue latch.
// Now, roll the log. Below, we execute a few more insertions with rolling.
ASSERT_OK(log->AllocateSegmentAndRollOverForTests());
}
ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
// The log anchor is currently equal to the latest OpId written to the Log
// because we are delaying the Commit message with the CountDownLatch.
// GC the first four segments created by the inserts.
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(4, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
// We use mutations here, since an MRS Flush() quiesces the tablet, and we
// want to ensure the only thing "anchoring" is the TransactionTracker.
ASSERT_OK(ExecuteDeletesAndRollLogs(3));
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
tablet_replica_->tablet()->FlushBiggestDMS();
ASSERT_EVENTUALLY([&]{
AssertNoLogAnchors();
ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
});
NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
// Try to GC(), nothing should be deleted due to the in-flight transaction.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(0, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
// Now we release the transaction and wait for everything to complete.
// We fully quiesce and flush, which should release all anchors.
ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
apply_continue.CountDown();
rpc_latch.Wait();
tablet_replica_->txn_tracker_.WaitForAllToFinish();
ASSERT_EQ(0, tablet_replica_->txn_tracker_.GetNumPendingForTests());
tablet_replica_->tablet()->FlushBiggestDMS();
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// All should be deleted except the two last segments.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(3, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
}
TEST_F(TabletReplicaTest, TestGCEmptyLog) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplica(info));
// We don't wait on consensus on purpose.
ASSERT_OK(tablet_replica_->RunLogGC());
}
TEST_F(TabletReplicaTest, TestFlushOpsPerfImprovements) {
FLAGS_flush_threshold_mb = 64;
MaintenanceOpStats stats;
// Just on the threshold and not enough time has passed for a time-based flush.
stats.set_ram_anchored(64 * 1024 * 1024);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
ASSERT_EQ(0.0, stats.perf_improvement());
stats.Clear();
// Just on the threshold and enough time has passed, we'll have a low improvement.
stats.set_ram_anchored(64 * 1024 * 1024);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 3 * 60 * 1000);
ASSERT_GT(stats.perf_improvement(), 0.01);
stats.Clear();
// Over the threshold, we expect improvement equal to the excess MB.
stats.set_ram_anchored(128 * 1024 * 1024);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
ASSERT_NEAR(stats.perf_improvement(), 64, 0.01);
stats.Clear();
// Below the threshold but have been there a long time, closing in to 1.0.
stats.set_ram_anchored(30 * 1024 * 1024);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 60 * 50 * 1000);
ASSERT_LT(0.7, stats.perf_improvement());
ASSERT_GT(1.0, stats.perf_improvement());
stats.Clear();
}
// Test that the schema of a tablet will be rolled forward upon replaying an
// alter schema request.
TEST_F(TabletReplicaTest, TestRollLogSegmentSchemaOnAlter) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
SchemaPB orig_schema_pb;
ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
const int orig_schema_version = tablet()->metadata()->schema_version();
// Add a new column.
SchemaBuilder builder(tablet()->metadata()->schema());
ASSERT_OK(builder.AddColumn("new_col", INT32));
Schema new_client_schema = builder.BuildWithoutIds();
SchemaPB new_schema;
ASSERT_OK(SchemaToPB(builder.Build(), &new_schema));
ASSERT_OK(UpdateSchema(new_schema, orig_schema_version + 1));
const auto write = [&] {
unique_ptr<WriteRequestPB> req(new WriteRequestPB());
ASSERT_OK(GenerateSequentialInsertRequest(new_client_schema, req.get()));
ASSERT_OK(ExecuteWrite(tablet_replica_.get(), *req));
};
// Upon restarting, our log segment header schema should have "new_col".
NO_FATALS(write());
NO_FATALS(RestartReplica());
// Get rid of the alter in the WALs.
NO_FATALS(write());
ASSERT_OK(RollLog(tablet_replica_.get()));
NO_FATALS(write());
tablet_replica_->tablet()->Flush();
ASSERT_OK(tablet_replica_->RunLogGC());
// Now write some more and restart. If our segment header schema previously
// didn't have "new_col", bootstrapping would fail, complaining about a
// mismatch between the segment header schema and the write request schema.
NO_FATALS(write());
NO_FATALS(RestartReplica());
}
// Regression test for KUDU-2690, wherein a alter schema request that failed
// (e.g. because of an invalid schema) would roll forward the log segment
// header schema, causing a failure or crash upon bootstrapping.
TEST_F(TabletReplicaTest, Kudu2690Test) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
SchemaPB orig_schema_pb;
ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
const int orig_schema_version = tablet()->metadata()->schema_version();
// First things first, add a new column.
SchemaBuilder builder(tablet()->metadata()->schema());
ASSERT_OK(builder.AddColumn("new_col", INT32));
Schema new_client_schema = builder.BuildWithoutIds();
SchemaPB new_schema;
ASSERT_OK(SchemaToPB(builder.Build(), &new_schema));
ASSERT_OK(UpdateSchema(new_schema, orig_schema_version + 1));
// Try to update the schema to an older version. Before the fix for
// KUDU-2690, this would revert the schema in the next log segment header
// upon rolling the log below.
ASSERT_OK(UpdateSchema(orig_schema_pb, orig_schema_version));
// Roll onto a new segment so we can begin filling a new segment. This allows
// us to GC the first segment.
ASSERT_OK(RollLog(tablet_replica_.get()));
{
unique_ptr<WriteRequestPB> req(new WriteRequestPB());
ASSERT_OK(GenerateSequentialInsertRequest(new_client_schema, req.get()));
ASSERT_OK(ExecuteWrite(tablet_replica_.get(), *req));
}
ASSERT_OK(tablet_replica_->RunLogGC());
// Before KUDU-2960 was fixed, bootstrapping would fail, complaining that the
// write requests contained a column that was not in the log segment header's
// schema.
NO_FATALS(RestartReplica());
}
TEST_F(TabletReplicaTest, TestLiveRowCountMetric) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
tablet_replica_->tablet()->GetMetricEntity(), Callback<uint64_t(void)>());
ASSERT_EQ(0, live_row_count->value());
// Insert some rows.
Random rand(SeedRandom());
const int kNumInsert = rand.Next() % 100 + 1;
ASSERT_OK(ExecuteInsertsAndRollLogs(kNumInsert));
ASSERT_EQ(kNumInsert, live_row_count->value());
// Delete some rows.
const int kNumDelete = rand.Next() % kNumInsert;
ASSERT_OK(ExecuteDeletesAndRollLogs(kNumDelete));
ASSERT_EQ(kNumInsert - kNumDelete, live_row_count->value());
}
} // namespace tablet
} // namespace kudu