blob: ce650680267f89cae7bb2ed020da00660997c1e9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/mvcc.h"
#include <algorithm>
#include <mutex>
#include <ostream>
#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
DEFINE_int32(inject_latency_ms_before_starting_txn, 0,
"Amount of latency in ms to inject before registering "
"a transaction with MVCC.");
TAG_FLAG(inject_latency_ms_before_starting_txn, advanced);
TAG_FLAG(inject_latency_ms_before_starting_txn, hidden);
namespace kudu {
namespace tablet {
using strings::Substitute;
MvccManager::MvccManager()
: safe_time_(Timestamp::kMin),
earliest_in_flight_(Timestamp::kMax),
open_(true) {
cur_snap_.all_committed_before_ = Timestamp::kInitialTimestamp;
cur_snap_.none_committed_at_or_after_ = Timestamp::kInitialTimestamp;
}
Status MvccManager::CheckIsSafeTimeInitialized() const {
// We initialize the MVCC safe time and clean time at the same time, so if
// clean time has not been updated, neither has safe time.
if (GetCleanTimestamp() == Timestamp::kInitialTimestamp) {
return Status::Uninitialized("safe time has not yet been initialized");
}
return Status::OK();
}
void MvccManager::StartTransaction(Timestamp timestamp) {
MAYBE_INJECT_RANDOM_LATENCY(FLAGS_inject_latency_ms_before_starting_txn);
std::lock_guard<LockType> l(lock_);
CHECK(!cur_snap_.IsCommitted(timestamp)) << "Trying to start a new txn at an already-committed"
<< " timestamp: " << timestamp.ToString()
<< " cur_snap_: " << cur_snap_.ToString();
CHECK(InitTransactionUnlocked(timestamp)) << "There is already a transaction with timestamp: "
<< timestamp.value() << " in flight or this timestamp "
<< "is before than or equal to \"safe\" time."
<< " Current Snapshot: " << cur_snap_.ToString()
<< " Current safe time: " << safe_time_;
}
void MvccManager::StartApplyingTransaction(Timestamp timestamp) {
std::lock_guard<LockType> l(lock_);
auto it = timestamps_in_flight_.find(timestamp.value());
if (PREDICT_FALSE(it == timestamps_in_flight_.end())) {
LOG(FATAL) << "Cannot mark timestamp " << timestamp.ToString() << " as APPLYING: "
<< "not in the in-flight map.";
}
TxnState cur_state = it->second;
if (PREDICT_FALSE(cur_state != RESERVED)) {
LOG(FATAL) << "Cannot mark timestamp " << timestamp.ToString() << " as APPLYING: "
<< "wrong state: " << cur_state;
}
it->second = APPLYING;
}
bool MvccManager::InitTransactionUnlocked(const Timestamp& timestamp) {
// Ensure that we didn't mark the given timestamp as "safe".
if (PREDICT_FALSE(timestamp <= safe_time_)) {
return false;
}
if (timestamp < earliest_in_flight_) {
earliest_in_flight_ = timestamp;
}
return InsertIfNotPresent(&timestamps_in_flight_, timestamp.value(), RESERVED);
}
void MvccManager::AbortTransaction(Timestamp timestamp) {
std::lock_guard<LockType> l(lock_);
// Remove from our in-flight list.
TxnState old_state = RemoveInFlightAndGetStateUnlocked(timestamp);
// If the tablet is shutting down, we can ignore the state of the
// transactions.
if (PREDICT_FALSE(!is_open())) {
LOG(WARNING) << "aborting transaction with timestamp " << timestamp.ToString()
<< " in state " << old_state << "; MVCC is closed";
return;
}
CHECK_EQ(old_state, RESERVED) << "transaction with timestamp " << timestamp.ToString()
<< " cannot be aborted in state " << old_state;
// If we're aborting the earliest transaction that was in flight,
// update our cached value.
if (earliest_in_flight_ == timestamp) {
AdvanceEarliestInFlightTimestamp();
}
}
void MvccManager::CommitTransaction(Timestamp timestamp) {
std::lock_guard<LockType> l(lock_);
// Commit the transaction, but do not adjust 'all_committed_before_', that will
// be done with a separate OfflineAdjustCurSnap() call.
bool was_earliest = false;
CommitTransactionUnlocked(timestamp, &was_earliest);
if (was_earliest && safe_time_ >= timestamp) {
// If this transaction was the earliest in-flight, we might have to adjust
// the "clean" timestamp.
AdjustCleanTime();
}
}
MvccManager::TxnState MvccManager::RemoveInFlightAndGetStateUnlocked(Timestamp ts) {
DCHECK(lock_.is_locked());
auto it = timestamps_in_flight_.find(ts.value());
if (it == timestamps_in_flight_.end()) {
LOG(FATAL) << "Trying to remove timestamp which isn't in the in-flight set: "
<< ts.ToString();
}
TxnState state = it->second;
timestamps_in_flight_.erase(it);
return state;
}
void MvccManager::CommitTransactionUnlocked(Timestamp timestamp,
bool* was_earliest_in_flight) {
*was_earliest_in_flight = earliest_in_flight_ == timestamp;
// Remove from our in-flight list.
TxnState old_state = RemoveInFlightAndGetStateUnlocked(timestamp);
CHECK_EQ(old_state, APPLYING)
<< "Trying to commit a transaction which never entered APPLYING state: "
<< timestamp.ToString() << " state=" << old_state;
// Add to snapshot's committed list
cur_snap_.AddCommittedTimestamp(timestamp);
// If we're committing the earliest transaction that was in flight,
// update our cached value.
if (*was_earliest_in_flight) {
AdvanceEarliestInFlightTimestamp();
}
}
void MvccManager::AdvanceEarliestInFlightTimestamp() {
if (timestamps_in_flight_.empty()) {
earliest_in_flight_ = Timestamp::kMax;
} else {
earliest_in_flight_ = Timestamp(std::min_element(timestamps_in_flight_.begin(),
timestamps_in_flight_.end())->first);
}
}
void MvccManager::AdjustSafeTime(Timestamp safe_time) {
std::lock_guard<LockType> l(lock_);
// No more transactions will start with a ts that is lower than or equal
// to 'safe_time', so we adjust the snapshot accordingly.
if (PREDICT_TRUE(safe_time_ <= safe_time)) {
DVLOG(4) << "Adjusting safe time to: " << safe_time;
safe_time_ = safe_time;
} else {
// Note: Getting here means that we are about to apply a transaction out of
// order. This out-of-order applying is only safe because concurrrent
// transactions are guaranteed to not affect the same state based on locks
// taken before starting the transaction (e.g. row locks, schema locks).
KLOG_EVERY_N(INFO, 10) << Substitute("Tried to move safe_time back from $0 to $1. "
"Current Snapshot: $2", safe_time_.ToString(),
safe_time.ToString(), cur_snap_.ToString());
return;
}
AdjustCleanTime();
}
// Remove any elements from 'v' which are < the given watermark.
static void FilterTimestamps(std::vector<Timestamp::val_type>* v,
Timestamp::val_type watermark) {
int j = 0;
for (const auto& ts : *v) {
if (ts >= watermark) {
(*v)[j++] = ts;
}
}
v->resize(j);
}
void MvccManager::Close() {
open_.store(false);
std::lock_guard<LockType> l(lock_);
auto iter = waiters_.begin();
while (iter != waiters_.end()) {
(*iter)->latch->CountDown();
iter = waiters_.erase(iter);
}
}
void MvccManager::AdjustCleanTime() {
// There are two possibilities:
//
// 1) We still have an in-flight transaction earlier than 'safe_time_'.
// In this case, we update the watermark to that transaction's timestamp.
//
// 2) There are no in-flight transactions earlier than 'safe_time_'.
// (There may still be in-flight transactions with future timestamps due to
// commit-wait transactions which start in the future). In this case, we update
// the watermark to 'safe_time_', since we know that no new
// transactions can start with an earlier timestamp.
//
// In either case, we have to add the newly committed ts only if it remains higher
// than the new watermark.
if (earliest_in_flight_ < safe_time_) {
cur_snap_.all_committed_before_ = earliest_in_flight_;
} else {
cur_snap_.all_committed_before_ = safe_time_;
}
DVLOG(4) << "Adjusted clean time to: " << cur_snap_.all_committed_before_;
// Filter out any committed timestamps that now fall below the watermark
FilterTimestamps(&cur_snap_.committed_timestamps_, cur_snap_.all_committed_before_.value());
// If the current snapshot doesn't have any committed timestamps, then make sure we still
// advance the 'none_committed_at_or_after_' watermark so that it never falls below
// 'all_committed_before_'.
if (cur_snap_.committed_timestamps_.empty()) {
cur_snap_.none_committed_at_or_after_ = cur_snap_.all_committed_before_;
}
// it may also have unblocked some waiters.
// Check if someone is waiting for transactions to be committed.
if (PREDICT_FALSE(!waiters_.empty())) {
auto iter = waiters_.begin();
while (iter != waiters_.end()) {
WaitingState* waiter = *iter;
if (IsDoneWaitingUnlocked(*waiter)) {
iter = waiters_.erase(iter);
waiter->latch->CountDown();
continue;
}
iter++;
}
}
}
Status MvccManager::WaitUntil(WaitFor wait_for, Timestamp ts, const MonoTime& deadline) const {
TRACE_EVENT2("tablet", "MvccManager::WaitUntil",
"wait_for", wait_for == ALL_COMMITTED ? "all_committed" : "none_applying",
"ts", ts.ToUint64())
// If MVCC is closed, there's no point in waiting.
RETURN_NOT_OK(CheckOpen());
CountDownLatch latch(1);
WaitingState waiting_state;
{
waiting_state.timestamp = ts;
waiting_state.latch = &latch;
waiting_state.wait_for = wait_for;
std::lock_guard<LockType> l(lock_);
if (IsDoneWaitingUnlocked(waiting_state)) return Status::OK();
waiters_.push_back(&waiting_state);
}
if (waiting_state.latch->WaitUntil(deadline)) {
// If the wait ended because MVCC is shutting down, return an error.
return CheckOpen();
}
// We timed out. We need to clean up our entry in the waiters_ array.
std::lock_guard<LockType> l(lock_);
// It's possible that while we were re-acquiring the lock, we did not get
// notified. In that case, we have no cleanup to do.
if (waiting_state.latch->count() == 0) {
return CheckOpen();
}
waiters_.erase(std::find(waiters_.begin(), waiters_.end(), &waiting_state));
return Status::TimedOut(Substitute("Timed out waiting for all transactions with ts < $0 to $1",
ts.ToString(),
wait_for == ALL_COMMITTED ? "commit" : "finish applying"));
}
bool MvccManager::IsDoneWaitingUnlocked(const WaitingState& waiter) const {
switch (waiter.wait_for) {
case ALL_COMMITTED:
return AreAllTransactionsCommittedUnlocked(waiter.timestamp);
case NONE_APPLYING:
return !AnyApplyingAtOrBeforeUnlocked(waiter.timestamp);
}
LOG(FATAL); // unreachable
}
Status MvccManager::CheckOpen() const {
if (PREDICT_TRUE(is_open())) {
return Status::OK();
}
return Status::Aborted("MVCC is closed");
}
bool MvccManager::AreAllTransactionsCommittedUnlocked(Timestamp ts) const {
// If ts is before the 'all_committed_before_' watermark on the current snapshot then
// all transactions before it are committed.
if (ts < cur_snap_.all_committed_before_) return true;
// We might not have moved 'cur_snap_.all_committed_before_' (the clean time) but 'ts'
// might still come before any possible in-flights.
return ts < earliest_in_flight_;
}
bool MvccManager::AnyApplyingAtOrBeforeUnlocked(Timestamp ts) const {
// TODO(todd) this is not actually checking on the applying txns, it's checking on
// _all in-flight_. Is this a bug?
for (const InFlightMap::value_type entry : timestamps_in_flight_) {
if (entry.first <= ts.value()) {
return true;
}
}
return false;
}
void MvccManager::TakeSnapshot(MvccSnapshot *snap) const {
std::lock_guard<LockType> l(lock_);
*snap = cur_snap_;
}
Status MvccManager::WaitForSnapshotWithAllCommitted(Timestamp timestamp,
MvccSnapshot* snapshot,
const MonoTime& deadline) const {
TRACE_EVENT0("tablet", "MvccManager::WaitForSnapshotWithAllCommitted");
RETURN_NOT_OK(WaitUntil(ALL_COMMITTED, timestamp, deadline));
*snapshot = MvccSnapshot(timestamp);
return Status::OK();
}
Status MvccManager::WaitForApplyingTransactionsToCommit() const {
TRACE_EVENT0("tablet", "MvccManager::WaitForApplyingTransactionsToCommit");
RETURN_NOT_OK(CheckOpen());
// Find the highest timestamp of an APPLYING transaction.
Timestamp wait_for = Timestamp::kMin;
{
std::lock_guard<LockType> l(lock_);
for (const InFlightMap::value_type entry : timestamps_in_flight_) {
if (entry.second == APPLYING) {
wait_for = Timestamp(std::max(entry.first, wait_for.value()));
}
}
}
// Wait until there are no transactions applying with that timestamp
// or below. It's possible that we're a bit conservative here - more transactions
// may enter the APPLYING set while we're waiting, but we will eventually
// succeed.
if (wait_for == Timestamp::kMin) {
// None were APPLYING: we can just return.
return Status::OK();
}
return WaitUntil(NONE_APPLYING, wait_for, MonoTime::Max());
}
bool MvccManager::AreAllTransactionsCommitted(Timestamp ts) const {
std::lock_guard<LockType> l(lock_);
return AreAllTransactionsCommittedUnlocked(ts);
}
int MvccManager::CountTransactionsInFlight() const {
std::lock_guard<LockType> l(lock_);
return timestamps_in_flight_.size();
}
Timestamp MvccManager::GetCleanTimestamp() const {
std::lock_guard<LockType> l(lock_);
return cur_snap_.all_committed_before_;
}
void MvccManager::GetApplyingTransactionsTimestamps(std::vector<Timestamp>* timestamps) const {
std::lock_guard<LockType> l(lock_);
timestamps->reserve(timestamps_in_flight_.size());
for (const InFlightMap::value_type entry : timestamps_in_flight_) {
if (entry.second == APPLYING) {
timestamps->push_back(Timestamp(entry.first));
}
}
}
MvccManager::~MvccManager() {
CHECK(waiters_.empty());
}
////////////////////////////////////////////////////////////
// MvccSnapshot
////////////////////////////////////////////////////////////
MvccSnapshot::MvccSnapshot()
: all_committed_before_(Timestamp::kInitialTimestamp),
none_committed_at_or_after_(Timestamp::kInitialTimestamp) {
}
MvccSnapshot::MvccSnapshot(const MvccManager &manager) {
manager.TakeSnapshot(this);
}
MvccSnapshot::MvccSnapshot(const Timestamp& timestamp)
: all_committed_before_(timestamp),
none_committed_at_or_after_(timestamp) {
}
MvccSnapshot MvccSnapshot::CreateSnapshotIncludingAllTransactions() {
return MvccSnapshot(Timestamp::kMax);
}
MvccSnapshot MvccSnapshot::CreateSnapshotIncludingNoTransactions() {
return MvccSnapshot(Timestamp::kMin);
}
bool MvccSnapshot::IsCommittedFallback(const Timestamp& timestamp) const {
for (const Timestamp::val_type& v : committed_timestamps_) {
if (v == timestamp.value()) return true;
}
return false;
}
bool MvccSnapshot::MayHaveCommittedTransactionsAtOrAfter(const Timestamp& timestamp) const {
return timestamp < none_committed_at_or_after_;
}
bool MvccSnapshot::MayHaveUncommittedTransactionsAtOrBefore(const Timestamp& timestamp) const {
// The snapshot may have uncommitted transactions before 'timestamp' if:
// - 'all_committed_before_' comes before 'timestamp'
// - 'all_committed_before_' is precisely 'timestamp' but 'timestamp' isn't in the
// committed set.
return timestamp > all_committed_before_ ||
(timestamp == all_committed_before_ && !IsCommittedFallback(timestamp));
}
std::string MvccSnapshot::ToString() const {
std::string ret("MvccSnapshot[committed={T|");
if (committed_timestamps_.size() == 0) {
StrAppend(&ret, "T < ", all_committed_before_.ToString(),"}]");
return ret;
}
StrAppend(&ret, "T < ", all_committed_before_.ToString(),
" or (T in {");
bool first = true;
for (Timestamp::val_type t : committed_timestamps_) {
if (!first) {
ret.push_back(',');
}
first = false;
StrAppend(&ret, t);
}
ret.append("})}]");
return ret;
}
void MvccSnapshot::AddCommittedTimestamps(const std::vector<Timestamp>& timestamps) {
for (const Timestamp& ts : timestamps) {
AddCommittedTimestamp(ts);
}
}
void MvccSnapshot::AddCommittedTimestamp(Timestamp timestamp) {
if (IsCommitted(timestamp)) return;
committed_timestamps_.push_back(timestamp.value());
// If this is a new upper bound commit mark, update it.
if (none_committed_at_or_after_ <= timestamp) {
none_committed_at_or_after_ = Timestamp(timestamp.value() + 1);
}
}
////////////////////////////////////////////////////////////
// ScopedTransaction
////////////////////////////////////////////////////////////
ScopedTransaction::ScopedTransaction(MvccManager* manager, Timestamp timestamp)
: done_(false),
manager_(DCHECK_NOTNULL(manager)),
timestamp_(timestamp) {
manager_->StartTransaction(timestamp);
}
ScopedTransaction::~ScopedTransaction() {
if (!done_) {
Abort();
}
}
void ScopedTransaction::StartApplying() {
manager_->StartApplyingTransaction(timestamp_);
}
void ScopedTransaction::Commit() {
manager_->CommitTransaction(timestamp_);
done_ = true;
}
void ScopedTransaction::Abort() {
manager_->AbortTransaction(timestamp_);
done_ = true;
}
} // namespace tablet
} // namespace kudu