blob: 472aafd724d16eeae64195e22bb3037b4caf093c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.InvalidEnvironmentException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
/**
*
*/
public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx>, IgniteDiagnosticAware {
/** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Error updater. */
private static final AtomicReferenceFieldUpdater<GridDhtTxFinishFuture, Throwable> ERR_UPD =
AtomicReferenceFieldUpdater.newUpdater(GridDhtTxFinishFuture.class, Throwable.class, "err");
/** Logger. */
private static IgniteLogger log;
/** Logger. */
private static IgniteLogger msgLog;
/** Context. */
private GridCacheSharedContext<K, V> cctx;
/** Future ID. */
private final IgniteUuid futId;
/** Transaction. */
@GridToStringExclude
private GridDhtTxLocalAdapter tx;
/** Commit flag. */
private boolean commit;
/** Error. */
@SuppressWarnings("UnusedDeclaration")
@GridToStringExclude
private volatile Throwable err;
/** DHT mappings. */
private Map<UUID, GridDistributedTxMapping> dhtMap;
/** Near mappings. */
private Map<UUID, GridDistributedTxMapping> nearMap;
/**
* @param cctx Context.
* @param tx Transaction.
* @param commit Commit flag.
*/
public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter tx, boolean commit) {
super(F.<IgniteInternalTx>identityReducer(tx));
this.cctx = cctx;
this.tx = tx;
this.commit = commit;
dhtMap = tx.dhtMap();
nearMap = tx.nearMap();
futId = IgniteUuid.randomUuid();
if (log == null) {
msgLog = cctx.txFinishMessageLogger();
log = U.logger(cctx.kernalContext(), logRef, GridDhtTxFinishFuture.class);
}
}
/**
* @return Transaction.
*/
public GridDhtTxLocalAdapter tx() {
return tx;
}
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public boolean onNodeLeft(UUID nodeId) {
for (IgniteInternalFuture<?> fut : futures())
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
if (f.node().id().equals(nodeId)) {
f.onNodeLeft();
return true;
}
}
return false;
}
/** {@inheritDoc} */
@Override public boolean trackable() {
return true;
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
assert false;
}
/**
* @param e Error.
*/
public void rollbackOnError(Throwable e) {
assert e != null;
if (ERR_UPD.compareAndSet(this, null, e)) {
tx.setRollbackOnly();
if (X.hasCause(e, InvalidEnvironmentException.class, NodeStoppingException.class))
onComplete();
else
finish(false);
}
}
/**
* @param nodeId Sender.
* @param res Result.
*/
public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
if (!isDone()) {
boolean found = false;
for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
if (f.futureId() == res.miniId()) {
found = true;
assert f.node().id().equals(nodeId);
f.onResult(res);
}
}
}
if (!found) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, failed to find mini future [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + nodeId +
", res=" + res +
", fut=" + this + ']');
}
}
}
else {
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, failed to find mini future [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + nodeId +
", res=" + res +
", fut=" + this + ']');
}
}
}
/** {@inheritDoc} */
@Override public boolean onDone(IgniteInternalTx tx, Throwable err) {
if (initialized() || err != null) {
Throwable e = this.err;
if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) {
try {
boolean hasInvalidEnvironmentIssue = X.hasCause(err, InvalidEnvironmentException.class, NodeStoppingException.class);
this.tx.tmFinish(err == null, hasInvalidEnvironmentIssue, false);
}
catch (IgniteCheckedException finishErr) {
U.error(log, "Failed to finish tx: " + tx, e);
if (e == null)
e = finishErr;
}
}
if (commit && e == null)
e = this.tx.commitError();
Throwable finishErr = mvccFinish(e != null ? e : err);
if (super.onDone(tx, finishErr)) {
if (finishErr == null)
finishErr = this.tx.commitError();
if (this.tx.syncMode() != PRIMARY_SYNC)
this.tx.sendFinishReply(finishErr);
// Don't forget to clean up.
cctx.mvcc().removeFuture(futId);
return true;
}
}
return false;
}
/**
* @param f Future.
* @return {@code True} if mini-future.
*/
private boolean isMini(IgniteInternalFuture<?> f) {
return f.getClass().equals(MiniFuture.class);
}
/**
* Completeness callback.
*/
private void onComplete() {
onDone(tx, err);
}
/**
* Initializes future.
*
* @param commit Commit flag.
*/
@SuppressWarnings({"SimplifiableIfStatement", "IfMayBeConditional"})
public void finish(boolean commit) {
boolean sync;
assert !tx.queryEnlisted() || tx.mvccSnapshot() != null;
if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap))
sync = finish(commit, dhtMap, nearMap);
else if (!commit && !F.isEmpty(tx.lockTransactionNodes()))
sync = rollbackLockTransactions(tx.lockTransactionNodes());
else
// No backup or near nodes to send commit message to (just complete then).
sync = false;
GridLongList waitTxs = tx.mvccWaitTransactions();
if (waitTxs != null) {
MvccSnapshot snapshot = tx.mvccSnapshot();
assert snapshot != null;
MvccCoordinator crd = cctx.coordinators().currentCoordinator();
if (crd != null && crd.coordinatorVersion() == snapshot.coordinatorVersion()) {
add((IgniteInternalFuture)cctx.coordinators().waitTxsFuture(crd.nodeId(), waitTxs));
sync = true;
}
}
markInitialized();
if (!sync)
onComplete();
}
/**
* @param nodes Nodes.
* @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for.
*/
private boolean rollbackLockTransactions(Collection<ClusterNode> nodes) {
assert !F.isEmpty(nodes);
if (tx.onePhaseCommit())
return false;
boolean sync = tx.syncMode() == FULL_SYNC;
if (tx.explicitLock() || tx.queryEnlisted())
sync = true;
boolean res = false;
int miniId = 0;
for (ClusterNode n : nodes) {
assert !n.isLocal();
MiniFuture fut = new MiniFuture(++miniId, n);
add(fut); // Append new future.
GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
tx.nearNodeId(),
futId,
fut.futureId(),
tx.topologyVersion(),
tx.xidVersion(),
tx.commitVersion(),
tx.threadId(),
tx.isolation(),
false,
tx.isInvalidate(),
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
tx.pendingVersions(),
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
tx.activeCachesDeploymentEnabled(),
false,
false,
tx.mvccSnapshot(),
cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n));
try {
cctx.io().send(n, req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, sent request lock tx [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + n.id() + ']');
}
if (sync)
res = true;
else
fut.onDone();
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
fut.onNodeLeft((ClusterTopologyCheckedException)e);
else {
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, failed to send request lock tx [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + n.id() +
", err=" + e + ']');
}
fut.onResult(e);
}
}
}
return res;
}
/**
* @param commit Commit flag.
* @param dhtMap DHT map.
* @param nearMap Near map.
* @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for.
*/
private boolean finish(boolean commit,
Map<UUID, GridDistributedTxMapping> dhtMap,
Map<UUID, GridDistributedTxMapping> nearMap) {
if (tx.onePhaseCommit())
return false;
assert !commit || !tx.txState().mvccEnabled() || tx.mvccSnapshot() != null || F.isEmpty(tx.writeEntries());
boolean sync = tx.syncMode() == FULL_SYNC;
if (tx.explicitLock() || tx.queryEnlisted())
sync = true;
boolean res = false;
int miniId = 0;
// Do not need process active transactions on backups.
MvccSnapshot mvccSnapshot = tx.mvccSnapshot();
if (mvccSnapshot != null)
mvccSnapshot = mvccSnapshot.withoutActiveTransactions();
// Create mini futures.
for (GridDistributedTxMapping dhtMapping : dhtMap.values()) {
ClusterNode n = dhtMapping.primary();
assert !n.isLocal();
GridDistributedTxMapping nearMapping = nearMap.get(n.id());
if (!dhtMapping.queryUpdate() && dhtMapping.empty() && nearMapping != null && nearMapping.empty())
// Nothing to send.
continue;
MiniFuture fut = new MiniFuture(++miniId, dhtMapping, nearMapping);
add(fut); // Append new future.
Collection<Long> updCntrs = new ArrayList<>(dhtMapping.entries().size());
for (IgniteTxEntry e : dhtMapping.entries())
updCntrs.add(e.updateCounter());
GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
tx.nearNodeId(),
futId,
fut.futureId(),
tx.topologyVersion(),
tx.xidVersion(),
tx.commitVersion(),
tx.threadId(),
tx.isolation(),
commit,
tx.isInvalidate(),
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
tx.pendingVersions(),
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
tx.activeCachesDeploymentEnabled(),
updCntrs,
false,
false,
mvccSnapshot,
commit ? null : cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n));
req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
try {
cctx.io().send(n, req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, sent request dht [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + n.id() + ']');
}
if (sync)
res = true;
else
fut.onDone();
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
fut.onNodeLeft((ClusterTopologyCheckedException)e);
else {
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + n.id() +
", err=" + e + ']');
}
fut.onResult(e);
}
}
}
for (GridDistributedTxMapping nearMapping : nearMap.values()) {
if (!dhtMap.containsKey(nearMapping.primary().id())) {
if (nearMapping.empty())
// Nothing to send.
continue;
MiniFuture fut = new MiniFuture(++miniId, null, nearMapping);
add(fut); // Append new future.
GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
tx.nearNodeId(),
futId,
fut.futureId(),
tx.topologyVersion(),
tx.xidVersion(),
tx.commitVersion(),
tx.threadId(),
tx.isolation(),
commit,
tx.isInvalidate(),
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
tx.pendingVersions(),
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
tx.activeCachesDeploymentEnabled(),
false,
false,
mvccSnapshot,
null);
req.writeVersion(tx.writeVersion());
try {
cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, sent request near [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + nearMapping.primary().id() + ']');
}
if (sync)
res = true;
else
fut.onDone();
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
fut.onNodeLeft((ClusterTopologyCheckedException)e);
else {
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + nearMapping.primary().id() +
", err=" + e + ']');
}
fut.onResult(e);
}
}
}
}
return res;
}
/**
* Finishes MVCC transaction on the local node.
*/
private Throwable mvccFinish(Throwable commitError) {
try {
cctx.tm().mvccFinish(tx, commit && commitError == null);
}
catch (IgniteCheckedException ex) {
if (commitError == null)
tx.commitError(commitError = ex);
else
commitError.addSuppressed(ex);
}
return commitError;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) {
if (!isDone()) {
for (IgniteInternalFuture fut : futures()) {
if (!fut.isDone()) {
if (MiniFuture.class.isInstance(fut)) {
MiniFuture f = (MiniFuture)fut;
if (!f.node().isLocal()) {
GridCacheVersion dhtVer = tx.xidVersion();
GridCacheVersion nearVer = tx.nearXidVersion();
ctx.remoteTxInfo(f.node().id(), dhtVer, nearVer, "GridDhtTxFinishFuture " +
"waiting for response [node=" + f.node().id() +
", topVer=" + tx.topologyVersion() +
", dhtVer=" + dhtVer +
", nearVer=" + nearVer +
", futId=" + futId +
", miniId=" + f.futId +
", tx=" + tx + ']');
return;
}
}
else if (fut instanceof MvccFuture) {
MvccFuture f = (MvccFuture)fut;
if (!cctx.localNodeId().equals(f.coordinatorNodeId())) {
ctx.basicInfo(f.coordinatorNodeId(), "GridDhtTxFinishFuture " +
"waiting for mvcc coordinator reply [mvccCrdNode=" + f.coordinatorNodeId() +
", loc=" + f.coordinatorNodeId().equals(cctx.localNodeId()) + ']');
return;
}
}
}
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@SuppressWarnings("unchecked")
@Override public String apply(IgniteInternalFuture<?> f) {
if (f.getClass() == MiniFuture.class) {
return "[node=" + ((MiniFuture)f).node().id() +
", loc=" + ((MiniFuture)f).node().isLocal() +
", done=" + f.isDone() + "]";
}
else if (f instanceof MvccFuture) {
MvccFuture crdFut = (MvccFuture)f;
return "[mvccCrdNode=" + crdFut.coordinatorNodeId() +
", loc=" + crdFut.coordinatorNodeId().equals(cctx.localNodeId()) +
", done=" + f.isDone() + "]";
}
else
return f.toString();
}
});
return S.toString(GridDhtTxFinishFuture.class, this,
"xidVer", tx.xidVersion(),
"innerFuts", futs,
"super", super.toString());
}
/**
* Mini-future for get operations. Mini-futures are only waiting on a single
* node as opposed to multiple nodes.
*/
private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
/** */
private final int futId;
/** DHT mapping. */
@GridToStringInclude
private GridDistributedTxMapping dhtMapping;
/** Near mapping. */
@GridToStringInclude
private GridDistributedTxMapping nearMapping;
/** */
@GridToStringInclude
private ClusterNode node;
/**
* @param futId Future ID.
* @param node Node.
*/
private MiniFuture(int futId, ClusterNode node) {
this.futId = futId;
this.node = node;
}
/**
* @param futId Future ID.
* @param dhtMapping Mapping.
* @param nearMapping nearMapping.
*/
MiniFuture(int futId, GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) {
assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
this.futId = futId;
this.dhtMapping = dhtMapping;
this.nearMapping = nearMapping;
}
/**
* @return Future ID.
*/
int futureId() {
return futId;
}
/**
* @return Node ID.
*/
public ClusterNode node() {
return node != null ? node : dhtMapping != null ? dhtMapping.primary() : nearMapping.primary();
}
/**
* @param e Error.
*/
void onResult(Throwable e) {
if (log.isDebugEnabled())
log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
// Fail.
onDone(e);
}
/**
* @param e Node failure.
*/
void onNodeLeft(ClusterTopologyCheckedException e) {
onNodeLeft();
}
/**
*/
void onNodeLeft() {
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, mini future node left [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + node().id() + ']');
}
// If node left, then there is nothing to commit on it.
onDone(tx);
}
/**
* @param res Result callback.
*/
void onResult(GridDhtTxFinishResponse res) {
if (log.isDebugEnabled())
log.debug("Transaction synchronously completed on node [node=" + node() + ", res=" + res + ']');
onDone();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
}