| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| |
| #ifndef ROCKSDB_LITE |
| |
| #ifndef __STDC_FORMAT_MACROS |
| #define __STDC_FORMAT_MACROS |
| #endif |
| |
| #include "utilities/transactions/transaction_lock_mgr.h" |
| |
| #include <inttypes.h> |
| |
| #include <algorithm> |
| #include <condition_variable> |
| #include <functional> |
| #include <mutex> |
| #include <string> |
| #include <vector> |
| |
| #include "rocksdb/slice.h" |
| #include "rocksdb/utilities/transaction_db_mutex.h" |
| #include "util/cast_util.h" |
| #include "util/murmurhash.h" |
| #include "util/sync_point.h" |
| #include "util/thread_local.h" |
| #include "utilities/transactions/pessimistic_transaction_db.h" |
| |
| namespace rocksdb { |
| |
| struct LockInfo { |
| bool exclusive; |
| autovector<TransactionID> txn_ids; |
| |
| // Transaction locks are not valid after this time in us |
| uint64_t expiration_time; |
| |
| LockInfo(TransactionID id, uint64_t time, bool ex) |
| : exclusive(ex), expiration_time(time) { |
| txn_ids.push_back(id); |
| } |
| LockInfo(const LockInfo& lock_info) |
| : exclusive(lock_info.exclusive), |
| txn_ids(lock_info.txn_ids), |
| expiration_time(lock_info.expiration_time) {} |
| }; |
| |
| struct LockMapStripe { |
| explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) { |
| stripe_mutex = factory->AllocateMutex(); |
| stripe_cv = factory->AllocateCondVar(); |
| assert(stripe_mutex); |
| assert(stripe_cv); |
| } |
| |
| // Mutex must be held before modifying keys map |
| std::shared_ptr<TransactionDBMutex> stripe_mutex; |
| |
| // Condition Variable per stripe for waiting on a lock |
| std::shared_ptr<TransactionDBCondVar> stripe_cv; |
| |
| // Locked keys mapped to the info about the transactions that locked them. |
| // TODO(agiardullo): Explore performance of other data structures. |
| std::unordered_map<std::string, LockInfo> keys; |
| }; |
| |
| // Map of #num_stripes LockMapStripes |
| struct LockMap { |
| explicit LockMap(size_t num_stripes, |
| std::shared_ptr<TransactionDBMutexFactory> factory) |
| : num_stripes_(num_stripes) { |
| lock_map_stripes_.reserve(num_stripes); |
| for (size_t i = 0; i < num_stripes; i++) { |
| LockMapStripe* stripe = new LockMapStripe(factory); |
| lock_map_stripes_.push_back(stripe); |
| } |
| } |
| |
| ~LockMap() { |
| for (auto stripe : lock_map_stripes_) { |
| delete stripe; |
| } |
| } |
| |
| // Number of sepearate LockMapStripes to create, each with their own Mutex |
| const size_t num_stripes_; |
| |
| // Count of keys that are currently locked in this column family. |
| // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.) |
| std::atomic<int64_t> lock_cnt{0}; |
| |
| std::vector<LockMapStripe*> lock_map_stripes_; |
| |
| size_t GetStripe(const std::string& key) const; |
| }; |
| |
| void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) { |
| std::lock_guard<std::mutex> lock(paths_buffer_mutex_); |
| |
| if (paths_buffer_.empty()) { |
| return; |
| } |
| |
| paths_buffer_[buffer_idx_] = path; |
| buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); |
| } |
| |
| void DeadlockInfoBuffer::Resize(uint32_t target_size) { |
| std::lock_guard<std::mutex> lock(paths_buffer_mutex_); |
| |
| paths_buffer_ = Normalize(); |
| |
| // Drop the deadlocks that will no longer be needed ater the normalize |
| if (target_size < paths_buffer_.size()) { |
| paths_buffer_.erase( |
| paths_buffer_.begin(), |
| paths_buffer_.begin() + (paths_buffer_.size() - target_size)); |
| buffer_idx_ = 0; |
| } |
| // Resize the buffer to the target size and restore the buffer's idx |
| else { |
| auto prev_size = paths_buffer_.size(); |
| paths_buffer_.resize(target_size); |
| buffer_idx_ = (uint32_t)prev_size; |
| } |
| } |
| |
| std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() { |
| auto working = paths_buffer_; |
| |
| if (working.empty()) { |
| return working; |
| } |
| |
| // Next write occurs at a nonexistent path's slot |
| if (paths_buffer_[buffer_idx_].empty()) { |
| working.resize(buffer_idx_); |
| } else { |
| std::rotate(working.begin(), working.begin() + buffer_idx_, working.end()); |
| } |
| |
| return working; |
| } |
| |
| std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() { |
| std::lock_guard<std::mutex> lock(paths_buffer_mutex_); |
| |
| // Reversing the normalized vector returns the latest deadlocks first |
| auto working = Normalize(); |
| std::reverse(working.begin(), working.end()); |
| |
| return working; |
| } |
| |
| namespace { |
| void UnrefLockMapsCache(void* ptr) { |
| // Called when a thread exits or a ThreadLocalPtr gets destroyed. |
| auto lock_maps_cache = |
| static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr); |
| delete lock_maps_cache; |
| } |
| } // anonymous namespace |
| |
| TransactionLockMgr::TransactionLockMgr( |
| TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks, |
| uint32_t max_num_deadlocks, |
| std::shared_ptr<TransactionDBMutexFactory> mutex_factory) |
| : txn_db_impl_(nullptr), |
| default_num_stripes_(default_num_stripes), |
| max_num_locks_(max_num_locks), |
| lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)), |
| dlock_buffer_(max_num_deadlocks), |
| mutex_factory_(mutex_factory) { |
| assert(txn_db); |
| txn_db_impl_ = |
| static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db); |
| } |
| |
| TransactionLockMgr::~TransactionLockMgr() {} |
| |
| size_t LockMap::GetStripe(const std::string& key) const { |
| assert(num_stripes_ > 0); |
| static murmur_hash hash; |
| size_t stripe = hash(key) % num_stripes_; |
| return stripe; |
| } |
| |
| void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) { |
| InstrumentedMutexLock l(&lock_map_mutex_); |
| |
| if (lock_maps_.find(column_family_id) == lock_maps_.end()) { |
| lock_maps_.emplace(column_family_id, |
| std::shared_ptr<LockMap>( |
| new LockMap(default_num_stripes_, mutex_factory_))); |
| } else { |
| // column_family already exists in lock map |
| assert(false); |
| } |
| } |
| |
| void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) { |
| // Remove lock_map for this column family. Since the lock map is stored |
| // as a shared ptr, concurrent transactions can still keep using it |
| // until they release their references to it. |
| { |
| InstrumentedMutexLock l(&lock_map_mutex_); |
| |
| auto lock_maps_iter = lock_maps_.find(column_family_id); |
| assert(lock_maps_iter != lock_maps_.end()); |
| |
| lock_maps_.erase(lock_maps_iter); |
| } // lock_map_mutex_ |
| |
| // Clear all thread-local caches |
| autovector<void*> local_caches; |
| lock_maps_cache_->Scrape(&local_caches, nullptr); |
| for (auto cache : local_caches) { |
| delete static_cast<LockMaps*>(cache); |
| } |
| } |
| |
| // Look up the LockMap shared_ptr for a given column_family_id. |
| // Note: The LockMap is only valid as long as the caller is still holding on |
| // to the returned shared_ptr. |
| std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap( |
| uint32_t column_family_id) { |
| // First check thread-local cache |
| if (lock_maps_cache_->Get() == nullptr) { |
| lock_maps_cache_->Reset(new LockMaps()); |
| } |
| |
| auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get()); |
| |
| auto lock_map_iter = lock_maps_cache->find(column_family_id); |
| if (lock_map_iter != lock_maps_cache->end()) { |
| // Found lock map for this column family. |
| return lock_map_iter->second; |
| } |
| |
| // Not found in local cache, grab mutex and check shared LockMaps |
| InstrumentedMutexLock l(&lock_map_mutex_); |
| |
| lock_map_iter = lock_maps_.find(column_family_id); |
| if (lock_map_iter == lock_maps_.end()) { |
| return std::shared_ptr<LockMap>(nullptr); |
| } else { |
| // Found lock map. Store in thread-local cache and return. |
| std::shared_ptr<LockMap>& lock_map = lock_map_iter->second; |
| lock_maps_cache->insert({column_family_id, lock_map}); |
| |
| return lock_map; |
| } |
| } |
| |
| // Returns true if this lock has expired and can be acquired by another |
| // transaction. |
| // If false, sets *expire_time to the expiration time of the lock according |
| // to Env->GetMicros() or 0 if no expiration. |
| bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, |
| const LockInfo& lock_info, Env* env, |
| uint64_t* expire_time) { |
| auto now = env->NowMicros(); |
| |
| bool expired = |
| (lock_info.expiration_time > 0 && lock_info.expiration_time <= now); |
| |
| if (!expired && lock_info.expiration_time > 0) { |
| // return how many microseconds until lock will be expired |
| *expire_time = lock_info.expiration_time; |
| } else { |
| for (auto id : lock_info.txn_ids) { |
| if (txn_id == id) { |
| continue; |
| } |
| |
| bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id); |
| if (!success) { |
| expired = false; |
| break; |
| } |
| *expire_time = 0; |
| } |
| } |
| |
| return expired; |
| } |
| |
| Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, |
| uint32_t column_family_id, |
| const std::string& key, Env* env, |
| bool exclusive) { |
| // Lookup lock map for this column family id |
| std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); |
| LockMap* lock_map = lock_map_ptr.get(); |
| if (lock_map == nullptr) { |
| char msg[255]; |
| snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32, |
| column_family_id); |
| |
| return Status::InvalidArgument(msg); |
| } |
| |
| // Need to lock the mutex for the stripe that this key hashes to |
| size_t stripe_num = lock_map->GetStripe(key); |
| assert(lock_map->lock_map_stripes_.size() > stripe_num); |
| LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); |
| |
| LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive); |
| int64_t timeout = txn->GetLockTimeout(); |
| |
| return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env, |
| timeout, lock_info); |
| } |
| |
| // Helper function for TryLock(). |
| Status TransactionLockMgr::AcquireWithTimeout( |
| PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, |
| uint32_t column_family_id, const std::string& key, Env* env, |
| int64_t timeout, const LockInfo& lock_info) { |
| Status result; |
| uint64_t start_time = 0; |
| uint64_t end_time = 0; |
| |
| if (timeout > 0) { |
| start_time = env->NowMicros(); |
| end_time = start_time + timeout; |
| } |
| |
| if (timeout < 0) { |
| // If timeout is negative, we wait indefinitely to acquire the lock |
| result = stripe->stripe_mutex->Lock(); |
| } else { |
| result = stripe->stripe_mutex->TryLockFor(timeout); |
| } |
| |
| if (!result.ok()) { |
| // failed to acquire mutex |
| return result; |
| } |
| |
| // Acquire lock if we are able to |
| uint64_t expire_time_hint = 0; |
| autovector<TransactionID> wait_ids; |
| result = AcquireLocked(lock_map, stripe, key, env, lock_info, |
| &expire_time_hint, &wait_ids); |
| |
| if (!result.ok() && timeout != 0) { |
| // If we weren't able to acquire the lock, we will keep retrying as long |
| // as the timeout allows. |
| bool timed_out = false; |
| do { |
| // Decide how long to wait |
| int64_t cv_end_time = -1; |
| |
| // Check if held lock's expiration time is sooner than our timeout |
| if (expire_time_hint > 0 && |
| (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) { |
| // expiration time is sooner than our timeout |
| cv_end_time = expire_time_hint; |
| } else if (timeout >= 0) { |
| cv_end_time = end_time; |
| } |
| |
| assert(result.IsBusy() || wait_ids.size() != 0); |
| |
| // We are dependent on a transaction to finish, so perform deadlock |
| // detection. |
| if (wait_ids.size() != 0) { |
| if (txn->IsDeadlockDetect()) { |
| if (IncrementWaiters(txn, wait_ids, key, column_family_id, |
| lock_info.exclusive)) { |
| result = Status::Busy(Status::SubCode::kDeadlock); |
| stripe->stripe_mutex->UnLock(); |
| return result; |
| } |
| } |
| txn->SetWaitingTxn(wait_ids, column_family_id, &key); |
| } |
| |
| TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); |
| if (cv_end_time < 0) { |
| // Wait indefinitely |
| result = stripe->stripe_cv->Wait(stripe->stripe_mutex); |
| } else { |
| uint64_t now = env->NowMicros(); |
| if (static_cast<uint64_t>(cv_end_time) > now) { |
| result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex, |
| cv_end_time - now); |
| } |
| } |
| |
| if (wait_ids.size() != 0) { |
| txn->ClearWaitingTxn(); |
| if (txn->IsDeadlockDetect()) { |
| DecrementWaiters(txn, wait_ids); |
| } |
| } |
| |
| if (result.IsTimedOut()) { |
| timed_out = true; |
| // Even though we timed out, we will still make one more attempt to |
| // acquire lock below (it is possible the lock expired and we |
| // were never signaled). |
| } |
| |
| if (result.ok() || result.IsTimedOut()) { |
| result = AcquireLocked(lock_map, stripe, key, env, lock_info, |
| &expire_time_hint, &wait_ids); |
| } |
| } while (!result.ok() && !timed_out); |
| } |
| |
| stripe->stripe_mutex->UnLock(); |
| |
| return result; |
| } |
| |
| void TransactionLockMgr::DecrementWaiters( |
| const PessimisticTransaction* txn, |
| const autovector<TransactionID>& wait_ids) { |
| std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); |
| DecrementWaitersImpl(txn, wait_ids); |
| } |
| |
| void TransactionLockMgr::DecrementWaitersImpl( |
| const PessimisticTransaction* txn, |
| const autovector<TransactionID>& wait_ids) { |
| auto id = txn->GetID(); |
| assert(wait_txn_map_.Contains(id)); |
| wait_txn_map_.Delete(id); |
| |
| for (auto wait_id : wait_ids) { |
| rev_wait_txn_map_.Get(wait_id)--; |
| if (rev_wait_txn_map_.Get(wait_id) == 0) { |
| rev_wait_txn_map_.Delete(wait_id); |
| } |
| } |
| } |
| |
| bool TransactionLockMgr::IncrementWaiters( |
| const PessimisticTransaction* txn, |
| const autovector<TransactionID>& wait_ids, const std::string& key, |
| const uint32_t& cf_id, const bool& exclusive) { |
| auto id = txn->GetID(); |
| std::vector<int> queue_parents(txn->GetDeadlockDetectDepth()); |
| std::vector<TransactionID> queue_values(txn->GetDeadlockDetectDepth()); |
| std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); |
| assert(!wait_txn_map_.Contains(id)); |
| |
| wait_txn_map_.Insert(id, {wait_ids, cf_id, key, exclusive}); |
| |
| for (auto wait_id : wait_ids) { |
| if (rev_wait_txn_map_.Contains(wait_id)) { |
| rev_wait_txn_map_.Get(wait_id)++; |
| } else { |
| rev_wait_txn_map_.Insert(wait_id, 1); |
| } |
| } |
| |
| // No deadlock if nobody is waiting on self. |
| if (!rev_wait_txn_map_.Contains(id)) { |
| return false; |
| } |
| |
| const auto* next_ids = &wait_ids; |
| int parent = -1; |
| for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) { |
| int i = 0; |
| if (next_ids) { |
| for (; i < static_cast<int>(next_ids->size()) && |
| tail + i < txn->GetDeadlockDetectDepth(); |
| i++) { |
| queue_values[tail + i] = (*next_ids)[i]; |
| queue_parents[tail + i] = parent; |
| } |
| tail += i; |
| } |
| |
| // No more items in the list, meaning no deadlock. |
| if (tail == head) { |
| return false; |
| } |
| |
| auto next = queue_values[head]; |
| if (next == id) { |
| std::vector<DeadlockInfo> path; |
| while (head != -1) { |
| assert(wait_txn_map_.Contains(queue_values[head])); |
| |
| auto extracted_info = wait_txn_map_.Get(queue_values[head]); |
| path.push_back({queue_values[head], extracted_info.m_cf_id, |
| extracted_info.m_waiting_key, |
| extracted_info.m_exclusive}); |
| head = queue_parents[head]; |
| } |
| std::reverse(path.begin(), path.end()); |
| dlock_buffer_.AddNewPath(DeadlockPath(path)); |
| DecrementWaitersImpl(txn, wait_ids); |
| return true; |
| } else if (!wait_txn_map_.Contains(next)) { |
| next_ids = nullptr; |
| continue; |
| } else { |
| parent = head; |
| next_ids = &(wait_txn_map_.Get(next).m_neighbors); |
| } |
| } |
| |
| // Wait cycle too big, just assume deadlock. |
| dlock_buffer_.AddNewPath(DeadlockPath(true)); |
| DecrementWaitersImpl(txn, wait_ids); |
| return true; |
| } |
| |
| // Try to lock this key after we have acquired the mutex. |
| // Sets *expire_time to the expiration time in microseconds |
| // or 0 if no expiration. |
| // REQUIRED: Stripe mutex must be held. |
| Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, |
| LockMapStripe* stripe, |
| const std::string& key, Env* env, |
| const LockInfo& txn_lock_info, |
| uint64_t* expire_time, |
| autovector<TransactionID>* txn_ids) { |
| assert(txn_lock_info.txn_ids.size() == 1); |
| |
| Status result; |
| // Check if this key is already locked |
| if (stripe->keys.find(key) != stripe->keys.end()) { |
| // Lock already held |
| LockInfo& lock_info = stripe->keys.at(key); |
| assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive); |
| |
| if (lock_info.exclusive || txn_lock_info.exclusive) { |
| if (lock_info.txn_ids.size() == 1 && |
| lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) { |
| // The list contains one txn and we're it, so just take it. |
| lock_info.exclusive = txn_lock_info.exclusive; |
| lock_info.expiration_time = txn_lock_info.expiration_time; |
| } else { |
| // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case |
| // it's there for a shared lock with multiple holders which was not |
| // caught in the first case. |
| if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env, |
| expire_time)) { |
| // lock is expired, can steal it |
| lock_info.txn_ids = txn_lock_info.txn_ids; |
| lock_info.exclusive = txn_lock_info.exclusive; |
| lock_info.expiration_time = txn_lock_info.expiration_time; |
| // lock_cnt does not change |
| } else { |
| result = Status::TimedOut(Status::SubCode::kLockTimeout); |
| *txn_ids = lock_info.txn_ids; |
| } |
| } |
| } else { |
| // We are requesting shared access to a shared lock, so just grant it. |
| lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]); |
| // Using std::max means that expiration time never goes down even when |
| // a transaction is removed from the list. The correct solution would be |
| // to track expiry for every transaction, but this would also work for |
| // now. |
| lock_info.expiration_time = |
| std::max(lock_info.expiration_time, txn_lock_info.expiration_time); |
| } |
| } else { // Lock not held. |
| // Check lock limit |
| if (max_num_locks_ > 0 && |
| lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) { |
| result = Status::Busy(Status::SubCode::kLockLimit); |
| } else { |
| // acquire lock |
| stripe->keys.insert({key, txn_lock_info}); |
| |
| // Maintain lock count if there is a limit on the number of locks |
| if (max_num_locks_) { |
| lock_map->lock_cnt++; |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn, |
| const std::string& key, |
| LockMapStripe* stripe, LockMap* lock_map, |
| Env* env) { |
| TransactionID txn_id = txn->GetID(); |
| |
| auto stripe_iter = stripe->keys.find(key); |
| if (stripe_iter != stripe->keys.end()) { |
| auto& txns = stripe_iter->second.txn_ids; |
| auto txn_it = std::find(txns.begin(), txns.end(), txn_id); |
| // Found the key we locked. unlock it. |
| if (txn_it != txns.end()) { |
| if (txns.size() == 1) { |
| stripe->keys.erase(stripe_iter); |
| } else { |
| auto last_it = txns.end() - 1; |
| if (txn_it != last_it) { |
| *txn_it = *last_it; |
| } |
| txns.pop_back(); |
| } |
| |
| if (max_num_locks_ > 0) { |
| // Maintain lock count if there is a limit on the number of locks. |
| assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); |
| lock_map->lock_cnt--; |
| } |
| } |
| } else { |
| // This key is either not locked or locked by someone else. This should |
| // only happen if the unlocking transaction has expired. |
| assert(txn->GetExpirationTime() > 0 && |
| txn->GetExpirationTime() < env->NowMicros()); |
| } |
| } |
| |
| void TransactionLockMgr::UnLock(PessimisticTransaction* txn, |
| uint32_t column_family_id, |
| const std::string& key, Env* env) { |
| std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); |
| LockMap* lock_map = lock_map_ptr.get(); |
| if (lock_map == nullptr) { |
| // Column Family must have been dropped. |
| return; |
| } |
| |
| // Lock the mutex for the stripe that this key hashes to |
| size_t stripe_num = lock_map->GetStripe(key); |
| assert(lock_map->lock_map_stripes_.size() > stripe_num); |
| LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); |
| |
| stripe->stripe_mutex->Lock(); |
| UnLockKey(txn, key, stripe, lock_map, env); |
| stripe->stripe_mutex->UnLock(); |
| |
| // Signal waiting threads to retry locking |
| stripe->stripe_cv->NotifyAll(); |
| } |
| |
| void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, |
| const TransactionKeyMap* key_map, Env* env) { |
| for (auto& key_map_iter : *key_map) { |
| uint32_t column_family_id = key_map_iter.first; |
| auto& keys = key_map_iter.second; |
| |
| std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); |
| LockMap* lock_map = lock_map_ptr.get(); |
| |
| if (lock_map == nullptr) { |
| // Column Family must have been dropped. |
| return; |
| } |
| |
| // Bucket keys by lock_map_ stripe |
| std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe( |
| std::max(keys.size(), lock_map->num_stripes_)); |
| |
| for (auto& key_iter : keys) { |
| const std::string& key = key_iter.first; |
| |
| size_t stripe_num = lock_map->GetStripe(key); |
| keys_by_stripe[stripe_num].push_back(&key); |
| } |
| |
| // For each stripe, grab the stripe mutex and unlock all keys in this stripe |
| for (auto& stripe_iter : keys_by_stripe) { |
| size_t stripe_num = stripe_iter.first; |
| auto& stripe_keys = stripe_iter.second; |
| |
| assert(lock_map->lock_map_stripes_.size() > stripe_num); |
| LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); |
| |
| stripe->stripe_mutex->Lock(); |
| |
| for (const std::string* key : stripe_keys) { |
| UnLockKey(txn, *key, stripe, lock_map, env); |
| } |
| |
| stripe->stripe_mutex->UnLock(); |
| |
| // Signal waiting threads to retry locking |
| stripe->stripe_cv->NotifyAll(); |
| } |
| } |
| } |
| |
| TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { |
| LockStatusData data; |
| // Lock order here is important. The correct order is lock_map_mutex_, then |
| // for every column family ID in ascending order lock every stripe in |
| // ascending order. |
| InstrumentedMutexLock l(&lock_map_mutex_); |
| |
| std::vector<uint32_t> cf_ids; |
| for (const auto& map : lock_maps_) { |
| cf_ids.push_back(map.first); |
| } |
| std::sort(cf_ids.begin(), cf_ids.end()); |
| |
| for (auto i : cf_ids) { |
| const auto& stripes = lock_maps_[i]->lock_map_stripes_; |
| // Iterate and lock all stripes in ascending order. |
| for (const auto& j : stripes) { |
| j->stripe_mutex->Lock(); |
| for (const auto& it : j->keys) { |
| struct KeyLockInfo info; |
| info.exclusive = it.second.exclusive; |
| info.key = it.first; |
| for (const auto& id : it.second.txn_ids) { |
| info.ids.push_back(id); |
| } |
| data.insert({i, info}); |
| } |
| } |
| } |
| |
| // Unlock everything. Unlocking order is not important. |
| for (auto i : cf_ids) { |
| const auto& stripes = lock_maps_[i]->lock_map_stripes_; |
| for (const auto& j : stripes) { |
| j->stripe_mutex->UnLock(); |
| } |
| } |
| |
| return data; |
| } |
| std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() { |
| return dlock_buffer_.PrepareBuffer(); |
| } |
| |
| void TransactionLockMgr::Resize(uint32_t target_size) { |
| dlock_buffer_.Resize(target_size); |
| } |
| |
| } // namespace rocksdb |
| #endif // ROCKSDB_LITE |