blob: 69fb67a8db009c442801e32b777e5ece6e802dfc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
/**
*
*/
public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
/** */
@GridToStringExclude
private ClientRemapFuture remapFut;
/** */
private int miniId;
/**
* @param cctx Context.
* @param tx Transaction.
*/
public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx,
GridNearTxLocal tx) {
super(cctx, tx);
assert tx.optimistic() && tx.serializable() : tx;
}
/** {@inheritDoc} */
@Override protected boolean ignoreFailure(Throwable err) {
return IgniteCheckedException.class.isAssignableFrom(err.getClass());
}
/** {@inheritDoc} */
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
if (log.isDebugEnabled())
log.debug("Transaction future received owner changed callback: " + entry);
if (entry.context().isNear() && owner != null) {
IgniteTxEntry txEntry = tx.entry(entry.txKey());
if (txEntry != null) {
if (keyLockFut != null)
keyLockFut.onKeyLocked(entry.txKey());
return true;
}
}
return false;
}
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
for (IgniteInternalFuture<?> fut : futures()) {
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
if (f.primary().id().equals(nodeId)) {
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
nodeId);
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
f.onNodeLeft(e);
found = true;
}
}
}
return found;
}
/**
* @param m Failed mapping.
* @param e Error.
*/
private void onError(@Nullable GridDistributedTxMapping m, Throwable e) {
try (TraceSurroundings ignored = MTC.support(span)) {
if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
if (tx.onePhaseCommit()) {
tx.markForBackupCheck();
onComplete();
return;
}
}
if (e instanceof IgniteTxOptimisticCheckedException) {
if (m != null)
tx.removeMapping(m.primary().id());
}
prepareError(e);
}
}
/**
* @param e Error.
*/
private void prepareError(Throwable e) {
ERR_UPD.compareAndSet(this, null, e);
if (keyLockFut != null)
keyLockFut.onDone(e);
}
/** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
MiniFuture mini = miniFuture(res.miniId());
if (mini != null)
mini.onResult(res, true);
}
}
/** {@inheritDoc} */
@Override public boolean onDone(IgniteInternalTx t, Throwable err) {
try (TraceSurroundings ignored = MTC.support(span)) {
if (isDone())
return false;
if (err != null) {
ERR_UPD.compareAndSet(this, null, err);
if (keyLockFut != null)
keyLockFut.onDone(err);
}
return onComplete();
}
}
/**
* Finds pending mini future by the given mini ID.
*
* @param miniId Mini ID to find.
* @return Mini future.
*/
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
compoundsReadLock();
try {
int size = futuresCountNoLock();
// Avoid iterator creation.
for (int i = 0; i < size; i++) {
IgniteInternalFuture fut = future(i);
if (!isMini(fut))
continue;
MiniFuture mini = (MiniFuture)fut;
if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
return null;
}
}
}
finally {
compoundsReadUnlock();
}
return null;
}
/**
* @param f Future.
* @return {@code True} if mini-future.
*/
private boolean isMini(IgniteInternalFuture<?> f) {
return f.getClass().equals(MiniFuture.class);
}
/**
* Completeness callback.
*
* @return {@code True} if future was finished by this call.
*/
private boolean onComplete() {
Throwable err0 = err;
if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == null) &&
(err0 == null || tx.needCheckBackup()))
tx.state(PREPARED);
if (super.onDone(tx, err0)) {
if (err0 != null)
tx.setRollbackOnly();
// Don't forget to clean up.
cctx.mvcc().removeVersionedFuture(this);
return true;
}
return false;
}
/**
* Initializes future.
*
* @param remap Remap flag.
*/
@Override protected void prepare0(boolean remap, boolean topLocked) {
boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
if (!txStateCheck) {
if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
if (tx.timedOut())
onDone(null, tx.timeoutException());
else
onDone(null, tx.rollbackException());
}
else
onDone(null, new IgniteCheckedException("Invalid transaction state for " +
"prepare [state=" + tx.state() + ", tx=" + this + ']'));
return;
}
boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
try {
prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
markInitialized();
}
finally {
if (set)
cctx.tm().setTxTopologyHint(null);
}
}
/**
* @param reads Read entries.
* @param writes Write entries.
* @param remap Remap flag.
* @param topLocked Topology locked flag.
*/
@SuppressWarnings("unchecked")
private void prepare(
Iterable<IgniteTxEntry> reads,
Iterable<IgniteTxEntry> writes,
boolean remap,
boolean topLocked
) {
AffinityTopologyVersion topVer = tx.topologyVersion();
assert topVer.topologyVersion() > 0;
GridDhtTxMapping txMapping = new GridDhtTxMapping();
Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
boolean hasNearCache = false;
for (IgniteTxEntry write : writes) {
map(write, topVer, mappings, txMapping, remap, topLocked);
if (write.context().isNear())
hasNearCache = true;
}
for (IgniteTxEntry read : reads) {
map(read, topVer, mappings, txMapping, remap, topLocked);
if (read.context().isNear())
hasNearCache = true;
}
if (keyLockFut != null)
keyLockFut.onAllKeysAdded();
if (isDone()) {
if (log.isDebugEnabled())
log.debug("Abandoning (re)map because future is done: " + this);
return;
}
tx.addEntryMapping(mappings.values());
cctx.mvcc().recheckPendingLocks();
tx.transactionNodes(txMapping.transactionNodes());
if (!hasNearCache)
checkOnePhase(txMapping);
MiniFuture locNearEntriesFut = null;
// Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}.
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
MiniFuture fut = new MiniFuture(this, m, ++miniId);
add((IgniteInternalFuture)fut);
if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
assert locNearEntriesFut == null;
locNearEntriesFut = fut;
add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId));
}
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
Iterator<IgniteInternalFuture<?>> it = futs.iterator();
while (it.hasNext()) {
IgniteInternalFuture<?> fut0 = it.next();
if (skipFuture(remap, fut0))
continue;
MiniFuture fut = (MiniFuture)fut0;
IgniteCheckedException err = prepare(fut, txMapping.transactionNodes(), locNearEntriesFut);
if (err != null) {
while (it.hasNext()) {
fut0 = it.next();
if (skipFuture(remap, fut0))
continue;
fut = (MiniFuture)fut0;
tx.removeMapping(fut.mapping().primary().id());
fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err));
}
break;
}
}
markInitialized();
}
/**
* @param remap Remap flag.
* @param fut Future.
* @return {@code True} if skip future during remap.
*/
private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) {
return !(isMini(fut)) || (remap && (((MiniFuture)fut).rcvRes == 1));
}
/**
* @param fut Mini future.
* @param txNodes Tx nodes.
* @param locNearEntriesFut Local future for near cache entries prepare.
* @return Prepare error if any.
*/
@Nullable private IgniteCheckedException prepare(final MiniFuture fut,
Map<UUID, Collection<UUID>> txNodes,
@Nullable MiniFuture locNearEntriesFut) {
GridDistributedTxMapping m = fut.mapping();
final ClusterNode primary = m.primary();
long timeout = tx.remainingTime();
if (timeout == -1) {
IgniteCheckedException err = tx.timeoutException();
fut.onResult(err);
return err;
}
// Must lock near entries separately.
if (m.hasNearCacheEntries()) {
try {
cctx.tm().prepareTx(tx, m.nearCacheEntries());
}
catch (IgniteCheckedException e) {
fut.onResult(e);
return e;
}
}
if (primary.isLocal()) {
if (locNearEntriesFut != null) {
boolean nearEntries = fut == locNearEntriesFut;
GridNearTxPrepareRequest req = createRequest(txNodes,
fut,
timeout,
nearEntries ? m.nearEntriesReads() : m.colocatedEntriesReads(),
nearEntries ? m.nearEntriesWrites() : m.colocatedEntriesWrites());
prepareLocal(req, fut, nearEntries);
}
else {
GridNearTxPrepareRequest req = createRequest(txNodes,
fut,
timeout,
m.reads(),
m.writes());
prepareLocal(req, fut, m.hasNearCacheEntries());
}
}
else {
try {
GridNearTxPrepareRequest req = createRequest(txNodes,
fut,
timeout,
m.reads(),
m.writes());
cctx.io().send(primary, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException e) {
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
fut.onNodeLeft(e);
return e;
}
catch (IgniteCheckedException e) {
fut.onResult(e);
return e;
}
}
return null;
}
/**
* @param txNodes Tx nodes.
* @param fut Future.
* @param timeout Timeout.
* @param reads Read entries.
* @param writes Write entries.
* @return Request.
*/
private GridNearTxPrepareRequest createRequest(
Map<UUID, Collection<UUID>> txNodes,
MiniFuture fut,
long timeout,
Collection<IgniteTxEntry> reads,
Collection<IgniteTxEntry> writes) {
GridDistributedTxMapping m = fut.mapping();
GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
futId,
tx.topologyVersion(),
tx,
timeout,
reads,
writes,
m.hasNearCacheEntries(),
txNodes,
m.last(),
tx.onePhaseCommit(),
tx.needReturnValue() && tx.implicit(),
tx.implicitSingle(),
m.explicitLock(),
tx.taskNameHash(),
m.clientFirst(),
txNodes.size() == 1,
tx.activeCachesDeploymentEnabled(),
tx.txState().recovery());
for (IgniteTxEntry txEntry : writes) {
if (txEntry.op() == TRANSFORM)
req.addDhtVersion(txEntry.txKey(), null);
}
req.miniId(fut.futureId());
return req;
}
/**
* @param req Request.
* @param fut Future.
* @param nearEntries {@code True} if prepare near cache entries.
*/
private void prepareLocal(GridNearTxPrepareRequest req,
final MiniFuture fut,
final boolean nearEntries) {
IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ?
cctx.tm().txHandler().prepareNearTxLocal(tx, req) :
cctx.tm().txHandler().prepareColocatedTx(tx, req);
prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
@Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
try {
fut.onResult(prepFut.get(), nearEntries);
}
catch (IgniteCheckedException e) {
fut.onResult(e);
}
}
});
}
/**
* @param entry Transaction entry.
* @param topVer Topology version.
* @param curMapping Current mapping.
* @param txMapping Mapping.
* @param remap Remap flag.
* @param topLocked Topology locked flag.
*/
private void map(
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
Map<UUID, GridDistributedTxMapping> curMapping,
GridDhtTxMapping txMapping,
boolean remap,
boolean topLocked
) {
GridCacheContext cacheCtx = entry.context();
List<ClusterNode> nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
if (F.isEmpty(nodes)) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys to nodes " +
"(partition is not mapped to any node) [key=" + entry.key() +
", partition=" + cacheCtx.affinity().partition(entry.key()) + ", topVer=" + topVer + ']'));
return;
}
txMapping.addMapping(nodes);
ClusterNode primary = F.first(nodes);
assert primary != null;
if (log.isDebugEnabled()) {
log.debug("Mapped key to primary node [key=" + entry.key() +
", part=" + cacheCtx.affinity().partition(entry.key()) +
", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
}
// Must re-initialize cached entry while holding topology lock.
if (cacheCtx.isNear())
entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
else
entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
if (!remap && cacheCtx.isNear()) {
if (entry.explicitVersion() == null) {
if (keyLockFut == null) {
keyLockFut = new KeyLockFuture();
add((IgniteInternalFuture)keyLockFut);
}
keyLockFut.addLockKey(entry.txKey());
}
}
GridDistributedTxMapping cur = curMapping.get(primary.id());
if (cur == null) {
cur = new GridDistributedTxMapping(primary);
curMapping.put(primary.id(), cur);
cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
cur.last(true);
}
if (primary.isLocal()) {
if (cacheCtx.isNear())
tx.nearLocallyMapped(true);
else if (cacheCtx.isColocated())
tx.colocatedLocallyMapped(true);
}
cur.add(entry);
if (entry.explicitVersion() != null) {
tx.markExplicit(primary.id());
cur.markExplicitLock();
}
entry.nodeId(primary.id());
if (cacheCtx.isNear()) {
while (true) {
try {
GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
cached.dhtNodeId(tx.xidVersion(), primary.id());
break;
}
catch (GridCacheEntryRemovedException ignore) {
entry.cached(cacheCtx.near().entryEx(entry.key(), topVer));
}
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
Collection<String> futs = F.viewReadOnly(futures(),
new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
if (isMini(f)) {
return "[node=" + ((MiniFuture)f).primary().id() +
", loc=" + ((MiniFuture)f).primary().isLocal() +
", done=" + f.isDone() +
", err=" + f.error() + "]";
}
else
return f.toString();
}
});
return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this,
"innerFuts", futs,
"remap", remapFut != null,
"tx", tx,
"super", super.toString());
}
/**
* Client remap future.
*/
private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
/** */
private static final long serialVersionUID = 0L;
/**
* Constructor.
*/
ClientRemapFuture() {
super(new ClientRemapFutureReducer());
}
}
/**
* Client remap future reducer.
*/
private static class ClientRemapFutureReducer implements IgniteReducer<GridNearTxPrepareResponse, Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** Remap flag. */
private boolean remap = true;
/** {@inheritDoc} */
@Override public boolean collect(@Nullable GridNearTxPrepareResponse res) {
assert res != null;
if (res.clientRemapVersion() == null)
remap = false;
return true;
}
/** {@inheritDoc} */
@Override public Boolean reduce() {
return remap;
}
}
/**
*
*/
private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
/** Receive result flag updater. */
private static final AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
/** */
private final int futId;
/** Parent future. */
private final GridNearOptimisticSerializableTxPrepareFuture parent;
/** Keys. */
@GridToStringInclude
private GridDistributedTxMapping m;
/** Flag to signal some result being processed. */
private volatile int rcvRes;
/**
* @param parent Parent future.
* @param m Mapping.
* @param futId Mini future ID.
*/
MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m, int futId) {
this.parent = parent;
this.m = m;
this.futId = futId;
}
/**
* @return Future ID.
*/
int futureId() {
return futId;
}
/**
* @return Primary node.
*/
public ClusterNode primary() {
return m.primary();
}
/**
* @return Keys.
*/
public GridDistributedTxMapping mapping() {
return m;
}
/**
* @param e Error.
*/
void onResult(Throwable e) {
if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
parent.onError(m, e);
if (log.isDebugEnabled())
log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
// Fail.
onDone(e);
}
else
U.warn(log, "Received error after another result has been processed [fut=" +
parent + ", mini=" + this + ']', e);
}
/**
* @param e Node failure.
*/
void onNodeLeft(ClusterTopologyCheckedException e) {
if (isDone())
return;
if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
parent.onError(null, e);
onDone(e);
}
}
/**
* @param res Result callback.
* @param updateMapping Update mapping flag.
*/
@SuppressWarnings({"unchecked"})
void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) {
if (isDone())
return;
if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
if (res.error() != null) {
// Fail the whole compound future.
parent.onError(m, res.error());
onDone(res.error());
}
else {
if (res.clientRemapVersion() != null) {
assert parent.cctx.kernalContext().clientNode();
assert m.clientFirst();
parent.tx.removeMapping(m.primary().id());
ClientRemapFuture remapFut0 = null;
synchronized (parent) {
if (parent.remapFut == null) {
parent.remapFut = new ClientRemapFuture();
remapFut0 = parent.remapFut;
}
}
if (remapFut0 != null) {
Collection<IgniteInternalFuture<?>> futs = (Collection)parent.futures();
for (IgniteInternalFuture<?> fut : futs) {
if (parent.isMini(fut) && fut != this)
remapFut0.add((MiniFuture)fut);
}
remapFut0.markInitialized();
remapFut0.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> remapFut0) {
try {
IgniteInternalFuture<?> affFut =
parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
if (affFut == null)
affFut = new GridFinishedFuture<Object>();
if (parent.remapFut.get()) {
if (log.isDebugEnabled()) {
log.debug("Will remap client tx [" +
"fut=" + parent +
", topVer=" + res.topologyVersion() + ']');
}
synchronized (parent) {
assert remapFut0 == parent.remapFut;
parent.remapFut = null;
}
parent.cctx.time().waitAsync(
affFut,
parent.tx.remainingTime(),
new IgniteBiInClosure<IgniteCheckedException, Boolean>() {
@Override public void apply(IgniteCheckedException e, Boolean timedOut) {
if (parent.errorOrTimeoutOnTopologyVersion(e, timedOut))
return;
remap(res);
}
}
);
}
else {
ClusterTopologyCheckedException err0 = new ClusterTopologyCheckedException(
"Cluster topology changed while client transaction is preparing.");
err0.retryReadyFuture(affFut);
parent.prepareError(err0);
onDone(err0);
}
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled()) {
log.debug("Prepare failed, will not remap tx: " +
parent);
}
parent.prepareError(e);
onDone(e);
}
}
});
}
else
onDone(res);
}
else {
parent.onPrepareResponse(m, res, updateMapping);
// Finish this mini future (need result only on client node).
onDone(parent.cctx.kernalContext().clientNode() ? res : null);
}
}
}
}
/**
* @param res Response.
*/
private void remap(final GridNearTxPrepareResponse res) {
if (parent.tx.isRollbackOnly()) {
onDone(new IgniteTxRollbackCheckedException(
"Failed to prepare the transaction, due to the transaction is marked as rolled back " +
"[tx=" + CU.txString(parent.tx) + ']'));
return;
}
parent.prepareOnTopology(true, new Runnable() {
@Override public void run() {
onDone(res);
}
});
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
}