blob: dc3da4c8e4a26ee3f7900e67436eab6474f16c56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
* Transaction adapter for cache transactions.
*/
public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements IgniteTxLocalEx {
/** Commit error updater. */
protected static final AtomicReferenceFieldUpdater<IgniteTxLocalAdapter, Throwable> COMMIT_ERR_UPD =
AtomicReferenceFieldUpdater.newUpdater(IgniteTxLocalAdapter.class, Throwable.class, "commitErr");
/** Done flag updater. */
protected static final AtomicIntegerFieldUpdater<IgniteTxLocalAdapter> DONE_FLAG_UPD =
AtomicIntegerFieldUpdater.newUpdater(IgniteTxLocalAdapter.class, "doneFlag");
/** Minimal version encountered (either explicit lock or XID of this transaction). */
protected GridCacheVersion minVer;
/** Flag indicating with TM commit happened. */
protected volatile int doneFlag;
/** Committed versions, relative to base. */
private Collection<GridCacheVersion> committedVers = Collections.emptyList();
/** Rolled back versions, relative to base. */
private Collection<GridCacheVersion> rolledbackVers = Collections.emptyList();
/** Base for completed versions. */
private GridCacheVersion completedBase;
/** Flag indicating that transformed values should be sent to remote nodes. */
private boolean sndTransformedVals;
/** Commit error. */
protected volatile Throwable commitErr;
/** Implicit transaction result. */
protected GridCacheReturn implicitRes;
/** Flag indicating whether deployment is enabled for caches from this transaction or not. */
private boolean depEnabled;
/** */
@GridToStringInclude
protected IgniteTxLocalState txState;
/** */
protected CacheWriteSynchronizationMode syncMode;
/** */
protected volatile boolean qryEnlisted;
/**
* @param cctx Cache registry.
* @param xidVer Transaction ID.
* @param implicit {@code True} if transaction was implicitly started by the system,
* {@code false} if it was started explicitly by user.
* @param implicitSingle {@code True} if transaction is implicit with only one key.
* @param sys System flag.
* @param plc IO policy.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Expected transaction size.
*/
protected IgniteTxLocalAdapter(
GridCacheSharedContext cctx,
GridCacheVersion xidVer,
boolean implicit,
boolean implicitSingle,
boolean sys,
byte plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
boolean invalidate,
boolean storeEnabled,
boolean onePhaseCommit,
int txSize,
@Nullable UUID subjId,
int taskNameHash
) {
super(
cctx,
xidVer,
implicit,
/*local*/true,
sys,
plc,
concurrency,
isolation,
timeout,
invalidate,
storeEnabled,
onePhaseCommit,
txSize,
subjId,
taskNameHash
);
minVer = xidVer;
txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl();
}
/**
* @return Transaction write synchronization mode.
*/
public final CacheWriteSynchronizationMode syncMode() {
if (syncMode != null)
return syncMode;
return txState().syncMode(cctx);
}
/**
* @param syncMode Write synchronization mode.
*/
public void syncMode(CacheWriteSynchronizationMode syncMode) {
this.syncMode = syncMode;
}
/** {@inheritDoc} */
@Override public IgniteTxLocalState txState() {
return txState;
}
/**
* Creates result instance.
*/
protected void initResult() {
implicitRes = new GridCacheReturn(localResult(), false);
}
/** {@inheritDoc} */
@Override public UUID eventNodeId() {
return cctx.localNodeId();
}
/** {@inheritDoc} */
@Override public UUID originatingNodeId() {
return cctx.localNodeId();
}
/** {@inheritDoc} */
@Override public boolean empty() {
return txState.empty();
}
/** {@inheritDoc} */
@Override public Collection<UUID> masterNodeIds() {
return Collections.singleton(nodeId);
}
/** {@inheritDoc} */
@Override public Throwable commitError() {
return commitErr;
}
/** {@inheritDoc} */
@Override public void commitError(Throwable e) {
COMMIT_ERR_UPD.compareAndSet(this, null, e);
}
/** {@inheritDoc} */
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
assert false;
return false;
}
/** {@inheritDoc} */
@Override public boolean activeCachesDeploymentEnabled() {
return depEnabled;
}
/** {@inheritDoc} */
@Override public void activeCachesDeploymentEnabled(boolean depEnabled) {
this.depEnabled = depEnabled;
}
/** {@inheritDoc} */
@Override public boolean isStarted() {
return txState.initialized();
}
/** {@inheritDoc} */
@Override public boolean hasWriteKey(IgniteTxKey key) {
return txState.hasWriteKey(key);
}
/**
* @return Transaction read set.
*/
@Override public Set<IgniteTxKey> readSet() {
return txState.readSet();
}
/**
* @return Transaction write set.
*/
@Override public Set<IgniteTxKey> writeSet() {
return txState.writeSet();
}
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> readMap() {
return txState.readMap();
}
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
return txState.writeMap();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> allEntries() {
return txState.allEntries();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> readEntries() {
return txState.readEntries();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> writeEntries() {
return txState.writeEntries();
}
/** {@inheritDoc} */
@Nullable @Override public IgniteTxEntry entry(IgniteTxKey key) {
return txState.entry(key);
}
/** {@inheritDoc} */
@Override public void seal() {
txState.seal();
}
/**
* @param ret Result.
*/
public void implicitSingleResult(GridCacheReturn ret) {
assert ret != null;
if (ret.invokeResult())
implicitRes.mergeEntryProcessResults(ret);
else
implicitRes = ret;
}
/**
* @return {@code True} if transaction participates in a cache that has an interceptor configured.
*/
public boolean hasInterceptor() {
return txState().hasInterceptor(cctx);
}
/**
* @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
* to remote nodes.
*/
public void sendTransformedValues(boolean snd) {
sndTransformedVals = snd;
}
/**
* @return {@code True} if should be committed after lock is acquired.
*/
protected boolean commitAfterLock() {
return implicit() && (!dht() || colocated());
}
/** {@inheritDoc} */
@Nullable @Override public GridTuple<CacheObject> peek(
GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key
) throws GridCacheFilterFailedException {
IgniteTxEntry e = entry(cacheCtx.txKey(key));
if (e != null)
return e.hasPreviousValue() ? F.t(e.previousValue()) : null;
return null;
}
/**
* Gets minimum version present in transaction.
*
* @return Minimum versions.
*/
@Override public GridCacheVersion minVersion() {
return minVer;
}
/**
* @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}.
* @throws IgniteCheckedException If prepare step failed.
*/
public void userPrepare(@Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
if (state() != PREPARING) {
if (remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
TransactionState state = state();
setRollbackOnly();
throw new IgniteCheckedException("Invalid transaction state for prepare [state=" +
state + ", tx=" + this + ']');
}
checkValid();
try {
cctx.tm().prepareTx(this, entries);
if (txState().mvccEnabled())
calculatePartitionUpdateCounters();
}
catch (IgniteCheckedException e) {
throw e;
}
catch (Throwable e) {
setRollbackOnly();
if (e instanceof Error)
throw e;
throw new IgniteCheckedException("Transaction validation produced a runtime exception: " + this, e);
}
}
/**
* Calculates partition update counters for current transaction. Each partition will be supplied with
* pair (init, delta) values, where init - initial update counter, and delta - updates count made
* by current transaction for a given partition.
*/
public void calculatePartitionUpdateCounters() throws IgniteTxRollbackCheckedException {
TxCounters counters = txCounters(false);
if (counters != null && F.isEmpty(counters.updateCounters())) {
List<PartitionUpdateCountersMessage> cntrMsgs = new ArrayList<>();
for (Map.Entry<Integer, Map<Integer, AtomicLong>> record : counters.accumulatedUpdateCounters().entrySet()) {
int cacheId = record.getKey();
Map<Integer, AtomicLong> partToCntrs = record.getValue();
assert partToCntrs != null;
if (F.isEmpty(partToCntrs))
continue;
PartitionUpdateCountersMessage msg = new PartitionUpdateCountersMessage(cacheId, partToCntrs.size());
GridCacheContext ctx0 = cctx.cacheContext(cacheId);
GridDhtPartitionTopology top = ctx0.topology();
assert top != null;
for (Map.Entry<Integer, AtomicLong> e : partToCntrs.entrySet()) {
AtomicLong acc = e.getValue();
assert acc != null;
long cntr = acc.get();
assert cntr >= 0;
if (cntr != 0) {
int p = e.getKey();
GridDhtLocalPartition part = top.localPartition(p);
// Verify primary tx mapping.
// LOST state is possible if tx is started over LOST partition.
boolean valid = part != null &&
(part.state() == OWNING || part.state() == LOST) &&
part.primary(top.readyTopologyVersion());
if (!valid) {
// Local node is no longer primary for the partition, need to rollback a transaction.
if (part != null && !part.primary(top.readyTopologyVersion())) {
log.warning("Failed to prepare a transaction on outdated topology, rolling back " +
"[tx=" + CU.txString(this) +
", readyTopVer=" + top.readyTopologyVersion() +
", lostParts=" + top.lostPartitions() +
", part=" + part.toString() + ']');
throw new IgniteTxRollbackCheckedException("Failed to prepare a transaction on outdated " +
"topology, please try again [timeout=" + timeout() + ", tx=" + CU.txString(this) + ']');
}
// Trigger error.
throw new AssertionError("Invalid primary mapping [tx=" + CU.txString(this) +
", readyTopVer=" + top.readyTopologyVersion() +
", lostParts=" + top.lostPartitions() +
", part=" + (part == null ? "NULL" : part.toString()) + ']');
}
msg.add(p, part.getAndIncrementUpdateCounter(cntr), cntr);
}
}
if (msg.size() > 0)
cntrMsgs.add(msg);
}
counters.updateCounters(cntrMsgs);
}
}
/**
* Checks that locks are in proper state for commit.
*
* @param entry Cache entry to check.
*/
private void checkCommitLocks(GridCacheEntryEx entry) {
assert ownsLockUnsafe(entry) : "Lock is not owned for commit [entry=" + entry +
", tx=" + this + ']';
}
/**
* Gets cache entry for given key.
*
* @param cacheCtx Cache context.
* @param key Key.
* @return Cache entry.
*/
protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key) {
return cacheCtx.cache().entryEx(key.key());
}
/**
* Gets cache entry for given key and topology version.
*
* @param cacheCtx Cache context.
* @param key Key.
* @param topVer Topology version.
* @return Cache entry.
*/
protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key, AffinityTopologyVersion topVer) {
return cacheCtx.cache().entryEx(key.key(), topVer);
}
/** {@inheritDoc} */
@Override public void userCommit() throws IgniteCheckedException {
TransactionState state = state();
if (state != COMMITTING) {
if (remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
setRollbackOnly();
throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
}
checkValid();
Collection<IgniteTxEntry> commitEntries = (near() || cctx.snapshot().needTxReadLogging()) ? allEntries() : writeEntries();
boolean empty = F.isEmpty(commitEntries) && !queryEnlisted();
// Register this transaction as completed prior to write-phase to
// ensure proper lock ordering for removed entries.
// We add colocated transaction to committed set even if it is empty to correctly order
// locks on backup nodes.
if (!empty || colocated())
cctx.tm().addCommittedTx(this);
if (!empty) {
batchStoreCommit(writeEntries());
WALPointer ptr = null;
IgniteCheckedException err = null;
cctx.database().checkpointReadLock();
try {
cctx.tm().txContext(this);
AffinityTopologyVersion topVer = topologyVersion();
TxCounters txCounters = txCounters(false);
/*
* Commit to cache. Note that for 'near' transaction we loop through all the entries.
*/
for (IgniteTxEntry txEntry : commitEntries) {
GridCacheContext cacheCtx = txEntry.context();
GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE;
UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId();
while (true) {
try {
GridCacheEntryEx cached = txEntry.cached();
// Must try to evict near entries before committing from
// transaction manager to make sure locks are held.
if (!evictNearEntry(txEntry, false)) {
if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
cached.markObsolete(xidVer);
break;
}
if (cached.detached())
break;
boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer);
boolean metrics = true;
if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped())
metrics = false;
boolean evt = !isNearLocallyMapped(txEntry, false);
if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
txEntry.cached().unswap(false);
IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
true, null);
GridCacheVersion dhtVer = null;
// For near local transactions we must record DHT version
// in order to keep near entries on backup nodes until
// backup remote transaction completes.
if (cacheCtx.isNear()) {
if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
dhtVer = txEntry.dhtVersion();
if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
if (expiry != null) {
txEntry.cached().unswap(false);
Duration duration = cached.hasValue() ?
expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
txEntry.ttl(CU.toTtl(duration));
}
}
}
GridCacheOperation op = res.get1();
CacheObject val = res.get2();
// Deal with conflicts.
GridCacheVersion explicitVer = txEntry.conflictVersion() != null ?
txEntry.conflictVersion() : writeVersion();
if ((op == CREATE || op == UPDATE) &&
txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
if (expiry != null) {
Duration duration = cached.hasValue() ?
expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
long ttl = CU.toTtl(duration);
txEntry.ttl(ttl);
if (ttl == CU.TTL_ZERO)
op = DELETE;
}
}
boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
GridCacheVersionConflictContext<?, ?> conflictCtx = null;
if (conflictNeedResolve) {
IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictRes =
conflictResolve(op, txEntry, val, explicitVer, cached);
assert conflictRes != null;
conflictCtx = conflictRes.get2();
if (conflictCtx.isUseOld())
op = NOOP;
else if (conflictCtx.isUseNew()) {
txEntry.ttl(conflictCtx.ttl());
txEntry.conflictExpireTime(conflictCtx.expireTime());
}
else {
assert conflictCtx.isMerge();
op = conflictRes.get1();
val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
explicitVer = writeVersion();
txEntry.ttl(conflictCtx.ttl());
txEntry.conflictExpireTime(conflictCtx.expireTime());
}
}
else
// Nullify explicit version so that innerSet/innerRemove will work as usual.
explicitVer = null;
if (sndTransformedVals || conflictNeedResolve) {
assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve;
txEntry.value(val, true, false);
txEntry.op(op);
txEntry.entryProcessors(null);
txEntry.conflictVersion(explicitVer);
}
if (dhtVer == null)
dhtVer = explicitVer != null ? explicitVer : writeVersion();
if (op == CREATE || op == UPDATE) {
assert val != null : txEntry;
GridCacheUpdateTxResult updRes = cached.innerSet(
this,
eventNodeId(),
txEntry.nodeId(),
val,
false,
false,
txEntry.ttl(),
evt,
metrics,
txEntry.keepBinary(),
txEntry.hasOldValue(),
txEntry.oldValue(),
topVer,
null,
cached.detached() ? DR_NONE : drType,
txEntry.conflictExpireTime(),
cached.isNear() ? null : explicitVer,
resolveTaskName(),
dhtVer,
null);
if (updRes.success())
txEntry.updateCounter(updRes.updateCounter());
if (updRes.loggedPointer() != null)
ptr = updRes.loggedPointer();
if (updRes.success() && updateNearCache) {
final CacheObject val0 = val;
final boolean metrics0 = metrics;
final GridCacheVersion dhtVer0 = dhtVer;
updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerSet(
null,
eventNodeId(),
nodeId,
val0,
false,
false,
txEntry.ttl(),
false,
metrics0,
txEntry.keepBinary(),
txEntry.hasOldValue(),
txEntry.oldValue(),
topVer,
CU.empty0(),
DR_NONE,
txEntry.conflictExpireTime(),
null,
resolveTaskName(),
dhtVer0,
null)
);
}
}
else if (op == DELETE) {
GridCacheUpdateTxResult updRes = cached.innerRemove(
this,
eventNodeId(),
txEntry.nodeId(),
false,
evt,
metrics,
txEntry.keepBinary(),
txEntry.hasOldValue(),
txEntry.oldValue(),
topVer,
null,
cached.detached() ? DR_NONE : drType,
cached.isNear() ? null : explicitVer,
resolveTaskName(),
dhtVer,
null);
if (updRes.success())
txEntry.updateCounter(updRes.updateCounter());
if (updRes.loggedPointer() != null)
ptr = updRes.loggedPointer();
if (updRes.success() && updateNearCache) {
final boolean metrics0 = metrics;
final GridCacheVersion dhtVer0 = dhtVer;
updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerRemove(
null,
eventNodeId(),
nodeId,
false,
false,
metrics0,
txEntry.keepBinary(),
txEntry.hasOldValue(),
txEntry.oldValue(),
topVer,
CU.empty0(),
DR_NONE,
null,
resolveTaskName(),
dhtVer0,
null)
);
}
}
else if (op == RELOAD) {
cached.innerReload();
if (updateNearCache)
updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload());
}
else if (op == READ) {
CacheGroupContext grp = cacheCtx.group();
if (grp.logDataRecords() && cctx.snapshot().needTxReadLogging()) {
ptr = grp.wal().log(new DataRecord(new DataEntry(
cacheCtx.cacheId(),
txEntry.key(),
val,
op,
nearXidVersion(),
writeVersion(),
0,
txEntry.key().partition(),
txEntry.updateCounter(),
DataEntry.flags(CU.txOnPrimary(this)))));
}
ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
if (expiry != null) {
Duration duration = expiry.getExpiryForAccess();
if (duration != null)
cached.updateTtl(null, CU.toTtl(duration));
}
if (log.isDebugEnabled())
log.debug("Ignoring READ entry when committing: " + txEntry);
}
else {
assert ownsLock(txEntry.cached()) :
"Transaction does not own lock for group lock entry during commit [tx=" +
this + ", txEntry=" + txEntry + ']';
if (conflictCtx == null || !conflictCtx.isUseOld()) {
if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
cached.updateTtl(null, txEntry.ttl());
}
if (log.isDebugEnabled())
log.debug("Ignoring NOOP entry when committing: " + txEntry);
}
}
// Check commit locks after set, to make sure that
// we are not changing obsolete entries.
// (innerSet and innerRemove will throw an exception
// if an entry is obsolete).
if (txEntry.op() != READ)
checkCommitLocks(cached);
// Break out of while loop.
break;
}
// If entry cached within transaction got removed.
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion()));
}
}
}
if (!txState.mvccEnabled() && txCounters != null) {
cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCounters.updateCounters());
for (IgniteTxEntry entry : commitEntries) {
if (entry.cqNotifyClosure() != null)
entry.cqNotifyClosure().applyx();
}
}
// Apply cache sizes only for primary nodes. Update counters were applied on prepare state.
applyTxSizes();
cctx.mvccCaching().onTxFinished(this, true);
if (ptr != null)
cctx.wal(true).flush(ptr, false);
}
catch (Throwable ex) {
// We are about to initiate transaction rollback when tx has started to committing.
// Need to remove version from committed list.
cctx.tm().removeCommittedTx(this);
if (X.hasCause(ex, NodeStoppingException.class)) {
U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) +
", err=" + ex + ']');
boolean persistenceEnabled = CU.isPersistenceEnabled(cctx.kernalContext().config());
if (persistenceEnabled) {
GridCacheDatabaseSharedManager dbManager = (GridCacheDatabaseSharedManager)cctx.database();
dbManager.getCheckpointer().skipCheckpointOnNodeStop(true);
}
throw ex;
}
err = heuristicException(ex);
COMMIT_ERR_UPD.compareAndSet(this, null, err);
state(UNKNOWN);
try {
uncommit();
}
catch (Throwable e) {
err.addSuppressed(e);
}
throw err;
}
finally {
cctx.database().checkpointReadUnlock();
cctx.tm().resetContext();
}
}
// Do not unlock transaction entries if one-phase commit.
if (!onePhaseCommit()) {
if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
// Unlock all locks.
cctx.tm().commitTx(this);
boolean needsCompletedVersions = needsCompletedVersions();
assert !needsCompletedVersions || completedBase != null;
assert !needsCompletedVersions || committedVers != null;
assert !needsCompletedVersions || rolledbackVers != null;
}
}
}
/**
* Safely performs {@code updateClojure} operation on near cache entry with given {@code entryKey}.
* In case of {@link GridCacheEntryRemovedException} operation will be retried.
*
* @param cacheCtx Cache context.
* @param entryKey Entry key.
* @param updateClojure Near entry update clojure.
* @throws IgniteCheckedException If update is failed.
*/
private void updateNearEntrySafely(
GridCacheContext cacheCtx,
KeyCacheObject entryKey,
NearEntryUpdateClojure<GridCacheEntryEx> updateClojure
) throws IgniteCheckedException {
while (true) {
GridCacheEntryEx nearCached = cacheCtx.dht().near().peekEx(entryKey);
if (nearCached == null)
break;
try {
updateClojure.apply(nearCached);
break;
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry during transaction commit (will retry): " + nearCached);
cacheCtx.dht().near().removeEntry(nearCached);
}
}
}
/**
* Commits transaction to transaction manager. Used for one-phase commit transactions only.
*
* @param commit If {@code true} commits transaction, otherwise rollbacks.
* @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} from thread map.
* @param nodeStop If {@code true} tx is cancelled on node stop.
* @throws IgniteCheckedException If failed.
*/
public void tmFinish(boolean commit, boolean nodeStop, boolean clearThreadMap) throws IgniteCheckedException {
assert onePhaseCommit();
if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
if (!nodeStop) {
// Unlock all locks.
if (commit)
cctx.tm().commitTx(this);
else
cctx.tm().rollbackTx(this, clearThreadMap, false);
}
state(commit ? COMMITTED : ROLLED_BACK);
if (commit) {
boolean needsCompletedVersions = needsCompletedVersions();
assert !needsCompletedVersions || completedBase != null : "Missing completed base for transaction: " + this;
assert !needsCompletedVersions || committedVers != null : "Missing committed versions for transaction: " + this;
assert !needsCompletedVersions || rolledbackVers != null : "Missing rolledback versions for transaction: " + this;
}
cctx.mvccCaching().onTxFinished(this, commit);
}
}
/** {@inheritDoc} */
@Override public void completedVersions(
GridCacheVersion completedBase,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers) {
this.completedBase = completedBase;
this.committedVers = committedVers;
this.rolledbackVers = rolledbackVers;
}
/**
* @return Completed base for ordering.
*/
public GridCacheVersion completedBase() {
return completedBase;
}
/**
* @return Committed versions.
*/
public Collection<GridCacheVersion> committedVersions() {
return committedVers;
}
/**
* @return Rolledback versions.
*/
public Collection<GridCacheVersion> rolledbackVersions() {
return rolledbackVers;
}
/** {@inheritDoc} */
@Override public void userRollback(boolean clearThreadMap) throws IgniteCheckedException {
TransactionState state = state();
if (state != ROLLING_BACK && state != ROLLED_BACK) {
setRollbackOnly();
throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state +
", tx=" + this + ']');
}
if (near()) {
// Must evict near entries before rolling back from
// transaction manager, so they will be removed from cache.
for (IgniteTxEntry e : allEntries())
evictNearEntry(e, false);
}
if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
cctx.tm().rollbackTx(this, clearThreadMap, skipCompletedVersions());
cctx.mvccCaching().onTxFinished(this, false);
if (!internal()) {
Collection<CacheStoreManager> stores = txState.stores(cctx);
if (stores != null && !stores.isEmpty()) {
assert isWriteToStoreFromDhtValid(stores) :
"isWriteToStoreFromDht can't be different within one transaction";
boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht();
if (!stores.isEmpty() && (near() || isWriteToStoreFromDht))
sessionEnd(stores, false);
}
}
}
}
/**
* @param ctx Cache context.
* @param key Key.
* @param expiryPlc Expiry policy.
* @return Expiry policy wrapper for entries accessed locally in optimistic transaction.
*/
protected IgniteCacheExpiryPolicy accessPolicy(
GridCacheContext ctx,
IgniteTxKey key,
@Nullable ExpiryPolicy expiryPlc
) {
return null;
}
/**
* @param cacheCtx Cache context.
* @param keys Keys.
* @return Expiry policy.
*/
protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
return null;
}
/**
* Post lock processing for put or remove.
*
* @param cacheCtx Context.
* @param keys Keys.
* @param ret Return value.
* @param rmv {@code True} if remove.
* @param retval Flag to return value or not.
* @param read {@code True} if read.
* @param accessTtl TTL for read operation.
* @param filter Filter to check entries.
* @param computeInvoke If {@code true} computes return value for invoke operation.
* @throws IgniteCheckedException If error.
*/
protected final void postLockWrite(
GridCacheContext cacheCtx,
Iterable<KeyCacheObject> keys,
GridCacheReturn ret,
boolean rmv,
boolean retval,
boolean read,
long accessTtl,
CacheEntryPredicate[] filter,
boolean computeInvoke
) throws IgniteCheckedException {
for (KeyCacheObject k : keys) {
IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
if (txEntry == null)
throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
"operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
while (true) {
GridCacheEntryEx cached = txEntry.cached();
try {
assert cached.detached() || cached.lockedLocally(xidVersion()) || isRollbackOnly() :
"Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
if (log.isDebugEnabled())
log.debug("Post lock write entry: " + cached);
CacheObject v = txEntry.previousValue();
boolean hasPrevVal = txEntry.hasPreviousValue();
if (onePhaseCommit())
filter = txEntry.filters();
// If we have user-passed filter, we must read value into entry for peek().
if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
retval = true;
boolean invoke = txEntry.op() == TRANSFORM;
if (retval || invoke) {
if (!cacheCtx.isNear()) {
if (!hasPrevVal) {
// For non-local cache should read from store after lock on primary.
boolean readThrough = cacheCtx.isLocal() &&
(invoke || cacheCtx.loadPreviousValue()) &&
!txEntry.skipStore();
v = cached.innerGet(
null,
this,
readThrough,
/*metrics*/!invoke,
/*event*/!invoke && !dht(),
null,
resolveTaskName(),
null,
txEntry.keepBinary());
}
}
else {
if (!hasPrevVal)
v = cached.rawGet();
}
if (txEntry.op() == TRANSFORM) {
if (computeInvoke) {
txEntry.readValue(v);
GridCacheVersion ver;
try {
ver = cached.version();
}
catch (GridCacheEntryRemovedException e) {
assert optimistic() : txEntry;
if (log.isDebugEnabled())
log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
ver = null;
}
addInvokeResult(txEntry, v, ret, ver);
}
}
else {
ret.value(
cacheCtx,
v,
txEntry.keepBinary(),
U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)
);
}
}
boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
// For remove operation we return true only if we are removing s/t,
// i.e. cached value is not null.
ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
if (onePhaseCommit())
txEntry.filtersPassed(pass);
boolean updateTtl = read;
if (pass) {
txEntry.markValid();
if (log.isDebugEnabled())
log.debug("Filter passed in post lock for key: " + k);
}
else {
// Revert operation to previous. (if no - NOOP, so entry will be unlocked).
txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
txEntry.filters(CU.empty0());
txEntry.filtersSet(false);
updateTtl = !cacheCtx.putIfAbsentFilter(filter);
}
if (updateTtl) {
if (!read) {
ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry);
if (expiryPlc != null)
txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
}
else
txEntry.ttl(accessTtl);
}
break; // While.
}
// If entry cached within transaction got removed before lock.
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion()));
}
}
}
}
/**
* @param txEntry Entry.
* @param cacheVal Value.
* @param ret Return value to update.
* @param ver Entry version.
*/
protected final void addInvokeResult(IgniteTxEntry txEntry,
CacheObject cacheVal,
GridCacheReturn ret,
GridCacheVersion ver
) {
GridCacheContext ctx = txEntry.context();
Object key0 = null;
Object val0 = null;
IgniteThread.onEntryProcessorEntered(true);
if (cctx.kernalContext().deploy().enabled() && deploymentLdrId != null)
U.restoreDeploymentContext(cctx.kernalContext(), deploymentLdrId);
try {
Object res = null;
for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), key0, cacheVal,
val0, ver, txEntry.keepBinary(), txEntry.cached());
EntryProcessor<Object, Object, ?> entryProcessor = t.get1();
res = entryProcessor.process(invokeEntry, t.get2());
val0 = invokeEntry.getValue(txEntry.keepBinary());
key0 = invokeEntry.key();
}
if (val0 != null) // no validation for remove case
ctx.validateKeyAndValue(txEntry.key(), ctx.toCacheObject(val0));
if (res != null)
ret.addEntryProcessResult(ctx, txEntry.key(), key0, res, null, txEntry.keepBinary());
}
catch (Exception e) {
ret.addEntryProcessResult(ctx, txEntry.key(), key0, null, e, txEntry.keepBinary());
}
finally {
IgniteThread.onEntryProcessorLeft();
}
}
/**
* Initializes read map.
*
* @return {@code True} if transaction was successfully started.
*/
public boolean init() {
return !txState.init(txSize) || cctx.tm().onStarted(this);
}
/** {@inheritDoc} */
@Override public final void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException {
txState.addActiveCache(cacheCtx, recovery, this);
}
/**
* Checks transaction expiration.
*
* @throws IgniteCheckedException If transaction check failed.
*/
protected void checkValid() throws IgniteCheckedException {
checkValid(true);
}
/**
* Checks transaction expiration.
*
* @param checkTimeout Whether timeout should be checked.
* @throws IgniteCheckedException If transaction check failed.
*/
protected void checkValid(boolean checkTimeout) throws IgniteCheckedException {
if (local() && !dht() && remainingTime() == -1 && checkTimeout)
state(MARKED_ROLLBACK, true);
if (isRollbackOnly()) {
if (remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Cache transaction timed out: " + CU.txString(this));
TransactionState state = state();
if (state == ROLLING_BACK || state == ROLLED_BACK)
throw new IgniteTxRollbackCheckedException("Cache transaction is marked as rollback-only " +
"(will be rolled back automatically): " + this);
if (state == UNKNOWN)
throw new IgniteTxHeuristicCheckedException("Cache transaction is in unknown state " +
"(remote transactions will be invalidated): " + this);
throw rollbackException();
}
}
/** {@inheritDoc} */
@Override public Collection<GridCacheVersion> alternateVersions() {
return Collections.emptyList();
}
/**
* @param op Cache operation.
* @param val Value.
* @param expiryPlc Explicitly specified expiry policy.
* @param invokeArgs Optional arguments for EntryProcessor.
* @param entryProcessor Entry processor.
* @param entry Cache entry.
* @param filter Filter.
* @param filtersSet {@code True} if filter should be marked as set.
* @param drTtl DR TTL (if any).
* @param drExpireTime DR expire time (if any).
* @param drVer DR version.
* @param skipStore Skip store flag.
* @return Transaction entry.
*/
public final IgniteTxEntry addEntry(GridCacheOperation op,
@Nullable CacheObject val,
@Nullable EntryProcessor entryProcessor,
Object[] invokeArgs,
GridCacheEntryEx entry,
@Nullable ExpiryPolicy expiryPlc,
CacheEntryPredicate[] filter,
boolean filtersSet,
long drTtl,
long drExpireTime,
@Nullable GridCacheVersion drVer,
boolean skipStore,
boolean keepBinary,
boolean addReader
) {
assert invokeArgs == null || op == TRANSFORM;
IgniteTxKey key = entry.txKey();
checkInternal(key);
IgniteTxEntry old = entry(key);
// Keep old filter if already have one (empty filter is always overridden).
if (!filtersSet || !F.isEmptyOrNulls(filter)) {
// Replace filter if previous filter failed.
if (old != null && old.filtersSet())
filter = old.filters();
}
IgniteTxEntry txEntry;
if (old != null) {
if (entryProcessor != null) {
assert val == null;
assert op == TRANSFORM;
// Will change the op.
old.addEntryProcessor(entryProcessor, invokeArgs);
}
else {
assert old.op() != TRANSFORM;
old.op(op);
old.value(val, op == CREATE || op == UPDATE || op == DELETE, op == READ);
}
// Keep old ttl value.
old.cached(entry);
old.filters(filter);
// Keep old skipStore and keepBinary flags.
old.skipStore(skipStore);
old.keepBinary(keepBinary);
// Update ttl if specified.
if (drTtl >= 0L) {
assert drExpireTime >= 0L;
entryTtlDr(key, drTtl, drExpireTime);
}
else
entryExpiry(key, expiryPlc);
txEntry = old;
if (log.isDebugEnabled())
log.debug("Updated transaction entry: " + txEntry);
}
else {
boolean hasDrTtl = drTtl >= 0;
txEntry = new IgniteTxEntry(entry.context(),
this,
op,
val,
EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), entryProcessor),
invokeArgs,
hasDrTtl ? drTtl : -1L,
entry,
filter,
drVer,
skipStore,
keepBinary,
addReader);
txEntry.conflictExpireTime(drExpireTime);
if (!hasDrTtl)
txEntry.expiry(expiryPlc);
txState.addEntry(txEntry);
if (log.isDebugEnabled())
log.debug("Created transaction entry: " + txEntry);
}
txEntry.filtersSet(filtersSet);
while (true) {
try {
updateExplicitVersion(txEntry, entry);
return txEntry;
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry in transaction newEntry method (will retry): " + entry);
entry = entryEx(entry.context(), txEntry.txKey(), topologyVersion());
txEntry.cached(entry);
}
}
}
/**
* Updates explicit version for tx entry based on current entry lock owner.
*
* @param txEntry Tx entry to update.
* @param entry Entry.
* @throws GridCacheEntryRemovedException If entry was concurrently removed.
*/
protected void updateExplicitVersion(IgniteTxEntry txEntry, GridCacheEntryEx entry)
throws GridCacheEntryRemovedException {
if (!entry.context().isDht()) {
// All put operations must wait for async locks to complete,
// so it is safe to get acquired locks.
GridCacheMvccCandidate explicitCand = entry.localOwner();
if (explicitCand == null)
explicitCand = cctx.mvcc().explicitLock(threadId(), entry.txKey());
if (explicitCand != null) {
GridCacheVersion explicitVer = explicitCand.version();
boolean locCand = false;
if (explicitCand.nearLocal() || explicitCand.local())
locCand = cctx.localNodeId().equals(explicitCand.nodeId());
else if (explicitCand.dhtLocal())
locCand = cctx.localNodeId().equals(explicitCand.otherNodeId());
if (!explicitVer.equals(xidVer) && explicitCand.isHeldByThread(threadId) && !explicitCand.tx() && locCand) {
txEntry.explicitVersion(explicitVer);
if (explicitVer.isLess(minVer))
minVer = explicitVer;
}
}
}
}
/**
* @return Map of affected partitions: cacheId -> partId.
*/
public Map<Integer, Set<Integer>> partsMap() {
return null;
}
/** {@inheritDoc} */
@Override public void touchPartition(int cacheId, int partId) {
txState.touchPartition(cacheId, partId);
}
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(IgniteTxLocalAdapter.class, this, "super", super.toString(),
"size", allEntries().size());
}
/**
* @param key Key.
* @param expiryPlc Expiry policy.
*/
void entryExpiry(IgniteTxKey key, @Nullable ExpiryPolicy expiryPlc) {
assert key != null;
IgniteTxEntry e = entry(key);
if (e != null) {
e.expiry(expiryPlc);
e.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
}
}
/**
* @param key Key.
* @param ttl TTL.
* @param expireTime Expire time.
* @return {@code true} if tx entry exists for this key, {@code false} otherwise.
*/
boolean entryTtlDr(IgniteTxKey key, long ttl, long expireTime) {
assert key != null;
assert ttl >= 0;
IgniteTxEntry e = entry(key);
if (e != null) {
e.ttl(ttl);
e.conflictExpireTime(expireTime);
e.expiry(null);
}
return e != null;
}
/**
* @param key Key.
* @return Tx entry time to live.
*/
public long entryTtl(IgniteTxKey key) {
assert key != null;
IgniteTxEntry e = entry(key);
return e != null ? e.ttl() : 0;
}
/**
* @param key Key.
* @return Tx entry expire time.
*/
public long entryExpireTime(IgniteTxKey key) {
assert key != null;
IgniteTxEntry e = entry(key);
if (e != null) {
long ttl = e.ttl();
assert ttl != -1;
if (ttl > 0) {
long expireTime = U.currentTimeMillis() + ttl;
if (expireTime > 0)
return expireTime;
}
}
return 0;
}
/**
* @return {@code True} if there are entries, enlisted by query.
*/
public boolean queryEnlisted() {
return qryEnlisted;
}
/**
* Marks that there are entries, enlisted by query.
*/
public void markQueryEnlisted() {
assert mvccSnapshot != null && txState.mvccEnabled();
if (!qryEnlisted) {
qryEnlisted = true;
if (!cctx.localNode().isClient())
cctx.coordinators().registerLocalTransaction(mvccSnapshot.coordinatorVersion(), mvccSnapshot.counter());
}
}
/**
* Post-lock closure alias.
*
* @param <T> Return type.
*/
protected abstract class PLC1<T> extends PostLockClosure1<T> {
/** */
private static final long serialVersionUID = 0L;
/**
* @param arg Argument.
*/
protected PLC1(T arg) {
super(arg);
}
/**
* @param arg Argument.
* @param commit Commit flag.
*/
protected PLC1(T arg, boolean commit) {
super(arg, commit);
}
}
/**
* Post-lock closure alias.
*
* @param <T> Return type.
*/
protected abstract class PLC2<T> extends PostLockClosure2<T> {
/** */
private static final long serialVersionUID = 0L;
// No-op.
}
/**
* Post-lock closure alias.
*
* @param <T> Return type.
*/
protected abstract class PMC<T> extends PostMissClosure<T> {
/** */
private static final long serialVersionUID = 0L;
// No-op.
}
/**
* Post-lock closure.
*
* @param <T> Return type.
*/
protected abstract class PostLockClosure1<T> implements IgniteBiClosure<Boolean, Exception, IgniteInternalFuture<T>> {
/** */
private static final long serialVersionUID = 0L;
/** Closure argument. */
private T arg;
/** Commit flag. */
private boolean commit;
/**
* Creates a Post-Lock closure that will pass the argument given to the {@code postLock} method.
*
* @param arg Argument for {@code postLock}.
*/
protected PostLockClosure1(T arg) {
this(arg, true);
}
/**
* Creates a Post-Lock closure that will pass the argument given to the {@code postLock} method.
*
* @param arg Argument for {@code postLock}.
* @param commit Flag indicating whether commit should be done after postLock.
*/
protected PostLockClosure1(T arg, boolean commit) {
this.arg = arg;
this.commit = commit;
}
/** {@inheritDoc} */
@Override public final IgniteInternalFuture<T> apply(Boolean locked, @Nullable final Exception e) {
TransactionDeadlockException deadlockErr = X.cause(e, TransactionDeadlockException.class);
if (e != null && deadlockErr == null) {
setRollbackOnly();
if (commit && commitAfterLock())
return rollbackAsync().chain(new C1<IgniteInternalFuture<IgniteInternalTx>, T>() {
@Override public T apply(IgniteInternalFuture<IgniteInternalTx> f) {
throw new GridClosureException(e);
}
});
throw new GridClosureException(e);
}
if (deadlockErr != null || !locked) {
setRollbackOnly();
final GridClosureException ex = new GridClosureException(
new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " +
"for transaction [timeout=" + timeout() + ", tx=" + CU.txString(IgniteTxLocalAdapter.this) +
']', deadlockErr)
);
if (commit && commitAfterLock())
return rollbackAsync().chain(new C1<IgniteInternalFuture<IgniteInternalTx>, T>() {
@Override public T apply(IgniteInternalFuture<IgniteInternalTx> f) {
throw ex;
}
});
throw ex;
}
boolean rollback = true;
try {
final T r = postLock(arg);
// Commit implicit transactions.
if (commit && commitAfterLock()) {
rollback = false;
return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, T>() {
@Override public T applyx(IgniteInternalFuture<IgniteInternalTx> f) throws IgniteCheckedException {
f.get();
return r;
}
});
}
rollback = false;
return new GridFinishedFuture<>(r);
}
catch (final IgniteCheckedException ex) {
if (commit && commitAfterLock())
return rollbackAsync().chain(new C1<IgniteInternalFuture<IgniteInternalTx>, T>() {
@Override public T apply(IgniteInternalFuture<IgniteInternalTx> f) {
throw new GridClosureException(ex);
}
});
throw new GridClosureException(ex);
}
finally {
if (rollback)
setRollbackOnly();
}
}
/**
* Post lock callback.
*
* @param val Argument.
* @return Future return value.
* @throws IgniteCheckedException If operation failed.
*/
protected abstract T postLock(T val) throws IgniteCheckedException;
}
/**
* Post-lock closure.
*
* @param <T> Return type.
*/
protected abstract class PostLockClosure2<T> implements IgniteBiClosure<Boolean, Exception, IgniteInternalFuture<T>> {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public final IgniteInternalFuture<T> apply(Boolean locked, @Nullable Exception e) {
boolean rollback = true;
try {
if (e != null)
throw new GridClosureException(e);
if (!locked)
throw new GridClosureException(new IgniteTxTimeoutCheckedException("Failed to acquire lock " +
"within provided timeout for transaction [timeout=" + timeout() +
", tx=" + CU.txString(IgniteTxLocalAdapter.this) + ']'));
IgniteInternalFuture<T> fut = postLock();
rollback = false;
return fut;
}
catch (IgniteCheckedException ex) {
throw new GridClosureException(ex);
}
finally {
if (rollback)
setRollbackOnly();
}
}
/**
* Post lock callback.
*
* @return Future return value.
* @throws IgniteCheckedException If operation failed.
*/
protected abstract IgniteInternalFuture<T> postLock() throws IgniteCheckedException;
}
/**
* Post-lock closure.
*
* @param <T> Return type.
*/
protected abstract class PostMissClosure<T> implements IgniteBiClosure<T, Exception, IgniteInternalFuture<T>> {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public final IgniteInternalFuture<T> apply(T t, Exception e) {
boolean rollback = true;
try {
if (e != null)
throw new GridClosureException(e);
IgniteInternalFuture<T> fut = postMiss(t);
rollback = false;
return fut;
}
catch (IgniteCheckedException ex) {
throw new GridClosureException(ex);
}
finally {
if (rollback)
setRollbackOnly();
}
}
/**
* Post lock callback.
*
* @param t Post-miss parameter.
* @return Future return value.
* @throws IgniteCheckedException If operation failed.
*/
protected abstract IgniteInternalFuture<T> postMiss(T t) throws IgniteCheckedException;
}
/**
* Clojure to perform operations with near cache entries.
*/
@FunctionalInterface
private interface NearEntryUpdateClojure<E extends GridCacheEntryEx> {
/**
* Apply clojure to given {@code entry}.
*
* @param entry Entry.
* @throws IgniteCheckedException If operation is failed.
* @throws GridCacheEntryRemovedException If entry is removed.
*/
public void apply(E entry) throws IgniteCheckedException, GridCacheEntryRemovedException;
}
}