blob: b5e38839b75b478b19e96416002c9a7dffd9fd8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
/**
* Future to obtain/lock topology version for SELECT FOR UPDATE.
*/
public class TxTopologyVersionFuture extends GridFutureAdapter<AffinityTopologyVersion> {
/** Transaction. */
private final GridNearTxLocal tx;
/** Target cache context. */
private final GridCacheContext<?, ?> cctx;
/** Topology locked flag. */
private boolean topLocked;
/**
* @param tx Transaction.
* @param cctx Target cache context.
*/
public TxTopologyVersionFuture(GridNearTxLocal tx, GridCacheContext cctx) {
this.tx = tx;
this.cctx = cctx;
init();
}
/** */
private void init() {
// Obtain the topology version to use.
long threadId = Thread.currentThread().getId();
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
if (topVer == null && tx.system())
topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
if (topVer != null)
tx.topologyVersion(topVer);
if (topVer == null)
topVer = tx.topologyVersionSnapshot();
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) {
if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)) {
Throwable err = fut.validateCache(cctx, false, false, null, null);
if (err != null) {
onDone(err);
return;
}
break;
}
}
onDone(topVer);
topLocked = true;
return;
}
acquireTopologyVersion();
}
/**
* Acquire topology future and wait for its completion.
*/
private void acquireTopologyVersion() {
cctx.topology().readLock();
try {
if (cctx.topology().stopping()) {
onDone(new CacheStoppedException(cctx.name()));
return;
}
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
Throwable err = fut.validateCache(cctx, false, false, null, null);
if (err != null) {
onDone(err);
return;
}
AffinityTopologyVersion topVer = fut.topologyVersion();
if (tx != null)
tx.topologyVersion(topVer);
onDone(topVer);
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
try {
fut.get();
acquireTopologyVersion();
}
catch (IgniteCheckedException e) {
onDone(e);
}
finally {
cctx.shared().txContextReset();
}
}
});
}
}
finally {
cctx.topology().readUnlock();
}
}
/**
* @return Client first flag.
*/
public boolean clientFirst() {
return cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
}
}