blob: da1ccc2772ef24ee5df70a3d527e4016b9198a76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
/**
* Manages lock order within a thread.
*/
public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/** Maxim number of removed locks. */
private static final int MAX_REMOVED_LOCKS = 10240;
/** Maxim number of atomic IDs for thread. Must be power of two! */
protected static final int THREAD_RESERVE_SIZE = 0x4000;
/** @see IgniteSystemProperties#IGNITE_MAX_NESTED_LISTENER_CALLS */
public static final int DFLT_MAX_NESTED_LISTENER_CALLS = 5;
/** */
private static final int MAX_NESTED_LSNR_CALLS =
getInteger(IGNITE_MAX_NESTED_LISTENER_CALLS, DFLT_MAX_NESTED_LISTENER_CALLS);
/** Pending locks per thread. */
private final ThreadLocal<Deque<GridCacheMvccCandidate>> pending = new ThreadLocal<>();
/** Pending near local locks and topology version per thread. */
private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit;
/** Set of removed lock versions. */
private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks =
new GridBoundedConcurrentLinkedHashSet<>(MAX_REMOVED_LOCKS, MAX_REMOVED_LOCKS, 0.75f, 16, PER_SEGMENT_Q);
/** Locked keys. */
@GridToStringExclude
private final ConcurrentMap<IgniteTxKey, GridDistributedCacheEntry> locked = newMap();
/** Near locked keys. Need separate map because mvcc manager is shared between caches. */
@GridToStringExclude
private final ConcurrentMap<IgniteTxKey, GridDistributedCacheEntry> nearLocked = newMap();
/** Active futures mapped by version ID. */
@GridToStringExclude
private final ConcurrentMap<GridCacheVersion, Collection<GridCacheVersionedFuture<?>>> verFuts = newMap();
/** Pending atomic futures. */
private final ConcurrentHashMap<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap<>();
/** Pending data streamer futures. */
private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
/** */
private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap<>();
/** Near to DHT version mapping. */
private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
/** Finish futures. */
private final FastSizeDeque<FinishLockFuture> finishFuts = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
/** Nested listener calls. */
private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>() {
@Override protected Integer initialValue() {
return 0;
}
};
/** Logger. */
@SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
private IgniteLogger exchLog;
/** */
private volatile boolean stopping;
/** Global atomic id counter. */
protected final AtomicLong globalAtomicCnt = new AtomicLong();
/** Per thread atomic id counter. */
private final ThreadLocal<LongWrapper> threadAtomicCnt = new ThreadLocal<LongWrapper>() {
@Override protected LongWrapper initialValue() {
return new LongWrapper(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE));
}
};
/** Lock callback. */
@GridToStringExclude
private final GridCacheLockCallback cb = new GridCacheLockCallback() {
/** {@inheritDoc} */
@Override public void onOwnerChanged(final GridCacheEntryEx entry, final GridCacheMvccCandidate owner) {
int nested = nestedLsnrCalls.get();
if (nested < MAX_NESTED_LSNR_CALLS) {
nestedLsnrCalls.set(nested + 1);
try {
notifyOwnerChanged(entry, owner);
}
finally {
nestedLsnrCalls.set(nested);
}
}
else {
cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
notifyOwnerChanged(entry, owner);
}
}, true);
}
}
/** {@inheritDoc} */
@Override public void onLocked(GridDistributedCacheEntry entry) {
if (entry.isNear())
nearLocked.put(entry.txKey(), entry);
else
locked.put(entry.txKey(), entry);
}
/** {@inheritDoc} */
@Override public void onFreed(GridDistributedCacheEntry entry) {
if (entry.isNear())
nearLocked.remove(entry.txKey());
else
locked.remove(entry.txKey());
}
};
/**
* @param entry Entry to notify callback for.
* @param owner Current lock owner.
*/
private void notifyOwnerChanged(final GridCacheEntryEx entry, final GridCacheMvccCandidate owner) {
assert entry != null;
if (log.isDebugEnabled())
log.debug("Received owner changed callback [" + entry.key() + ", owner=" + owner + ']');
if (owner != null && (owner.local() || owner.nearLocal())) {
Collection<GridCacheVersionedFuture<?>> futCol = verFuts.get(owner.version());
if (futCol != null) {
ArrayList<GridCacheVersionedFuture<?>> futColCp;
synchronized (futCol) {
futColCp = new ArrayList<>(futCol.size());
futColCp.addAll(futCol);
}
// Must invoke onOwnerChanged outside of synchronization block.
for (GridCacheVersionedFuture<?> fut : futColCp) {
if (!fut.isDone()) {
final GridCacheVersionedFuture<Boolean> mvccFut = (GridCacheVersionedFuture<Boolean>)fut;
// Since this method is called outside of entry synchronization,
// we can safely invoke any method on the future.
// Also note that we don't remove future here if it is done.
// The removal is initiated from within future itself.
if (mvccFut.onOwnerChanged(entry, owner))
return;
}
}
}
}
if (log.isDebugEnabled())
log.debug("Lock future not found for owner change callback (will try transaction futures) [owner=" +
owner + ", entry=" + entry + ']');
// If no future was found, delegate to transaction manager.
if (cctx.tm().onOwnerChanged(entry, owner)) {
if (log.isDebugEnabled())
log.debug("Found transaction for changed owner: " + owner);
}
else if (log.isDebugEnabled())
log.debug("Failed to find transaction for changed owner: " + owner);
if (!finishFuts.isEmptyx()) {
for (FinishLockFuture f : finishFuts)
f.recheck(entry);
}
}
/** Discovery listener. */
@GridToStringExclude private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent;
assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
if (log.isDebugEnabled())
log.debug("Processing node left [nodeId=" + discoEvt.eventNode().id() + "]");
removeExplicitNodeLocks(discoEvt.eventNode().id());
for (GridCacheFuture<?> fut : activeFutures())
fut.onNodeLeft(discoEvt.eventNode().id());
for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values())
cacheFut.onNodeLeft(discoEvt.eventNode().id());
}
};
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
exchLog = cctx.logger(getClass().getName() + ".exchange");
pendingExplicit = GridConcurrentFactory.newMap();
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
}
/** {@inheritDoc} */
@Override public void onKernalStop0(boolean cancel) {
cctx.gridEvents().removeLocalEventListener(discoLsnr);
}
/**
* @return MVCC callback.
*/
public GridCacheLockCallback callback() {
return cb;
}
/**
* @return Collection of pending explicit locks.
*/
public Collection<GridCacheExplicitLockSpan> activeExplicitLocks() {
return pendingExplicit.values();
}
/**
* @return Collection of active futures.
*/
public Collection<GridCacheFuture<?>> activeFutures() {
ArrayList<GridCacheFuture<?>> col = new ArrayList<>();
for (Collection<GridCacheVersionedFuture<?>> futs : verFuts.values()) {
synchronized (futs) {
col.addAll(futs);
}
}
col.addAll(futs.values());
return col;
}
/**
* Creates a future that will wait for finishing all remote transactions (primary -> backup)
* with topology version less or equal to {@code topVer}.
*
* @param topVer Topology version.
* @return Compound future of all {@link GridDhtTxFinishFuture} futures.
*/
public IgniteInternalFuture<?> finishRemoteTxs(AffinityTopologyVersion topVer) {
GridCompoundFuture<?, ?> res = new CacheObjectsReleaseFuture<>("RemoteTx", topVer);
for (GridCacheFuture<?> fut : futs.values()) {
if (fut instanceof GridDhtTxFinishFuture) {
GridDhtTxFinishFuture finishTxFuture = (GridDhtTxFinishFuture)fut;
if (cctx.tm().needWaitTransaction(finishTxFuture.tx(), topVer))
res.add(ignoreErrors(finishTxFuture));
}
}
res.markInitialized();
return res;
}
/**
* Future wrapper which ignores any underlying future errors.
*
* @param f Underlying future.
* @return Future wrapper which ignore any underlying future errors.
*/
private IgniteInternalFuture ignoreErrors(IgniteInternalFuture<?> f) {
GridFutureAdapter<?> wrapper = new GridFutureAdapter();
f.listen(future -> wrapper.onDone());
return wrapper;
}
/**
* @param leftNodeId Left node ID.
*/
public void removeExplicitNodeLocks(UUID leftNodeId) {
cctx.kernalContext().closure().runLocalSafe(
new GridPlainRunnable() {
@Override public void run() {
for (GridDistributedCacheEntry entry : locked()) {
try {
entry.removeExplicitNodeLocks(leftNodeId);
entry.touch();
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Attempted to remove node locks from removed entry in cache lock manager " +
"disco callback (will ignore): " + entry);
}
}
}
}, true);
}
/**
* @param from From version.
* @param to To version.
*/
public void mapVersion(GridCacheVersion from, GridCacheVersion to) {
assert from != null;
assert to != null;
GridCacheVersion old = near2dht.put(from, to);
assert old == null || old == to || old.equals(to);
if (log.isDebugEnabled())
log.debug("Added version mapping [from=" + from + ", to=" + to + ']');
}
/**
* @param from Near version.
* @return DHT version.
*/
public GridCacheVersion mappedVersion(GridCacheVersion from) {
assert from != null;
GridCacheVersion to = near2dht.get(from);
if (log.isDebugEnabled())
log.debug("Retrieved mapped version [from=" + from + ", to=" + to + ']');
return to;
}
/**
* Cancels all client futures.
*/
public void onStop() {
stopping = true;
cancelClientFutures(stopError());
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture reconnectFut) {
IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut);
cancelClientFutures(err);
}
/**
* @param err Error.
*/
private void cancelClientFutures(IgniteCheckedException err) {
cancelFuturesWithException(err, activeFutures());
cancelFuturesWithException(err, atomicFuts.values());
}
/**
* @param err Error to complete future with.
* @param futures Collection of futures.
*/
private void cancelFuturesWithException(
IgniteCheckedException err,
Collection<? extends IgniteInternalFuture<?>> futures
) {
for (IgniteInternalFuture<?> fut : futures) {
try {
((GridFutureAdapter)fut).onDone(err);
}
catch (Exception e) {
U.warn(log, "Failed to complete future on node stop (will ignore): " + fut, e);
}
}
}
/**
* @param reconnectFut Reconnect future.
* @return Client disconnected exception.
*/
private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) {
if (reconnectFut == null)
reconnectFut = cctx.kernalContext().cluster().clientReconnectFuture();
return new IgniteClientDisconnectedCheckedException(reconnectFut,
"Operation has been cancelled (client node disconnected).");
}
/**
* @return Node stop exception.
*/
private IgniteCheckedException stopError() {
return new NodeStoppingException("Operation has been cancelled (node is stopping).");
}
/**
* @param from From version.
* @return To version.
*/
public GridCacheVersion unmapVersion(GridCacheVersion from) {
assert from != null;
GridCacheVersion to = near2dht.remove(from);
if (log.isDebugEnabled())
log.debug("Removed mapped version [from=" + from + ", to=" + to + ']');
return to;
}
/**
* @param futId Future ID.
* @param fut Future.
* @return {@code False} if future was forcibly completed with error.
*/
public boolean addAtomicFuture(long futId, GridCacheAtomicFuture<?> fut) {
IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
return onFutureAdded(fut);
}
/**
* @return Collection of pending atomic futures.
*/
public Collection<GridCacheAtomicFuture<?>> atomicFutures() {
return atomicFuts.values();
}
/**
* @return Number of pending atomic futures.
*/
public int atomicFuturesCount() {
return atomicFuts.size();
}
/**
* @return Collection of pending data streamer futures.
*/
public Collection<DataStreamerFuture> dataStreamerFutures() {
return dataStreamerFuts;
}
/**
* Gets future by given future ID.
*
* @param futId Future ID.
* @return Future.
*/
@Nullable public IgniteInternalFuture<?> atomicFuture(long futId) {
return atomicFuts.get(futId);
}
/**
* @param futId Future ID.
* @return Removed future.
*/
@Nullable public IgniteInternalFuture<?> removeAtomicFuture(long futId) {
return atomicFuts.remove(futId);
}
/**
* @param fut Future.
* @param futId Future ID.
* @return {@code True} if added.
*/
public boolean addFuture(final GridCacheFuture<?> fut, final IgniteUuid futId) {
GridCacheFuture<?> old = futs.put(futId, fut);
assert old == null : old;
return onFutureAdded(fut);
}
/**
* @param topVer Topology version.
* @return Future.
*/
public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) {
final DataStreamerFuture fut = new DataStreamerFuture(topVer);
boolean add = dataStreamerFuts.add(fut);
assert add;
return fut;
}
/**
/**
* Adds future.
*
* @param fut Future.
* @return {@code True} if added.
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
public boolean addFuture(final GridCacheVersionedFuture<?> fut) {
if (fut.isDone()) {
fut.markNotTrackable();
return true;
}
if (!fut.trackable())
return true;
while (true) {
Collection<GridCacheVersionedFuture<?>> old = verFuts.get(fut.version());
if (old == null) {
Collection<GridCacheVersionedFuture<?>> col = new HashSet<GridCacheVersionedFuture<?>>(U.capacity(1), 0.75f) {
{
// Make sure that we add future to queue before
// adding queue to the map of futures.
add(fut);
}
@Override public int hashCode() {
return System.identityHashCode(this);
}
@Override public boolean equals(Object obj) {
return obj == this;
}
};
old = verFuts.putIfAbsent(fut.version(), col);
}
if (old != null) {
boolean empty, dup = false;
synchronized (old) {
empty = old.isEmpty();
if (!empty)
dup = !old.add(fut);
}
// Future is being removed, so we force-remove here and try again.
if (empty) {
if (verFuts.remove(fut.version(), old)) {
if (log.isDebugEnabled())
log.debug("Removed future list from futures map for lock version: " + fut.version());
}
continue;
}
if (dup) {
if (log.isDebugEnabled())
log.debug("Found duplicate future in futures map (will not add): " + fut);
return false;
}
}
// Handle version mappings.
if (fut instanceof GridCacheMappedVersion) {
GridCacheVersion from = ((GridCacheMappedVersion)fut).mappedVersion();
if (from != null)
mapVersion(from, fut.version());
}
if (log.isDebugEnabled())
log.debug("Added future to future map: " + fut);
break;
}
// Just in case if future was completed before it was added.
if (fut.isDone())
removeVersionedFuture(fut);
else
onFutureAdded(fut);
return true;
}
/**
* @param fut Future.
* @return {@code False} if future was forcibly completed with error.
*/
private boolean onFutureAdded(IgniteInternalFuture<?> fut) {
if (stopping) {
((GridFutureAdapter)fut).onDone(stopError());
return false;
}
else if (cctx.kernalContext().clientDisconnected()) {
((GridFutureAdapter)fut).onDone(disconnectedError(null));
return false;
}
return true;
}
/**
* @param futId Future ID.
*/
public void removeFuture(IgniteUuid futId) {
futs.remove(futId);
}
/**
* @param fut Future to remove.
* @return {@code True} if removed.
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
public boolean removeVersionedFuture(GridCacheVersionedFuture<?> fut) {
if (!fut.trackable())
return true;
Collection<GridCacheVersionedFuture<?>> cur = verFuts.get(fut.version());
if (cur == null)
return false;
boolean rmv, empty;
synchronized (cur) {
rmv = cur.remove(fut);
empty = cur.isEmpty();
}
if (rmv) {
if (log.isDebugEnabled())
log.debug("Removed future from future map: " + fut);
}
else if (log.isDebugEnabled())
log.debug("Attempted to remove a non-registered future (has it been already removed?): " + fut);
if (empty && verFuts.remove(fut.version(), cur))
if (log.isDebugEnabled())
log.debug("Removed future list from futures map for lock version: " + fut.version());
return rmv;
}
/**
* Gets future for given future ID and lock ID.
*
* @param ver Lock ID.
* @param futId Future ID.
* @return Future.
*/
@Nullable public GridCacheVersionedFuture<?> versionedFuture(GridCacheVersion ver, IgniteUuid futId) {
Collection<GridCacheVersionedFuture<?>> futs = this.verFuts.get(ver);
if (futs != null) {
synchronized (futs) {
for (GridCacheVersionedFuture<?> fut : futs) {
if (fut.futureId().equals(futId)) {
if (log.isDebugEnabled())
log.debug("Found future in futures map: " + fut);
return fut;
}
}
}
}
if (log.isDebugEnabled())
log.debug("Failed to find future in futures map [ver=" + ver + ", futId=" + futId + ']');
return null;
}
/**
* Gets futures for given lock ID.
*
* @param ver Lock ID.
* @return Futures.
*/
@Nullable public Collection<GridCacheVersionedFuture<?>> futuresForVersion(GridCacheVersion ver) {
Collection<GridCacheVersionedFuture<?>> futs = this.verFuts.get(ver);
if (futs != null) {
synchronized (futs) {
return new ArrayList<>(futs);
}
}
return null;
}
/**
* @param futId Future ID.
* @return Found future.
*/
@Nullable public GridCacheFuture future(IgniteUuid futId) {
return futs.get(futId);
}
/**
* @param cacheCtx Cache context.
* @param ver Lock version to check.
* @return {@code True} if lock had been removed.
*/
public boolean isRemoved(GridCacheContext cacheCtx, GridCacheVersion ver) {
return !cacheCtx.isNear() && !cacheCtx.isLocal() && ver != null && rmvLocks.contains(ver);
}
/**
* @param cacheCtx Cache context.
* @param ver Obsolete entry version.
* @return {@code True} if added.
*/
public boolean addRemoved(GridCacheContext cacheCtx, GridCacheVersion ver) {
if (cacheCtx.isNear() || cacheCtx.isLocal())
return true;
boolean ret = rmvLocks.add(ver);
if (log.isDebugEnabled())
log.debug("Added removed lock version: " + ver);
return ret;
}
/**
* @return Collection of all locked entries.
*/
private Collection<GridDistributedCacheEntry> locked() {
return F.concat(false, locked.values(), nearLocked.values());
}
/**
* @return Locked keys.
*/
public Collection<IgniteTxKey> lockedKeys() {
return locked.keySet();
}
/**
* @return Locked near keys.
*/
public Collection<IgniteTxKey> nearLockedKeys() {
return nearLocked.keySet();
}
/**
* This method has poor performance, so use with care. It is currently only used by {@code DGC}.
*
* @return Remote candidates.
*/
public Collection<GridCacheMvccCandidate> remoteCandidates() {
Collection<GridCacheMvccCandidate> rmtCands = new ArrayList<>();
for (GridDistributedCacheEntry entry : locked())
rmtCands.addAll(entry.remoteMvccSnapshot());
return rmtCands;
}
/**
* This method has poor performance, so use with care. It is currently only used by {@code DGC}.
*
* @return Local candidates.
*/
public Collection<GridCacheMvccCandidate> localCandidates() {
Collection<GridCacheMvccCandidate> locCands = new ArrayList<>();
for (GridDistributedCacheEntry entry : locked()) {
try {
locCands.addAll(entry.localCandidates());
}
catch (GridCacheEntryRemovedException ignore) {
// No-op.
}
}
return locCands;
}
/**
* @param cacheCtx Cache context.
* @param cand Cache lock candidate to add.
* @return {@code True} if added as a result of this operation,
* {@code false} if was previously added.
*/
public boolean addNext(GridCacheContext cacheCtx, GridCacheMvccCandidate cand) {
assert cand != null;
assert !cand.reentry() : "Lock reentries should not be linked: " + cand;
// Don't order near candidates by thread as they will be ordered on
// DHT node. Also, if candidate is implicit, no point to order him.
if (cacheCtx.isNear() || cand.singleImplicit())
return true;
Deque<GridCacheMvccCandidate> queue = pending.get();
if (queue == null)
pending.set(queue = new ArrayDeque<>());
GridCacheMvccCandidate prev = null;
if (!queue.isEmpty())
prev = queue.getLast();
queue.add(cand);
if (prev != null) {
prev.next(cand);
cand.previous(prev);
}
if (log.isDebugEnabled())
log.debug("Linked new candidate: " + cand);
return true;
}
/**
* Reset MVCC context.
*/
public void contextReset() {
pending.set(null);
}
/**
* Adds candidate to the list of near local candidates.
*
* @param threadId Thread ID.
* @param cand Candidate to add.
* @param topVer Topology version.
*/
public void addExplicitLock(long threadId, GridCacheMvccCandidate cand, AffinityTopologyVersion topVer) {
while (true) {
GridCacheExplicitLockSpan span = pendingExplicit.get(threadId);
if (span == null) {
span = new GridCacheExplicitLockSpan(topVer, cand);
GridCacheExplicitLockSpan old = pendingExplicit.putIfAbsent(threadId, span);
if (old == null)
break;
else
span = old;
}
// Either span was not empty, or concurrent put did not succeed.
if (span.addCandidate(topVer, cand))
break;
else
pendingExplicit.remove(threadId, span);
}
}
/**
* Removes candidate from the list of near local candidates.
*
* @param cand Candidate to remove.
*/
public void removeExplicitLock(GridCacheMvccCandidate cand) {
GridCacheExplicitLockSpan span = pendingExplicit.get(cand.threadId());
if (span == null)
return;
if (span.removeCandidate(cand))
pendingExplicit.remove(cand.threadId(), span);
}
/**
* Checks if given key is locked by thread with given id or any thread.
*
* @param key Key to check.
* @param threadId Thread id. If -1, all threads will be checked.
* @return {@code True} if locked by any or given thread (depending on {@code threadId} value).
*/
public boolean isLockedByThread(IgniteTxKey key, long threadId) {
if (threadId < 0) {
for (GridCacheExplicitLockSpan span : pendingExplicit.values()) {
GridCacheMvccCandidate cand = span.candidate(key, null);
if (cand != null && cand.owner())
return true;
}
}
else {
GridCacheExplicitLockSpan span = pendingExplicit.get(threadId);
if (span != null) {
GridCacheMvccCandidate cand = span.candidate(key, null);
return cand != null && cand.owner();
}
}
return false;
}
/**
* Marks candidates for given thread and given key as owned.
*
* @param key Key.
* @param threadId Thread id.
*/
public void markExplicitOwner(IgniteTxKey key, long threadId) {
assert threadId > 0;
GridCacheExplicitLockSpan span = pendingExplicit.get(threadId);
if (span != null)
span.markOwned(key);
}
/**
* Removes explicit lock for given thread id, key and optional version.
*
* @param threadId Thread id.
* @param key Key.
* @param ver Optional version.
* @return Candidate.
*/
public GridCacheMvccCandidate removeExplicitLock(long threadId,
IgniteTxKey key,
@Nullable GridCacheVersion ver
) {
assert threadId > 0;
GridCacheExplicitLockSpan span = pendingExplicit.get(threadId);
if (span == null)
return null;
GridCacheMvccCandidate cand = span.removeCandidate(key, ver);
if (cand != null && span.isEmpty())
pendingExplicit.remove(threadId, span);
return cand;
}
/**
* Gets last added explicit lock candidate by thread id and key.
*
* @param threadId Thread id.
* @param key Key to look up.
* @return Last added explicit lock candidate for given thread id and key or {@code null} if
* no such candidate.
*/
@Nullable public GridCacheMvccCandidate explicitLock(long threadId, IgniteTxKey key) {
if (threadId < 0)
return explicitLock(key, null);
else {
GridCacheExplicitLockSpan span = pendingExplicit.get(threadId);
return span == null ? null : span.candidate(key, null);
}
}
/**
* Gets explicit lock candidate added by any thread by given key and lock version.
*
* @param key Key.
* @param ver Version.
* @return Lock candidate that satisfies given criteria or {@code null} if no such candidate.
*/
@Nullable public GridCacheMvccCandidate explicitLock(IgniteTxKey key, @Nullable GridCacheVersion ver) {
for (GridCacheExplicitLockSpan span : pendingExplicit.values()) {
GridCacheMvccCandidate cand = span.candidate(key, ver);
if (cand != null)
return cand;
}
return null;
}
/**
* @param threadId Thread ID.
* @return Topology snapshot for last acquired and not released lock.
*/
@Nullable public AffinityTopologyVersion lastExplicitLockTopologyVersion(long threadId) {
GridCacheExplicitLockSpan span = pendingExplicit.get(threadId);
return span != null ? span.topologyVersion() : null;
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>> ");
X.println(">>> Mvcc manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']');
X.println(">>> rmvLocksSize: " + rmvLocks.sizex());
X.println(">>> lockedSize: " + locked.size());
X.println(">>> futsSize: " + (verFuts.size() + futs.size()));
X.println(">>> near2dhtSize: " + near2dht.size());
X.println(">>> finishFutsSize: " + finishFuts.sizex());
}
/**
* @param topVer Topology version.
* @return Future that signals when all locks for given partitions are released.
*/
public IgniteInternalFuture<?> finishLocks(AffinityTopologyVersion topVer) {
assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
return finishLocks(null, topVer);
}
/**
* @param topVer Topology version.
* @return Locked keys.
*/
public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
if (!finishFuts.isEmptyx()) {
for (FinishLockFuture fut : finishFuts) {
if (fut.topologyVersion().equals(topVer))
cands.putAll(fut.pendingLocks());
}
}
return cands;
}
/**
* Creates a future that will wait for all explicit locks acquired on given topology
* version to be released.
*
* @param topVer Topology version to wait for.
* @return Explicit locks release future.
*/
public IgniteInternalFuture<?> finishExplicitLocks(AffinityTopologyVersion topVer) {
GridCompoundFuture<Object, Object> res = new CacheObjectsReleaseFuture<>("ExplicitLock", topVer);
for (GridCacheExplicitLockSpan span : pendingExplicit.values()) {
AffinityTopologyVersion snapshot = span.topologyVersion();
if (snapshot != null && snapshot.compareTo(topVer) < 0)
res.add(span.releaseFuture());
}
res.markInitialized();
return res;
}
/**
* @param topVer Topology version to finish.
*
* @return Finish update future.
*/
@SuppressWarnings("unchecked")
public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) {
GridCompoundFuture<Object, Object> res = new FinishAtomicUpdateFuture("AtomicUpdate", topVer);
for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) {
IgniteInternalFuture<Void> complete = fut.completeFuture(topVer);
if (complete != null)
res.add((IgniteInternalFuture)complete);
}
res.markInitialized();
return res;
}
/**
*
* @return Finish update future.
*/
public IgniteInternalFuture<?> finishDataStreamerUpdates(AffinityTopologyVersion topVer) {
GridCompoundFuture<Void, Object> res = new CacheObjectsReleaseFuture<>("DataStreamer", topVer);
for (DataStreamerFuture fut : dataStreamerFuts) {
if (fut.topVer.compareTo(topVer) < 0)
res.add(fut);
}
res.markInitialized();
return res;
}
/**
* @param keys Key for which locks should be released.
* @param cacheId Cache ID.
* @param topVer Topology version.
* @return Future that signals when all locks for given keys are released.
*/
public IgniteInternalFuture<?> finishKeys(Collection<KeyCacheObject> keys,
final int cacheId,
AffinityTopologyVersion topVer) {
if (!(keys instanceof Set))
keys = new HashSet<>(keys);
final Collection<KeyCacheObject> keys0 = keys;
return finishLocks(new P1<GridDistributedCacheEntry>() {
@Override public boolean apply(GridDistributedCacheEntry e) {
return e.context().cacheId() == cacheId && keys0.contains(e.key());
}
}, topVer);
}
/**
* @param filter Entry filter.
* @param topVer Topology version.
* @return Future that signals when all locks for given partitions will be released.
*/
private IgniteInternalFuture<?> finishLocks(
@Nullable final IgnitePredicate<GridDistributedCacheEntry> filter,
AffinityTopologyVersion topVer
) {
assert topVer.topologyVersion() != 0;
if (topVer.equals(AffinityTopologyVersion.NONE))
return new GridFinishedFuture();
final FinishLockFuture finishFut =
new FinishLockFuture(filter == null ? locked() : F.view(locked(), filter), topVer);
finishFuts.add(finishFut);
finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> e) {
finishFuts.remove(finishFut);
}
});
finishFut.recheck();
return finishFut;
}
/**
*
*/
public void recheckPendingLocks() {
if (exchLog.isDebugEnabled())
exchLog.debug("Rechecking pending locks for completion.");
if (!finishFuts.isEmptyx()) {
for (FinishLockFuture fut : finishFuts)
fut.recheck();
}
}
/**
* @return Next future ID for atomic futures.
*/
public long nextAtomicId() {
LongWrapper cnt = threadAtomicCnt.get();
long res = cnt.getAndIncrement();
if ((cnt.get() & (THREAD_RESERVE_SIZE - 1)) == 0)
cnt.set(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE));
return res;
}
/**
*
*/
private class FinishLockFuture extends GridFutureAdapter<Object> {
/** */
private static final long serialVersionUID = 0L;
/** Topology version. Instance field for toString method only. */
@GridToStringInclude
private final AffinityTopologyVersion topVer;
/** */
@GridToStringInclude
private final Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> pendingLocks =
new ConcurrentHashMap<>();
/**
* @param topVer Topology version.
* @param entries Entries.
*/
FinishLockFuture(Iterable<GridDistributedCacheEntry> entries, AffinityTopologyVersion topVer) {
assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
this.topVer = topVer;
for (GridCacheEntryEx entry : entries) {
// Either local or near local candidates.
try {
Collection<GridCacheMvccCandidate> locs = entry.localCandidates();
if (!F.isEmpty(locs)) {
Collection<GridCacheMvccCandidate> cands = new ConcurrentLinkedQueue<>();
cands.addAll(F.view(locs, versionFilter()));
if (!F.isEmpty(cands))
pendingLocks.put(entry.txKey(), cands);
}
}
catch (GridCacheEntryRemovedException ignored) {
if (exchLog.isDebugEnabled())
exchLog.debug("Got removed entry when adding it to finish lock future (will ignore): " + entry);
}
}
if (exchLog.isDebugEnabled())
exchLog.debug("Pending lock set [topVer=" + topVer + ", locks=" + pendingLocks + ']');
}
/**
* @return Topology version.
*/
AffinityTopologyVersion topologyVersion() {
return topVer;
}
/**
* @return Pending locks.
*/
Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> pendingLocks() {
return pendingLocks;
}
/**
* @return Filter.
*/
private IgnitePredicate<GridCacheMvccCandidate> versionFilter() {
assert topVer.topologyVersion() > 0;
return new P1<GridCacheMvccCandidate>() {
@Override public boolean apply(GridCacheMvccCandidate c) {
assert c.nearLocal() || c.dhtLocal();
// Wait for explicit locks.
return c.topologyVersion().equals(AffinityTopologyVersion.ZERO) || c.topologyVersion().compareTo(topVer) < 0;
}
};
}
/**
*
*/
void recheck() {
for (Iterator<IgniteTxKey> it = pendingLocks.keySet().iterator(); it.hasNext(); ) {
IgniteTxKey key = it.next();
GridCacheContext cacheCtx = cctx.cacheContext(key.cacheId());
GridCacheEntryEx entry = cacheCtx.cache().peekEx(key.key());
if (entry == null)
it.remove();
else
recheck(entry);
}
if (pendingLocks.isEmpty()) {
if (exchLog.isDebugEnabled())
exchLog.debug("Finish lock future is done: " + this);
onDone();
}
}
/**
* @param entry Entry.
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
void recheck(@Nullable GridCacheEntryEx entry) {
if (entry == null)
return;
if (exchLog.isDebugEnabled())
exchLog.debug("Rechecking entry for completion [entry=" + entry + ", finFut=" + this + ']');
Collection<GridCacheMvccCandidate> cands = pendingLocks.get(entry.txKey());
if (cands != null) {
synchronized (cands) {
// Check exclude ID again, as key could have been reassigned.
cands.removeIf(GridCacheMvccCandidate::removed);
if (cands.isEmpty())
pendingLocks.remove(entry.txKey());
if (pendingLocks.isEmpty()) {
onDone();
if (exchLog.isDebugEnabled())
exchLog.debug("Finish lock future is done: " + this);
}
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
if (!pendingLocks.isEmpty()) {
Map<GridCacheVersion, IgniteInternalTx> txs = new HashMap<>(1, 1.0f);
for (Collection<GridCacheMvccCandidate> cands : pendingLocks.values())
for (GridCacheMvccCandidate c : cands)
txs.put(c.version(), cctx.tm().tx(c.version()));
return S.toString(FinishLockFuture.class, this, "txs=" + txs + ", super=" + super.toString());
}
else
return S.toString(FinishLockFuture.class, this, super.toString());
}
}
/**
* Finish atomic update future.
*/
private static class FinishAtomicUpdateFuture extends CacheObjectsReleaseFuture<Object, Object> {
/** */
private static final long serialVersionUID = 0L;
/**
* @param type Type.
* @param topVer Topology version.
*/
private FinishAtomicUpdateFuture(String type, AffinityTopologyVersion topVer) {
super(type, topVer);
}
/** {@inheritDoc} */
@Override protected boolean ignoreFailure(Throwable err) {
Class cls = err.getClass();
return ClusterTopologyCheckedException.class.isAssignableFrom(cls) ||
CachePartialUpdateCheckedException.class.isAssignableFrom(cls);
}
}
/**
*
*/
private class DataStreamerFuture extends GridFutureAdapter<Void> {
/** Topology version. Instance field for toString method only. */
@GridToStringInclude
private final AffinityTopologyVersion topVer;
/**
* @param topVer Topology version.
*/
DataStreamerFuture(AffinityTopologyVersion topVer) {
this.topVer = topVer;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
dataStreamerFuts.remove(this);
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStreamerFuture.class, this, super.toString());
}
}
/** Long wrapper. */
private static class LongWrapper {
/** */
private long val;
/**
* @param val Value.
*/
public LongWrapper(long val) {
this.val = val + 1;
if (this.val == 0)
this.val = 1;
}
/**
* @param val Value to set.
*/
public void set(long val) {
this.val = val;
}
/**
* @return Current value.
*/
public long get() {
return val;
}
/**
* @return Current value (and stores incremented value).
*/
public long getAndIncrement() {
return val++;
}
}
}