blob: 4a63fe33ff52f7da9fd2d28517b896177abd0134 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/transactions/transaction_driver.h"
#include "kudu/consensus/consensus.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tablet/transactions/transaction_tracker.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/logging.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
namespace kudu {
namespace tablet {
using consensus::CommitMsg;
using consensus::Consensus;
using consensus::ConsensusRound;
using consensus::ReplicateMsg;
using consensus::CommitMsg;
using consensus::DriverType;
using log::Log;
using std::shared_ptr;
static const char* kTimestampFieldName = "timestamp";
////////////////////////////////////////////////////////////
// TransactionDriver
////////////////////////////////////////////////////////////
TransactionDriver::TransactionDriver(TransactionTracker *txn_tracker,
Consensus* consensus,
Log* log,
ThreadPool* prepare_pool,
ThreadPool* apply_pool,
TransactionOrderVerifier* order_verifier)
: txn_tracker_(txn_tracker),
consensus_(consensus),
log_(log),
prepare_pool_(prepare_pool),
apply_pool_(apply_pool),
order_verifier_(order_verifier),
trace_(new Trace()),
start_time_(MonoTime::Now(MonoTime::FINE)),
replication_state_(NOT_REPLICATING),
prepare_state_(NOT_PREPARED) {
if (Trace::CurrentTrace()) {
Trace::CurrentTrace()->AddChildTrace(trace_.get());
}
}
Status TransactionDriver::Init(gscoped_ptr<Transaction> transaction,
DriverType type) {
transaction_ = transaction.Pass();
if (type == consensus::REPLICA) {
boost::lock_guard<simple_spinlock> lock(opid_lock_);
op_id_copy_ = transaction_->state()->op_id();
DCHECK(op_id_copy_.IsInitialized());
replication_state_ = REPLICATING;
} else {
DCHECK_EQ(type, consensus::LEADER);
gscoped_ptr<ReplicateMsg> replicate_msg;
transaction_->NewReplicateMsg(&replicate_msg);
if (consensus_) { // sometimes NULL in tests
// Unretained is required to avoid a refcount cycle.
mutable_state()->set_consensus_round(
consensus_->NewRound(replicate_msg.Pass(),
Bind(&TransactionDriver::ReplicationFinished, Unretained(this))));
}
}
RETURN_NOT_OK(txn_tracker_->Add(this));
return Status::OK();
}
consensus::OpId TransactionDriver::GetOpId() {
boost::lock_guard<simple_spinlock> lock(opid_lock_);
return op_id_copy_;
}
const TransactionState* TransactionDriver::state() const {
return transaction_ != nullptr ? transaction_->state() : nullptr;
}
TransactionState* TransactionDriver::mutable_state() {
return transaction_ != nullptr ? transaction_->state() : nullptr;
}
Transaction::TransactionType TransactionDriver::tx_type() const {
return transaction_->tx_type();
}
string TransactionDriver::ToString() const {
boost::lock_guard<simple_spinlock> lock(lock_);
return ToStringUnlocked();
}
string TransactionDriver::ToStringUnlocked() const {
string ret = StateString(replication_state_, prepare_state_);
if (transaction_ != nullptr) {
ret += " " + transaction_->ToString();
} else {
ret += "[unknown txn]";
}
return ret;
}
Status TransactionDriver::ExecuteAsync() {
VLOG_WITH_PREFIX(4) << "ExecuteAsync()";
TRACE_EVENT_FLOW_BEGIN0("txn", "ExecuteAsync", this);
ADOPT_TRACE(trace());
Status s;
if (replication_state_ == NOT_REPLICATING) {
// We're a leader transaction. Before submitting, check that we are the leader and
// determine the current term.
s = consensus_->CheckLeadershipAndBindTerm(mutable_state()->consensus_round());
}
if (s.ok()) {
s = prepare_pool_->SubmitClosure(
Bind(&TransactionDriver::PrepareAndStartTask, Unretained(this)));
}
if (!s.ok()) {
HandleFailure(s);
}
// TODO: make this return void
return Status::OK();
}
void TransactionDriver::PrepareAndStartTask() {
TRACE_EVENT_FLOW_END0("txn", "PrepareAndStartTask", this);
Status prepare_status = PrepareAndStart();
if (PREDICT_FALSE(!prepare_status.ok())) {
HandleFailure(prepare_status);
}
}
Status TransactionDriver::PrepareAndStart() {
TRACE_EVENT1("txn", "PrepareAndStart", "txn", this);
VLOG_WITH_PREFIX(4) << "PrepareAndStart()";
// Actually prepare and start the transaction.
prepare_physical_timestamp_ = GetMonoTimeMicros();
RETURN_NOT_OK(transaction_->Prepare());
RETURN_NOT_OK(transaction_->Start());
// Only take the lock long enough to take a local copy of the
// replication state and set our prepare state. This ensures that
// exactly one of Replicate/Prepare callbacks will trigger the apply
// phase.
ReplicationState repl_state_copy;
{
boost::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(prepare_state_, NOT_PREPARED);
prepare_state_ = PREPARED;
repl_state_copy = replication_state_;
}
switch (repl_state_copy) {
case NOT_REPLICATING:
{
// Set the timestamp in the message, now that it's prepared.
transaction_->state()->consensus_round()->replicate_msg()->set_timestamp(
transaction_->state()->timestamp().ToUint64());
VLOG_WITH_PREFIX(4) << "Triggering consensus repl";
// Trigger the consensus replication.
{
boost::lock_guard<simple_spinlock> lock(lock_);
replication_state_ = REPLICATING;
}
Status s = consensus_->Replicate(mutable_state()->consensus_round());
if (PREDICT_FALSE(!s.ok())) {
boost::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(replication_state_, REPLICATING);
transaction_status_ = s;
replication_state_ = REPLICATION_FAILED;
return s;
}
break;
}
case REPLICATING:
{
// Already replicating - nothing to trigger
break;
}
case REPLICATION_FAILED:
DCHECK(!transaction_status_.ok());
FALLTHROUGH_INTENDED;
case REPLICATED:
{
// We can move on to apply.
// Note that ApplyAsync() will handle the error status in the
// REPLICATION_FAILED case.
return ApplyAsync();
}
}
return Status::OK();
}
void TransactionDriver::HandleFailure(const Status& s) {
VLOG_WITH_PREFIX(2) << "Failed transaction: " << s.ToString();
CHECK(!s.ok());
TRACE("HandleFailure($0)", s.ToString());
ReplicationState repl_state_copy;
{
boost::lock_guard<simple_spinlock> lock(lock_);
transaction_status_ = s;
repl_state_copy = replication_state_;
}
switch (repl_state_copy) {
case NOT_REPLICATING:
case REPLICATION_FAILED:
{
VLOG_WITH_PREFIX(1) << "Transaction " << ToString() << " failed prior to "
"replication success: " << s.ToString();
transaction_->Finish(Transaction::ABORTED);
mutable_state()->completion_callback()->set_error(transaction_status_);
mutable_state()->completion_callback()->TransactionCompleted();
txn_tracker_->Release(this);
return;
}
case REPLICATING:
case REPLICATED:
{
LOG_WITH_PREFIX(FATAL) << "Cannot cancel transactions that have already replicated"
<< ": " << transaction_status_.ToString()
<< " transaction:" << ToString();
}
}
}
void TransactionDriver::ReplicationFinished(const Status& status) {
{
boost::lock_guard<simple_spinlock> op_id_lock(opid_lock_);
// TODO: it's a bit silly that we have three copies of the opid:
// one here, one in ConsensusRound, and one in TransactionState.
op_id_copy_ = DCHECK_NOTNULL(mutable_state()->consensus_round())->id();
DCHECK(op_id_copy_.IsInitialized());
mutable_state()->mutable_op_id()->CopyFrom(op_id_copy_);
}
PrepareState prepare_state_copy;
{
boost::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(replication_state_, REPLICATING);
if (status.ok()) {
replication_state_ = REPLICATED;
} else {
replication_state_ = REPLICATION_FAILED;
transaction_status_ = status;
}
prepare_state_copy = prepare_state_;
}
// If we have prepared and replicated, we're ready
// to move ahead and apply this operation.
// Note that if we set the state to REPLICATION_FAILED above,
// ApplyAsync() will actually abort the transaction, i.e.
// ApplyTask() will never be called and the transaction will never
// be applied to the tablet.
if (prepare_state_copy == PREPARED) {
// We likely need to do cleanup if this fails so for now just
// CHECK_OK
CHECK_OK(ApplyAsync());
}
}
void TransactionDriver::Abort(const Status& status) {
CHECK(!status.ok());
ReplicationState repl_state_copy;
{
boost::lock_guard<simple_spinlock> lock(lock_);
repl_state_copy = replication_state_;
transaction_status_ = status;
}
// If the state is not NOT_REPLICATING we abort immediately and the transaction
// will never be replicated.
// In any other state we just set the transaction status, if the transaction's
// Apply hasn't started yet this prevents it from starting, but if it has then
// the transaction runs to completion.
if (repl_state_copy == NOT_REPLICATING) {
HandleFailure(status);
}
}
Status TransactionDriver::ApplyAsync() {
{
boost::unique_lock<simple_spinlock> lock(lock_);
DCHECK_EQ(prepare_state_, PREPARED);
if (transaction_status_.ok()) {
DCHECK_EQ(replication_state_, REPLICATED);
order_verifier_->CheckApply(op_id_copy_.index(),
prepare_physical_timestamp_);
// Now that the transaction is committed in consensus advance the safe time.
if (transaction_->state()->external_consistency_mode() != COMMIT_WAIT) {
transaction_->state()->tablet_peer()->tablet()->mvcc_manager()->
OfflineAdjustSafeTime(transaction_->state()->timestamp());
}
} else {
DCHECK_EQ(replication_state_, REPLICATION_FAILED);
DCHECK(!transaction_status_.ok());
lock.unlock();
HandleFailure(transaction_status_);
return Status::OK();
}
}
TRACE_EVENT_FLOW_BEGIN0("txn", "ApplyTask", this);
return apply_pool_->SubmitClosure(Bind(&TransactionDriver::ApplyTask, Unretained(this)));
}
void TransactionDriver::ApplyTask() {
TRACE_EVENT_FLOW_END0("txn", "ApplyTask", this);
ADOPT_TRACE(trace());
{
boost::lock_guard<simple_spinlock> lock(lock_);
DCHECK_EQ(replication_state_, REPLICATED);
DCHECK_EQ(prepare_state_, PREPARED);
}
// We need to ref-count ourself, since Commit() may run very quickly
// and end up calling Finalize() while we're still in this code.
scoped_refptr<TransactionDriver> ref(this);
{
gscoped_ptr<CommitMsg> commit_msg;
CHECK_OK(transaction_->Apply(&commit_msg));
commit_msg->mutable_commited_op_id()->CopyFrom(op_id_copy_);
SetResponseTimestamp(transaction_->state(), transaction_->state()->timestamp());
// If the client requested COMMIT_WAIT as the external consistency mode
// calculate the latest that the prepare timestamp could be and wait
// until now.earliest > prepare_latest. Only after this are the locks
// released.
if (mutable_state()->external_consistency_mode() == COMMIT_WAIT) {
// TODO: only do this on the leader side
TRACE("APPLY: Commit Wait.");
// If we can't commit wait and have already applied we might have consistency
// issues if we still reply to the client that the operation was a success.
// On the other hand we don't have rollbacks as of yet thus we can't undo the
// the apply either, so we just CHECK_OK for now.
CHECK_OK(CommitWait());
}
transaction_->PreCommit();
{
TRACE_EVENT1("txn", "AsyncAppendCommit", "txn", this);
CHECK_OK(log_->AsyncAppendCommit(commit_msg.Pass(), Bind(DoNothingStatusCB)));
}
Finalize();
}
}
void TransactionDriver::SetResponseTimestamp(TransactionState* transaction_state,
const Timestamp& timestamp) {
google::protobuf::Message* response = transaction_state->response();
if (response) {
const google::protobuf::FieldDescriptor* ts_field =
response->GetDescriptor()->FindFieldByName(kTimestampFieldName);
response->GetReflection()->SetUInt64(response, ts_field, timestamp.ToUint64());
}
}
Status TransactionDriver::CommitWait() {
MonoTime before = MonoTime::Now(MonoTime::FINE);
DCHECK(mutable_state()->external_consistency_mode() == COMMIT_WAIT);
// TODO: we could plumb the RPC deadline in here, and not bother commit-waiting
// if the deadline is already expired.
RETURN_NOT_OK(
mutable_state()->tablet_peer()->clock()->WaitUntilAfter(mutable_state()->timestamp(),
MonoTime::Max()));
mutable_state()->mutable_metrics()->commit_wait_duration_usec =
MonoTime::Now(MonoTime::FINE).GetDeltaSince(before).ToMicroseconds();
return Status::OK();
}
void TransactionDriver::Finalize() {
ADOPT_TRACE(trace());
// TODO: this is an ugly hack so that the Release() call doesn't delete the
// object while we still hold the lock.
scoped_refptr<TransactionDriver> ref(this);
boost::lock_guard<simple_spinlock> lock(lock_);
transaction_->Finish(Transaction::COMMITTED);
mutable_state()->completion_callback()->TransactionCompleted();
txn_tracker_->Release(this);
}
std::string TransactionDriver::StateString(ReplicationState repl_state,
PrepareState prep_state) {
string state_str;
switch (repl_state) {
case NOT_REPLICATING:
StrAppend(&state_str, "NR-"); // For Not Replicating
break;
case REPLICATING:
StrAppend(&state_str, "R-"); // For Replicating
break;
case REPLICATION_FAILED:
StrAppend(&state_str, "RF-"); // For Replication Failed
break;
case REPLICATED:
StrAppend(&state_str, "RD-"); // For Replication Done
break;
default:
LOG(DFATAL) << "Unexpected replication state: " << repl_state;
}
switch (prep_state) {
case PREPARED:
StrAppend(&state_str, "P");
break;
case NOT_PREPARED:
StrAppend(&state_str, "NP");
break;
default:
LOG(DFATAL) << "Unexpected prepare state: " << prep_state;
}
return state_str;
}
std::string TransactionDriver::LogPrefix() const {
ReplicationState repl_state_copy;
PrepareState prep_state_copy;
string ts_string;
{
boost::lock_guard<simple_spinlock> lock(lock_);
repl_state_copy = replication_state_;
prep_state_copy = prepare_state_;
ts_string = state()->has_timestamp() ? state()->timestamp().ToString() : "No timestamp";
}
string state_str = StateString(repl_state_copy, prep_state_copy);
// We use the tablet and the peer (T, P) to identify ts and tablet and the timestamp (Ts) to
// (help) identify the transaction. The state string (S) describes the state of the transaction.
return strings::Substitute("T $0 P $1 S $2 Ts $3: ",
// consensus_ is NULL in some unit tests.
PREDICT_TRUE(consensus_) ? consensus_->tablet_id() : "(unknown)",
PREDICT_TRUE(consensus_) ? consensus_->peer_uuid() : "(unknown)",
state_str,
ts_string);
}
} // namespace tablet
} // namespace kudu