blob: 4a5976ab514c664f0f90652b39398ca7c926b59f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_replica.h"
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/lock_manager.h"
#include "kudu/tablet/ops/alter_schema_op.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/op_driver.h"
#include "kudu/tablet/ops/op_tracker.h"
#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica-test-base.h"
#include "kudu/tablet/tablet_replica_mm_ops.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/array_view.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_bool(enable_maintenance_manager);
DECLARE_int32(flush_threshold_mb);
DECLARE_int32(tablet_history_max_age_sec);
METRIC_DECLARE_entity(tablet);
METRIC_DECLARE_gauge_uint64(live_row_count);
using kudu::consensus::CommitMsg;
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::consensus::OpId;
using kudu::consensus::RECEIVED_OPID;
using kudu::consensus::RaftConsensus;
using kudu::log::Log;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::tserver::AlterSchemaRequestPB;
using kudu::tserver::AlterSchemaResponsePB;
using kudu::tserver::WriteRequestPB;
using kudu::tserver::WriteResponsePB;
using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
namespace kudu {
namespace tablet {
static Schema GetTestSchema() {
return Schema({ ColumnSchema("key", INT32) }, 1);
}
class TabletReplicaTest : public TabletReplicaTestBase {
public:
TabletReplicaTest()
: TabletReplicaTestBase(GetTestSchema()),
insert_counter_(0),
delete_counter_(0) {
}
protected:
// Generate monotonic sequence of key column integers.
Status GenerateSequentialInsertRequest(const Schema& schema,
WriteRequestPB* write_req) {
write_req->set_tablet_id(tablet()->tablet_id());
RETURN_NOT_OK(SchemaToPB(schema, write_req->mutable_schema()));
KuduPartialRow row(&schema);
for (int i = 0; i < schema.num_columns(); i++) {
RETURN_NOT_OK(row.SetInt32(i, insert_counter_++));
}
RowOperationsPBEncoder enc(write_req->mutable_row_operations());
enc.Add(RowOperationsPB::INSERT, row);
return Status::OK();
}
// Generate monotonic sequence of deletions, starting with 0.
// Will assert if you try to delete more rows than you inserted.
Status GenerateSequentialDeleteRequest(WriteRequestPB* write_req) {
CHECK_LT(delete_counter_, insert_counter_);
Schema schema(GetTestSchema());
write_req->set_tablet_id(tablet()->tablet_id());
CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
KuduPartialRow row(&schema);
CHECK_OK(row.SetInt32("key", delete_counter_++));
RowOperationsPBEncoder enc(write_req->mutable_row_operations());
enc.Add(RowOperationsPB::DELETE, row);
return Status::OK();
}
Status UpdateSchema(const SchemaPB& schema, int schema_version) {
AlterSchemaRequestPB alter;
alter.set_dest_uuid(tablet()->metadata()->fs_manager()->uuid());
alter.set_tablet_id(tablet()->tablet_id());
alter.set_schema_version(schema_version);
*alter.mutable_schema() = schema;
return ExecuteAlter(tablet_replica_.get(), alter);
}
Status ExecuteAlter(TabletReplica* replica, const AlterSchemaRequestPB& req) {
unique_ptr<AlterSchemaResponsePB> resp(new AlterSchemaResponsePB());
unique_ptr<AlterSchemaOpState> op_state(
new AlterSchemaOpState(replica, &req, resp.get()));
CountDownLatch rpc_latch(1);
op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
new LatchOpCompletionCallback<AlterSchemaResponsePB>(&rpc_latch, resp.get())));
RETURN_NOT_OK(replica->SubmitAlterSchema(std::move(op_state)));
rpc_latch.Wait();
CHECK(!resp->has_error())
<< "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
return Status::OK();
}
static Status RollLog(TabletReplica* replica) {
RETURN_NOT_OK(replica->log_->WaitUntilAllFlushed());
return replica->log_->AllocateSegmentAndRollOverForTests();
}
Status ExecuteWriteAndRollLog(TabletReplica* tablet_replica, const WriteRequestPB& req) {
RETURN_NOT_OK(ExecuteWrite(tablet_replica, req));
// Roll the log after each write.
// Usually the append thread does the roll and no additional sync is required. However in
// this test the thread that is appending is not the same thread that is rolling the log
// so we must make sure the Log's queue is flushed before we roll or we might have a race
// between the appender thread and the thread executing the test.
CHECK_OK(RollLog(tablet_replica));
return Status::OK();
}
// Execute insert requests and roll log after each one.
Status ExecuteInsertsAndRollLogs(int num_inserts) {
for (int i = 0; i < num_inserts; i++) {
WriteRequestPB req;
RETURN_NOT_OK(GenerateSequentialInsertRequest(GetTestSchema(), &req));
RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), req));
}
return Status::OK();
}
// Execute delete requests and roll log after each one.
Status ExecuteDeletesAndRollLogs(int num_deletes) {
for (int i = 0; i < num_deletes; i++) {
WriteRequestPB req;
RETURN_NOT_OK(GenerateSequentialDeleteRequest(&req));
RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), req));
}
return Status::OK();
}
// Assert that there are no log anchors held on the tablet replica.
//
// NOTE: when an op finishes and notifies the completion callback, it still is
// registered with the op tracker for a very short time before being
// destructed. So, this should always be called with an ASSERT_EVENTUALLY wrapper.
void AssertNoLogAnchors() {
// Make sure that there are no registered anchors in the registry
ASSERT_EQ(0, tablet_replica()->log_anchor_registry()->GetAnchorCountForTests());
}
// Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
void AssertLogAnchorEarlierThanLogLatest() {
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
std::optional<OpId> last_log_opid = tablet_replica_->consensus()->GetLastOpId(RECEIVED_OPID);
ASSERT_TRUE(last_log_opid);
ASSERT_LT(retention.for_durability, last_log_opid->index())
<< "Expected valid log anchor, got earliest opid: " << retention.for_durability
<< " (expected any value earlier than last log id: " << SecureShortDebugString(*last_log_opid)
<< ")";
}
// We disable automatic log GC. Don't leak those changes.
google::FlagSaver flag_saver_;
int32_t insert_counter_;
int32_t delete_counter_;
};
// A Op that waits on the apply_continue latch inside of Apply().
class DelayedApplyOp : public WriteOp {
public:
DelayedApplyOp(CountDownLatch* apply_started,
CountDownLatch* apply_continue,
unique_ptr<WriteOpState> state)
: WriteOp(std::move(state), consensus::LEADER),
apply_started_(DCHECK_NOTNULL(apply_started)),
apply_continue_(DCHECK_NOTNULL(apply_continue)) {
}
virtual Status Apply(CommitMsg** commit_msg) override {
apply_started_->CountDown();
LOG(INFO) << "Delaying apply...";
apply_continue_->Wait();
LOG(INFO) << "Apply proceeding";
return WriteOp::Apply(commit_msg);
}
private:
CountDownLatch* apply_started_;
CountDownLatch* apply_continue_;
DISALLOW_COPY_AND_ASSIGN(DelayedApplyOp);
};
// Ensure that Log::GC() doesn't delete logs when the MRS has an anchor.
TEST_F(TabletReplicaTest, TestMRSAnchorPreventsLogGC) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
Log* log = tablet_replica_->log_.get();
int32_t num_gced;
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
log::SegmentSequence segments;
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(1, segments.size());
ASSERT_OK(ExecuteInsertsAndRollLogs(3));
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(4, segments.size());
NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
// Ensure nothing gets deleted.
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(0, num_gced) << "earliest needed: " << retention.for_durability;
// Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
tablet_replica_->tablet()->Flush();
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// The first two segments should be deleted.
// The last is anchored due to the commit in the last segment being the last
// OpId in the log.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(2, num_gced) << "earliest needed: " << retention.for_durability;
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
}
// Ensure that Log::GC() doesn't delete logs when the DMS has an anchor.
TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
Log* log = tablet_replica_->log_.get();
shared_ptr<RaftConsensus> consensus = tablet_replica_->shared_consensus();
int32_t num_gced;
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
log::SegmentSequence segments;
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(1, segments.size());
ASSERT_OK(ExecuteInsertsAndRollLogs(2));
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(3, segments.size());
// Flush MRS & GC log so the next mutation goes into a DMS.
ASSERT_OK(tablet_replica_->tablet()->Flush());
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
// We will only GC 1, and have 1 left because the earliest needed OpId falls
// back to the latest OpId written to the Log if no anchors are set.
ASSERT_EQ(1, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
std::optional<OpId> id = consensus->GetLastOpId(consensus::RECEIVED_OPID);
ASSERT_TRUE(id);
LOG(INFO) << "Before: " << *id;
// We currently have no anchors and the last operation in the log is 0.3
// Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
// what I think is a wrong assertion.
// I.e. since 0.4 is the last operation that we know is in memory 0.4 is the
// last anchor we expect _and_ it's the last op in the log.
// Only if we apply two operations is the last anchored operation and the
// last operation in the log different.
// Execute a mutation.
ASSERT_OK(ExecuteDeletesAndRollLogs(2));
NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(4, segments.size());
// Execute another couple inserts, but Flush it so it doesn't anchor.
ASSERT_OK(ExecuteInsertsAndRollLogs(2));
ASSERT_OK(tablet_replica_->tablet()->Flush());
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(6, segments.size());
// Ensure the delta and last insert remain in the logs, anchored by the delta.
// Note that this will allow GC of the 2nd insert done above.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(1, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
// Flush DMS to release the anchor.
tablet_replica_->tablet()->FlushBiggestDMS();
// Verify no anchors after Flush().
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// We should only hang onto one segment due to no anchors.
// The last log OpId is the commit in the last segment, so it only anchors
// that segment, not the previous, because it's not the first OpId in the
// segment.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(3, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
}
// Ensure that Log::GC() doesn't compact logs with OpIds of active ops.
TEST_F(TabletReplicaTest, TestActiveOpPreventsLogGC) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
Log* log = tablet_replica_->log_.get();
int32_t num_gced;
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
log::SegmentSequence segments;
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(1, segments.size());
ASSERT_OK(ExecuteInsertsAndRollLogs(4));
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
// Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
tablet_replica_->tablet()->Flush();
// Verify no anchors after Flush().
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// Now create a long-lived op that hangs during Apply().
// Allow other ops to go through. Logs should be populated, but the
// long-lived op should prevent the log from being deleted since it
// is in-flight.
CountDownLatch rpc_latch(1);
CountDownLatch apply_started(1);
CountDownLatch apply_continue(1);
unique_ptr<WriteRequestPB> req(new WriteRequestPB());
unique_ptr<WriteResponsePB> resp(new WriteResponsePB());
{
// Long-running mutation.
ASSERT_OK(GenerateSequentialDeleteRequest(req.get()));
unique_ptr<WriteOpState> op_state(new WriteOpState(tablet_replica_.get(),
req.get(),
nullptr, // No RequestIdPB
resp.get()));
op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
new LatchOpCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
unique_ptr<DelayedApplyOp> op(
new DelayedApplyOp(&apply_started,
&apply_continue,
std::move(op_state)));
scoped_refptr<OpDriver> driver;
ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op),
&driver));
driver->ExecuteAsync();
apply_started.Wait();
ASSERT_TRUE(driver->GetOpId().IsInitialized())
<< "By the time an op is applied, it should have an Opid";
// The apply will hang until we CountDown() the continue latch.
// Now, roll the log. Below, we execute a few more insertions with rolling.
ASSERT_OK(log->AllocateSegmentAndRollOverForTests());
}
ASSERT_EQ(1, tablet_replica_->op_tracker_.GetNumPendingForTests());
// The log anchor is currently equal to the latest OpId written to the Log
// because we are delaying the Commit message with the CountDownLatch.
// GC the first four segments created by the inserts.
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(4, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
// We use mutations here, since an MRS Flush() quiesces the tablet, and we
// want to ensure the only thing "anchoring" is the OpTracker.
ASSERT_OK(ExecuteDeletesAndRollLogs(3));
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
tablet_replica_->tablet()->FlushBiggestDMS();
ASSERT_EVENTUALLY([&]{
AssertNoLogAnchors();
ASSERT_EQ(1, tablet_replica_->op_tracker_.GetNumPendingForTests());
});
NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
// Try to GC(), nothing should be deleted due to the in-flight op.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(0, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
// Now we release the op and wait for everything to complete.
// We fully quiesce and flush, which should release all anchors.
ASSERT_EQ(1, tablet_replica_->op_tracker_.GetNumPendingForTests());
apply_continue.CountDown();
rpc_latch.Wait();
tablet_replica_->op_tracker_.WaitForAllToFinish();
ASSERT_EQ(0, tablet_replica_->op_tracker_.GetNumPendingForTests());
tablet_replica_->tablet()->FlushBiggestDMS();
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// All should be deleted except the two last segments.
retention = tablet_replica_->GetRetentionIndexes();
ASSERT_OK(log->GC(retention, &num_gced));
ASSERT_EQ(3, num_gced);
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(2, segments.size());
}
TEST_F(TabletReplicaTest, TestGCEmptyLog) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplica(info));
// We don't wait on consensus on purpose.
ASSERT_OK(tablet_replica_->RunLogGC());
}
TEST_F(TabletReplicaTest, TestFlushOpsPerfImprovements) {
FLAGS_flush_threshold_mb = 64;
MaintenanceOpStats stats;
// Just on the threshold and not enough time has passed for a time-based flush,
// we'll expect improvement equal to '1'.
stats.set_ram_anchored(64 * 1024 * 1024);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
ASSERT_EQ(1.0, stats.perf_improvement());
stats.Clear();
// Below the threshold and enough time has passed, we'll have a low improvement.
stats.set_ram_anchored(2 * 1024 * 1024);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 3 * 60 * 1000);
ASSERT_LT(0.01, stats.perf_improvement());
ASSERT_GT(0.1, stats.perf_improvement());
stats.Clear();
// Over the threshold, we expect improvement equal to the excess MB.
stats.set_ram_anchored(128 * 1024 * 1024);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
ASSERT_NEAR(stats.perf_improvement(), 64, 0.01);
stats.Clear();
// Below the threshold but have been there a long time, closing in to 1.0.
stats.set_ram_anchored(1);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 60 * 50 * 1000);
ASSERT_LT(0.7, stats.perf_improvement());
ASSERT_GT(1.0, stats.perf_improvement());
stats.Clear();
// Approaching threshold, enough time has passed but haven't been there a long time,
// closing in to 1.0.
stats.set_ram_anchored(63 * 1024 * 1024);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 3 * 60 * 1000);
ASSERT_LT(0.9, stats.perf_improvement());
ASSERT_GT(1.0, stats.perf_improvement());
stats.Clear();
}
// Test that the schema of a tablet will be rolled forward upon replaying an
// alter schema request.
TEST_F(TabletReplicaTest, TestRollLogSegmentSchemaOnAlter) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
SchemaPB orig_schema_pb;
ASSERT_OK(SchemaToPB(SchemaBuilder(*tablet()->metadata()->schema()).Build(), &orig_schema_pb));
const int orig_schema_version = tablet()->metadata()->schema_version();
// Add a new column.
SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.AddColumn("new_col", INT32));
Schema new_client_schema = builder.BuildWithoutIds();
SchemaPB new_schema;
ASSERT_OK(SchemaToPB(builder.Build(), &new_schema));
ASSERT_OK(UpdateSchema(new_schema, orig_schema_version + 1));
const auto write = [&] {
unique_ptr<WriteRequestPB> req(new WriteRequestPB());
ASSERT_OK(GenerateSequentialInsertRequest(new_client_schema, req.get()));
ASSERT_OK(ExecuteWrite(tablet_replica_.get(), *req));
};
// Upon restarting, our log segment header schema should have "new_col".
NO_FATALS(write());
ASSERT_OK(RestartReplica());
// Get rid of the alter in the WALs.
NO_FATALS(write());
ASSERT_OK(RollLog(tablet_replica_.get()));
NO_FATALS(write());
tablet_replica_->tablet()->Flush();
ASSERT_OK(tablet_replica_->RunLogGC());
// Now write some more and restart. If our segment header schema previously
// didn't have "new_col", bootstrapping would fail, complaining about a
// mismatch between the segment header schema and the write request schema.
NO_FATALS(write());
ASSERT_OK(RestartReplica());
}
// Regression test for KUDU-2690, wherein a alter schema request that failed
// (e.g. because of an invalid schema) would roll forward the log segment
// header schema, causing a failure or crash upon bootstrapping.
TEST_F(TabletReplicaTest, Kudu2690Test) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
SchemaPB orig_schema_pb;
ASSERT_OK(SchemaToPB(SchemaBuilder(*tablet()->metadata()->schema()).Build(), &orig_schema_pb));
const int orig_schema_version = tablet()->metadata()->schema_version();
// First things first, add a new column.
SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.AddColumn("new_col", INT32));
Schema new_client_schema = builder.BuildWithoutIds();
SchemaPB new_schema;
ASSERT_OK(SchemaToPB(builder.Build(), &new_schema));
ASSERT_OK(UpdateSchema(new_schema, orig_schema_version + 1));
// Try to update the schema to an older version. Before the fix for
// KUDU-2690, this would revert the schema in the next log segment header
// upon rolling the log below.
ASSERT_OK(UpdateSchema(orig_schema_pb, orig_schema_version));
// Roll onto a new segment so we can begin filling a new segment. This allows
// us to GC the first segment.
ASSERT_OK(RollLog(tablet_replica_.get()));
{
unique_ptr<WriteRequestPB> req(new WriteRequestPB());
ASSERT_OK(GenerateSequentialInsertRequest(new_client_schema, req.get()));
ASSERT_OK(ExecuteWrite(tablet_replica_.get(), *req));
}
ASSERT_OK(tablet_replica_->RunLogGC());
// Before KUDU-2960 was fixed, bootstrapping would fail, complaining that the
// write requests contained a column that was not in the log segment header's
// schema.
ASSERT_OK(RestartReplica());
}
TEST_F(TabletReplicaTest, TestLiveRowCountMetric) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
// We don't care what the function is, since the metric is already instantiated.
auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
tablet_replica_->tablet()->GetMetricEntity(), [](){ return 0; });
ASSERT_EQ(0, live_row_count->value());
// Insert some rows.
Random rand(SeedRandom());
const int kNumInsert = rand.Next() % 100 + 1;
ASSERT_OK(ExecuteInsertsAndRollLogs(kNumInsert));
ASSERT_EQ(kNumInsert, live_row_count->value());
// Delete some rows.
const int kNumDelete = rand.Next() % kNumInsert;
ASSERT_OK(ExecuteDeletesAndRollLogs(kNumDelete));
ASSERT_EQ(kNumInsert - kNumDelete, live_row_count->value());
}
TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
FLAGS_enable_maintenance_manager = false;
FLAGS_tablet_history_max_age_sec = 1;
const int kNumRows = 10;
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
auto* tablet = tablet_replica_->tablet();
// Metrics are already registered so pass a dummy lambda.
auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
tablet->GetMetricEntity(), [] () { return 0; });
// Insert some rows and flush so we get a DRS, and then delete them so we
// have an ancient, fully deleted DRS.
ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
ASSERT_OK(tablet->Flush());
ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
ASSERT_EQ(1, tablet->num_rowsets());
ASSERT_EQ(0, live_row_count->value());
SleepFor(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
// Insert some fresh rows so we can validate that we don't GC everything.
ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
ASSERT_OK(tablet->Flush());
ASSERT_EQ(2, tablet->num_rowsets());
ASSERT_EQ(kNumRows, live_row_count->value());
// Now GC what we can. The first rowset should be gone.
ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
ASSERT_EQ(1, tablet->num_rowsets());
ASSERT_EQ(kNumRows, live_row_count->value());
ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
ASSERT_EQ(0, live_row_count->value());
// Restart and ensure we can rebuild our DMS okay.
ASSERT_OK(RestartReplica());
tablet = tablet_replica_->tablet();
ASSERT_EQ(1, tablet->num_rowsets());
live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
tablet->GetMetricEntity(), [] () { return 0; });
ASSERT_EQ(0, live_row_count->value());
// Now do that again but with deltafiles.
ASSERT_OK(tablet->FlushBiggestDMS());
ASSERT_OK(RestartReplica());
tablet = tablet_replica_->tablet();
ASSERT_EQ(1, tablet->num_rowsets());
// Wait for our deleted rowset to become ancient. Since we just started up,
// we shouldn't have read any delta stats, so running the GC won't pick up
// our deleted DRS.
SleepFor(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
ASSERT_EQ(1, tablet->num_rowsets());
}
// This is a trivial test scenario to check how row locking works in case of
// concurrent attempts to lock the same row with relatively long waiting times.
// The thread attempting to acquire the row lock for long times should be able
// to acquire the lock eventually and log about its attempts to acquire the log.
// The logging part isn't covered by any special assertions, though.
// An alternative place to add this scenario could be lock_manager-test.cc, but
// for proper logging a real WriteOpState backed by a tablet is necessary.
TEST_F(TabletReplicaTest, RowLocksLongWaitAndLogging) {
SKIP_IF_SLOW_NOT_ALLOWED();
constexpr const char* const kKey = "key";
constexpr int32_t kValue = 0;
const Schema schema(GetTestSchema());
Slice key[]{kKey};
unique_ptr<WriteRequestPB> req(new WriteRequestPB);
req->set_tablet_id(tablet()->tablet_id());
CHECK_OK(SchemaToPB(schema, req->mutable_schema()));
KuduPartialRow row(&schema);
CHECK_OK(row.SetInt32(kKey, kValue));
{
RowOperationsPBEncoder enc(req->mutable_row_operations());
enc.Add(RowOperationsPB::DELETE, row);
}
unique_ptr<WriteResponsePB> resp(new WriteResponsePB);
LockManager lock_manager;
thread t0([&]{
unique_ptr<WriteOpState> op_state(new WriteOpState(
tablet_replica_.get(), req.get(), nullptr, resp.get()));
ScopedRowLock row_lock(
&lock_manager, op_state.get(), key, LockManager::LOCK_EXCLUSIVE);
CHECK(row_lock.acquired());
// Pause for a while when the other thread tries to acquire the lock,
// so the other thread logs about its attempts to acquire the row lock.
SleepFor(MonoDelta::FromMilliseconds(3000));
});
thread t1([&]{
// Let the other thread acquire the lock first.
SleepFor(MonoDelta::FromMilliseconds(500));
unique_ptr<WriteOpState> op_state(new WriteOpState(
tablet_replica_.get(), req.get(), nullptr, resp.get()));
ScopedRowLock row_lock(
&lock_manager, op_state.get(), key, LockManager::LOCK_EXCLUSIVE);
CHECK(row_lock.acquired());
});
t0.join();
t1.join();
}
} // namespace tablet
} // namespace kudu