blob: a1222eb00aecfba46b82e612023a1d791578e114 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.tx.impl;
import static java.util.Collections.emptyList;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_TIMEOUT_ERR;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.Waiter;
import org.apache.ignite.internal.tx.event.LockEvent;
import org.apache.ignite.internal.tx.event.LockEventParameters;
import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
* A {@link LockManager} implementation which stores lock queues in the heap.
* <p>Lock waiters are placed in the queue, ordered according to comparator provided by {@link HeapLockManager#deadlockPreventionPolicy}.
* When a new waiter is placed in the queue, it's validated against current lock owner: if there is an owner with a higher priority (as
* defined by comparator) lock request is denied.
* <p>Read lock can be upgraded to write lock (only available for the lowest read-locked entry of
* the queue).
* <p>Additionally limits the lock map size.
public class HeapLockManager extends AbstractEventProducer<LockEvent, LockEventParameters> implements LockManager {
* Table size. TODO make it configurable IGNITE-20694
public static final int SLOTS = 131072;
* Empty slots.
private final ConcurrentLinkedQueue<LockState> empty = new ConcurrentLinkedQueue<>();
* Mapped slots.
private final ConcurrentHashMap<LockKey, LockState> locks;
* Raw slots.
private final LockState[] slots;
* The policy.
private final DeadlockPreventionPolicy deadlockPreventionPolicy;
* Executor that is used to fail waiters after timeout.
private final Executor delayedExecutor;
* Enlisted transactions.
private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<LockState>> txMap = new ConcurrentHashMap<>(1024);
* Parent lock manager.
* TODO asch Needs optimization
private final LockManager parentLockManager;
private final EventListener<LockEventParameters> parentLockConflictListener = this::parentLockConflictListener;
* Constructor.
public HeapLockManager() {
this(new WaitDieDeadlockPreventionPolicy(), SLOTS, SLOTS, new HeapUnboundedLockManager());
* Constructor.
public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy) {
this(deadlockPreventionPolicy, SLOTS, SLOTS, new HeapUnboundedLockManager());
* Constructor.
* @param deadlockPreventionPolicy Deadlock prevention policy.
* @param maxSize Raw slots size.
* @param mapSize Lock map size.
public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy, int maxSize, int mapSize, LockManager parentLockManager) {
if (mapSize > maxSize) {
throw new IllegalArgumentException("maxSize=" + maxSize + " < mapSize=" + mapSize);
this.parentLockManager = Objects.requireNonNull(parentLockManager);
this.deadlockPreventionPolicy = deadlockPreventionPolicy;
this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0
? CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(), TimeUnit.MILLISECONDS)
: null;
locks = new ConcurrentHashMap<>(mapSize);
LockState[] tmp = new LockState[maxSize];
for (int i = 0; i < tmp.length; i++) {
LockState lockState = new LockState();
if (i < mapSize) {
tmp[i] = lockState;
slots = tmp; // Atomic init.
parentLockManager.listen(LockEvent.LOCK_CONFLICT, parentLockConflictListener);
public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey, LockMode lockMode) {
if (lockKey.contextId() == null) { // Treat this lock as a hierarchy lock.
return parentLockManager.acquire(txId, lockKey, lockMode);
while (true) {
LockState state = lockState(lockKey);
IgniteBiTuple<CompletableFuture<Void>, LockMode> futureTuple = state.tryAcquire(txId, lockMode);
if (futureTuple.get1() == null) {
continue; // State is marked for remove, need retry.
LockMode newLockMode = futureTuple.get2();
return futureTuple.get1().thenApply(res -> new Lock(lockKey, newLockMode, txId));
public void release(Lock lock) {
LockState state = lockState(lock.lockKey());
if (state.tryRelease(lock.txId())) {
locks.compute(lock.lockKey(), (k, v) -> adjustLockState(state, v));
public void release(UUID txId, LockKey lockKey, LockMode lockMode) {
// TODO: Delegation to parentLockManager might change after
if (lockKey.contextId() == null) { // Treat this lock as a hierarchy lock.
parentLockManager.release(txId, lockKey, lockMode);
LockState state = lockState(lockKey);
if (state.tryRelease(txId, lockMode)) {
locks.compute(lockKey, (k, v) -> adjustLockState(state, v));
public void releaseAll(UUID txId) {
ConcurrentLinkedQueue<LockState> states = this.txMap.remove(txId);
if (states != null) {
for (LockState state : states) {
if (state.tryRelease(txId)) {
LockKey key = state.key; // State may be already invalidated.
if (key != null) {
locks.compute(key, (k, v) -> adjustLockState(state, v));
public Iterator<Lock> locks(UUID txId) {
ConcurrentLinkedQueue<LockState> lockStates = txMap.get(txId);
// TODO: Delegation to parentLockManager might change after
if (lockStates == null) {
return parentLockManager.locks(txId);
List<Lock> result = new ArrayList<>();
for (LockState lockState : lockStates) {
Waiter waiter = lockState.waiter(txId);
if (waiter != null) {
result.add(new Lock(lockState.key, waiter.lockMode(), txId));
return CollectionUtils.concat(result.iterator(), parentLockManager.locks(txId));
* Returns the lock state for the key.
* @param key The key.
private LockState lockState(LockKey key) {
int h = spread(key.hashCode());
int index = h & (slots.length - 1);
LockState[] res = new LockState[1];
locks.compute(key, (k, v) -> {
if (v == null) {
v = empty.poll();
if (v == null) {
res[0] = slots[index];
assert !res[0].markedForRemove;
} else {
v.markedForRemove = false;
v.key = k;
res[0] = v;
} else {
res[0] = v;
return v;
return res[0];
/** {@inheritDoc} */
public Collection<UUID> queue(LockKey key) {
return lockState(key).queue();
/** {@inheritDoc} */
public Waiter waiter(LockKey key, UUID txId) {
return lockState(key).waiter(txId);
/** {@inheritDoc} */
public boolean isEmpty() {
for (LockState slot : slots) {
if (slot.waitersCount() != 0) {
return false;
return parentLockManager.isEmpty();
private CompletableFuture<Boolean> parentLockConflictListener(LockEventParameters params) {
return fireEvent(LockEvent.LOCK_CONFLICT, params).thenApply(v -> false);
private LockState adjustLockState(LockState state, LockState v) {
// Mapping may already change.
if (v != state) {
return v;
synchronized (v.waiters) {
if (v.waiters.isEmpty()) {
v.markedForRemove = true;
v.key = null;
return null;
} else {
return v;
* A lock state.
public class LockState {
/** Waiters. */
private final TreeMap<UUID, WaiterImpl> waiters;
/** Marked for removal flag. */
private volatile boolean markedForRemove = false;
/** Lock key. */
private volatile LockKey key;
LockState() {
Comparator<UUID> txComparator =
deadlockPreventionPolicy.txIdComparator() != null ? deadlockPreventionPolicy.txIdComparator() : UUID::compareTo;
this.waiters = new TreeMap<>(txComparator);
* Attempts to acquire a lock for the specified {@code key} in specified lock mode.
* @param txId Transaction id.
* @param lockMode Lock mode.
* @return The future or null if state is marked for removal and acquired lock mode.
@Nullable IgniteBiTuple<CompletableFuture<Void>, LockMode> tryAcquire(UUID txId, LockMode lockMode) {
WaiterImpl waiter = new WaiterImpl(txId, lockMode);
synchronized (waiters) {
if (markedForRemove) {
return new IgniteBiTuple(null, lockMode);
// We always replace the previous waiter with the new one. If the previous waiter has lock intention then incomplete
// lock future is copied to the new waiter. This guarantees that, if the previous waiter was locked concurrently, then
// it doesn't have any lock intentions, and the future is not copied to the new waiter. Otherwise, if there is lock
// intention, this means that the lock future contained in previous waiter, is not going to be completed and can be
// copied safely.
WaiterImpl prev = waiters.put(txId, waiter);
// Reenter
if (prev != null) {
if (prev.locked() && prev.lockMode().allowReenter(lockMode)) {
return new IgniteBiTuple(nullCompletedFuture(), prev.lockMode());
} else {
assert prev.lockMode() == waiter.lockMode() :
"Lock modes are incorrect [prev=" + prev.lockMode() + ", new=" + waiter.lockMode() + ']';
if (!isWaiterReadyToNotify(waiter, false)) {
if (deadlockPreventionPolicy.waitTimeout() > 0) {
// Put to wait queue, track.
if (prev == null) {
return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
if (!waiter.locked()) {
} else if (waiter.hasLockIntent()) {
waiter.refuseIntent(); // Restore old lock.
} else {
// Lock granted, track.
if (prev == null) {
// Notify outside the monitor.
return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
* Returns waiters count.
* @return waiters count.
public int waitersCount() {
synchronized (waiters) {
return waiters.size();
* Checks current waiter. It can change the internal state of the waiter.
* @param waiter Checked waiter.
* @return True if current waiter ready to notify, false otherwise.
private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) {
for (Map.Entry<UUID, WaiterImpl> entry : waiters.tailMap(waiter.txId(), false).entrySet()) {
WaiterImpl tmp = entry.getValue();
LockMode mode = lockedMode(tmp);
if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) {
if (conflictFound(waiter.txId(), tmp.txId())) {, tmp));
return true;
} else if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) {, tmp));
return true;
return false;
for (Map.Entry<UUID, WaiterImpl> entry : waiters.headMap(waiter.txId()).entrySet()) {
WaiterImpl tmp = entry.getValue();
LockMode mode = lockedMode(tmp);
if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) {
if (skipFail) {
return false;
} else if (conflictFound(waiter.txId(), tmp.txId())) {, tmp));
return true;
} else if (deadlockPreventionPolicy.waitTimeout() == 0) {, tmp));
return true;
} else {
return false;
return true;
* Create lock exception with given parameters.
* @param locker Locker.
* @param holder Lock holder.
* @return Lock exception.
private LockException lockException(WaiterImpl locker, WaiterImpl holder) {
return new LockException(ACQUIRE_LOCK_ERR,
"Failed to acquire a lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']');
* Create lock exception when lock holder is believed to be missing.
* @param locker Locker.
* @param holder Lock holder.
* @return Lock exception.
private LockException abandonedLockException(WaiterImpl locker, WaiterImpl holder) {
return new LockException(ACQUIRE_LOCK_ERR,
"Failed to acquire an abandoned lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']');
* Attempts to release a lock for the specified {@code key} in exclusive mode.
* @param txId Transaction id.
* @return {@code True} if the queue is empty.
boolean tryRelease(UUID txId) {
Collection<WaiterImpl> toNotify;
synchronized (waiters) {
toNotify = release(txId);
// Notify outside the monitor.
for (WaiterImpl waiter : toNotify) {
return key != null && waitersCount() == 0;
* Releases a specific lock of the key, if a key is locked in multiple modes by the same locker.
* @param txId Transaction id.
* @param lockMode Lock mode.
* @return If the value is true, no one waits of any lock of the key, false otherwise.
boolean tryRelease(UUID txId, LockMode lockMode) {
List<WaiterImpl> toNotify = emptyList();
synchronized (waiters) {
WaiterImpl waiter = waiters.get(txId);
if (waiter != null) {
assert LockMode.supremum(lockMode, waiter.lockMode()) == waiter.lockMode() :
"The lock is not locked in specified mode [mode=" + lockMode + ", locked=" + waiter.lockMode() + ']';
LockMode modeFromDowngrade = waiter.recalculateMode(lockMode);
if (!waiter.locked() && !waiter.hasLockIntent()) {
toNotify = release(txId);
} else if (modeFromDowngrade != waiter.lockMode()) {
toNotify = unlockCompatibleWaiters();
// Notify outside the monitor.
for (WaiterImpl waiter : toNotify) {
return key != null && waitersCount() == 0;
* Releases all locks are held by a specific transaction. This method should be invoked synchronously.
* @param txId Transaction id.
* @return List of waiters to notify.
private List<WaiterImpl> release(UUID txId) {
if (waiters.isEmpty()) {
return emptyList();
return unlockCompatibleWaiters();
* Unlock compatible waiters.
* @return List of waiters to notify.
private List<WaiterImpl> unlockCompatibleWaiters() {
if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) {
return emptyList();
ArrayList<WaiterImpl> toNotify = new ArrayList<>();
Set<UUID> toFail = new HashSet<>();
for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
WaiterImpl tmp = entry.getValue();
if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, true)) {
assert !tmp.hasLockIntent() : "This waiter in not locked for notification [waiter=" + tmp + ']';
if (deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() >= 0) {
for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
WaiterImpl tmp = entry.getValue();
if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, false)) {
assert tmp.hasLockIntent() : "Only failed waiter can be notified here [waiter=" + tmp + ']';
for (UUID failTx : toFail) {
var w = waiters.get(failTx);
if (w.locked()) {
} else {
return toNotify;
* Makes the waiter fail after specified timeout (in milliseconds), if intended lock was not acquired within this timeout.
* @param waiter Waiter.
private void setWaiterTimeout(WaiterImpl waiter) {
delayedExecutor.execute(() -> {
if (!waiter.fut.isDone()) {
waiter.fut.completeExceptionally(new LockException(ACQUIRE_LOCK_TIMEOUT_ERR, "Failed to acquire a lock due to "
+ "timeout [txId=" + waiter.txId() + ", waiter=" + waiter
+ ", timeout=" + deadlockPreventionPolicy.waitTimeout() + ']'));
* Gets a lock mode for this waiter.
* @param waiter Waiter.
* @return Lock mode, which is held by the waiter or {@code null}, if the waiter holds nothing.
private LockMode lockedMode(WaiterImpl waiter) {
LockMode mode = null;
if (waiter.locked()) {
mode = waiter.lockMode();
return mode;
* Returns a collection of timestamps that is associated with the specified {@code key}.
* @return The waiters queue.
public Collection<UUID> queue() {
synchronized (waiters) {
return new ArrayList<>(waiters.keySet());
* Returns a waiter for the specified {@code key}.
* @param txId Transaction id.
* @return The waiter.
public Waiter waiter(UUID txId) {
synchronized (waiters) {
return waiters.get(txId);
private void track(UUID txId) {
txMap.compute(txId, (k, v) -> {
if (v == null) {
v = new ConcurrentLinkedQueue<>();
return v;
* Notifies about the lock conflict found between transactions.
* @param acquirerTx Transaction which tries to acquire the lock.
* @param holderTx Transaction which holds the lock.
private boolean conflictFound(UUID acquirerTx, UUID holderTx) {
CompletableFuture<Void> eventResult = fireEvent(LockEvent.LOCK_CONFLICT, new LockEventParameters(acquirerTx, holderTx));
// No async handling is expected.
// TODO:
assert eventResult.isDone() : "Async lock conflict handling is not supported";
return eventResult.isCompletedExceptionally();
* A waiter implementation.
private static class WaiterImpl implements Comparable<WaiterImpl>, Waiter {
* Holding locks by type.
private final Map<LockMode, Integer> locks = new EnumMap<>(LockMode.class);
* Lock modes are marked as intended, but have not taken yet. This is NOT specific to intention lock modes, such as IS and IX.
private final Set<LockMode> intendedLocks = EnumSet.noneOf(LockMode.class);
/** Locked future. */
private CompletableFuture<Void> fut;
/** Waiter transaction id. */
private final UUID txId;
/** The lock mode to intend to hold. This is NOT specific to intention lock modes, such as IS and IX. */
private LockMode intendedLockMode;
/** The lock mode. */
private LockMode lockMode;
* The filed has a value when the waiter couldn't lock a key.
private LockException ex;
* The constructor.
* @param txId Transaction id.
* @param lockMode Lock mode.
WaiterImpl(UUID txId, LockMode lockMode) {
this.fut = new CompletableFuture<>();
this.txId = txId;
this.intendedLockMode = lockMode;
locks.put(lockMode, 1);
* Adds a lock mode.
* @param lockMode Lock mode.
* @param increment Value to increment amount.
void addLock(LockMode lockMode, int increment) {
locks.merge(lockMode, increment, Integer::sum);
* Removes a lock mode.
* @param lockMode Lock mode.
* @return True if the lock is not locked in the passed mode, false otherwise.
private boolean removeLock(LockMode lockMode) {
Integer counter = locks.get(lockMode);
if (counter == null || counter < 2) {
return true;
} else {
locks.put(lockMode, counter - 1);
return false;
* Recalculates lock mode based of all locks which the waiter has taken.
* @param modeToRemove Mode without which, the recalculation will happen.
* @return Previous lock mode.
LockMode recalculateMode(LockMode modeToRemove) {
if (!removeLock(modeToRemove)) {
return lockMode;
return recalculate();
* Recalculates lock supremums.
* @return Previous lock mode.
private LockMode recalculate() {
LockMode newIntendedLockMode = null;
LockMode newLockMode = null;
for (LockMode mode : locks.keySet()) {
assert locks.get(mode) > 0 : "Incorrect lock counter [txId=" + txId + ", mode=" + mode + "]";
if (intendedLocks.contains(mode)) {
newIntendedLockMode = newIntendedLockMode == null ? mode : LockMode.supremum(newIntendedLockMode, mode);
} else {
newLockMode = newLockMode == null ? mode : LockMode.supremum(newLockMode, mode);
LockMode mode = lockMode;
lockMode = newLockMode;
intendedLockMode = newLockMode != null && newIntendedLockMode != null ? LockMode.supremum(newLockMode, newIntendedLockMode)
: newIntendedLockMode;
return mode;
* Merge all locks that were held by another waiter to the current one.
* @param other Other waiter.
void upgrade(WaiterImpl other) {
other.locks.entrySet().forEach(entry -> addLock(entry.getKey(), entry.getValue()));
if (other.hasLockIntent()) {
fut = other.fut;
* Removes all locks that were intended to hold.
void refuseIntent() {
for (LockMode mode : intendedLocks) {
intendedLockMode = null;
/** {@inheritDoc} */
public int compareTo(WaiterImpl o) {
return txId.compareTo(o.txId);
/** Notifies a future listeners. */
private void notifyLocked() {
if (ex != null) {
} else {
assert lockMode != null;
/** {@inheritDoc} */
public boolean locked() {
return this.lockMode != null;
* Checks is the waiter has any intended to lock a key.
* @return True if the waiter has an intended lock, false otherwise.
public boolean hasLockIntent() {
return this.intendedLockMode != null;
/** {@inheritDoc} */
public LockMode lockMode() {
return lockMode;
/** {@inheritDoc} */
public LockMode intendedLockMode() {
return intendedLockMode;
/** Grant a lock. */
private void lock() {
lockMode = intendedLockMode;
intendedLockMode = null;
* Fails the lock waiter.
* @param e Lock exception.
private void fail(LockException e) {
ex = e;
/** {@inheritDoc} */
public UUID txId() {
return txId;
/** {@inheritDoc} */
public boolean equals(Object o) {
if (!(o instanceof WaiterImpl)) {
return false;
return compareTo((WaiterImpl) o) == 0;
/** {@inheritDoc} */
public int hashCode() {
return txId.hashCode();
/** {@inheritDoc} */
public String toString() {
return S.toString(WaiterImpl.class, this, "granted", fut.isDone());
private static int spread(int h) {
return (h ^ (h >>> 16)) & 0x7fffffff;
public LockState[] getSlots() {
return slots;
public int available() {
return empty.size();