blob: 70fbd1fce793bb3316dc4593ac738a2b4d08fe70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTxQueryEnlistResultHandler.createResponse;
/**
* Cache lock future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFuture {
/** Involved cache ids. */
private final int[] cacheIds;
/** Partitions. */
private final int[] parts;
/** Schema name. */
private final String schema;
/** Query string. */
private final String qry;
/** Query parameters. */
private final Object[] params;
/** Flags. */
private final int flags;
/** Fetch page size. */
private final int pageSize;
/**
* @param cctx Cache context.
* @param tx Transaction.
* @param cacheIds Involved cache ids.
* @param parts Partitions.
* @param schema Schema name.
* @param qry Query string.
* @param params Query parameters.
* @param flags Flags.
* @param pageSize Fetch page size.
* @param timeout Timeout.
*/
protected GridNearTxQueryEnlistFuture(
GridCacheContext<?, ?> cctx, GridNearTxLocal tx, int[] cacheIds, int[] parts, String schema, String qry,
Object[] params, int flags, int pageSize, long timeout) {
super(cctx, tx, timeout);
this.cacheIds = cacheIds;
this.parts = parts;
this.schema = schema;
this.qry = qry;
this.params = params;
this.flags = flags;
this.pageSize = pageSize;
}
/**
* @param topLocked Topology locked flag.
*/
@Override protected void map(final boolean topLocked) {
MiniFuture mini = null;
try {
final AffinityAssignment assignment = cctx.affinity().assignment(topVer);
Collection<ClusterNode> primary;
if (parts != null) {
primary = U.newHashSet(parts.length);
for (int i = 0; i < parts.length; i++) {
ClusterNode pNode = assignment.get(parts[i]).get(0);
primary.add(pNode);
updateMappings(pNode);
}
}
else {
primary = assignment.primaryPartitionNodes();
for (ClusterNode pNode : primary)
updateMappings(pNode);
}
boolean locallyMapped = primary.contains(cctx.localNode());
if (locallyMapped)
add(new MiniFuture(cctx.localNode()));
int idx = locallyMapped ? 1 : 0;
boolean first = true;
boolean clientFirst = false;
// Need to unlock topology to avoid deadlock with binary descriptors registration.
if(!topLocked && cctx.topology().holdsLock())
cctx.topology().readUnlock();
for (ClusterNode node : F.view(primary, F.remoteNodes(cctx.localNodeId()))) {
add(mini = new MiniFuture(node));
if (first) {
clientFirst = cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
first = false;
}
GridNearTxQueryEnlistRequest req = new GridNearTxQueryEnlistRequest(
cctx.cacheId(),
threadId,
futId,
++idx,
tx.subjectId(),
topVer,
lockVer,
mvccSnapshot,
cacheIds,
parts,
schema,
qry,
params,
flags,
pageSize,
remainingTime(),
tx.remainingTime(),
tx.taskNameHash(),
clientFirst
);
sendRequest(req, node.id(), mini);
}
if (locallyMapped) {
final MiniFuture localMini = mini = miniFuture(-1);
assert localMini != null;
GridDhtTxQueryEnlistFuture fut = new GridDhtTxQueryEnlistFuture(
cctx.localNode().id(),
lockVer,
mvccSnapshot,
threadId,
futId,
-1,
tx,
cacheIds,
parts,
schema,
qry,
params,
flags,
pageSize,
remainingTime(),
cctx);
updateLocalFuture(fut);
fut.listen(new CI1<IgniteInternalFuture<Long>>() {
@Override public void apply(IgniteInternalFuture<Long> fut) {
assert fut.error() != null || fut.result() != null : fut;
try {
clearLocalFuture((GridDhtTxQueryEnlistFuture)fut);
GridNearTxQueryEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
localMini.onResult(res, fut.error());
}
catch (IgniteCheckedException e) {
localMini.onResult(null, e);
}
finally {
CU.unwindEvicts(cctx);
}
}
});
fut.init();
}
}
catch (Throwable e) {
if (mini != null)
mini.onResult(null, e);
else
onDone(e);
if (e instanceof Error)
throw (Error)e;
}
markInitialized();
}
/**
*
* @param req Request.
* @param nodeId Remote node ID.
* @param fut Result future.
* @throws IgniteCheckedException if failed to send.
*/
private void sendRequest(GridCacheMessage req, UUID nodeId, MiniFuture fut) throws IgniteCheckedException {
IgniteInternalFuture<?> txSync = cctx.tm().awaitFinishAckAsync(nodeId, tx.threadId());
if (txSync == null || txSync.isDone())
cctx.io().send(nodeId, req, cctx.ioPolicy());
else
txSync.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
try {
cctx.io().send(nodeId, req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
fut.onResult(null, e);
}
}
});
}
/**
* @param nodeId Left node ID
* @return {@code True} if node was in the list.
*/
@Override public synchronized boolean onNodeLeft(UUID nodeId) {
for (IgniteInternalFuture<?> fut : futures()) {
MiniFuture f = (MiniFuture)fut;
if (f.node.id().equals(nodeId)) {
if (log.isDebugEnabled())
log.debug("Found mini-future for left node [nodeId=" + nodeId + ", mini=" + f + ", fut=" +
this + ']');
return f.onResult(null, newTopologyException(nodeId));
}
}
if (log.isDebugEnabled())
log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId +
", fut=" + this + ']');
return false;
}
/**
* Finds pending mini future by the given mini ID.
*
* @param miniId Mini ID to find.
* @return Mini future.
*/
private MiniFuture miniFuture(int miniId) {
synchronized (this) {
int idx = Math.abs(miniId) - 1;
assert idx >= 0 && idx < futuresCountNoLock();
IgniteInternalFuture<Long> fut = future(idx);
if (!fut.isDone())
return (MiniFuture)fut;
}
return null;
}
/**
* Creates new topology exception for cases when primary node leaves grid during mapping.
*
* @param nodeId Node ID.
* @return Topology exception with user-friendly message.
*/
private ClusterTopologyCheckedException newTopologyException(UUID nodeId) {
ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys " +
"(primary node left grid, retry transaction if possible) [node=" + nodeId + ']');
topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
return topEx;
}
/**
* @param nodeId Sender node id.
* @param res Response.
*/
public void onResult(UUID nodeId, GridNearTxQueryEnlistResponse res) {
MiniFuture mini = miniFuture(res.miniId());
if (mini != null)
mini.onResult(res, null);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxQueryEnlistFuture.class, this, super.toString());
}
/** */
private class MiniFuture extends GridFutureAdapter<Long> {
/** */
private boolean completed;
/** Node ID. */
@GridToStringExclude
private final ClusterNode node;
/**
* @param node Cluster node.
*/
private MiniFuture(ClusterNode node) {
this.node = node;
}
/**
* @param res Response.
* @param err Exception.
* @return {@code True} if future was completed by this call.
*/
public boolean onResult(GridNearTxQueryEnlistResponse res, Throwable err) {
assert res != null || err != null : this;
if (err == null && res.error() != null)
err = res.error();
synchronized (this) {
if (completed)
return false;
completed = true;
}
if (res != null && res.removeMapping()) {
GridDistributedTxMapping m = tx.mappings().get(node.id());
assert m != null && m.empty();
tx.removeMapping(node.id());
if (node.isLocal())
tx.colocatedLocallyMapped(false);
}
else if (res != null) {
tx.mappings().get(node.id()).addBackups(res.newDhtNodes());
if (res.result() > 0 && !node.isLocal())
tx.hasRemoteLocks(true);
}
return err != null ? onDone(err) : onDone(res.result(), res.error());
}
}
}