| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import java.io.Externalizable; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheReturn; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; |
| import org.apache.ignite.internal.processors.cache.transactions.TxCounters; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.util.F0; |
| import org.apache.ignite.internal.util.GridLeanMap; |
| import org.apache.ignite.internal.util.GridLeanSet; |
| import org.apache.ignite.internal.util.future.GridEmbeddedFuture; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.tostring.GridToStringBuilder; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.CX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.apache.ignite.transactions.TransactionState; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; |
| import static org.apache.ignite.transactions.TransactionState.COMMITTED; |
| import static org.apache.ignite.transactions.TransactionState.COMMITTING; |
| import static org.apache.ignite.transactions.TransactionState.PREPARED; |
| import static org.apache.ignite.transactions.TransactionState.PREPARING; |
| import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; |
| import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; |
| import static org.apache.ignite.transactions.TransactionState.UNKNOWN; |
| |
| /** |
| * Replicated user transaction. |
| */ |
| public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Asynchronous rollback marker for lock futures. */ |
| public static final IgniteInternalFuture<Boolean> ROLLBACK_FUT = new GridFutureAdapter<>(); |
| |
| /** Lock future updater. */ |
| private static final AtomicReferenceFieldUpdater<GridDhtTxLocalAdapter, IgniteInternalFuture> LOCK_FUT_UPD = |
| AtomicReferenceFieldUpdater.newUpdater(GridDhtTxLocalAdapter.class, IgniteInternalFuture.class, "lockFut"); |
| |
| /** Near mappings. */ |
| protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap<>(); |
| |
| /** DHT mappings. */ |
| protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap<>(); |
| |
| /** Mapped flag. */ |
| protected volatile boolean mapped; |
| |
| /** */ |
| protected boolean explicitLock; |
| |
| /** Versions of pending locks for entries of this tx. */ |
| private Collection<GridCacheVersion> pendingVers; |
| |
| /** Flag indicating that originating node has near cache. */ |
| private boolean nearOnOriginatingNode; |
| |
| /** Nodes where transactions were started on lock step. */ |
| private Set<ClusterNode> lockTxNodes; |
| |
| /** Enlist or lock future what is currently in progress. */ |
| @GridToStringExclude |
| protected volatile IgniteInternalFuture<?> lockFut; |
| |
| /** |
| * Empty constructor required for {@link Externalizable}. |
| */ |
| protected GridDhtTxLocalAdapter() { |
| // No-op. |
| } |
| |
| /** |
| * @param xidVer Transaction version. |
| * @param implicit Implicit flag. |
| * @param implicitSingle Implicit-with-single-key flag. |
| * @param cctx Cache context. |
| * @param sys System flag. |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @param timeout Timeout. |
| * @param txSize Expected transaction size. |
| */ |
| protected GridDhtTxLocalAdapter( |
| GridCacheSharedContext cctx, |
| GridCacheVersion xidVer, |
| boolean implicit, |
| boolean implicitSingle, |
| boolean sys, |
| boolean explicitLock, |
| byte plc, |
| TransactionConcurrency concurrency, |
| TransactionIsolation isolation, |
| long timeout, |
| boolean invalidate, |
| boolean storeEnabled, |
| boolean onePhaseCommit, |
| int txSize, |
| @Nullable UUID subjId, |
| int taskNameHash |
| ) { |
| super( |
| cctx, |
| xidVer, |
| implicit, |
| implicitSingle, |
| sys, |
| plc, |
| concurrency, |
| isolation, |
| timeout, |
| invalidate, |
| storeEnabled, |
| onePhaseCommit, |
| txSize, |
| subjId, |
| taskNameHash |
| ); |
| |
| assert cctx != null; |
| |
| this.explicitLock = explicitLock; |
| |
| threadId = Thread.currentThread().getId(); |
| } |
| |
| /** |
| * @param node Node. |
| */ |
| void addLockTransactionNode(ClusterNode node) { |
| assert node != null; |
| assert !node.isLocal(); |
| |
| if (lockTxNodes == null) |
| lockTxNodes = new HashSet<>(); |
| |
| lockTxNodes.add(node); |
| } |
| |
| /** |
| * Sets flag that indicates that originating node has a near cache that participates in this transaction. |
| * |
| * @param hasNear Has near cache flag. |
| */ |
| public void nearOnOriginatingNode(boolean hasNear) { |
| nearOnOriginatingNode = hasNear; |
| } |
| |
| /** |
| * Gets flag that indicates that originating node has a near cache that participates in this transaction. |
| * |
| * @return Has near cache flag. |
| */ |
| boolean nearOnOriginatingNode() { |
| return nearOnOriginatingNode; |
| } |
| |
| /** |
| * @return {@code True} if explicit lock transaction. |
| */ |
| public boolean explicitLock() { |
| return explicitLock; |
| } |
| |
| /** |
| * @param explicitLock Explicit lock flag. |
| */ |
| public void explicitLock(boolean explicitLock) { |
| this.explicitLock = explicitLock; |
| } |
| |
| /** |
| * @return Nodes where transactions were started on lock step. |
| */ |
| @Nullable Set<ClusterNode> lockTransactionNodes() { |
| return lockTxNodes; |
| } |
| |
| /** |
| * @return Near node id. |
| */ |
| protected abstract UUID nearNodeId(); |
| |
| /** |
| * @return Near future ID. |
| */ |
| protected abstract IgniteUuid nearFutureId(); |
| |
| /** |
| * Adds reader to cached entry. |
| * |
| * @param msgId Message ID. |
| * @param cached Cached entry. |
| * @param entry Transaction entry. |
| * @param topVer Topology version. |
| * @return {@code True} if reader was added as a result of this call. |
| */ |
| @Nullable protected abstract IgniteInternalFuture<Boolean> addReader(long msgId, |
| GridDhtCacheEntry cached, |
| IgniteTxEntry entry, |
| AffinityTopologyVersion topVer); |
| |
| /** |
| * @param err Error, if any. |
| */ |
| protected abstract void sendFinishReply(@Nullable Throwable err); |
| |
| /** {@inheritDoc} */ |
| @Override public boolean needsCompletedVersions() { |
| return nearOnOriginatingNode; |
| } |
| |
| /** |
| * @return Versions for all pending locks that were in queue before tx locks were released. |
| */ |
| Collection<GridCacheVersion> pendingVersions() { |
| return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers; |
| } |
| |
| /** |
| * @param pendingVers Versions for all pending locks that were in queue before tx locsk were released. |
| */ |
| public void pendingVersions(Collection<GridCacheVersion> pendingVers) { |
| this.pendingVers = pendingVers; |
| } |
| |
| /** |
| * Map explicit locks. |
| */ |
| protected void mapExplicitLocks() { |
| if (!mapped) { |
| // Explicit locks may participate in implicit transactions only. |
| if (!implicit()) { |
| mapped = true; |
| |
| return; |
| } |
| |
| Map<ClusterNode, List<GridDhtCacheEntry>> dhtEntryMap = null; |
| Map<ClusterNode, List<GridDhtCacheEntry>> nearEntryMap = null; |
| |
| for (IgniteTxEntry e : allEntries()) { |
| assert e.cached() != null; |
| |
| GridCacheContext cacheCtx = e.cached().context(); |
| |
| if (cacheCtx.isNear()) |
| continue; |
| |
| if (e.cached().obsolete()) { |
| GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key(), topologyVersion()); |
| |
| e.cached(cached); |
| } |
| |
| if (e.cached().detached() || e.cached().isLocal()) |
| continue; |
| |
| while (true) { |
| try { |
| // Map explicit locks. |
| if (e.explicitVersion() != null && !e.explicitVersion().equals(xidVer)) { |
| if (dhtEntryMap == null) |
| dhtEntryMap = new GridLeanMap<>(); |
| |
| if (nearEntryMap == null) |
| nearEntryMap = new GridLeanMap<>(); |
| |
| cacheCtx.dhtMap( |
| (GridDhtCacheEntry)e.cached(), |
| e.explicitVersion(), |
| log, |
| dhtEntryMap, |
| nearEntryMap); |
| } |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key(), topologyVersion()); |
| |
| e.cached(cached); |
| } |
| } |
| } |
| |
| if (!F.isEmpty(dhtEntryMap)) |
| addDhtNodeEntryMapping(dhtEntryMap); |
| |
| if (!F.isEmpty(nearEntryMap)) |
| addNearNodeEntryMapping(nearEntryMap); |
| |
| mapped = true; |
| } |
| } |
| |
| /** |
| * @return DHT map. |
| */ |
| Map<UUID, GridDistributedTxMapping> dhtMap() { |
| mapExplicitLocks(); |
| |
| return dhtMap; |
| } |
| |
| /** |
| * @return Near map. |
| */ |
| Map<UUID, GridDistributedTxMapping> nearMap() { |
| mapExplicitLocks(); |
| |
| return nearMap; |
| } |
| |
| /** |
| * @param mappings Mappings to add. |
| */ |
| private void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { |
| addMapping(mappings, dhtMap); |
| } |
| |
| /** |
| * @param mappings Mappings to add. |
| */ |
| private void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { |
| addMapping(mappings, nearMap); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @return {@code True} if mapping was removed. |
| */ |
| public boolean removeMapping(UUID nodeId) { |
| return removeMapping(nodeId, null, dhtMap) | removeMapping(nodeId, null, nearMap); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param entry Entry to remove. |
| * @return {@code True} if was removed. |
| */ |
| boolean removeDhtMapping(UUID nodeId, GridCacheEntryEx entry) { |
| return removeMapping(nodeId, entry, dhtMap); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param entry Entry to remove. |
| * @return {@code True} if was removed. |
| */ |
| boolean removeNearMapping(UUID nodeId, GridCacheEntryEx entry) { |
| return removeMapping(nodeId, entry, nearMap); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param entry Entry to remove. |
| * @param map Map to remove from. |
| * @return {@code True} if was removed. |
| */ |
| private boolean removeMapping(UUID nodeId, @Nullable GridCacheEntryEx entry, |
| Map<UUID, GridDistributedTxMapping> map) { |
| if (entry != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Removing mapping for entry [nodeId=" + nodeId + ", entry=" + entry + ']'); |
| |
| IgniteTxEntry txEntry = entry(entry.txKey()); |
| |
| if (txEntry == null) |
| return false; |
| |
| GridDistributedTxMapping m = map.get(nodeId); |
| |
| boolean ret = m != null && m.removeEntry(txEntry); |
| |
| if (m != null && m.empty()) |
| map.remove(nodeId); |
| |
| return ret; |
| } |
| else |
| return map.remove(nodeId) != null; |
| } |
| |
| /** |
| * @param mappings Entry mappings. |
| * @param dst Transaction mappings. |
| */ |
| private void addMapping( |
| Map<ClusterNode, List<GridDhtCacheEntry>> mappings, |
| Map<UUID, GridDistributedTxMapping> dst |
| ) { |
| for (Map.Entry<ClusterNode, List<GridDhtCacheEntry>> mapping : mappings.entrySet()) { |
| ClusterNode n = mapping.getKey(); |
| |
| GridDistributedTxMapping m = dst.get(n.id()); |
| |
| List<GridDhtCacheEntry> entries = mapping.getValue(); |
| |
| for (GridDhtCacheEntry entry : entries) { |
| IgniteTxEntry txEntry = entry(entry.txKey()); |
| |
| if (txEntry != null) { |
| if (m == null) |
| dst.put(n.id(), m = new GridDistributedTxMapping(n)); |
| |
| m.add(txEntry); |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void addInvalidPartition(GridCacheContext ctx, int part) { |
| assert false : "DHT transaction encountered invalid partition [part=" + part + ", tx=" + this + ']'; |
| } |
| |
| /** |
| * @param e Entry to add. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void addEntry(IgniteTxEntry e) throws IgniteCheckedException { |
| init(); |
| |
| TransactionState state = state(); |
| |
| assert state == PREPARING : "Invalid tx state for " + |
| "adding entry [e=" + e + ", tx=" + this + ']'; |
| |
| e.unmarshal(cctx, false, cctx.deploy().globalLoader()); |
| |
| checkInternal(e.txKey()); |
| |
| GridCacheContext cacheCtx = e.context(); |
| |
| GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); |
| |
| try { |
| IgniteTxEntry existing = entry(e.txKey()); |
| |
| if (existing != null) { |
| existing.op(e.op()); // Absolutely must set operation, as default is DELETE. |
| |
| existing.value(e.value(), e.hasWriteValue(), e.hasReadValue()); |
| existing.entryProcessors(e.entryProcessors()); |
| existing.ttl(e.ttl()); |
| existing.filters(e.filters()); |
| existing.expiry(e.expiry()); |
| |
| existing.conflictExpireTime(e.conflictExpireTime()); |
| existing.conflictVersion(e.conflictVersion()); |
| } |
| else { |
| existing = e; |
| |
| addActiveCache(dhtCache.context(), false); |
| |
| GridDhtCacheEntry cached = dhtCache.entryExx(existing.key(), topologyVersion()); |
| |
| existing.cached(cached); |
| |
| GridCacheVersion explicit = existing.explicitVersion(); |
| |
| if (explicit != null) { |
| GridCacheVersion dhtVer = cctx.mvcc().mappedVersion(explicit); |
| |
| if (dhtVer == null) |
| throw new IgniteCheckedException("Failed to find dht mapping for explicit entry version: " + existing); |
| |
| existing.explicitVersion(dhtVer); |
| } |
| |
| txState.addEntry(existing); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Added entry to transaction: " + existing); |
| } |
| } |
| catch (GridDhtInvalidPartitionException ex) { |
| throw new IgniteCheckedException(ex); |
| } |
| } |
| |
| /** |
| * @param cacheCtx Cache context. |
| * @param entries Entries to lock. |
| * @param msgId Message ID. |
| * @param read Read flag. |
| * @param createTtl TTL for create operation. |
| * @param accessTtl TTL for read operation. |
| * @param needRetVal Return value flag. |
| * @param skipStore Skip store flag. |
| * @param keepBinary Keep binary flag. |
| * @param nearCache {@code True} if near cache enabled on originating node. |
| * @return Lock future. |
| */ |
| @SuppressWarnings("ForLoopReplaceableByForEach") |
| IgniteInternalFuture<GridCacheReturn> lockAllAsync( |
| GridCacheContext cacheCtx, |
| List<GridCacheEntryEx> entries, |
| long msgId, |
| final boolean read, |
| final boolean needRetVal, |
| long createTtl, |
| long accessTtl, |
| boolean skipStore, |
| boolean keepBinary, |
| boolean nearCache |
| ) { |
| try { |
| checkValid(); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| |
| final GridCacheReturn ret = new GridCacheReturn(localResult(), false); |
| |
| if (F.isEmpty(entries)) |
| return new GridFinishedFuture<>(ret); |
| |
| init(); |
| |
| onePhaseCommit(onePhaseCommit); |
| |
| try { |
| GridFutureAdapter<GridCacheReturn> enlistFut = new GridFutureAdapter<>(); |
| |
| if (!updateLockFuture(null, enlistFut)) |
| return finishFuture(enlistFut, timedOut() ? timeoutException() : rollbackException(), false); |
| |
| Set<KeyCacheObject> skipped = null; |
| |
| try { |
| AffinityTopologyVersion topVer = topologyVersion(); |
| |
| GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); |
| |
| // Enlist locks into transaction. |
| for (int i = 0; i < entries.size(); i++) { |
| GridCacheEntryEx entry = entries.get(i); |
| |
| KeyCacheObject key = entry.key(); |
| |
| IgniteTxEntry txEntry = entry(entry.txKey()); |
| |
| // First time access. |
| if (txEntry == null) { |
| GridDhtCacheEntry cached; |
| |
| while (true) { |
| try { |
| cached = dhtCache.entryExx(key, topVer); |
| |
| cached.unswap(read); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Get removed entry: " + key); |
| } |
| } |
| |
| addActiveCache(dhtCache.context(), false); |
| |
| txEntry = addEntry(NOOP, |
| null, |
| null, |
| null, |
| cached, |
| null, |
| CU.empty0(), |
| false, |
| -1L, |
| -1L, |
| null, |
| skipStore, |
| keepBinary, |
| nearCache); |
| |
| if (read) |
| txEntry.ttl(accessTtl); |
| |
| txEntry.cached(cached); |
| |
| addReader(msgId, cached, txEntry, topVer); |
| } |
| else { |
| if (skipped == null) |
| skipped = new GridLeanSet<>(); |
| |
| skipped.add(key); |
| } |
| } |
| } |
| finally { |
| finishFuture(enlistFut, null, true); |
| } |
| |
| assert pessimistic(); |
| |
| Collection<KeyCacheObject> keys = F.viewReadOnly(entries, CU.entry2Key()); |
| |
| // Acquire locks only after having added operation to the write set. |
| // Otherwise, during rollback we will not know whether locks need |
| // to be rolled back. |
| // Loose all skipped and previously locked (we cannot reenter locks here). |
| final Collection<KeyCacheObject> passedKeys = skipped != null ? F.view(keys, F0.notIn(skipped)) : keys; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Lock keys: " + passedKeys); |
| |
| return obtainLockAsync(cacheCtx, |
| ret, |
| passedKeys, |
| read, |
| needRetVal, |
| createTtl, |
| accessTtl, |
| skipStore, |
| keepBinary); |
| } |
| catch (IgniteCheckedException e) { |
| setRollbackOnly(); |
| |
| return new GridFinishedFuture<>(e); |
| } |
| } |
| |
| /** |
| * @param cacheCtx Context. |
| * @param ret Return value. |
| * @param passedKeys Passed keys. |
| * @param read {@code True} if read. |
| * @param needRetVal Return value flag. |
| * @param createTtl TTL for create operation. |
| * @param accessTtl TTL for read operation. |
| * @param skipStore Skip store flag. |
| * @return Future for lock acquisition. |
| */ |
| private IgniteInternalFuture<GridCacheReturn> obtainLockAsync( |
| final GridCacheContext cacheCtx, |
| GridCacheReturn ret, |
| final Collection<KeyCacheObject> passedKeys, |
| final boolean read, |
| final boolean needRetVal, |
| final long createTtl, |
| final long accessTtl, |
| boolean skipStore, |
| boolean keepBinary) { |
| if (log.isDebugEnabled()) |
| log.debug("Before acquiring transaction lock on keys [keys=" + passedKeys + ']'); |
| |
| if (passedKeys.isEmpty()) |
| return new GridFinishedFuture<>(ret); |
| |
| GridDhtTransactionalCacheAdapter<?, ?> dhtCache = |
| cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); |
| |
| long timeout = remainingTime(); |
| |
| if (timeout == -1) |
| return new GridFinishedFuture<>(timeoutException()); |
| |
| if (isRollbackOnly()) |
| return new GridFinishedFuture<>(rollbackException()); |
| |
| IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, |
| timeout, |
| this, |
| isInvalidate(), |
| read, |
| needRetVal, |
| isolation, |
| createTtl, |
| accessTtl, |
| CU.empty0(), |
| skipStore, |
| keepBinary); |
| |
| return new GridEmbeddedFuture<>( |
| fut, |
| new PLC1<GridCacheReturn>(ret) { |
| @Override protected GridCacheReturn postLock(GridCacheReturn ret) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Acquired transaction lock on keys: " + passedKeys); |
| |
| postLockWrite(cacheCtx, |
| passedKeys, |
| ret, |
| /*remove*/false, |
| /*retval*/false, |
| /*read*/read, |
| accessTtl, |
| CU.empty0(), |
| /*computeInvoke*/false); |
| |
| return ret; |
| } |
| } |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Finishing dht local tx [tx=" + this + ", commit=" + commit + "]"); |
| |
| if (optimistic()) |
| state(PREPARED); |
| |
| if (commit) { |
| if (!state(COMMITTING)) { |
| TransactionState state = state(); |
| |
| if (state != COMMITTING && state != COMMITTED) |
| throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state() + |
| ", tx=" + this + ']'); |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Invalid transaction state for commit (another thread is committing): " + this); |
| |
| return false; |
| } |
| } |
| } |
| else { |
| if (!state(ROLLING_BACK)) { |
| if (log.isDebugEnabled()) |
| log.debug("Invalid transaction state for rollback [state=" + state() + ", tx=" + this + ']'); |
| |
| return false; |
| } |
| } |
| |
| IgniteCheckedException err = null; |
| |
| // Commit to DB first. This way if there is a failure, transaction |
| // won't be committed. |
| try { |
| if (commit && !isRollbackOnly()) |
| userCommit(); |
| else |
| userRollback(clearThreadMap); |
| } |
| catch (IgniteCheckedException e) { |
| err = e; |
| |
| commit = false; |
| |
| // If heuristic error. |
| if (!isRollbackOnly()) { |
| systemInvalidate(true); |
| |
| U.warn(log, "Set transaction invalidation flag to true due to error [tx=" + CU.txString(this) + |
| ", err=" + err + ']'); |
| } |
| } |
| |
| if (err != null) { |
| state(UNKNOWN); |
| |
| throw err; |
| } |
| else { |
| // Committed state will be set in finish future onDone callback. |
| if (commit) { |
| if (!onePhaseCommit()) { |
| if (!state(COMMITTED)) { |
| state(UNKNOWN); |
| |
| throw new IgniteCheckedException("Invalid transaction state for commit: " + this); |
| } |
| } |
| } |
| else { |
| if (!state(ROLLED_BACK)) { |
| state(UNKNOWN); |
| |
| throw new IgniteCheckedException("Invalid transaction state for rollback: " + this); |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Removes previously created prepare future from atomic reference. |
| * |
| * @param fut Expected future. |
| */ |
| protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut); |
| |
| /** |
| * @return {@code True} if transaction is finished on prepare step. |
| */ |
| public final boolean commitOnPrepare() { |
| return onePhaseCommit() && !near() && !nearOnOriginatingNode; |
| } |
| |
| /** |
| * @return Lock future. |
| */ |
| public IgniteInternalFuture<?> lockFuture() { |
| return lockFut; |
| } |
| |
| /** |
| * Atomically updates lock future. |
| * |
| * @param oldFut Old future. |
| * @param newFut New future. |
| * @return {@code true} If future was changed. |
| */ |
| public boolean updateLockFuture(IgniteInternalFuture<?> oldFut, IgniteInternalFuture<?> newFut) { |
| return LOCK_FUT_UPD.compareAndSet(this, oldFut, newFut); |
| } |
| |
| /** |
| * Clears lock future. |
| * |
| * @param cond Clear lock condition. |
| */ |
| public void clearLockFuture(@Nullable IgniteInternalFuture cond) { |
| while (true) { |
| IgniteInternalFuture f = lockFut; |
| |
| if (f == null |
| || f == ROLLBACK_FUT |
| || (cond != null && f != cond) |
| || updateLockFuture(f, null)) |
| return; |
| } |
| } |
| |
| /** |
| * @param f Future to finish. |
| * @param err Error. |
| * @param clearLockFut {@code True} if need to clear lock future. |
| * @return Finished future. |
| */ |
| public <T> GridFutureAdapter<T> finishFuture(GridFutureAdapter<T> f, Throwable err, boolean clearLockFut) { |
| if (clearLockFut) |
| clearLockFuture(null); |
| |
| f.onDone(err); |
| |
| return f; |
| } |
| |
| /** |
| * Prepare async rollback. |
| * |
| * @return Current lock future or null if it's safe to roll back. |
| */ |
| @Nullable public IgniteInternalFuture<?> tryRollbackAsync() { |
| while (true) { |
| final IgniteInternalFuture fut = lockFut; |
| |
| if (fut == ROLLBACK_FUT) |
| return null; |
| else if (updateLockFuture(fut, ROLLBACK_FUT)) |
| return fut; |
| } |
| } |
| |
| /** |
| * @param prepFut Prepare future. |
| * @return If transaction if finished on prepare step returns future which is completed after transaction finish. |
| */ |
| protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare( |
| final GridDhtTxPrepareFuture prepFut) { |
| if (commitOnPrepare()) { |
| return finishFuture().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridNearTxPrepareResponse>() { |
| @Override public GridNearTxPrepareResponse applyx(IgniteInternalFuture<IgniteInternalTx> finishFut) |
| throws IgniteCheckedException { |
| return prepFut.get(); |
| } |
| }); |
| } |
| |
| return prepFut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), |
| "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString()); |
| } |
| |
| /** |
| * Increments lock counter. |
| */ |
| public void incrementLockCounter() { |
| txCounters(true).incrementLockCounter(); |
| } |
| |
| /** |
| * @return Current value of lock counter. |
| */ |
| public int lockCounter() { |
| TxCounters txCntrs = txCounters(false); |
| |
| return txCntrs != null ? txCntrs.lockCounter() : 0; |
| } |
| } |