blob: a5ab7cd49825db843aece5439e9ee7d7abfa96e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* A future tracking requests for remote nodes transaction enlisting and locking
* of entries produced with complex DML queries requiring reduce step.
*/
public class GridNearTxSelectForUpdateFuture extends GridCacheCompoundIdentityFuture<Long>
implements GridCacheVersionedFuture<Long> {
/** */
private static final long serialVersionUID = 6931664882548658420L;
/** Done field updater. */
private static final AtomicIntegerFieldUpdater<GridNearTxSelectForUpdateFuture> DONE_UPD =
AtomicIntegerFieldUpdater.newUpdater(GridNearTxSelectForUpdateFuture.class, "done");
/** Done field updater. */
private static final AtomicReferenceFieldUpdater<GridNearTxSelectForUpdateFuture, Throwable> EX_UPD =
AtomicReferenceFieldUpdater.newUpdater(GridNearTxSelectForUpdateFuture.class, Throwable.class, "ex");
/** */
@SuppressWarnings("unused")
@GridToStringExclude
private volatile int done;
/** */
@SuppressWarnings("unused")
@GridToStringExclude
private volatile Throwable ex;
/** Cache context. */
@GridToStringExclude
private final GridCacheContext<?, ?> cctx;
/** Transaction. */
private final GridNearTxLocal tx;
/** Mvcc future id. */
private final IgniteUuid futId;
/** Lock version. */
private final GridCacheVersion lockVer;
/** */
private AffinityTopologyVersion topVer;
/** */
private final long timeout;
/** Logger. */
@GridToStringExclude
private final IgniteLogger log;
/** Timeout object. */
@GridToStringExclude
private LockTimeoutObject timeoutObj;
/** Ids of mini futures. */
private final Map<UUID, Integer> miniFutIds = new HashMap<>();
/**
* @param cctx Cache context.
* @param tx Transaction.
* @param timeout Timeout.
*/
public GridNearTxSelectForUpdateFuture(GridCacheContext<?, ?> cctx,
GridNearTxLocal tx,
long timeout) {
super(CU.longReducer());
this.cctx = cctx;
this.tx = tx;
this.timeout = timeout;
futId = IgniteUuid.randomUuid();
lockVer = tx.xidVersion();
log = cctx.logger(GridNearTxSelectForUpdateFuture.class);
}
/**
* @return Cache context.
*/
public GridCacheContext<?, ?> cache() {
return cctx;
}
/**
* @param node Node.
*/
private void map(ClusterNode node) {
GridDistributedTxMapping mapping = tx.mappings().get(node.id());
if (mapping == null)
tx.mappings().put(mapping = new GridDistributedTxMapping(node));
mapping.markQueryUpdate();
if (node.isLocal())
tx.colocatedLocallyMapped(true);
int futId = futuresCountNoLock();
miniFutIds.put(node.id(), futId);
add(new NodeFuture(node));
}
/**
* Process result of query execution on given
* @param nodeId Node id.
* @param cnt Total rows counter on given node.
* @param removeMapping Whether transaction mapping should be removed for node.
* @param err Error.
*/
public void onResult(UUID nodeId, Long cnt, boolean removeMapping, @Nullable Throwable err) {
NodeFuture nodeFut = mapFuture(nodeId);
if (nodeFut != null)
nodeFut.onResult(cnt, removeMapping, err);
}
/** {@inheritDoc} */
@Override protected boolean processFailure(Throwable err, IgniteInternalFuture<Long> fut) {
if (ex != null || !EX_UPD.compareAndSet(this, null, err))
ex.addSuppressed(err);
return true;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Long res, @Nullable Throwable err, boolean cancelled) {
if (!DONE_UPD.compareAndSet(this, 0, 1))
return false;
cctx.tm().txContext(tx);
Throwable ex0 = ex;
if (ex0 != null) {
if (err != null)
ex0.addSuppressed(err);
err = ex0;
}
if (!cancelled && err == null)
tx.clearLockFuture(this);
else
tx.setRollbackOnly();
boolean done = super.onDone(res, err, cancelled);
assert done;
// Clean up.
cctx.mvcc().removeVersionedFuture(this);
if (timeoutObj != null)
cctx.time().removeTimeoutObject(timeoutObj);
return true;
}
/**
* Finds pending map node future by the given ID.
*
* @param nodeId Node id.
* @return Map node future.
*/
private NodeFuture mapFuture(UUID nodeId) {
synchronized (this) {
Integer idx = miniFutIds.get(nodeId);
if (idx == null)
throw new IllegalStateException("SELECT FOR UPDATE node future not found [nodeId=" + nodeId + "].");
assert idx >= 0 && idx < futuresCountNoLock();
IgniteInternalFuture<Long> fut = future(idx);
if (!fut.isDone())
return (NodeFuture)fut;
}
return null;
}
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return lockVer;
}
/** {@inheritDoc} */
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
return false;
}
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
}
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
if (topVer == null)
return false; // Local query, do nothing.
for (IgniteInternalFuture<?> fut : futures()) {
NodeFuture f = (NodeFuture)fut;
if (f.node.id().equals(nodeId)) {
if (log.isDebugEnabled())
log.debug("Found mini-future for left node [nodeId=" + nodeId + ", mini=" + f + ", fut=" +
this + ']');
ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys " +
"(primary node left grid, retry transaction if possible) [node=" + nodeId + ']');
topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
return f.onResult(0, false, topEx);
}
}
if (log.isDebugEnabled())
log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId +
", fut=" + this + ']');
return false;
}
/** {@inheritDoc} */
@Override public boolean trackable() {
return true;
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
// No-op.
}
/** {@inheritDoc} */
@Override protected void logError(IgniteLogger log, String msg, Throwable e) {
// no-op
}
/** {@inheritDoc} */
@Override protected void logDebug(IgniteLogger log, String msg) {
// no-op
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxSelectForUpdateFuture.class, this, super.toString());
}
/**
* Initialize this future for distributed execution.
* @param topVer Topology version.
* @param nodes Nodes to run query on.
*/
public synchronized void init(AffinityTopologyVersion topVer, Collection<ClusterNode> nodes) {
doInit(topVer, nodes, false);
}
/**
* Initialize this future for local execution.
*/
public synchronized void initLocal() {
doInit(null, Collections.singletonList(cctx.localNode()), true);
}
/**
* Initialize this future for distributed or local execution.
* @param topVer Topology version ({@code null} for local case).
* @param nodes Nodes to run query on.
* @param loc Local query flag.
*/
private void doInit(@Nullable AffinityTopologyVersion topVer, Collection<ClusterNode> nodes, boolean loc) {
assert !loc || (topVer == null && nodes.size() == 1 && nodes.iterator().next().isLocal());
if (initialized())
throw new IllegalStateException("SELECT FOR UPDATE future has been initialized already.");
tx.init();
if (timeout < 0) {
// Time is out.
onDone(timeoutException());
return;
}
else if (timeout > 0)
timeoutObj = new LockTimeoutObject();
if (!tx.updateLockFuture(null, this)) {
onDone(tx.timedOut() ? tx.timeoutException() : tx.rollbackException());
return;
}
boolean added = cctx.mvcc().addFuture(this);
assert added : this;
try {
tx.addActiveCache(cctx, false);
}
catch (IgniteCheckedException e) {
onDone(e);
return;
}
if (timeoutObj != null)
cctx.time().addTimeoutObject(timeoutObj);
this.topVer = topVer;
for (ClusterNode n : nodes)
map(n);
markInitialized();
}
/**
* @return Timeout exception.
*/
@NotNull private IgniteTxTimeoutCheckedException timeoutException() {
return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " +
"transaction [timeout=" + timeout + ", tx=" + tx + ']');
}
/**
* A future tracking a single MAP request to be enlisted in transaction and locked on data node.
*/
private class NodeFuture extends GridFutureAdapter<Long> {
/** */
private boolean completed;
/** Node ID. */
@GridToStringExclude
private final ClusterNode node;
/**
* @param node Cluster node.
*
*/
private NodeFuture(ClusterNode node) {
this.node = node;
}
/**
* @return Node.
*/
public ClusterNode node() {
return node;
}
/**
* @param cnt Total rows counter on given node.
* @param removeMapping Whether transaction mapping should be removed for node.
* @param err Exception.
* @return {@code True} if future was completed by this call.
*/
public boolean onResult(long cnt, boolean removeMapping, Throwable err) {
synchronized (this) {
if (completed)
return false;
completed = true;
}
if (X.hasCause(err, ClusterTopologyCheckedException.class) || removeMapping) {
GridDistributedTxMapping m = tx.mappings().get(node.id());
assert m != null && m.empty();
tx.removeMapping(node.id());
if (node.isLocal())
tx.colocatedLocallyMapped(false);
}
else if (err == null && cnt > 0 && !node.isLocal())
tx.hasRemoteLocks(true);
return onDone(cnt, err);
}
}
/**
* Lock request timeout object.
*/
private class LockTimeoutObject extends GridTimeoutObjectAdapter {
/**
* Default constructor.
*/
LockTimeoutObject() {
super(timeout);
}
/** {@inheritDoc} */
@Override public void onTimeout() {
if (log.isDebugEnabled())
log.debug("Timed out waiting for lock response: " + this);
onDone(timeoutException());
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(LockTimeoutObject.class, this);
}
}
}