blob: 11b2c040a622a7849d6b8b486580f2afd546d199 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.TransactionStateChangedEvent;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.discovery.ConsistentIdMapper;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheLazyEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridSetWrapper;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
import static org.apache.ignite.events.EventType.EVT_TX_COMMITTED;
import static org.apache.ignite.events.EventType.EVT_TX_RESUMED;
import static org.apache.ignite.events.EventType.EVT_TX_ROLLED_BACK;
import static org.apache.ignite.events.EventType.EVT_TX_SUSPENDED;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
/**
* Managed transaction adapter.
*/
public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implements IgniteInternalTx {
/** Static logger to avoid re-creation. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Finalizing status updater. */
private static final AtomicReferenceFieldUpdater<IgniteTxAdapter, FinalizationStatus> FINALIZING_UPD =
AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, FinalizationStatus.class, "finalizing");
/** */
private static final AtomicReferenceFieldUpdater<IgniteTxAdapter, TxCounters> TX_COUNTERS_UPD =
AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, TxCounters.class, "txCounters");
/** Logger. */
protected static IgniteLogger log;
/** Transaction ID. */
@GridToStringInclude
protected GridCacheVersion xidVer;
/** Entries write version. */
@GridToStringInclude
protected GridCacheVersion writeVer;
/** Implicit flag. */
@GridToStringInclude
protected boolean implicit;
/** Local flag. */
@GridToStringInclude
protected boolean loc;
/** Thread ID. */
@GridToStringInclude
protected long threadId;
/** Transaction start time. */
@GridToStringInclude
protected long startTime = U.currentTimeMillis();
/** Transaction start time in nanoseconds to measure duration. */
protected long startTimeNanos;
/** Node ID. */
@GridToStringInclude
protected UUID nodeId;
/** Cache registry. */
@GridToStringExclude
protected GridCacheSharedContext<?, ?> cctx;
/** Need return value. */
protected boolean needRetVal;
/** Isolation. */
@GridToStringInclude
protected TransactionIsolation isolation = READ_COMMITTED;
/** Concurrency. */
@GridToStringInclude
protected TransactionConcurrency concurrency = PESSIMISTIC;
/** Transaction timeout. */
@GridToStringInclude
protected long timeout;
/** Deployment class loader id which will be used for deserialization of entries on a distributed task. */
@GridToStringExclude
protected IgniteUuid deploymentLdrId;
/** Invalidate flag. */
protected volatile boolean invalidate;
/** Invalidation flag for system invalidations (not user-based ones). */
private boolean sysInvalidate;
/** Internal flag. */
protected boolean internal;
/** System transaction flag. */
private boolean sys;
/** IO policy. */
private byte plc;
/** One phase commit flag. */
protected boolean onePhaseCommit;
/** Commit version. */
private volatile GridCacheVersion commitVer;
/** Finalizing status. */
private volatile FinalizationStatus finalizing = FinalizationStatus.NONE;
/** Done marker. */
protected volatile boolean isDone;
/** */
@GridToStringInclude
private Map<Integer, Set<Integer>> invalidParts;
/**
* Transaction state. Note that state is not protected, as we want to
* always use {@link #state()} and {@link #state(TransactionState)}
* methods.
*/
@GridToStringInclude
private volatile TransactionState state = ACTIVE;
/** Timed out flag. */
private volatile boolean timedOut;
/** */
protected int txSize;
/** */
@GridToStringExclude
private volatile GridFutureAdapter<IgniteInternalTx> finFut;
/** Topology version. */
@GridToStringInclude
protected volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
/** */
protected Map<UUID, Collection<UUID>> txNodes;
/** Subject ID initiated this transaction. */
protected UUID subjId;
/** Task name hash code. */
protected int taskNameHash;
/** Task name. */
protected final String taskName;
/** Store used flag. */
protected boolean storeEnabled = true;
/** UUID to consistent id mapper. */
protected ConsistentIdMapper consistentIdMapper;
/** Mvcc tx update snapshot. */
@GridToStringInclude
protected volatile MvccSnapshot mvccSnapshot;
/** {@code True} if tx should skip adding itself to completed version map on finish. */
private boolean skipCompletedVers;
/** Rollback finish future. */
@GridToStringExclude
private volatile IgniteInternalFuture rollbackFut;
/** */
@SuppressWarnings("unused")
@GridToStringExclude
private volatile TxCounters txCounters;
/** Transaction from which this transaction was copied by(if it was). */
private GridNearTxLocal parentTx;
/**
* @param cctx Cache registry.
* @param xidVer Transaction ID.
* @param implicit Implicit flag.
* @param loc Local flag.
* @param sys System transaction flag.
* @param plc IO policy.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Transaction size.
*/
protected IgniteTxAdapter(
GridCacheSharedContext<?, ?> cctx,
GridCacheVersion xidVer,
boolean implicit,
boolean loc,
boolean sys,
byte plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
boolean invalidate,
boolean storeEnabled,
boolean onePhaseCommit,
int txSize,
@Nullable UUID subjId,
int taskNameHash
) {
assert xidVer != null;
assert cctx != null;
this.cctx = cctx;
this.xidVer = xidVer;
this.implicit = implicit;
this.loc = loc;
this.sys = sys;
this.plc = plc;
this.concurrency = concurrency;
this.isolation = isolation;
this.timeout = timeout;
this.invalidate = invalidate;
this.storeEnabled = storeEnabled;
this.onePhaseCommit = onePhaseCommit;
this.txSize = txSize;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
this.deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext());
nodeId = cctx.discovery().localNode().id();
threadId = Thread.currentThread().getId();
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, this);
consistentIdMapper = new ConsistentIdMapper(cctx.discovery());
boolean needTaskName = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ) ||
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_PUT) ||
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_REMOVED);
taskName = needTaskName ? cctx.kernalContext().task().resolveTaskName(taskNameHash) : null;
if (cctx.kernalContext().performanceStatistics().enabled())
startTimeNanos = System.nanoTime();
}
/**
* @param cctx Cache registry.
* @param nodeId Node ID.
* @param xidVer Transaction ID.
* @param startVer Start version mark.
* @param threadId Thread ID.
* @param sys System transaction flag.
* @param plc IO policy.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Transaction size.
*/
protected IgniteTxAdapter(
GridCacheSharedContext<?, ?> cctx,
UUID nodeId,
GridCacheVersion xidVer,
GridCacheVersion startVer,
long threadId,
boolean sys,
byte plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
int txSize,
@Nullable UUID subjId,
int taskNameHash
) {
this.cctx = cctx;
this.nodeId = nodeId;
this.threadId = threadId;
this.xidVer = xidVer;
this.sys = sys;
this.plc = plc;
this.concurrency = concurrency;
this.isolation = isolation;
this.timeout = timeout;
this.txSize = txSize;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
implicit = false;
loc = false;
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, this);
consistentIdMapper = new ConsistentIdMapper(cctx.discovery());
boolean needTaskName = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ) ||
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_PUT) ||
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_REMOVED);
taskName = needTaskName ? cctx.kernalContext().task().resolveTaskName(taskNameHash) : null;
if (cctx.kernalContext().performanceStatistics().enabled())
startTimeNanos = System.nanoTime();
}
/**
* @param parentTx Transaction from which this transaction was copied by.
*/
public void setParentTx(GridNearTxLocal parentTx) {
this.parentTx = parentTx;
}
/**
* @return Mvcc info.
*/
@Override @Nullable public MvccSnapshot mvccSnapshot() {
return mvccSnapshot;
}
/** {@inheritDoc} */
@Override public void mvccSnapshot(MvccSnapshot mvccSnapshot) {
this.mvccSnapshot = mvccSnapshot;
}
/**
* @return {@code True} if tx should skip adding itself to completed version map on finish.
*/
public boolean skipCompletedVersions() {
return skipCompletedVers;
}
/**
* @param skipCompletedVers {@code True} if tx should skip adding itself to completed version map on finish.
*/
public void skipCompletedVersions(boolean skipCompletedVers) {
this.skipCompletedVers = skipCompletedVers;
}
/**
* @return Shared cache context.
*/
public GridCacheSharedContext<?, ?> context() {
return cctx;
}
/** {@inheritDoc} */
@Override public boolean localResult() {
assert originatingNodeId() != null;
return cctx.localNodeId().equals(originatingNodeId());
}
/**
* Checks whether near cache should be updated.
*
* @return Flag indicating whether near cache should be updated.
*/
protected boolean updateNearCache(
GridCacheContext<?, ?> cacheCtx,
KeyCacheObject key,
AffinityTopologyVersion topVer
) {
return false;
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> optimisticLockEntries() {
if (serializable() && optimistic())
return F.concat(false, writeEntries(), readEntries());
return writeEntries();
}
/** {@inheritDoc} */
@Override public boolean storeEnabled() {
return storeEnabled;
}
/**
* @param storeEnabled Store enabled flag.
*/
public void storeEnabled(boolean storeEnabled) {
this.storeEnabled = storeEnabled;
}
/** {@inheritDoc} */
@Override public boolean system() {
return sys;
}
/** {@inheritDoc} */
@Override public byte ioPolicy() {
return plc;
}
/** {@inheritDoc} */
@Override public boolean storeWriteThrough() {
return storeEnabled() && txState().storeWriteThrough(cctx);
}
/**
* Uncommits transaction by invalidating all of its entries. Courtesy to minimize inconsistency.
*/
protected void uncommit() {
for (IgniteTxEntry e : writeMap().values()) {
try {
GridCacheEntryEx entry = e.cached();
if (e.op() != NOOP)
entry.invalidate(xidVer);
}
catch (Throwable t) {
U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t);
if (t instanceof Error)
throw (Error)t;
break;
}
}
cctx.tm().uncommitTx(this);
}
/** {@inheritDoc} */
@Override public UUID otherNodeId() {
return null;
}
/** {@inheritDoc} */
@Override public UUID subjectId() {
return subjId;
}
/** {@inheritDoc} */
@Override public int taskNameHash() {
return taskNameHash;
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
AffinityTopologyVersion res = topVer;
if (res == null || res.equals(AffinityTopologyVersion.NONE)) {
if (system()) {
AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this);
if (topVer != null)
return topVer;
}
return cctx.exchange().readyAffinityVersion();
}
return res;
}
/** {@inheritDoc} */
@Override public final AffinityTopologyVersion topologyVersionSnapshot() {
AffinityTopologyVersion ret = topVer;
return AffinityTopologyVersion.NONE.equals(ret) ? null : ret;
}
/** {@inheritDoc} */
@Override public final AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
AffinityTopologyVersion topVer0 = this.topVer;
if (!AffinityTopologyVersion.NONE.equals(topVer0))
return topVer0;
synchronized (this) {
topVer0 = this.topVer;
if (AffinityTopologyVersion.NONE.equals(topVer0)) {
this.topVer = topVer;
return topVer;
}
return topVer0;
}
}
/**
* @return {@code True} if marked.
*/
@Override public final boolean markFinalizing(FinalizationStatus status) {
boolean res;
switch (status) {
case USER_FINISH:
res = FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.USER_FINISH);
break;
case RECOVERY_FINISH:
FinalizationStatus old = finalizing;
res = old != FinalizationStatus.USER_FINISH && FINALIZING_UPD.compareAndSet(this, old, status);
break;
default:
throw new IllegalArgumentException("Cannot set finalization status: " + status);
}
if (res) {
if (log.isDebugEnabled())
log.debug("Marked transaction as finalized: " + this);
}
else {
if (log.isDebugEnabled())
log.debug("Transaction was not marked finalized: " + this);
}
return res;
}
/**
* @return Finalization status.
*/
@Override @Nullable public FinalizationStatus finalizationStatus() {
return finalizing;
}
/**
* @return {@code True} if transaction has at least one key enlisted.
*/
public abstract boolean isStarted();
/** {@inheritDoc} */
@Override public int size() {
return txSize;
}
/**
* @return Logger.
*/
protected IgniteLogger log() {
return log;
}
/**
* @return True if transaction reflects changes in primary -> backup direction.
*/
public boolean remote() {
return false;
}
/** {@inheritDoc} */
@Override public boolean near() {
return false;
}
/** {@inheritDoc} */
@Override public boolean implicit() {
return implicit;
}
/** {@inheritDoc} */
@Override public boolean implicitSingle() {
return txState().implicitSingle();
}
/** {@inheritDoc} */
@Override public boolean local() {
return loc;
}
/** {@inheritDoc} */
@Override public final boolean user() {
return !implicit() && local() && !dht() && !internal();
}
/** {@inheritDoc} */
@Override public boolean dht() {
return false;
}
/** {@inheritDoc} */
@Override public boolean colocated() {
return false;
}
/** {@inheritDoc} */
@Override public IgniteUuid xid() {
return xidVer.asIgniteUuid();
}
/** {@inheritDoc} */
@Override public Map<Integer, Set<Integer>> invalidPartitions() {
return invalidParts == null ? Collections.<Integer, Set<Integer>>emptyMap() : invalidParts;
}
/** {@inheritDoc} */
@Override public void addInvalidPartition(int cacheId, int part) {
if (invalidParts == null)
invalidParts = new HashMap<>();
Set<Integer> parts = invalidParts.get(cacheId);
if (parts == null) {
parts = new HashSet<>();
invalidParts.put(cacheId, parts);
}
parts.add(part);
if (log.isDebugEnabled())
log.debug("Added invalid partition for transaction [cacheId=" + cacheId + ", part=" + part +
", tx=" + this + ']');
}
/** {@inheritDoc} */
@Override public GridCacheVersion ownedVersion(IgniteTxKey key) {
return null;
}
/** {@inheritDoc} */
@Override public long startTime() {
return startTime;
}
/** {@inheritDoc} */
@Override public long startTimeNanos() {
return startTimeNanos;
}
/**
* @return Flag indicating whether transaction needs return value.
*/
public boolean needReturnValue() {
return needRetVal;
}
/**
* @param needRetVal Need return value flag.
*/
public void needReturnValue(boolean needRetVal) {
this.needRetVal = needRetVal;
}
/**
* @return Rollback future.
*/
public IgniteInternalFuture rollbackFuture() {
return rollbackFut;
}
/**
* @param fut Rollback future.
*/
public void rollbackFuture(IgniteInternalFuture fut) {
rollbackFut = fut;
}
/**
* Gets remaining allowed transaction time.
*
* @return Remaining transaction time. {@code 0} if timeout isn't specified. {@code -1} if time is out.
*/
@Override public long remainingTime() {
if (timeout() <= 0)
return 0;
long timeLeft = timeout() - (U.currentTimeMillis() - startTime());
return timeLeft <= 0 ? -1 : timeLeft;
}
/**
* @return Transaction timeout exception.
*/
public final IgniteCheckedException timeoutException() {
return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " +
"for transaction [timeout=" + timeout() + ", tx=" + CU.txString(this) + ']');
}
/**
* @return Rollback exception.
*/
public final IgniteCheckedException rollbackException() {
return new IgniteTxRollbackCheckedException("Failed to finish transaction because it has been rolled back " +
"[timeout=" + timeout() + ", tx=" + CU.txString(this) + ']');
}
/**
* @param ex Root cause.
*/
public final IgniteCheckedException heuristicException(Throwable ex) {
return new IgniteTxHeuristicCheckedException("Committing a transaction has produced runtime exception", ex);
}
/**
* @param log Log.
* @param commit Commit.
* @param e Exception.
*/
public void logTxFinishErrorSafe(@Nullable IgniteLogger log, boolean commit, Throwable e) {
assert e != null : "Exception is expected";
final String fmt = "Failed completing the transaction: [commit=%s, tx=%s]";
try {
// First try printing a full transaction. This is error prone.
U.error(log, String.format(fmt, commit, this), e);
}
catch (Throwable e0) {
e.addSuppressed(e0);
U.error(log, String.format(fmt, commit, CU.txString(this)), e);
}
}
/** {@inheritDoc} */
@Override public GridCacheVersion xidVersion() {
return xidVer;
}
/** {@inheritDoc} */
@Override public long threadId() {
return threadId;
}
/** {@inheritDoc} */
@Override public UUID nodeId() {
return nodeId;
}
/** {@inheritDoc} */
@Override public TransactionIsolation isolation() {
return isolation;
}
/** {@inheritDoc} */
@Override public TransactionConcurrency concurrency() {
return concurrency;
}
/** {@inheritDoc} */
@Override public long timeout() {
return timeout;
}
/** {@inheritDoc} */
@Override public long timeout(long timeout) {
if (isStarted())
throw new IllegalStateException("Cannot change timeout after transaction has started: " + this);
long old = this.timeout;
this.timeout = timeout;
return old;
}
/** {@inheritDoc} */
@Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException {
GridCacheContext<?, ?> cacheCtx = entry.context();
IgniteTxEntry txEntry = entry(entry.txKey());
GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion();
return local() && !cacheCtx.isDht() ?
entry.lockedBy(xidVersion()) || (explicit != null && entry.lockedBy(explicit)) :
// If candidate is not there, then lock was explicit.
// Otherwise, check if entry is owned by version.
!entry.hasLockCandidate(xidVersion()) || entry.lockedBy(xidVersion());
}
/** {@inheritDoc} */
@Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) {
GridCacheContext cacheCtx = entry.context();
IgniteTxEntry txEntry = entry(entry.txKey());
GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion();
return local() && !cacheCtx.isDht() ?
entry.lockedByUnsafe(xidVersion()) || (explicit != null && entry.lockedByUnsafe(explicit)) :
// If candidate is not there, then lock was explicit.
// Otherwise, check if entry is owned by version.
!entry.hasLockCandidateUnsafe(xidVersion()) || entry.lockedByUnsafe(xidVersion());
}
/** {@inheritDoc} */
@Override public TransactionState state() {
return state;
}
/** {@inheritDoc} */
@Override public final void errorWhenCommitting() {
synchronized (this) {
TransactionState prev = state;
assert prev == COMMITTING : prev;
state = MARKED_ROLLBACK;
if (log.isDebugEnabled())
log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
notifyAll();
}
}
/** {@inheritDoc} */
@Override public boolean setRollbackOnly() {
return state(MARKED_ROLLBACK);
}
/**
* @return {@code True} if rollback only flag is set.
*/
@Override public boolean isRollbackOnly() {
return state == MARKED_ROLLBACK || state == ROLLING_BACK || state == ROLLED_BACK;
}
/** {@inheritDoc} */
@Override public boolean done() {
return isDone;
}
/**
* @return {@code True} if done flag has been set by this call.
*/
private boolean setDone() {
boolean isDone0 = isDone;
if (isDone0)
return false;
synchronized (this) {
isDone0 = isDone;
if (isDone0)
return false;
isDone = true;
return true;
}
}
/**
* @return Commit version.
*/
@Override public GridCacheVersion commitVersion() {
GridCacheVersion commitVer0 = commitVer;
if (commitVer0 != null)
return commitVer0;
synchronized (this) {
commitVer0 = commitVer;
if (commitVer0 != null)
return commitVer0;
commitVer = commitVer0 = xidVer;
return commitVer0;
}
}
/**
* @param commitVer Commit version.
*/
@Override public void commitVersion(GridCacheVersion commitVer) {
if (commitVer == null)
return;
GridCacheVersion commitVer0 = this.commitVer;
if (commitVer0 != null)
return;
synchronized (this) {
commitVer0 = this.commitVer;
if (commitVer0 != null)
return;
this.commitVer = commitVer;
}
}
/** {@inheritDoc} */
@Override public boolean needsCompletedVersions() {
return false;
}
/** {@inheritDoc} */
@Override public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed,
Collection<GridCacheVersion> txs) {
/* No-op. */
}
/** {@inheritDoc} */
@Override public boolean internal() {
return internal;
}
/**
* @param key Key.
* @return {@code True} if key is internal.
*/
protected boolean checkInternal(IgniteTxKey key) {
if (key.key().internal()) {
internal = true;
return true;
}
return false;
}
/**
* @param onePhaseCommit {@code True} if transaction commit should be performed in short-path way.
*/
public void onePhaseCommit(boolean onePhaseCommit) {
this.onePhaseCommit = onePhaseCommit;
}
/**
* @return Fast commit flag.
*/
@Override public boolean onePhaseCommit() {
return onePhaseCommit;
}
/** {@inheritDoc} */
@Override public boolean optimistic() {
return concurrency == OPTIMISTIC;
}
/** {@inheritDoc} */
@Override public boolean pessimistic() {
return concurrency == PESSIMISTIC;
}
/** {@inheritDoc} */
@Override public boolean serializable() {
return isolation == SERIALIZABLE;
}
/** {@inheritDoc} */
@Override public boolean repeatableRead() {
return isolation == REPEATABLE_READ;
}
/** {@inheritDoc} */
@Override public boolean readCommitted() {
return isolation == READ_COMMITTED;
}
/** {@inheritDoc} */
@Override public boolean state(TransactionState state) {
return state(state, false);
}
/**
* Changing state for this transaction as well as chained(parent) transactions.
*
* @param state Transaction state.
* @return {@code True} if transition was valid, {@code false} otherwise.
*/
public boolean chainState(TransactionState state) {
if (parentTx != null)
parentTx.state(state);
return state(state);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> finishFuture() {
GridFutureAdapter<IgniteInternalTx> fut = finFut;
if (fut == null) {
synchronized (this) {
fut = finFut;
if (fut == null) {
fut = new TxFinishFuture(this);
finFut = fut;
}
}
}
assert fut != null;
if (isDone)
fut.onDone(this);
return fut;
}
/** {@inheritDoc} */
@Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return null;
}
/**
*
* @param state State to set.
* @param timedOut Timeout flag.
*
* @return {@code True} if state changed.
*/
protected final boolean state(TransactionState state, boolean timedOut) {
boolean valid = false;
TransactionState prev;
boolean notify = false;
WALPointer ptr = null;
synchronized (this) {
prev = this.state;
switch (state) {
case ACTIVE: {
valid = prev == SUSPENDED;
break;
}
case PREPARING: {
valid = prev == ACTIVE;
break;
}
case PREPARED: {
valid = prev == PREPARING;
break;
}
case COMMITTING: {
valid = prev == PREPARED;
break;
}
case UNKNOWN: {
if (setDone())
notify = true;
valid = prev == ROLLING_BACK || prev == COMMITTING;
break;
}
case COMMITTED: {
if (setDone())
notify = true;
valid = prev == COMMITTING;
break;
}
case ROLLED_BACK: {
if (setDone())
notify = true;
valid = prev == ROLLING_BACK;
break;
}
case MARKED_ROLLBACK: {
valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == SUSPENDED;
break;
}
case ROLLING_BACK: {
valid = prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING ||
prev == PREPARED || prev == SUSPENDED || (prev == COMMITTING && local() && !dht());
break;
}
case SUSPENDED: {
valid = prev == ACTIVE;
break;
}
}
if (valid) {
if (timedOut)
this.timedOut = true;
this.state = state;
if (log.isDebugEnabled())
log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
recordStateChangedEvent(state);
notifyAll();
}
else {
if (log.isDebugEnabled())
log.debug("Invalid transaction state transition [invalid=" + state + ", cur=" + this.state +
", tx=" + this + ']');
}
if (valid) {
// Seal transactions maps.
if (state != ACTIVE && state != SUSPENDED)
seal();
if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) {
cctx.tm().setMvccState(this, state);
ptr = cctx.tm().logTxRecord(this);
}
}
}
if (valid) {
if (ptr != null && (state == COMMITTED || state == ROLLED_BACK))
try {
cctx.wal().flush(ptr, false);
}
catch (IgniteCheckedException e) {
String msg = "Failed to fsync ptr: " + ptr;
U.error(log, msg, e);
throw new IgniteException(msg, e);
}
}
if (notify) {
GridFutureAdapter<IgniteInternalTx> fut = finFut;
if (fut != null)
fut.onDone(this);
}
return valid;
}
/** */
private void recordStateChangedEvent(TransactionState state) {
if (!near() || !local()) // Covers only GridNearTxLocal's state changes.
return;
switch (state) {
case ACTIVE: {
recordStateChangedEvent(EVT_TX_RESUMED);
break;
}
case COMMITTED: {
recordStateChangedEvent(EVT_TX_COMMITTED);
break;
}
case ROLLED_BACK: {
recordStateChangedEvent(EVT_TX_ROLLED_BACK);
break;
}
case SUSPENDED: {
recordStateChangedEvent(EVT_TX_SUSPENDED);
break;
}
}
}
/**
* @param type Event type.
*/
protected void recordStateChangedEvent(int type) {
assert near() && local();
GridEventStorageManager evtMgr = cctx.gridEvents();
if (!system() /* ignoring system tx */ && evtMgr.isRecordable(type))
evtMgr.record(new TransactionStateChangedEvent(
cctx.discovery().localNode(),
"Transaction state changed.",
type,
new TransactionEventProxyImpl((GridNearTxLocal)this)));
}
/** {@inheritDoc} */
@Override public GridCacheVersion writeVersion() {
return writeVer == null ? commitVersion() : writeVer;
}
/** {@inheritDoc} */
@Override public void writeVersion(GridCacheVersion writeVer) {
this.writeVer = writeVer;
}
/** {@inheritDoc} */
@Override public boolean timedOut() {
return timedOut;
}
/** {@inheritDoc} */
@Override public void invalidate(boolean invalidate) {
if (isStarted() && !dht())
throw new IllegalStateException("Cannot change invalidation flag after transaction has started: " + this);
this.invalidate = invalidate;
}
/** {@inheritDoc} */
@Override public boolean isInvalidate() {
return invalidate;
}
/** {@inheritDoc} */
@Override public final boolean isSystemInvalidate() {
return sysInvalidate;
}
/** {@inheritDoc} */
@Override public final void systemInvalidate(boolean sysInvalidate) {
this.sysInvalidate = sysInvalidate;
}
/** {@inheritDoc} */
@Nullable @Override public Map<UUID, Collection<UUID>> transactionNodes() {
return txNodes;
}
/**
* @param txNodes Transaction nodes.
*/
public void transactionNodes(Map<UUID, Collection<UUID>> txNodes) {
this.txNodes = txNodes;
}
/** {@inheritDoc} */
@Nullable @Override public GridCacheVersion nearXidVersion() {
return null;
}
/**
* @param stores Store managers.
* @return If {@code isWriteToStoreFromDht} value same for all stores.
*/
protected boolean isWriteToStoreFromDhtValid(Collection<CacheStoreManager> stores) {
if (stores != null && !stores.isEmpty()) {
boolean exp = F.first(stores).isWriteToStoreFromDht();
for (CacheStoreManager store : stores) {
if (store.isWriteToStoreFromDht() != exp)
return false;
}
}
return true;
}
/**
* @param stores Store managers.
* @param commit Commit flag.
* @throws IgniteCheckedException In case of error.
*/
protected void sessionEnd(final Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException {
Iterator<CacheStoreManager> it = stores.iterator();
Set<CacheStore> visited = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
while (it.hasNext()) {
CacheStoreManager store = it.next();
store.sessionEnd(this, commit, !it.hasNext(), !visited.add(store.store()));
}
}
/**
* Performs batch database operations. This commit must be called
* before cache update. This way if there is a DB failure,
* cache transaction can still be rolled back.
*
* @param writeEntries Transaction write set.
* @throws IgniteCheckedException If batch update failed.
*/
protected final void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
if (!storeEnabled() || internal() ||
(!local() && near())) // No need to work with local store at GridNearTxRemote.
return;
Collection<CacheStoreManager> stores = txState().stores(cctx);
if (stores == null || stores.isEmpty())
return;
assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction";
CacheStoreManager first = F.first(stores);
boolean isWriteToStoreFromDht = first.isWriteToStoreFromDht();
if ((local() || first.isLocal()) && (near() || isWriteToStoreFromDht)) {
try {
if (writeEntries != null) {
Map<KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> putMap = null;
List<KeyCacheObject> rmvCol = null;
CacheStoreManager writeStore = null;
boolean skipNonPrimary = near() && isWriteToStoreFromDht;
for (IgniteTxEntry e : writeEntries) {
boolean skip = e.skipStore();
if (!skip && skipNonPrimary) {
skip = e.cached().isNear() ||
e.cached().detached() ||
!e.context().affinity().primaryByPartition(e.cached().partition(), topologyVersion()).isLocal();
}
if (!skip && !local() && // Update local store at backups only if needed.
cctx.localStorePrimaryOnly())
skip = true;
if (skip)
continue;
boolean intercept = e.context().config().getInterceptor() != null;
if (intercept || !F.isEmpty(e.entryProcessors()))
e.cached().unswap(false);
IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false, null);
GridCacheContext cacheCtx = e.context();
GridCacheOperation op = res.get1();
KeyCacheObject key = e.key();
CacheObject val = res.get2();
GridCacheVersion ver = writeVersion();
if (op == CREATE || op == UPDATE) {
// Batch-process all removes if needed.
if (rmvCol != null && !rmvCol.isEmpty()) {
assert writeStore != null;
writeStore.removeAll(this, rmvCol);
// Reset.
rmvCol.clear();
writeStore = null;
}
// Batch-process puts if cache ID has changed.
if (writeStore != null && writeStore != cacheCtx.store()) {
if (putMap != null && !putMap.isEmpty()) {
writeStore.putAll(this, putMap);
// Reset.
putMap.clear();
}
writeStore = null;
}
if (intercept) {
Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut(
new CacheLazyEntry(
cacheCtx,
key,
e.cached().rawGet(),
e.keepBinary()),
cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(val, e.keepBinary(), false, null));
if (interceptorVal == null)
continue;
val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal));
}
if (writeStore == null)
writeStore = cacheCtx.store();
if (writeStore.isWriteThrough()) {
if (putMap == null)
putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
putMap.put(key, F.t(val, ver));
}
}
else if (op == DELETE) {
// Batch-process all puts if needed.
if (putMap != null && !putMap.isEmpty()) {
assert writeStore != null;
writeStore.putAll(this, putMap);
// Reset.
putMap.clear();
writeStore = null;
}
if (writeStore != null && writeStore != cacheCtx.store()) {
if (rmvCol != null && !rmvCol.isEmpty()) {
writeStore.removeAll(this, rmvCol);
// Reset.
rmvCol.clear();
}
writeStore = null;
}
if (intercept) {
IgniteBiTuple<Boolean, Object> t = cacheCtx.config().getInterceptor().onBeforeRemove(
new CacheLazyEntry(cacheCtx, key, e.cached().rawGet(), e.keepBinary()));
if (cacheCtx.cancelRemove(t))
continue;
}
if (writeStore == null)
writeStore = cacheCtx.store();
if (writeStore.isWriteThrough()) {
if (rmvCol == null)
rmvCol = new ArrayList<>();
rmvCol.add(key);
}
}
else if (log.isDebugEnabled())
log.debug("Ignoring NOOP entry for batch store commit: " + e);
}
if (putMap != null && !putMap.isEmpty()) {
assert rmvCol == null || rmvCol.isEmpty();
assert writeStore != null;
// Batch put at the end of transaction.
writeStore.putAll(this, putMap);
}
if (rmvCol != null && !rmvCol.isEmpty()) {
assert putMap == null || putMap.isEmpty();
assert writeStore != null;
// Batch remove at the end of transaction.
writeStore.removeAll(this, rmvCol);
}
}
// Commit while locks are held.
sessionEnd(stores, true);
}
catch (IgniteCheckedException ex) {
commitError(ex);
errorWhenCommitting();
// Safe to remove transaction from committed tx list because nothing was committed yet.
cctx.tm().removeCommittedTx(this);
throw ex;
}
catch (Throwable ex) {
commitError(ex);
errorWhenCommitting();
// Safe to remove transaction from committed tx list because nothing was committed yet.
cctx.tm().removeCommittedTx(this);
if (ex instanceof Error)
throw (Error)ex;
throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
}
finally {
if (isRollbackOnly())
sessionEnd(stores, false);
}
}
else
sessionEnd(stores, true);
}
/**
* @param txEntry Entry to process.
* @param metrics {@code True} if metrics should be updated.
* @param ret Optional return value to initialize.
* @return Tuple containing transformation results.
* @throws IgniteCheckedException If failed to get previous value for transform.
* @throws GridCacheEntryRemovedException If entry was concurrently deleted.
*/
protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures(
IgniteTxEntry txEntry,
boolean metrics,
@Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, IgniteCheckedException {
assert txEntry.op() != TRANSFORM || !F.isEmpty(txEntry.entryProcessors()) : txEntry;
GridCacheContext cacheCtx = txEntry.context();
assert cacheCtx != null;
if (isSystemInvalidate())
return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null);
if (F.isEmpty(txEntry.entryProcessors())) {
if (ret != null) {
ret.value(
cacheCtx,
txEntry.value(),
txEntry.keepBinary(),
U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)
);
}
return F.t(txEntry.op(), txEntry.value());
}
else {
T2<GridCacheOperation, CacheObject> calcVal = txEntry.entryProcessorCalculatedValue();
if (calcVal != null)
return calcVal;
boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
final boolean keepBinary = txEntry.keepBinary();
CacheObject cacheVal;
if (txEntry.hasValue())
cacheVal = txEntry.value();
else if (txEntry.hasOldValue())
cacheVal = txEntry.oldValue();
else {
cacheVal = txEntry.cached().innerGet(
null,
this,
/*read through*/false,
/*metrics*/metrics,
/*event*/recordEvt,
/*closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
resolveTaskName(),
null,
keepBinary);
}
boolean modified = false;
Object val = null;
Object key = null;
GridCacheVersion ver;
try {
ver = txEntry.cached().version();
}
catch (GridCacheEntryRemovedException e) {
assert optimistic() : txEntry;
if (log.isDebugEnabled())
log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
ver = null;
}
for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(
txEntry.key(), key, cacheVal, val, ver, keepBinary, txEntry.cached());
Object procRes = null;
Exception err = null;
IgniteThread.onEntryProcessorEntered(true);
try {
EntryProcessor<Object, Object, Object> processor = t.get1();
procRes = processor.process(invokeEntry, t.get2());
val = invokeEntry.getValue();
key = invokeEntry.key();
}
catch (Exception e) {
err = e;
}
finally {
IgniteThread.onEntryProcessorLeft();
}
if (ret != null) {
if (err != null || procRes != null)
ret.addEntryProcessResult(txEntry.context(), txEntry.key(), null, procRes, err, keepBinary);
else
ret.invokeResult(true);
}
modified |= invokeEntry.modified();
}
if (modified)
cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
GridCacheOperation op = modified ? (cacheVal == null ? DELETE : UPDATE) : NOOP;
txEntry.entryProcessorCalculatedValue(new T2<>(op, op == NOOP ? null : cacheVal));
if (op == NOOP) {
ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
if (expiry != null) {
long ttl = CU.toTtl(expiry.getExpiryForAccess());
txEntry.ttl(ttl);
if (ttl == CU.TTL_ZERO)
op = DELETE;
}
}
return F.t(op, cacheVal);
}
}
/**
* @return Resolves task name.
*/
public String resolveTaskName() {
return taskName;
}
/**
* Resolve DR conflict.
*
* @param op Initially proposed operation.
* @param txEntry TX entry being updated.
* @param newVal New value.
* @param newVer New version.
* @param old Old entry.
* @return Tuple with adjusted operation type and conflict context.
* @throws IgniteCheckedException In case of eny exception.
* @throws GridCacheEntryRemovedException If entry got removed.
*/
@SuppressWarnings({"unchecked"})
protected IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictResolve(
GridCacheOperation op,
IgniteTxEntry txEntry,
CacheObject newVal,
GridCacheVersion newVer,
GridCacheEntryEx old)
throws IgniteCheckedException, GridCacheEntryRemovedException {
assert newVer != null;
// 1. Calculate TTL and expire time.
long newTtl = txEntry.ttl();
long newExpireTime = txEntry.conflictExpireTime();
// 1.1. If TTL is not changed, then calculate it based on expiry.
if (newTtl == CU.TTL_NOT_CHANGED) {
ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
if (expiry != null) {
if (op == CREATE)
newTtl = CU.toTtl(expiry.getExpiryForCreation());
else if (op == UPDATE)
newTtl = CU.toTtl(expiry.getExpiryForUpdate());
}
}
// 1.2. If TTL is set to zero, then mark operation as "DELETE".
if (newTtl == CU.TTL_ZERO) {
op = DELETE;
newTtl = CU.TTL_ETERNAL;
}
// 1.3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL".
if (newTtl == CU.TTL_NOT_CHANGED) {
if (old.isNewLocked())
newTtl = CU.TTL_ETERNAL;
else {
newTtl = old.rawTtl();
newExpireTime = old.rawExpireTime();
}
}
// TTL must be resolved at this point.
assert newTtl != CU.TTL_ZERO && newTtl != CU.TTL_NOT_CHANGED;
// 1.4 If expire time was not set explicitly, then calculate it.
if (newExpireTime == CU.EXPIRE_TIME_CALCULATE)
newExpireTime = CU.toExpireTime(newTtl);
// Expire time must be resolved at this point.
assert newExpireTime != CU.EXPIRE_TIME_CALCULATE;
// Construct old entry info.
GridCacheVersionedEntryEx oldEntry = old.versionedEntry(txEntry.keepBinary());
// Construct new entry info.
GridCacheContext entryCtx = txEntry.context();
GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry(
entryCtx,
txEntry.key(),
newVal,
newTtl,
newExpireTime,
newVer,
false,
txEntry.keepBinary());
GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, false);
if (ctx.isMerge()) {
Object resVal = ctx.mergeValue();
if ((op == CREATE || op == UPDATE) && resVal == null)
op = DELETE;
else if (op == DELETE && resVal != null)
op = old.isNewLocked() ? CREATE : UPDATE;
}
return F.t(op, ctx);
}
/**
* @param e Transaction entry.
* @param primaryOnly Flag to include backups into check or not.
* @return {@code True} if entry is locally mapped as a primary or back up node.
*/
protected boolean isNearLocallyMapped(IgniteTxEntry e, boolean primaryOnly) {
GridCacheContext cacheCtx = e.context();
if (!cacheCtx.isNear())
return false;
// Try to take either entry-recorded primary node ID,
// or transaction node ID from near-local transactions.
UUID nodeId = e.nodeId() == null ? local() ? this.nodeId : null : e.nodeId();
if (nodeId != null && nodeId.equals(cctx.localNodeId()))
return true;
GridCacheEntryEx cached = e.cached();
int part = cached != null ? cached.partition() : cacheCtx.affinity().partition(e.key());
List<ClusterNode> affNodes = cacheCtx.affinity().nodesByPartition(part, topologyVersion());
e.locallyMapped(F.contains(affNodes, cctx.localNode()));
if (primaryOnly) {
ClusterNode primary = F.first(affNodes);
if (primary == null && !cacheCtx.affinityNode())
return false;
assert primary != null : "Primary node is null for affinity nodes: " + affNodes;
return primary.isLocal();
}
else
return e.locallyMapped();
}
/**
* @param e Entry to evict if it qualifies for eviction.
* @param primaryOnly Flag to try to evict only on primary node.
* @return {@code True} if attempt was made to evict the entry.
*/
protected boolean evictNearEntry(IgniteTxEntry e, boolean primaryOnly) {
assert e != null;
if (isNearLocallyMapped(e, primaryOnly)) {
GridCacheEntryEx cached = e.cached();
assert cached instanceof GridNearCacheEntry : "Invalid cache entry: " + e;
if (log.isDebugEnabled())
log.debug("Evicting dht-local entry from near cache [entry=" + cached + ", tx=" + this + ']');
if (cached != null && cached.markObsolete(xidVer))
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
return o == this || (o instanceof IgniteTxAdapter && xidVer.equals(((IgniteTxAdapter)o).xidVer));
}
/** {@inheritDoc} */
@Override public int hashCode() {
return xidVer.hashCode();
}
/**
* Adds cache to the list of active caches in transaction.
*
* @param cacheCtx Cache context to add.
* @param recovery Recovery flag. See {@link CacheOperationContext#setRecovery(boolean)}.
* @throws IgniteCheckedException If caches already enlisted in this transaction are not compatible with given
* cache (e.g. they have different stores).
*/
public abstract void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException;
/** {@inheritDoc} */
@Override public TxCounters txCounters(boolean createIfAbsent) {
if (createIfAbsent && txCounters == null)
TX_COUNTERS_UPD.compareAndSet(this, null, new TxCounters());
return txCounters;
}
/**
* Makes cache sizes changes accumulated during transaction visible outside of transaction.
*/
protected void applyTxSizes() {
TxCounters txCntrs = txCounters(false);
if (txCntrs == null)
return;
Map<Integer, ? extends Map<Integer, AtomicLong>> sizeDeltas = txCntrs.sizeDeltas();
for (Map.Entry<Integer, ? extends Map<Integer, AtomicLong>> entry : sizeDeltas.entrySet()) {
Integer cacheId = entry.getKey();
Map<Integer, AtomicLong> deltas = entry.getValue();
assert !F.isEmpty(deltas);
GridDhtPartitionTopology top = cctx.cacheContext(cacheId).topology();
// Need to reserve on backups only
boolean reserve = dht() && remote();
for (Map.Entry<Integer, AtomicLong> e : deltas.entrySet()) {
boolean invalid = false;
int p = e.getKey();
long delta = e.getValue().get();
try {
GridDhtLocalPartition part = top.localPartition(p);
if (!reserve || part != null && part.reserve()) {
assert part != null;
try {
if (part.state() != GridDhtPartitionState.RENTING)
part.dataStore().updateSize(cacheId, delta);
else
invalid = true;
}
finally {
if (reserve)
part.release();
}
}
else
invalid = true;
}
catch (GridDhtInvalidPartitionException e1) {
invalid = true;
}
if (invalid) {
assert reserve;
if (log.isDebugEnabled())
log.debug("Trying to apply size delta for invalid partition: " +
"[cacheId=" + cacheId + ", part=" + p + "]");
}
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(IgniteTxAdapter.class, this,
"duration", (U.currentTimeMillis() - startTime) + "ms",
"onePhaseCommit", onePhaseCommit);
}
/**
* Transaction shadow class to be used for deserialization.
*/
private static class TxShadow implements IgniteInternalTx {
/** Xid. */
private final IgniteUuid xid;
/** Node ID. */
private final UUID nodeId;
/** Thread ID. */
private final long threadId;
/** Start time. */
private final long startTime;
/** Start time in nanoseconds. */
private final long startTimeNanos;
/** Transaction isolation. */
private final TransactionIsolation isolation;
/** Concurrency. */
private final TransactionConcurrency concurrency;
/** Invalidate flag. */
private final boolean invalidate;
/** Timeout. */
private final long timeout;
/** State. */
private final TransactionState state;
/** Rollback only flag. */
private final boolean rollbackOnly;
/** Implicit flag. */
private final boolean implicit;
/**
* @param xid Xid.
* @param nodeId Node ID.
* @param threadId Thread ID.
* @param startTime Start time.
* @param isolation Isolation.
* @param concurrency Concurrency.
* @param invalidate Invalidate flag.
* @param implicit Implicit flag.
* @param timeout Transaction timeout.
* @param state Transaction state.
* @param rollbackOnly Rollback-only flag.
*/
TxShadow(IgniteUuid xid, UUID nodeId, long threadId, long startTime, long startTimeNanos,
TransactionIsolation isolation, TransactionConcurrency concurrency, boolean invalidate, boolean implicit,
long timeout, TransactionState state, boolean rollbackOnly) {
this.xid = xid;
this.nodeId = nodeId;
this.threadId = threadId;
this.startTime = startTime;
this.startTimeNanos = startTimeNanos;
this.isolation = isolation;
this.concurrency = concurrency;
this.invalidate = invalidate;
this.implicit = implicit;
this.timeout = timeout;
this.state = state;
this.rollbackOnly = rollbackOnly;
}
/** {@inheritDoc} */
@Override public void mvccSnapshot(MvccSnapshot mvccSnapshot) {
// No-op.
}
/** {@inheritDoc} */
@Override public MvccSnapshot mvccSnapshot() {
return null;
}
/** {@inheritDoc} */
@Override public boolean localResult() {
return false;
}
/** {@inheritDoc} */
@Override public IgniteUuid xid() {
return xid;
}
/** {@inheritDoc} */
@Override public UUID nodeId() {
return nodeId;
}
/** {@inheritDoc} */
@Override public long threadId() {
return threadId;
}
/** {@inheritDoc} */
@Override public long startTime() {
return startTime;
}
/** {@inheritDoc} */
@Override public long startTimeNanos() {
return startTimeNanos;
}
/** {@inheritDoc} */
@Override public TransactionIsolation isolation() {
return isolation;
}
/** {@inheritDoc} */
@Override public TransactionConcurrency concurrency() {
return concurrency;
}
/** {@inheritDoc} */
@Override public boolean isInvalidate() {
return invalidate;
}
/** {@inheritDoc} */
@Override public boolean implicit() {
return implicit;
}
/** {@inheritDoc} */
@Override public long timeout() {
return timeout;
}
/** {@inheritDoc} */
@Override public TransactionState state() {
return state;
}
/** {@inheritDoc} */
@Override public boolean isRollbackOnly() {
return rollbackOnly;
}
/** {@inheritDoc} */
@Override public long timeout(long timeout) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public boolean setRollbackOnly() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public void errorWhenCommitting() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public boolean activeCachesDeploymentEnabled() {
return false;
}
/** {@inheritDoc} */
@Override public void activeCachesDeploymentEnabled(boolean depEnabled) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Nullable @Override public Object addMeta(int key, Object val) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Nullable @Override public Object removeMeta(int key) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Nullable @Override public Object meta(int key) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public int size() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public boolean storeEnabled() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public boolean storeWriteThrough() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public boolean system() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public byte ioPolicy() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersionSnapshot() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public boolean implicitSingle() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public void commitError(Throwable e) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Nullable @Override public String label() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public boolean empty() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public boolean markFinalizing(FinalizationStatus status) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Nullable @Override public FinalizationStatus finalizationStatus() {
return null;
}
/** {@inheritDoc} */
@Override public void addInvalidPartition(int cacheId, int part) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public Map<Integer, Set<Integer>> invalidPartitions() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Nullable @Override public GridCacheVersion ownedVersion(IgniteTxKey key) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Nullable @Override public UUID otherNodeId() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public UUID eventNodeId() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Override public UUID originatingNodeId() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
/** {@inheritDoc} */
@Nullable @Override public TxCounters txCounters(boolean createIfAbsent) {
return null;
}
/** {@inheritDoc} */
@Override public IgniteTxState txState() {
return null;
}
/** {@inheritDoc} */
@Override public Collection<UUID> masterNodeIds() {
return null;
}
/** {@inheritDoc} */
@Nullable @Override public GridCacheVersion nearXidVersion() {
return null;
}
/** {@inheritDoc} */
@Nullable @Override public Map<UUID, Collection<UUID>> transactionNodes() {
return null;
}
/** {@inheritDoc} */
@Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException {
return false;
}
/** {@inheritDoc} */
@Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) {
return false;
}
/** {@inheritDoc} */
@Override public boolean near() {
return false;
}
/** {@inheritDoc} */
@Override public boolean dht() {
return false;
}
/** {@inheritDoc} */
@Override public boolean colocated() {
return false;
}
/** {@inheritDoc} */
@Override public boolean local() {
return false;
}
/** {@inheritDoc} */
@Override public UUID subjectId() {
return null;
}
/** {@inheritDoc} */
@Override public int taskNameHash() {
return 0;
}
/** {@inheritDoc} */
@Override public boolean user() {
return false;
}
/** {@inheritDoc} */
@Override public boolean hasWriteKey(IgniteTxKey key) {
return false;
}
/** {@inheritDoc} */
@Override public Set<IgniteTxKey> readSet() {
return null;
}
/** {@inheritDoc} */
@Override public Set<IgniteTxKey> writeSet() {
return null;
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> allEntries() {
return null;
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> writeEntries() {
return null;
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> readEntries() {
return null;
}
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
return null;
}
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> readMap() {
return null;
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> optimisticLockEntries() {
return null;
}
/** {@inheritDoc} */
@Override public void seal() {
}
/** {@inheritDoc} */
@Nullable @Override public IgniteTxEntry entry(IgniteTxKey key) {
return null;
}
/** {@inheritDoc} */
@Nullable @Override public GridTuple<CacheObject> peek(GridCacheContext ctx,
boolean failFast,
KeyCacheObject key) {
return null;
}
/** {@inheritDoc} */
@Override public GridCacheVersion xidVersion() {
return null;
}
/** {@inheritDoc} */
@Override public GridCacheVersion commitVersion() {
return null;
}
/** {@inheritDoc} */
@Override public void commitVersion(GridCacheVersion commitVer) {
// No-op.
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> salvageTx() {
return null;
}
/** {@inheritDoc} */
@Override public GridCacheVersion writeVersion() {
return null;
}
/** {@inheritDoc} */
@Override public void writeVersion(GridCacheVersion ver) {
// No-op.
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> finishFuture() {
return null;
}
/** {@inheritDoc} */
@Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
return null;
}
/** {@inheritDoc} */
@Override public boolean state(TransactionState state) {
return false;
}
/** {@inheritDoc} */
@Override public void invalidate(boolean invalidate) {
// No-op.
}
/** {@inheritDoc} */
@Override public void systemInvalidate(boolean sysInvalidate) {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean isSystemInvalidate() {
return false;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
return null;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
return null;
}
/** {@inheritDoc} */
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
return false;
}
/** {@inheritDoc} */
@Override public boolean timedOut() {
return false;
}
/** {@inheritDoc} */
@Override public boolean done() {
return false;
}
/** {@inheritDoc} */
@Override public boolean optimistic() {
return false;
}
/** {@inheritDoc} */
@Override public boolean pessimistic() {
return false;
}
/** {@inheritDoc} */
@Override public boolean readCommitted() {
return false;
}
/** {@inheritDoc} */
@Override public boolean repeatableRead() {
return false;
}
/** {@inheritDoc} */
@Override public boolean serializable() {
return false;
}
/** {@inheritDoc} */
@Override public long remainingTime() throws IgniteTxTimeoutCheckedException {
return 0;
}
/** {@inheritDoc} */
@Override public Collection<GridCacheVersion> alternateVersions() {
return null;
}
/** {@inheritDoc} */
@Override public boolean needsCompletedVersions() {
return false;
}
/** {@inheritDoc} */
@Override public void completedVersions(GridCacheVersion base, Collection committed, Collection rolledback) {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean internal() {
return false;
}
/** {@inheritDoc} */
@Override public boolean onePhaseCommit() {
return false;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid());
}
/** {@inheritDoc} */
@Override public int hashCode() {
return xid.hashCode();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TxShadow.class, this);
}
}
/**
*
*/
private static class TxFinishFuture extends GridFutureAdapter<IgniteInternalTx> {
/** */
@GridToStringInclude
private IgniteTxAdapter tx;
/** */
private volatile long completionTime;
/**
* @param tx Transaction being awaited.
*/
private TxFinishFuture(IgniteTxAdapter tx) {
this.tx = tx;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) {
completionTime = U.currentTimeMillis();
return super.onDone(res, err);
}
/** {@inheritDoc} */
@Override public String toString() {
long ct = completionTime;
if (ct == 0)
ct = U.currentTimeMillis();
long duration = ct - tx.startTime();
return S.toString(TxFinishFuture.class, this, "duration", duration);
}
}
}