| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.near; |
| |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.cluster.ClusterTopologyException; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; |
| import org.apache.ignite.internal.processors.tracing.MTC; |
| import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; |
| import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiInClosure; |
| import org.apache.ignite.lang.IgniteReducer; |
| import org.jetbrains.annotations.Nullable; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; |
| import static org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; |
| import static org.apache.ignite.transactions.TransactionState.PREPARED; |
| import static org.apache.ignite.transactions.TransactionState.PREPARING; |
| |
| /** |
| * |
| */ |
| public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter { |
| /** */ |
| @GridToStringExclude |
| private ClientRemapFuture remapFut; |
| |
| /** */ |
| private int miniId; |
| |
| /** |
| * @param cctx Context. |
| * @param tx Transaction. |
| */ |
| public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, |
| GridNearTxLocal tx) { |
| super(cctx, tx); |
| |
| assert tx.optimistic() && tx.serializable() : tx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean ignoreFailure(Throwable err) { |
| return IgniteCheckedException.class.isAssignableFrom(err.getClass()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { |
| if (log.isDebugEnabled()) |
| log.debug("Transaction future received owner changed callback: " + entry); |
| |
| if (entry.context().isNear() && owner != null) { |
| IgniteTxEntry txEntry = tx.entry(entry.txKey()); |
| |
| if (txEntry != null) { |
| if (keyLockFut != null) |
| keyLockFut.onKeyLocked(entry.txKey()); |
| |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onNodeLeft(UUID nodeId) { |
| boolean found = false; |
| |
| for (IgniteInternalFuture<?> fut : futures()) { |
| if (isMini(fut)) { |
| MiniFuture f = (MiniFuture)fut; |
| |
| if (f.primary().id().equals(nodeId)) { |
| ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + |
| nodeId); |
| |
| e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); |
| |
| f.onNodeLeft(e); |
| |
| found = true; |
| } |
| } |
| } |
| |
| return found; |
| } |
| |
| /** |
| * @param m Failed mapping. |
| * @param e Error. |
| */ |
| private void onError(@Nullable GridDistributedTxMapping m, Throwable e) { |
| try (TraceSurroundings ignored = MTC.support(span)) { |
| if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { |
| if (tx.onePhaseCommit()) { |
| tx.markForBackupCheck(); |
| |
| onComplete(); |
| |
| return; |
| } |
| } |
| |
| if (e instanceof IgniteTxOptimisticCheckedException) { |
| if (m != null) |
| tx.removeMapping(m.primary().id()); |
| } |
| |
| prepareError(e); |
| } |
| } |
| |
| /** |
| * @param e Error. |
| */ |
| private void prepareError(Throwable e) { |
| ERR_UPD.compareAndSet(this, null, e); |
| |
| if (keyLockFut != null) |
| keyLockFut.onDone(e); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { |
| if (!isDone()) { |
| MiniFuture mini = miniFuture(res.miniId()); |
| |
| if (mini != null) |
| mini.onResult(res, true); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(IgniteInternalTx t, Throwable err) { |
| try (TraceSurroundings ignored = MTC.support(span)) { |
| if (isDone()) |
| return false; |
| |
| if (err != null) { |
| ERR_UPD.compareAndSet(this, null, err); |
| |
| if (keyLockFut != null) |
| keyLockFut.onDone(err); |
| } |
| |
| return onComplete(); |
| } |
| } |
| |
| /** |
| * Finds pending mini future by the given mini ID. |
| * |
| * @param miniId Mini ID to find. |
| * @return Mini future. |
| */ |
| private MiniFuture miniFuture(int miniId) { |
| // We iterate directly over the futs collection here to avoid copy. |
| compoundsReadLock(); |
| |
| try { |
| int size = futuresCountNoLock(); |
| |
| // Avoid iterator creation. |
| for (int i = 0; i < size; i++) { |
| IgniteInternalFuture fut = future(i); |
| |
| if (!isMini(fut)) |
| continue; |
| |
| MiniFuture mini = (MiniFuture)fut; |
| |
| if (mini.futureId() == miniId) { |
| if (!mini.isDone()) |
| return mini; |
| else |
| return null; |
| } |
| } |
| } |
| finally { |
| compoundsReadUnlock(); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @param f Future. |
| * @return {@code True} if mini-future. |
| */ |
| private boolean isMini(IgniteInternalFuture<?> f) { |
| return f.getClass().equals(MiniFuture.class); |
| } |
| |
| /** |
| * Completeness callback. |
| * |
| * @return {@code True} if future was finished by this call. |
| */ |
| private boolean onComplete() { |
| Throwable err0 = err; |
| |
| if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == null) && |
| (err0 == null || tx.needCheckBackup())) |
| tx.state(PREPARED); |
| |
| if (super.onDone(tx, err0)) { |
| if (err0 != null) |
| tx.setRollbackOnly(); |
| |
| // Don't forget to clean up. |
| cctx.mvcc().removeVersionedFuture(this); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Initializes future. |
| * |
| * @param remap Remap flag. |
| */ |
| @Override protected void prepare0(boolean remap, boolean topLocked) { |
| boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); |
| |
| if (!txStateCheck) { |
| if (tx.isRollbackOnly() || tx.setRollbackOnly()) { |
| if (tx.timedOut()) |
| onDone(null, tx.timeoutException()); |
| else |
| onDone(null, tx.rollbackException()); |
| } |
| else |
| onDone(null, new IgniteCheckedException("Invalid transaction state for " + |
| "prepare [state=" + tx.state() + ", tx=" + this + ']')); |
| |
| return; |
| } |
| |
| boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot()); |
| |
| try { |
| prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); |
| |
| markInitialized(); |
| } |
| finally { |
| if (set) |
| cctx.tm().setTxTopologyHint(null); |
| } |
| } |
| |
| /** |
| * @param reads Read entries. |
| * @param writes Write entries. |
| * @param remap Remap flag. |
| * @param topLocked Topology locked flag. |
| */ |
| @SuppressWarnings("unchecked") |
| private void prepare( |
| Iterable<IgniteTxEntry> reads, |
| Iterable<IgniteTxEntry> writes, |
| boolean remap, |
| boolean topLocked |
| ) { |
| AffinityTopologyVersion topVer = tx.topologyVersion(); |
| |
| assert topVer.topologyVersion() > 0; |
| |
| GridDhtTxMapping txMapping = new GridDhtTxMapping(); |
| |
| Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>(); |
| |
| boolean hasNearCache = false; |
| |
| for (IgniteTxEntry write : writes) { |
| map(write, topVer, mappings, txMapping, remap, topLocked); |
| |
| if (write.context().isNear()) |
| hasNearCache = true; |
| } |
| |
| for (IgniteTxEntry read : reads) { |
| map(read, topVer, mappings, txMapping, remap, topLocked); |
| |
| if (read.context().isNear()) |
| hasNearCache = true; |
| } |
| |
| if (keyLockFut != null) |
| keyLockFut.onAllKeysAdded(); |
| |
| if (isDone()) { |
| if (log.isDebugEnabled()) |
| log.debug("Abandoning (re)map because future is done: " + this); |
| |
| return; |
| } |
| |
| tx.addEntryMapping(mappings.values()); |
| |
| cctx.mvcc().recheckPendingLocks(); |
| |
| tx.transactionNodes(txMapping.transactionNodes()); |
| |
| if (!hasNearCache) |
| checkOnePhase(txMapping); |
| |
| MiniFuture locNearEntriesFut = null; |
| |
| // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}. |
| for (GridDistributedTxMapping m : mappings.values()) { |
| assert !m.empty(); |
| |
| MiniFuture fut = new MiniFuture(this, m, ++miniId); |
| |
| add((IgniteInternalFuture)fut); |
| |
| if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) { |
| assert locNearEntriesFut == null; |
| |
| locNearEntriesFut = fut; |
| |
| add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId)); |
| } |
| } |
| |
| Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); |
| |
| Iterator<IgniteInternalFuture<?>> it = futs.iterator(); |
| |
| while (it.hasNext()) { |
| IgniteInternalFuture<?> fut0 = it.next(); |
| |
| if (skipFuture(remap, fut0)) |
| continue; |
| |
| MiniFuture fut = (MiniFuture)fut0; |
| |
| IgniteCheckedException err = prepare(fut, txMapping.transactionNodes(), locNearEntriesFut); |
| |
| if (err != null) { |
| while (it.hasNext()) { |
| fut0 = it.next(); |
| |
| if (skipFuture(remap, fut0)) |
| continue; |
| |
| fut = (MiniFuture)fut0; |
| |
| tx.removeMapping(fut.mapping().primary().id()); |
| |
| fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err)); |
| } |
| |
| break; |
| } |
| } |
| |
| markInitialized(); |
| } |
| |
| /** |
| * @param remap Remap flag. |
| * @param fut Future. |
| * @return {@code True} if skip future during remap. |
| */ |
| private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) { |
| return !(isMini(fut)) || (remap && (((MiniFuture)fut).rcvRes == 1)); |
| } |
| |
| /** |
| * @param fut Mini future. |
| * @param txNodes Tx nodes. |
| * @param locNearEntriesFut Local future for near cache entries prepare. |
| * @return Prepare error if any. |
| */ |
| @Nullable private IgniteCheckedException prepare(final MiniFuture fut, |
| Map<UUID, Collection<UUID>> txNodes, |
| @Nullable MiniFuture locNearEntriesFut) { |
| GridDistributedTxMapping m = fut.mapping(); |
| |
| final ClusterNode primary = m.primary(); |
| |
| long timeout = tx.remainingTime(); |
| |
| if (timeout == -1) { |
| IgniteCheckedException err = tx.timeoutException(); |
| |
| fut.onResult(err); |
| |
| return err; |
| } |
| |
| // Must lock near entries separately. |
| if (m.hasNearCacheEntries()) { |
| try { |
| cctx.tm().prepareTx(tx, m.nearCacheEntries()); |
| } |
| catch (IgniteCheckedException e) { |
| fut.onResult(e); |
| |
| return e; |
| } |
| } |
| |
| if (primary.isLocal()) { |
| if (locNearEntriesFut != null) { |
| boolean nearEntries = fut == locNearEntriesFut; |
| |
| GridNearTxPrepareRequest req = createRequest(txNodes, |
| fut, |
| timeout, |
| nearEntries ? m.nearEntriesReads() : m.colocatedEntriesReads(), |
| nearEntries ? m.nearEntriesWrites() : m.colocatedEntriesWrites()); |
| |
| prepareLocal(req, fut, nearEntries); |
| } |
| else { |
| GridNearTxPrepareRequest req = createRequest(txNodes, |
| fut, |
| timeout, |
| m.reads(), |
| m.writes()); |
| |
| prepareLocal(req, fut, m.hasNearCacheEntries()); |
| } |
| } |
| else { |
| try { |
| GridNearTxPrepareRequest req = createRequest(txNodes, |
| fut, |
| timeout, |
| m.reads(), |
| m.writes()); |
| |
| cctx.io().send(primary, req, tx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); |
| |
| fut.onNodeLeft(e); |
| |
| return e; |
| } |
| catch (IgniteCheckedException e) { |
| fut.onResult(e); |
| |
| return e; |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @param txNodes Tx nodes. |
| * @param fut Future. |
| * @param timeout Timeout. |
| * @param reads Read entries. |
| * @param writes Write entries. |
| * @return Request. |
| */ |
| private GridNearTxPrepareRequest createRequest( |
| Map<UUID, Collection<UUID>> txNodes, |
| MiniFuture fut, |
| long timeout, |
| Collection<IgniteTxEntry> reads, |
| Collection<IgniteTxEntry> writes) { |
| GridDistributedTxMapping m = fut.mapping(); |
| |
| GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( |
| futId, |
| tx.topologyVersion(), |
| tx, |
| timeout, |
| reads, |
| writes, |
| m.hasNearCacheEntries(), |
| txNodes, |
| m.last(), |
| tx.onePhaseCommit(), |
| tx.needReturnValue() && tx.implicit(), |
| tx.implicitSingle(), |
| m.explicitLock(), |
| tx.taskNameHash(), |
| m.clientFirst(), |
| txNodes.size() == 1, |
| tx.activeCachesDeploymentEnabled(), |
| tx.txState().recovery()); |
| |
| for (IgniteTxEntry txEntry : writes) { |
| if (txEntry.op() == TRANSFORM) |
| req.addDhtVersion(txEntry.txKey(), null); |
| } |
| |
| req.miniId(fut.futureId()); |
| |
| return req; |
| } |
| |
| /** |
| * @param req Request. |
| * @param fut Future. |
| * @param nearEntries {@code True} if prepare near cache entries. |
| */ |
| private void prepareLocal(GridNearTxPrepareRequest req, |
| final MiniFuture fut, |
| final boolean nearEntries) { |
| IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ? |
| cctx.tm().txHandler().prepareNearTxLocal(tx, req) : |
| cctx.tm().txHandler().prepareColocatedTx(tx, req); |
| |
| prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { |
| @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { |
| try { |
| fut.onResult(prepFut.get(), nearEntries); |
| } |
| catch (IgniteCheckedException e) { |
| fut.onResult(e); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @param entry Transaction entry. |
| * @param topVer Topology version. |
| * @param curMapping Current mapping. |
| * @param txMapping Mapping. |
| * @param remap Remap flag. |
| * @param topLocked Topology locked flag. |
| */ |
| private void map( |
| IgniteTxEntry entry, |
| AffinityTopologyVersion topVer, |
| Map<UUID, GridDistributedTxMapping> curMapping, |
| GridDhtTxMapping txMapping, |
| boolean remap, |
| boolean topLocked |
| ) { |
| GridCacheContext cacheCtx = entry.context(); |
| |
| List<ClusterNode> nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer); |
| |
| if (F.isEmpty(nodes)) { |
| onDone(new ClusterTopologyServerNotFoundException("Failed to map keys to nodes " + |
| "(partition is not mapped to any node) [key=" + entry.key() + |
| ", partition=" + cacheCtx.affinity().partition(entry.key()) + ", topVer=" + topVer + ']')); |
| |
| return; |
| } |
| |
| txMapping.addMapping(nodes); |
| |
| ClusterNode primary = F.first(nodes); |
| |
| assert primary != null; |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Mapped key to primary node [key=" + entry.key() + |
| ", part=" + cacheCtx.affinity().partition(entry.key()) + |
| ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']'); |
| } |
| |
| // Must re-initialize cached entry while holding topology lock. |
| if (cacheCtx.isNear()) |
| entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); |
| else |
| entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true)); |
| |
| if (!remap && cacheCtx.isNear()) { |
| if (entry.explicitVersion() == null) { |
| if (keyLockFut == null) { |
| keyLockFut = new KeyLockFuture(); |
| |
| add((IgniteInternalFuture)keyLockFut); |
| } |
| |
| keyLockFut.addLockKey(entry.txKey()); |
| } |
| } |
| |
| GridDistributedTxMapping cur = curMapping.get(primary.id()); |
| |
| if (cur == null) { |
| cur = new GridDistributedTxMapping(primary); |
| |
| curMapping.put(primary.id(), cur); |
| |
| cur.clientFirst(!topLocked && cctx.kernalContext().clientNode()); |
| |
| cur.last(true); |
| } |
| |
| if (primary.isLocal()) { |
| if (cacheCtx.isNear()) |
| tx.nearLocallyMapped(true); |
| else if (cacheCtx.isColocated()) |
| tx.colocatedLocallyMapped(true); |
| } |
| |
| cur.add(entry); |
| |
| if (entry.explicitVersion() != null) { |
| tx.markExplicit(primary.id()); |
| |
| cur.markExplicitLock(); |
| } |
| |
| entry.nodeId(primary.id()); |
| |
| if (cacheCtx.isNear()) { |
| while (true) { |
| try { |
| GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached(); |
| |
| cached.dhtNodeId(tx.xidVersion(), primary.id()); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| entry.cached(cacheCtx.near().entryEx(entry.key(), topVer)); |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| Collection<String> futs = F.viewReadOnly(futures(), |
| new C1<IgniteInternalFuture<?>, String>() { |
| @Override public String apply(IgniteInternalFuture<?> f) { |
| if (isMini(f)) { |
| return "[node=" + ((MiniFuture)f).primary().id() + |
| ", loc=" + ((MiniFuture)f).primary().isLocal() + |
| ", done=" + f.isDone() + |
| ", err=" + f.error() + "]"; |
| } |
| else |
| return f.toString(); |
| } |
| }); |
| |
| return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this, |
| "innerFuts", futs, |
| "remap", remapFut != null, |
| "tx", tx, |
| "super", super.toString()); |
| } |
| |
| /** |
| * Client remap future. |
| */ |
| private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * Constructor. |
| */ |
| ClientRemapFuture() { |
| super(new ClientRemapFutureReducer()); |
| } |
| } |
| |
| /** |
| * Client remap future reducer. |
| */ |
| private static class ClientRemapFutureReducer implements IgniteReducer<GridNearTxPrepareResponse, Boolean> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Remap flag. */ |
| private boolean remap = true; |
| |
| /** {@inheritDoc} */ |
| @Override public boolean collect(@Nullable GridNearTxPrepareResponse res) { |
| assert res != null; |
| |
| if (res.clientRemapVersion() == null) |
| remap = false; |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Boolean reduce() { |
| return remap; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { |
| /** Receive result flag updater. */ |
| private static final AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD = |
| AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes"); |
| |
| /** */ |
| private final int futId; |
| |
| /** Parent future. */ |
| private final GridNearOptimisticSerializableTxPrepareFuture parent; |
| |
| /** Keys. */ |
| @GridToStringInclude |
| private GridDistributedTxMapping m; |
| |
| /** Flag to signal some result being processed. */ |
| private volatile int rcvRes; |
| |
| /** |
| * @param parent Parent future. |
| * @param m Mapping. |
| * @param futId Mini future ID. |
| */ |
| MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m, int futId) { |
| this.parent = parent; |
| this.m = m; |
| this.futId = futId; |
| } |
| |
| /** |
| * @return Future ID. |
| */ |
| int futureId() { |
| return futId; |
| } |
| |
| /** |
| * @return Primary node. |
| */ |
| public ClusterNode primary() { |
| return m.primary(); |
| } |
| |
| /** |
| * @return Keys. |
| */ |
| public GridDistributedTxMapping mapping() { |
| return m; |
| } |
| |
| /** |
| * @param e Error. |
| */ |
| void onResult(Throwable e) { |
| if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { |
| parent.onError(m, e); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); |
| |
| // Fail. |
| onDone(e); |
| } |
| else |
| U.warn(log, "Received error after another result has been processed [fut=" + |
| parent + ", mini=" + this + ']', e); |
| } |
| |
| /** |
| * @param e Node failure. |
| */ |
| void onNodeLeft(ClusterTopologyCheckedException e) { |
| if (isDone()) |
| return; |
| |
| if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { |
| if (log.isDebugEnabled()) |
| log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); |
| |
| parent.onError(null, e); |
| |
| onDone(e); |
| } |
| } |
| |
| /** |
| * @param res Result callback. |
| * @param updateMapping Update mapping flag. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) { |
| if (isDone()) |
| return; |
| |
| if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { |
| if (res.error() != null) { |
| // Fail the whole compound future. |
| parent.onError(m, res.error()); |
| |
| onDone(res.error()); |
| } |
| else { |
| if (res.clientRemapVersion() != null) { |
| assert parent.cctx.kernalContext().clientNode(); |
| assert m.clientFirst(); |
| |
| parent.tx.removeMapping(m.primary().id()); |
| |
| ClientRemapFuture remapFut0 = null; |
| |
| synchronized (parent) { |
| if (parent.remapFut == null) { |
| parent.remapFut = new ClientRemapFuture(); |
| |
| remapFut0 = parent.remapFut; |
| } |
| } |
| |
| if (remapFut0 != null) { |
| Collection<IgniteInternalFuture<?>> futs = (Collection)parent.futures(); |
| |
| for (IgniteInternalFuture<?> fut : futs) { |
| if (parent.isMini(fut) && fut != this) |
| remapFut0.add((MiniFuture)fut); |
| } |
| |
| remapFut0.markInitialized(); |
| |
| remapFut0.listen(new CI1<IgniteInternalFuture<Boolean>>() { |
| @Override public void apply(IgniteInternalFuture<Boolean> remapFut0) { |
| try { |
| IgniteInternalFuture<?> affFut = |
| parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); |
| |
| if (affFut == null) |
| affFut = new GridFinishedFuture<Object>(); |
| |
| if (parent.remapFut.get()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Will remap client tx [" + |
| "fut=" + parent + |
| ", topVer=" + res.topologyVersion() + ']'); |
| } |
| |
| synchronized (parent) { |
| assert remapFut0 == parent.remapFut; |
| |
| parent.remapFut = null; |
| } |
| |
| parent.cctx.time().waitAsync( |
| affFut, |
| parent.tx.remainingTime(), |
| new IgniteBiInClosure<IgniteCheckedException, Boolean>() { |
| @Override public void apply(IgniteCheckedException e, Boolean timedOut) { |
| if (parent.errorOrTimeoutOnTopologyVersion(e, timedOut)) |
| return; |
| |
| remap(res); |
| } |
| } |
| ); |
| } |
| else { |
| ClusterTopologyCheckedException err0 = new ClusterTopologyCheckedException( |
| "Cluster topology changed while client transaction is preparing."); |
| |
| err0.retryReadyFuture(affFut); |
| |
| parent.prepareError(err0); |
| |
| onDone(err0); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("Prepare failed, will not remap tx: " + |
| parent); |
| } |
| |
| parent.prepareError(e); |
| |
| onDone(e); |
| } |
| } |
| }); |
| } |
| else |
| onDone(res); |
| } |
| else { |
| parent.onPrepareResponse(m, res, updateMapping); |
| |
| // Finish this mini future (need result only on client node). |
| onDone(parent.cctx.kernalContext().clientNode() ? res : null); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param res Response. |
| */ |
| private void remap(final GridNearTxPrepareResponse res) { |
| if (parent.tx.isRollbackOnly()) { |
| onDone(new IgniteTxRollbackCheckedException( |
| "Failed to prepare the transaction, due to the transaction is marked as rolled back " + |
| "[tx=" + CU.txString(parent.tx) + ']')); |
| |
| return; |
| } |
| |
| parent.prepareOnTopology(true, new Runnable() { |
| @Override public void run() { |
| onDone(res); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); |
| } |
| } |
| } |