package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
* Common code for tx prepare in optimistic and pessimistic modes.
public abstract class GridNearTxPrepareFutureAdapter extends
GridCacheCompoundFuture<Object, IgniteInternalTx> implements GridCacheVersionedFuture<IgniteInternalTx> {
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Error updater. */
protected static final AtomicReferenceFieldUpdater<GridNearTxPrepareFutureAdapter, Throwable> ERR_UPD =
AtomicReferenceFieldUpdater.newUpdater(GridNearTxPrepareFutureAdapter.class, Throwable.class, "err");
/** */
private static final IgniteReducer<Object, IgniteInternalTx> REDUCER =
new IgniteReducer<Object, IgniteInternalTx>() {
@Override public boolean collect(Object e) {
return true;
@Override public IgniteInternalTx reduce() {
// Nothing to aggregate.
return null;
/** Logger. */
protected static IgniteLogger log;
/** Logger. */
protected static IgniteLogger msgLog;
/** Context. */
protected GridCacheSharedContext<?, ?> cctx;
/** Future ID. */
protected IgniteUuid futId;
/** Transaction. */
protected GridNearTxLocal tx;
/** Error. */
protected volatile Throwable err;
/** Trackable flag. */
protected boolean trackable = true;
* @param cctx Context.
* @param tx Transaction.
public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final GridNearTxLocal tx) {
assert cctx != null;
assert tx != null;
this.cctx = cctx;
this.tx = tx;
futId = IgniteUuid.randomUuid();
if (log == null) {
msgLog = cctx.txFinishMessageLogger();
log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFutureAdapter.class);
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return tx.xidVersion();
/** {@inheritDoc} */
@Override public void markNotTrackable() {
trackable = false;
/** {@inheritDoc} */
@Override public boolean trackable() {
return trackable;
* Called when related {@link GridNearTxLocal} is completed asynchronously on timeout,
public abstract void onNearTxLocalTimeout();
* @return Transaction.
public IgniteInternalTx tx() {
return tx;
* Prepares transaction.
public abstract void prepare();
* @param nodeId Sender.
* @param res Result.
public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
* Checks if mapped transaction can be committed on one phase.
* One-phase commit can be done if transaction maps to one primary node and not more than one backup.
* @param txMapping Transaction mapping.
final void checkOnePhase(GridDhtTxMapping txMapping) {
* Checks if mapped transaction can be committed on one phase.
* One-phase commit can be done if transaction maps to one primary node and not more than one backup.
* @param txNodes Primary to backups node map.
final void checkOnePhase(Map<UUID, Collection<UUID>> txNodes) {
if (tx.storeWriteThrough())
if (txNodes.size() == 1) {
Map.Entry<UUID, Collection<UUID>> entry = txNodes.entrySet().iterator().next();
assert entry != null;
Collection<UUID> backups = entry.getValue();
if (backups.size() <= 1)
* @param m Mapping.
* @param res Response.
* @param updateMapping Update mapping flag.
final void onPrepareResponse(GridDistributedTxMapping m,
GridNearTxPrepareResponse res,
boolean updateMapping) {
if (res == null)
assert res.error() == null : res;
if (tx.onePhaseCommit() && !res.onePhaseCommit())
UUID nodeId = m.primary().id();
for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
IgniteTxEntry txEntry = tx.entry(entry.getKey());
assert txEntry != null;
GridCacheContext cacheCtx = txEntry.context();
while (true) {
try {
if (cacheCtx.isNear()) {
GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();
CacheVersionedValue tup = entry.getValue();
else if (txEntry.cached().detached()) {
GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
CacheVersionedValue tup = entry.getValue();
detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion());
catch (GridCacheEntryRemovedException ignored) {
// Retry.
txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()));
for (IgniteTxKey key : res.filterFailedKeys()) {
IgniteTxEntry txEntry = tx.entry(key);
assert txEntry != null : "Missing tx entry for write key: " + key;
assert txEntry.context() != null;
ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
if (expiry != null)
if (m.queryUpdate() || !m.empty()) {
// This step is very important as near and DHT versions grow separately.
cctx.versions().onReceived(nodeId, res.dhtVersion());
if (updateMapping && m.hasNearCacheEntries()) {
GridCacheVersion writeVer = res.writeVersion();
if (writeVer == null)
writeVer = res.dhtVersion();
// Register DHT version.
m.dhtVersion(res.dhtVersion(), writeVer);
GridDistributedTxMapping map = tx.mappings().get(nodeId);
if (map != null)
map.dhtVersion(res.dhtVersion(), writeVer);
tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());