blob: d717c00c10fefd3725fbb7598facda7477137296 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/ops/op_driver.h"
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
#include <utility>
#include <glog/logging.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
#include "kudu/clock/clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/timestamp.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/op_order_verifier.h"
#include "kudu/tablet/ops/op_tracker.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/logging.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status_callback.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
using kudu::consensus::CommitMsg;
using kudu::consensus::DriverType;
using kudu::consensus::RaftConsensus;
using kudu::consensus::ReplicateMsg;
using kudu::log::Log;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::RequestIdPB;
using kudu::rpc::ResultTracker;
using std::string;
using std::unique_ptr;
using strings::Substitute;
namespace kudu {
namespace tablet {
static const char* kTimestampFieldName = "timestamp";
class FollowerOpCompletionCallback : public OpCompletionCallback {
public:
FollowerOpCompletionCallback(const RequestIdPB& request_id,
const google::protobuf::Message* response,
scoped_refptr<ResultTracker> result_tracker)
: request_id_(request_id),
response_(response),
result_tracker_(std::move(result_tracker)) {}
virtual void OpCompleted() {
if (status_.ok()) {
result_tracker_->RecordCompletionAndRespond(request_id_, response_);
} else {
// For now we always respond with TOO_BUSY, meaning the client will retry (even if
// this is an unretryable failure), that works as the client-driven version of this
// op will get the right error.
result_tracker_->FailAndRespond(request_id_,
rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
status_);
}
}
virtual ~FollowerOpCompletionCallback() {}
private:
const RequestIdPB& request_id_;
const google::protobuf::Message* response_;
scoped_refptr<ResultTracker> result_tracker_;
};
////////////////////////////////////////////////////////////
// OpDriver
////////////////////////////////////////////////////////////
OpDriver::OpDriver(OpTracker *op_tracker,
RaftConsensus* consensus,
Log* log,
ThreadPoolToken* prepare_pool_token,
ThreadPool* apply_pool,
OpOrderVerifier* order_verifier)
: op_tracker_(op_tracker),
consensus_(consensus),
log_(log),
prepare_pool_token_(prepare_pool_token),
apply_pool_(apply_pool),
order_verifier_(order_verifier),
trace_(new Trace()),
start_time_(MonoTime::Now()),
replication_state_(NOT_REPLICATING),
prepare_state_(NOT_PREPARED) {
if (Trace::CurrentTrace()) {
Trace::CurrentTrace()->AddChildTrace("op", trace_.get());
}
}
Status OpDriver::Init(unique_ptr<Op> op,
DriverType type) {
// If the tablet has been stopped, the replica is likely shutting down soon.
// Prevent further ops from starting.
// Note: Some tests may not have a replica.
TabletReplica* replica = op->state()->tablet_replica();
{
std::shared_ptr<Tablet> tablet = replica ? replica->shared_tablet() : nullptr;
if (PREDICT_FALSE(tablet && tablet->HasBeenStopped())) {
return Status::IllegalState("Not initializing new op; the tablet is stopped");
}
}
op_ = std::move(op);
if (type == consensus::FOLLOWER) {
std::lock_guard<simple_spinlock> lock(opid_lock_);
op_id_copy_ = op_->state()->op_id();
DCHECK(op_id_copy_.IsInitialized());
replication_state_ = REPLICATING;
replication_start_time_ = MonoTime::Now();
// Start the replica op in the thread that is updating consensus, for
// non-leader ops.
// Replica ops were already assigned a timestamp so we don't need to
// acquire locks before calling Start(). Starting the the op here gives a
// strong guarantee to consensus that the op is on mvcc when it moves
// "safe" time so that we don't risk marking a timestamp "safe" before all
// ops before it are in-flight are on mvcc.
RETURN_NOT_OK(op_->Start());
if (state()->are_results_tracked()) {
// If this is a follower op, make sure to set the op completion callback
// before the op has a chance to fail.
const rpc::RequestIdPB& request_id = state()->request_id();
const google::protobuf::Message* response = state()->response();
unique_ptr<OpCompletionCallback> callback(
new FollowerOpCompletionCallback(request_id,
response,
state()->result_tracker()));
mutable_state()->set_completion_callback(std::move(callback));
}
} else {
DCHECK_EQ(type, consensus::LEADER);
unique_ptr<ReplicateMsg> replicate_msg;
op_->NewReplicateMsg(&replicate_msg);
if (consensus_) { // sometimes NULL in tests
// A raw pointer is required to avoid a refcount cycle.
mutable_state()->set_consensus_round(
consensus_->NewRound(std::move(replicate_msg),
[this](const Status& s) { this->ReplicationFinished(s); }));
}
}
return op_tracker_->Add(this);
}
consensus::OpId OpDriver::GetOpId() {
std::lock_guard<simple_spinlock> lock(opid_lock_);
return op_id_copy_;
}
const OpState* OpDriver::state() const {
return op_ != nullptr ? op_->state() : nullptr;
}
OpState* OpDriver::mutable_state() {
return op_ != nullptr ? op_->state() : nullptr;
}
Op::OpType OpDriver::op_type() const {
return op_->op_type();
}
string OpDriver::ToString() const {
std::lock_guard<simple_spinlock> lock(lock_);
return ToStringUnlocked();
}
string OpDriver::ToStringUnlocked() const {
string ret = StateString(replication_state_, prepare_state_);
if (op_ != nullptr) {
ret += " " + op_->ToString();
} else {
ret += "[unknown op]";
}
return ret;
}
void OpDriver::ExecuteAsync() {
VLOG_WITH_PREFIX(4) << "ExecuteAsync()";
TRACE_EVENT_FLOW_BEGIN0("op", "ExecuteAsync", this);
ADOPT_TRACE(trace());
Status s;
if (replication_state_ == NOT_REPLICATING) {
// We're a leader op. Before submitting, check that we are the leader and
// determine the current term.
s = consensus_->CheckLeadershipAndBindTerm(mutable_state()->consensus_round());
}
if (s.ok()) {
s = prepare_pool_token_->Submit([this]() { this->PrepareTask(); });
}
if (PREDICT_FALSE(!s.ok())) {
HandleFailure(s);
}
}
void OpDriver::PrepareTask() {
TRACE_EVENT_FLOW_END0("op", "PrepareTask", this);
Status prepare_status = Prepare();
if (PREDICT_FALSE(!prepare_status.ok())) {
HandleFailure(prepare_status);
}
}
void OpDriver::RegisterFollowerOpOnResultTracker() {
// If this is an op being executed by a follower and its result is being
// tracked, make sure that we're the driver of the op.
if (!state()->are_results_tracked()) return;
ResultTracker::RpcState rpc_state = state()->result_tracker()->TrackRpcOrChangeDriver(
state()->request_id());
switch (rpc_state) {
case ResultTracker::RpcState::NEW:
// We're the only ones trying to execute the op (normal case). Proceed.
return;
// If this RPC was previously completed or is already stale (like if the same tablet was
// bootstrapped twice) stop tracking the result. Only follower ops can observe these
// states so we simply reset the callback and the result will not be tracked anymore.
case ResultTracker::RpcState::STALE:
case ResultTracker::RpcState::COMPLETED: {
mutable_state()->set_completion_callback(
unique_ptr<OpCompletionCallback>(new OpCompletionCallback()));
VLOG(2) << state()->result_tracker() << " Follower Rpc was already COMPLETED or STALE: "
<< rpc_state << " OpId: " << SecureShortDebugString(state()->op_id())
<< " RequestId: " << SecureShortDebugString(state()->request_id());
return;
}
default:
LOG(FATAL) << "Unexpected state: " << rpc_state;
}
}
Status OpDriver::Prepare() {
TRACE_EVENT1("op", "Prepare", "op", this);
VLOG_WITH_PREFIX(4) << "Prepare()";
// Actually prepare and start the op.
prepare_physical_timestamp_ = GetMonoTimeMicros();
RETURN_NOT_OK(op_->Prepare());
// Only take the lock long enough to take a local copy of the
// replication state and set our prepare state. This ensures that
// exactly one of Replicate/Prepare callbacks will trigger the apply
// phase.
ReplicationState repl_state_copy;
{
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(prepare_state_, NOT_PREPARED);
prepare_state_ = PREPARED;
repl_state_copy = replication_state_;
// If this is a follower op we need to register the op on the tracker here,
// atomically with the change of the prepared state. Otherwise if the
// prepare thread gets preempted after the state is prepared apply can be
// triggered by another thread without the rpc being registered.
if (op_->type() == consensus::FOLLOWER) {
RegisterFollowerOpOnResultTracker();
// ... else we're a client-started op. Make sure we're still the driver of the
// RPC and give up if we aren't.
} else {
if (state()->are_results_tracked()
&& !state()->result_tracker()->IsCurrentDriver(state()->request_id())) {
op_status_ = Status::AlreadyPresent(Substitute(
"There's already an attempt of the same operation on the server for request id: $0",
SecureShortDebugString(state()->request_id())));
replication_state_ = REPLICATION_FAILED;
return op_status_;
}
}
}
switch (repl_state_copy) {
case NOT_REPLICATING:
{
// Assign a timestamp to the op before we Start() it.
RETURN_NOT_OK(consensus_->time_manager()->AssignTimestamp(
mutable_state()->consensus_round()->replicate_msg()));
RETURN_NOT_OK(op_->Start());
VLOG_WITH_PREFIX(4) << "Triggering consensus replication.";
TRACE("REPLICATION: Starting.");
// Trigger consensus replication.
{
std::lock_guard<simple_spinlock> lock(lock_);
replication_state_ = REPLICATING;
replication_start_time_ = MonoTime::Now();
}
Status s = consensus_->Replicate(mutable_state()->consensus_round());
if (PREDICT_FALSE(!s.ok())) {
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(replication_state_, REPLICATING);
op_status_ = s;
replication_state_ = REPLICATION_FAILED;
return s;
}
break;
}
case REPLICATING:
{
// Already replicating - nothing to trigger
break;
}
case REPLICATION_FAILED:
DCHECK(!op_status_.ok());
[[fallthrough]];
case REPLICATED:
{
// We can move on to apply.
// Note that ApplyAsync() will handle the error status in the
// REPLICATION_FAILED case.
return ApplyAsync();
}
}
return Status::OK();
}
void OpDriver::HandleFailure(const Status& s) {
VLOG_WITH_PREFIX(2) << "Failed op: " << s.ToString();
CHECK(!s.ok());
TRACE("HandleFailure($0)", s.ToString());
ReplicationState repl_state_copy;
{
std::lock_guard<simple_spinlock> lock(lock_);
op_status_ = s;
repl_state_copy = replication_state_;
}
switch (repl_state_copy) {
case REPLICATING:
case REPLICATED:
{
// Replicated ops are only allowed to fail if the tablet has
// been stopped.
if (!state()->tablet_replica()->tablet()->HasBeenStopped()) {
LOG_WITH_PREFIX(FATAL) << "Cannot cancel ops that have already replicated"
<< ": " << op_status_.ToString()
<< " op:" << ToString();
}
[[fallthrough]];
}
case NOT_REPLICATING:
case REPLICATION_FAILED:
{
VLOG_WITH_PREFIX(1) << Substitute("Op $0 failed: $1", ToString(), s.ToString());
op_->Finish(Op::ABORTED);
mutable_state()->completion_callback()->set_error(op_status_);
mutable_state()->completion_callback()->OpCompleted();
op_tracker_->Release(this);
}
}
}
void OpDriver::ReplicationFinished(const Status& status) {
MonoTime replication_finished_time = MonoTime::Now();
ADOPT_TRACE(trace());
{
std::lock_guard<simple_spinlock> op_id_lock(opid_lock_);
// TODO: it's a bit silly that we have three copies of the opid:
// one here, one in ConsensusRound, and one in OpState.
op_id_copy_ = DCHECK_NOTNULL(mutable_state()->consensus_round())->id();
DCHECK(op_id_copy_.IsInitialized());
mutable_state()->mutable_op_id()->CopyFrom(op_id_copy_);
}
// If we're going to abort, do so before changing the state below. This avoids a race with
// the prepare thread, which would race with the thread calling this method to release the driver
// while we're aborting, if we were to do it afterwards.
if (!status.ok()) {
op_->AbortPrepare();
}
MonoDelta replication_duration;
PrepareState prepare_state_copy;
{
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(replication_state_, REPLICATING);
if (status.ok()) {
replication_state_ = REPLICATED;
} else {
replication_state_ = REPLICATION_FAILED;
op_status_ = status;
}
prepare_state_copy = prepare_state_;
replication_duration = replication_finished_time - replication_start_time_;
}
TRACE_COUNTER_INCREMENT("replication_time_us", replication_duration.ToMicroseconds());
TRACE("REPLICATION: Finished.");
// If we have prepared and replicated, we're ready
// to move ahead and apply this operation.
// Note that if we set the state to REPLICATION_FAILED above, ApplyAsync()
// will actually abort the op, i.e. ApplyTask() will never be called and the
// op will never be applied to the tablet.
if (prepare_state_copy == PREPARED) {
// We likely need to do cleanup if this fails so for now just
// CHECK_OK
CHECK_OK(ApplyAsync());
}
}
void OpDriver::Abort(const Status& status) {
CHECK(!status.ok());
ReplicationState repl_state_copy;
{
std::lock_guard<simple_spinlock> lock(lock_);
repl_state_copy = replication_state_;
op_status_ = status;
}
// If the state is not NOT_REPLICATING we abort immediately and the op will
// never be replicated.
// In any other state we just set the op status, if the op's Apply
// hasn't started yet this prevents it from starting, but if it has then the
// op runs to completion.
if (repl_state_copy == NOT_REPLICATING) {
HandleFailure(status);
}
}
Status OpDriver::ApplyAsync() {
{
std::unique_lock<simple_spinlock> lock(lock_);
DCHECK_EQ(prepare_state_, PREPARED);
if (op_status_.ok()) {
DCHECK_EQ(replication_state_, REPLICATED);
order_verifier_->CheckApply(op_id_copy_.index(), prepare_physical_timestamp_);
// Now that the op is committed in consensus advance the lower bound on
// new op timestamps.
if (op_->state()->external_consistency_mode() != COMMIT_WAIT) {
op_->state()->tablet_replica()->tablet()->mvcc_manager()->
AdjustNewOpLowerBound(op_->state()->timestamp());
}
} else {
DCHECK_EQ(replication_state_, REPLICATION_FAILED);
DCHECK(!op_status_.ok());
lock.unlock();
HandleFailure(op_status_);
return Status::OK();
}
}
TRACE_EVENT_FLOW_BEGIN0("op", "ApplyTask", this);
return apply_pool_->Submit([this]() { this->ApplyTask(); });
}
void OpDriver::ApplyTask() {
TRACE_EVENT_FLOW_END0("op", "ApplyTask", this);
ADOPT_TRACE(trace());
Tablet* tablet = state()->tablet_replica()->tablet();
if (tablet->HasBeenStopped()) {
HandleFailure(Status::IllegalState("Not Applying op; the tablet is stopped"));
return;
}
#if DCHECK_IS_ON()
{
std::lock_guard<simple_spinlock> lock(lock_);
DCHECK_EQ(replication_state_, REPLICATED);
DCHECK_EQ(prepare_state_, PREPARED);
}
#endif // #if DCHECK_IS_ON() ...
// We need to ref-count ourself, since FinishApplying() may run very quickly
// and end up calling Finalize() while we're still in this code.
scoped_refptr<OpDriver> ref(this);
{
CommitMsg* commit_msg;
Status s = op_->Apply(&commit_msg);
if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << Substitute("Did not Apply op $0: $1",
op_->ToString(), s.ToString());
HandleFailure(s);
return;
}
commit_msg->mutable_commited_op_id()->CopyFrom(op_id_copy_);
SetResponseTimestamp(op_->state(), op_->state()->timestamp());
{
TRACE_EVENT1("op", "AsyncAppendCommit", "op", this);
CHECK_OK(log_->AsyncAppendCommit(
*commit_msg, [](const Status& s) {
CrashIfNotOkStatusCB("Enqueued commit operation failed to write to WAL", s);
}));
}
// If the client requested COMMIT_WAIT as the external consistency mode
// calculate the latest that the prepare timestamp could be and wait
// until now.earliest > prepare_latest. Only after this are the locks
// released.
if (mutable_state()->external_consistency_mode() == COMMIT_WAIT) {
TRACE("APPLY: Commit Wait.");
// If we can't commit wait and have already applied we might have consistency
// issues if we still reply to the client that the operation was a success.
// On the other hand we don't have rollbacks as of yet thus we can't undo the
// the apply either, so we just CHECK_OK for now.
CHECK_OK(CommitWait());
}
Finalize();
}
}
void OpDriver::SetResponseTimestamp(OpState* op_state,
const Timestamp& timestamp) {
google::protobuf::Message* response = op_state->response();
if (response) {
const google::protobuf::FieldDescriptor* ts_field =
response->GetDescriptor()->FindFieldByName(kTimestampFieldName);
response->GetReflection()->SetUInt64(response, ts_field, timestamp.ToUint64());
}
}
Status OpDriver::CommitWait() {
MonoTime before = MonoTime::Now();
DCHECK(mutable_state()->external_consistency_mode() == COMMIT_WAIT);
RETURN_NOT_OK(mutable_state()->tablet_replica()->clock()->WaitUntilAfter(
mutable_state()->timestamp(), MonoTime::Max()));
mutable_state()->mutable_metrics()->commit_wait_duration_usec =
(MonoTime::Now() - before).ToMicroseconds();
return Status::OK();
}
void OpDriver::Finalize() {
ADOPT_TRACE(trace());
// TODO: this is an ugly hack so that the Release() call doesn't delete the
// object while we still hold the lock.
scoped_refptr<OpDriver> ref(this);
std::lock_guard<simple_spinlock> lock(lock_);
op_->Finish(Op::APPLIED);
mutable_state()->completion_callback()->OpCompleted();
op_tracker_->Release(this);
}
std::string OpDriver::StateString(ReplicationState repl_state,
PrepareState prep_state) {
string state_str;
switch (repl_state) {
case NOT_REPLICATING:
StrAppend(&state_str, "NR-"); // For Not Replicating
break;
case REPLICATING:
StrAppend(&state_str, "R-"); // For Replicating
break;
case REPLICATION_FAILED:
StrAppend(&state_str, "RF-"); // For Replication Failed
break;
case REPLICATED:
StrAppend(&state_str, "RD-"); // For Replication Done
break;
default:
LOG(DFATAL) << "Unexpected replication state: " << repl_state;
}
switch (prep_state) {
case PREPARED:
StrAppend(&state_str, "P");
break;
case NOT_PREPARED:
StrAppend(&state_str, "NP");
break;
default:
LOG(DFATAL) << "Unexpected prepare state: " << prep_state;
}
return state_str;
}
std::string OpDriver::LogPrefix() const {
ReplicationState repl_state_copy;
PrepareState prep_state_copy;
string ts_string;
{
std::lock_guard<simple_spinlock> lock(lock_);
repl_state_copy = replication_state_;
prep_state_copy = prepare_state_;
ts_string = state()->has_timestamp() ? state()->timestamp().ToString() : "No timestamp";
}
string state_str = StateString(repl_state_copy, prep_state_copy);
// We use the tablet and the peer (T, P) to identify ts and tablet and the timestamp (Ts) to
// (help) identify the op. The state string (S) describes the state of the op.
return Substitute("T $0 P $1 S $2 Ts $3: ",
// consensus_ is NULL in some unit tests.
PREDICT_TRUE(consensus_) ? consensus_->tablet_id() : "(unknown)",
PREDICT_TRUE(consensus_) ? consensus_->peer_uuid() : "(unknown)",
state_str,
ts_string);
}
} // namespace tablet
} // namespace kudu