blob: 285481d021e5d198238e58daa3750022e06fdd17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
/**
* Common code for tx prepare in optimistic and pessimistic modes.
*/
public abstract class GridNearTxPrepareFutureAdapter extends
GridCacheCompoundFuture<Object, IgniteInternalTx> implements GridCacheVersionedFuture<IgniteInternalTx> {
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Error updater. */
protected static final AtomicReferenceFieldUpdater<GridNearTxPrepareFutureAdapter, Throwable> ERR_UPD =
AtomicReferenceFieldUpdater.newUpdater(GridNearTxPrepareFutureAdapter.class, Throwable.class, "err");
/** */
private static final IgniteReducer<Object, IgniteInternalTx> REDUCER =
new IgniteReducer<Object, IgniteInternalTx>() {
@Override public boolean collect(Object e) {
return true;
}
@Override public IgniteInternalTx reduce() {
// Nothing to aggregate.
return null;
}
};
/** Logger. */
protected static IgniteLogger log;
/** Logger. */
protected static IgniteLogger msgLog;
/** Context. */
protected GridCacheSharedContext<?, ?> cctx;
/** Future ID. */
@GridToStringInclude
protected IgniteUuid futId;
/** Transaction. */
@GridToStringInclude
protected GridNearTxLocal tx;
/** Error. */
@GridToStringExclude
protected volatile Throwable err;
/** Trackable flag. */
protected boolean trackable = true;
/**
* @param cctx Context.
* @param tx Transaction.
*/
public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final GridNearTxLocal tx) {
super(REDUCER);
assert cctx != null;
assert tx != null;
this.cctx = cctx;
this.tx = tx;
futId = IgniteUuid.randomUuid();
if (log == null) {
msgLog = cctx.txFinishMessageLogger();
log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFutureAdapter.class);
}
}
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
}
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return tx.xidVersion();
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
trackable = false;
}
/** {@inheritDoc} */
@Override public boolean trackable() {
return trackable;
}
/**
* Called when related {@link GridNearTxLocal} is completed asynchronously on timeout,
*/
public abstract void onNearTxLocalTimeout();
/**
* @return Transaction.
*/
public IgniteInternalTx tx() {
return tx;
}
/**
* Prepares transaction.
*/
public abstract void prepare();
/**
* @param nodeId Sender.
* @param res Result.
*/
public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
/**
* Checks if mapped transaction can be committed on one phase.
* One-phase commit can be done if transaction maps to one primary node and not more than one backup.
*
* @param txMapping Transaction mapping.
*/
final void checkOnePhase(GridDhtTxMapping txMapping) {
checkOnePhase(tx.transactionNodes());
}
/**
* Checks if mapped transaction can be committed on one phase.
* One-phase commit can be done if transaction maps to one primary node and not more than one backup.
*
* @param txNodes Primary to backups node map.
*/
final void checkOnePhase(Map<UUID, Collection<UUID>> txNodes) {
if (tx.storeWriteThrough() || tx.txState().mvccEnabled()) // TODO IGNITE-3479 (onePhase + mvcc)
return;
if (txNodes.size() == 1) {
Map.Entry<UUID, Collection<UUID>> entry = txNodes.entrySet().iterator().next();
assert entry != null;
Collection<UUID> backups = entry.getValue();
if (backups.size() <= 1)
tx.onePhaseCommit(true);
}
}
/**
* @param m Mapping.
* @param res Response.
* @param updateMapping Update mapping flag.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
final void onPrepareResponse(GridDistributedTxMapping m,
GridNearTxPrepareResponse res,
boolean updateMapping) {
if (res == null)
return;
assert res.error() == null : res;
if (tx.onePhaseCommit() && !res.onePhaseCommit())
tx.onePhaseCommit(false);
UUID nodeId = m.primary().id();
for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
IgniteTxEntry txEntry = tx.entry(entry.getKey());
assert txEntry != null;
GridCacheContext cacheCtx = txEntry.context();
while (true) {
try {
if (cacheCtx.isNear()) {
GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();
CacheVersionedValue tup = entry.getValue();
nearEntry.resetFromPrimary(tup.value(),
tx.xidVersion(),
tup.version(),
nodeId,
tx.topologyVersion());
}
else if (txEntry.cached().detached()) {
GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
CacheVersionedValue tup = entry.getValue();
detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion());
}
break;
}
catch (GridCacheEntryRemovedException ignored) {
// Retry.
txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()));
}
}
}
tx.implicitSingleResult(res.returnValue());
for (IgniteTxKey key : res.filterFailedKeys()) {
IgniteTxEntry txEntry = tx.entry(key);
assert txEntry != null : "Missing tx entry for write key: " + key;
txEntry.op(NOOP);
assert txEntry.context() != null;
ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
if (expiry != null)
txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
}
if (m.queryUpdate() || !m.empty()) {
// This step is very important as near and DHT versions grow separately.
cctx.versions().onReceived(nodeId, res.dhtVersion());
if (updateMapping && m.hasNearCacheEntries()) {
GridCacheVersion writeVer = res.writeVersion();
if (writeVer == null)
writeVer = res.dhtVersion();
// Register DHT version.
m.dhtVersion(res.dhtVersion(), writeVer);
GridDistributedTxMapping map = tx.mappings().get(nodeId);
if (map != null)
map.dhtVersion(res.dhtVersion(), writeVer);
tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
}
}
}
}