| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import java.io.Externalizable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.ListIterator; |
| import java.util.Map; |
| import java.util.UUID; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.cluster.ClusterTopologyException; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheOperationContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException; |
| import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; |
| import org.apache.ignite.internal.processors.cache.GridCacheReturn; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; |
| import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; |
| import org.apache.ignite.internal.util.F0; |
| import org.apache.ignite.internal.util.GridLeanSet; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.lang.GridClosureException; |
| import org.apache.ignite.internal.util.lang.IgnitePair; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.C2; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; |
| import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| import static org.apache.ignite.transactions.TransactionState.COMMITTING; |
| |
| /** |
| * Base class for transactional DHT caches. |
| */ |
| @SuppressWarnings("unchecked") |
| public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCacheAdapter<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * Empty constructor required for {@link Externalizable}. |
| */ |
| protected GridDhtTransactionalCacheAdapter() { |
| // No-op. |
| } |
| |
| /** |
| * @param ctx Context. |
| */ |
| protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx) { |
| super(ctx); |
| } |
| |
| /** |
| * Constructor used for near-only cache. |
| * |
| * @param ctx Cache context. |
| * @param map Cache map. |
| */ |
| protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { |
| super(ctx, map); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| super.start(); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { |
| @Override public void apply(UUID nodeId, GridNearGetRequest req) { |
| processNearGetRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { |
| @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { |
| processNearSingleGetRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { |
| @Override public void apply(UUID nodeId, GridNearLockRequest req) { |
| processNearLockRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { |
| @Override public void apply(UUID nodeId, GridDhtLockRequest req) { |
| processDhtLockRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { |
| @Override public void apply(UUID nodeId, GridDhtLockResponse req) { |
| processDhtLockResponse(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { |
| @Override public void apply(UUID nodeId, GridNearUnlockRequest req) { |
| processNearUnlockRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { |
| @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) { |
| processDhtUnlockRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryEnlistRequest.class, new CI2<UUID, GridNearTxQueryEnlistRequest>() { |
| @Override public void apply(UUID nodeId, GridNearTxQueryEnlistRequest req) { |
| processNearTxQueryEnlistRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryEnlistResponse.class, new CI2<UUID, GridNearTxQueryEnlistResponse>() { |
| @Override public void apply(UUID nodeId, GridNearTxQueryEnlistResponse req) { |
| processNearTxQueryEnlistResponse(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class, |
| new MessageHandler<GridDhtForceKeysRequest>() { |
| @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { |
| processForceKeysRequest(node, msg); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class, |
| new MessageHandler<GridDhtForceKeysResponse>() { |
| @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { |
| processForceKeyResponse(node, msg); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryResultsEnlistRequest.class, |
| new CI2<UUID, GridNearTxQueryResultsEnlistRequest>() { |
| @Override public void apply(UUID nodeId, GridNearTxQueryResultsEnlistRequest req) { |
| processNearTxQueryResultsEnlistRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryResultsEnlistResponse.class, |
| new CI2<UUID, GridNearTxQueryResultsEnlistResponse>() { |
| @Override public void apply(UUID nodeId, GridNearTxQueryResultsEnlistResponse req) { |
| processNearTxQueryResultsEnlistResponse(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistRequest.class, |
| new CI2<UUID, GridNearTxEnlistRequest>() { |
| @Override public void apply(UUID nodeId, GridNearTxEnlistRequest req) { |
| processNearTxEnlistRequest(nodeId, req); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistResponse.class, |
| new CI2<UUID, GridNearTxEnlistResponse>() { |
| @Override public void apply(UUID nodeId, GridNearTxEnlistResponse msg) { |
| processNearTxEnlistResponse(nodeId, msg); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridDhtTxQueryEnlistRequest.class, |
| new CI2<UUID, GridDhtTxQueryEnlistRequest>() { |
| @Override public void apply(UUID nodeId, GridDhtTxQueryEnlistRequest msg) { |
| processDhtTxQueryEnlistRequest(nodeId, msg, false); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridDhtTxQueryFirstEnlistRequest.class, |
| new CI2<UUID, GridDhtTxQueryEnlistRequest>() { |
| @Override public void apply(UUID nodeId, GridDhtTxQueryEnlistRequest msg) { |
| processDhtTxQueryEnlistRequest(nodeId, msg, true); |
| } |
| }); |
| |
| ctx.io().addCacheHandler(ctx.cacheId(), GridDhtTxQueryEnlistResponse.class, |
| new CI2<UUID, GridDhtTxQueryEnlistResponse>() { |
| @Override public void apply(UUID nodeId, GridDhtTxQueryEnlistResponse msg) { |
| processDhtTxQueryEnlistResponse(nodeId, msg); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public abstract GridNearTransactionalCache<K, V> near(); |
| |
| /** |
| * @param nodeId Primary node ID. |
| * @param req Request. |
| * @param res Response. |
| * @return Remote transaction. |
| * @throws IgniteCheckedException If failed. |
| * @throws GridDistributedLockCancelledException If lock has been cancelled. |
| */ |
| @Nullable private GridDhtTxRemote startRemoteTx(UUID nodeId, |
| GridDhtLockRequest req, |
| GridDhtLockResponse res) |
| throws IgniteCheckedException, GridDistributedLockCancelledException { |
| List<KeyCacheObject> keys = req.keys(); |
| GridDhtTxRemote tx = null; |
| |
| int size = F.size(keys); |
| |
| for (int i = 0; i < size; i++) { |
| KeyCacheObject key = keys.get(i); |
| |
| if (key == null) |
| continue; |
| |
| IgniteTxKey txKey = ctx.txKey(key); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Unmarshalled key: " + key); |
| |
| GridDistributedCacheEntry entry = null; |
| |
| while (true) { |
| try { |
| int part = ctx.affinity().partition(key); |
| |
| GridDhtLocalPartition locPart = ctx.topology().localPartition(part, req.topologyVersion(), |
| false); |
| |
| if (locPart == null || !locPart.reserve()) { |
| if (log.isDebugEnabled()) |
| log.debug("Local partition for given key is already evicted (will add to invalid " + |
| "partition list) [key=" + key + ", part=" + part + ", locPart=" + locPart + ']'); |
| |
| res.addInvalidPartition(part); |
| |
| // Invalidate key in near cache, if any. |
| if (isNearEnabled(cacheCfg)) |
| obsoleteNearEntry(key); |
| |
| break; |
| } |
| |
| try { |
| // Handle implicit locks for pessimistic transactions. |
| if (req.inTx()) { |
| if (tx == null) |
| tx = ctx.tm().tx(req.version()); |
| |
| if (tx == null) { |
| tx = new GridDhtTxRemote( |
| ctx.shared(), |
| req.nodeId(), |
| req.futureId(), |
| nodeId, |
| req.nearXidVersion(), |
| req.topologyVersion(), |
| req.version(), |
| /*commitVer*/null, |
| ctx.systemTx(), |
| ctx.ioPolicy(), |
| PESSIMISTIC, |
| req.isolation(), |
| req.isInvalidate(), |
| req.timeout(), |
| req.txSize(), |
| req.subjectId(), |
| req.taskNameHash(), |
| !req.skipStore() && req.storeUsed(), |
| req.txLabel()); |
| |
| tx = ctx.tm().onCreated(null, tx); |
| |
| if (tx == null || !ctx.tm().onStarted(tx)) |
| throw new IgniteTxRollbackCheckedException("Failed to acquire lock (transaction " + |
| "has been completed) [ver=" + req.version() + ", tx=" + tx + ']'); |
| } |
| |
| tx.addWrite( |
| ctx, |
| NOOP, |
| txKey, |
| null, |
| null, |
| req.accessTtl(), |
| req.skipStore(), |
| req.keepBinary()); |
| } |
| |
| entry = entryExx(key, req.topologyVersion()); |
| |
| // Add remote candidate before reordering. |
| entry.addRemote( |
| req.nodeId(), |
| nodeId, |
| req.threadId(), |
| req.version(), |
| tx != null, |
| tx != null && tx.implicitSingle(), |
| null |
| ); |
| |
| // Invalidate key in near cache, if any. |
| if (isNearEnabled(cacheCfg) && req.invalidateNearEntry(i)) |
| invalidateNearEntry(key, req.version()); |
| |
| // Get entry info after candidate is added. |
| if (req.needPreloadKey(i)) { |
| entry.unswap(); |
| |
| GridCacheEntryInfo info = entry.info(); |
| |
| if (info != null && !info.isNew() && !info.isDeleted()) |
| res.addPreloadEntry(info); |
| } |
| |
| // Double-check in case if sender node left the grid. |
| if (ctx.discovery().node(req.nodeId()) == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Node requesting lock left grid (lock request will be ignored): " + req); |
| |
| entry.removeLock(req.version()); |
| |
| if (tx != null) { |
| tx.clearEntry(txKey); |
| |
| // If there is a concurrent salvage, there could be a case when tx is moved to |
| // COMMITTING state, but this lock is never acquired. |
| if (tx.state() == COMMITTING) |
| tx.forceCommit(); |
| else |
| tx.rollbackRemoteTx(); |
| } |
| |
| return null; |
| } |
| |
| // Entry is legit. |
| break; |
| } |
| finally { |
| locPart.release(); |
| } |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Received invalid partition exception [e=" + e + ", req=" + req + ']'); |
| |
| res.addInvalidPartition(e.partition()); |
| |
| // Invalidate key in near cache, if any. |
| if (isNearEnabled(cacheCfg)) |
| obsoleteNearEntry(key); |
| |
| if (tx != null) { |
| tx.clearEntry(txKey); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Cleared invalid entry from remote transaction (will skip) [entry=" + |
| entry + ", tx=" + tx + ']'); |
| } |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " + |
| entry; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received entry removed exception (will retry on renewed entry): " + entry); |
| |
| if (tx != null) { |
| tx.clearEntry(txKey); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Cleared removed entry from remote transaction (will retry) [entry=" + |
| entry + ", tx=" + tx + ']'); |
| } |
| } |
| } |
| } |
| |
| if (tx != null && tx.empty()) { |
| if (log.isDebugEnabled()) |
| log.debug("Rolling back remote DHT transaction because it is empty [req=" + req + ", res=" + res + ']'); |
| |
| tx.rollbackRemoteTx(); |
| |
| tx = null; |
| } |
| |
| return tx; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) { |
| if (txLockMsgLog.isDebugEnabled()) { |
| txLockMsgLog.debug("Received dht lock request [txId=" + req.nearXidVersion() + |
| ", dhtTxId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nodeId + ']'); |
| } |
| |
| IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : |
| ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion()); |
| |
| if (keyFut == null || keyFut.isDone()) { |
| if (keyFut != null) { |
| try { |
| keyFut.get(); |
| } |
| catch (NodeStoppingException ignored) { |
| return; |
| } |
| catch (IgniteCheckedException e) { |
| onForceKeysError(nodeId, req, e); |
| |
| return; |
| } |
| } |
| |
| processDhtLockRequest0(nodeId, req); |
| } |
| else { |
| keyFut.listen(new CI1<IgniteInternalFuture<Object>>() { |
| @Override public void apply(IgniteInternalFuture<Object> fut) { |
| try { |
| fut.get(); |
| } |
| catch (NodeStoppingException ignored) { |
| return; |
| } |
| catch (IgniteCheckedException e) { |
| onForceKeysError(nodeId, req, e); |
| |
| return; |
| } |
| |
| processDhtLockRequest0(nodeId, req); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| * @param e Error. |
| */ |
| private void onForceKeysError(UUID nodeId, GridDhtLockRequest req, IgniteCheckedException e) { |
| GridDhtLockResponse res = new GridDhtLockResponse(ctx.cacheId(), |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| e, |
| ctx.deploymentEnabled()); |
| |
| try { |
| ctx.io().send(nodeId, res, ctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send lock reply to remote node because it left grid: " + nodeId); |
| } |
| catch (IgniteCheckedException ignored) { |
| U.error(log, "Failed to send lock reply to node: " + nodeId, e); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) { |
| assert nodeId != null; |
| assert req != null; |
| assert !nodeId.equals(locNodeId); |
| |
| int cnt = F.size(req.keys()); |
| |
| GridDhtLockResponse res; |
| |
| GridDhtTxRemote dhtTx = null; |
| GridNearTxRemote nearTx = null; |
| |
| boolean fail = false; |
| boolean cancelled = false; |
| |
| try { |
| res = new GridDhtLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), cnt, |
| ctx.deploymentEnabled()); |
| |
| dhtTx = startRemoteTx(nodeId, req, res); |
| nearTx = isNearEnabled(cacheCfg) ? near().startRemoteTx(nodeId, req) : null; |
| |
| if (nearTx != null && !nearTx.empty()) |
| res.nearEvicted(nearTx.evicted()); |
| else { |
| if (!F.isEmpty(req.nearKeys())) { |
| Collection<IgniteTxKey> nearEvicted = new ArrayList<>(req.nearKeys().size()); |
| |
| nearEvicted.addAll(F.viewReadOnly(req.nearKeys(), new C1<KeyCacheObject, IgniteTxKey>() { |
| @Override public IgniteTxKey apply(KeyCacheObject k) { |
| return ctx.txKey(k); |
| } |
| })); |
| |
| res.nearEvicted(nearEvicted); |
| } |
| } |
| } |
| catch (IgniteTxRollbackCheckedException e) { |
| String err = "Failed processing DHT lock request (transaction has been completed): " + req; |
| |
| U.error(log, err, e); |
| |
| res = new GridDhtLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), |
| new IgniteTxRollbackCheckedException(err, e), ctx.deploymentEnabled()); |
| |
| fail = true; |
| } |
| catch (IgniteCheckedException e) { |
| String err = "Failed processing DHT lock request: " + req; |
| |
| U.error(log, err, e); |
| |
| res = new GridDhtLockResponse(ctx.cacheId(), |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| new IgniteCheckedException(err, e), ctx.deploymentEnabled()); |
| |
| fail = true; |
| } |
| catch (GridDistributedLockCancelledException ignored) { |
| // Received lock request for cancelled lock. |
| if (log.isDebugEnabled()) |
| log.debug("Received lock request for canceled lock (will ignore): " + req); |
| |
| res = null; |
| |
| fail = true; |
| cancelled = true; |
| } |
| |
| boolean releaseAll = false; |
| |
| if (res != null) { |
| try { |
| // Reply back to sender. |
| ctx.io().send(nodeId, res, ctx.ioPolicy()); |
| |
| if (txLockMsgLog.isDebugEnabled()) { |
| txLockMsgLog.debug("Sent dht lock response [txId=" + req.nearXidVersion() + |
| ", dhtTxId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nodeId + ']'); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| U.warn(txLockMsgLog, "Failed to send dht lock response, node failed [" + |
| "txId=" + req.nearXidVersion() + |
| ", dhtTxId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nodeId + ']'); |
| |
| fail = true; |
| releaseAll = true; |
| } |
| catch (IgniteCheckedException e) { |
| U.error(txLockMsgLog, "Failed to send dht lock response (lock will not be acquired) " + |
| "txId=" + req.nearXidVersion() + |
| ", dhtTxId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nodeId + ']', e); |
| |
| fail = true; |
| } |
| } |
| |
| if (fail) { |
| if (dhtTx != null) |
| dhtTx.rollbackRemoteTx(); |
| |
| if (nearTx != null) // Even though this should never happen, we leave this check for consistency. |
| nearTx.rollbackRemoteTx(); |
| |
| List<KeyCacheObject> keys = req.keys(); |
| |
| if (keys != null) { |
| for (KeyCacheObject key : keys) { |
| while (true) { |
| GridDistributedCacheEntry entry = peekExx(key); |
| |
| try { |
| if (entry != null) { |
| // Release all locks because sender node left grid. |
| if (releaseAll) |
| entry.removeExplicitNodeLocks(req.nodeId()); |
| else |
| entry.removeLock(req.version()); |
| } |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Attempted to remove lock on removed entity during during failure " + |
| "handling for dht lock request (will retry): " + entry); |
| } |
| } |
| } |
| } |
| |
| if (releaseAll && !cancelled) |
| U.warn(log, "Sender node left grid in the midst of lock acquisition (locks have been released)."); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) { |
| clearLocks(nodeId, req); |
| |
| if (isNearEnabled(cacheCfg)) |
| near().clearLocks(nodeId, req); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void processNearTxQueryEnlistRequest(UUID nodeId, final GridNearTxQueryEnlistRequest req) { |
| assert nodeId != null; |
| assert req != null; |
| |
| ClusterNode nearNode = ctx.discovery().node(nodeId); |
| |
| GridDhtTxLocal tx; |
| |
| try { |
| tx = initTxTopologyVersion(nodeId, |
| nearNode, |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| req.firstClientRequest(), |
| req.topologyVersion(), |
| req.threadId(), |
| req.txTimeout(), |
| req.subjectId(), |
| req.taskNameHash(), |
| req.mvccSnapshot()); |
| } |
| catch (IgniteCheckedException | IgniteException ex) { |
| GridNearTxQueryEnlistResponse res = new GridNearTxQueryEnlistResponse(req.cacheId(), |
| req.futureId(), |
| req.miniId(), |
| req.version(), |
| ex); |
| |
| try { |
| ctx.io().send(nearNode, res, ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send near enlist response [" + |
| "txId=" + req.version() + |
| ", node=" + nodeId + |
| ", res=" + res + ']', e); |
| } |
| |
| return; |
| } |
| |
| GridDhtTxQueryEnlistFuture fut = new GridDhtTxQueryEnlistFuture( |
| nodeId, |
| req.version(), |
| req.mvccSnapshot(), |
| req.threadId(), |
| req.futureId(), |
| req.miniId(), |
| tx, |
| req.cacheIds(), |
| req.partitions(), |
| req.schemaName(), |
| req.query(), |
| req.parameters(), |
| req.flags(), |
| req.pageSize(), |
| req.timeout(), |
| ctx); |
| |
| fut.listen(NearTxQueryEnlistResultHandler.instance()); |
| |
| fut.init(); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void processNearLockRequest(UUID nodeId, GridNearLockRequest req) { |
| assert ctx.affinityNode(); |
| assert nodeId != null; |
| assert req != null; |
| |
| if (txLockMsgLog.isDebugEnabled()) { |
| txLockMsgLog.debug("Received near lock request [txId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nodeId + ']'); |
| } |
| |
| ClusterNode nearNode = ctx.discovery().node(nodeId); |
| |
| if (nearNode == null) { |
| U.warn(txLockMsgLog, "Received near lock request from unknown node (will ignore) [txId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nodeId + ']'); |
| |
| return; |
| } |
| |
| processNearLockRequest0(nearNode, req); |
| } |
| |
| /** |
| * @param nearNode |
| * @param req |
| */ |
| private void processNearLockRequest0(ClusterNode nearNode, GridNearLockRequest req) { |
| IgniteInternalFuture<?> f; |
| |
| if (req.firstClientRequest()) { |
| for (; ; ) { |
| if (waitForExchangeFuture(nearNode, req)) |
| return; |
| |
| f = lockAllAsync(ctx, nearNode, req, null); |
| |
| if (f != null) |
| break; |
| } |
| } |
| else |
| f = lockAllAsync(ctx, nearNode, req, null); |
| |
| // Register listener just so we print out errors. |
| // Exclude lock timeout and rollback exceptions since it's not a fatal exception. |
| f.listen(CU.errorLogger(log, GridCacheLockTimeoutException.class, |
| GridDistributedLockCancelledException.class, IgniteTxTimeoutCheckedException.class, |
| IgniteTxRollbackCheckedException.class)); |
| } |
| |
| /** |
| * @param node Node. |
| * @param req Request. |
| */ |
| private boolean waitForExchangeFuture(final ClusterNode node, final GridNearLockRequest req) { |
| assert req.firstClientRequest() : req; |
| |
| GridDhtTopologyFuture topFut = ctx.shared().exchange().lastTopologyFuture(); |
| |
| if (!topFut.isDone()) { |
| Thread curThread = Thread.currentThread(); |
| |
| if (curThread instanceof IgniteThread) { |
| final IgniteThread thread = (IgniteThread)curThread; |
| |
| if (thread.cachePoolThread()) { |
| // Near transaction's finish on timeout will unlock topFut if it was held for too long, |
| // so need to listen with timeout. This is not true for optimistic transactions. |
| topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { |
| @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { |
| ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, new Runnable() { |
| @Override public void run() { |
| try { |
| processNearLockRequest0(node, req); |
| } |
| finally { |
| ctx.io().onMessageProcessed(req); |
| } |
| } |
| }); |
| } |
| }); |
| |
| return true; |
| } |
| } |
| |
| try { |
| topFut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Topology future failed: " + e, e); |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param res Response. |
| */ |
| private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) { |
| assert nodeId != null; |
| assert res != null; |
| GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>versionedFuture(res.version(), res.futureId()); |
| |
| if (fut == null) { |
| if (txLockMsgLog.isDebugEnabled()) |
| txLockMsgLog.debug("Received dht lock response for unknown future [txId=null" + |
| ", dhtTxId=" + res.version() + |
| ", node=" + nodeId + ']'); |
| |
| return; |
| } |
| else if (txLockMsgLog.isDebugEnabled()) { |
| txLockMsgLog.debug("Received dht lock response [txId=" + fut.nearLockVersion() + |
| ", dhtTxId=" + res.version() + |
| ", node=" + nodeId + ']'); |
| } |
| |
| fut.onResult(nodeId, res); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Boolean> lockAllAsync( |
| @Nullable Collection<KeyCacheObject> keys, |
| long timeout, |
| IgniteTxLocalEx txx, |
| boolean isInvalidate, |
| boolean isRead, |
| boolean retval, |
| TransactionIsolation isolation, |
| long createTtl, |
| long accessTtl) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return lockAllAsyncInternal( |
| keys, |
| timeout, |
| txx, |
| isInvalidate, |
| isRead, |
| retval, |
| isolation, |
| createTtl, |
| accessTtl, |
| CU.empty0(), |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary()); |
| } |
| |
| /** |
| * Acquires locks in partitioned cache. |
| * |
| * @param keys Keys to lock. |
| * @param timeout Lock timeout. |
| * @param txx Transaction. |
| * @param isInvalidate Invalidate flag. |
| * @param isRead Read flag. |
| * @param retval Return value flag. |
| * @param isolation Transaction isolation. |
| * @param createTtl TTL for create operation. |
| * @param accessTtl TTL for read operation. |
| * @param filter Optional filter. |
| * @param skipStore Skip store flag. |
| * @return Lock future. |
| */ |
| public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable Collection<KeyCacheObject> keys, |
| long timeout, |
| IgniteTxLocalEx txx, |
| boolean isInvalidate, |
| boolean isRead, |
| boolean retval, |
| TransactionIsolation isolation, |
| long createTtl, |
| long accessTtl, |
| CacheEntryPredicate[] filter, |
| boolean skipStore, |
| boolean keepBinary) { |
| if (keys == null || keys.isEmpty()) |
| return new GridDhtFinishedFuture<>(true); |
| |
| GridDhtTxLocalAdapter tx = (GridDhtTxLocalAdapter)txx; |
| |
| assert tx != null; |
| |
| GridDhtLockFuture fut = new GridDhtLockFuture( |
| ctx, |
| tx.nearNodeId(), |
| tx.nearXidVersion(), |
| tx.topologyVersion(), |
| keys.size(), |
| isRead, |
| retval, |
| timeout, |
| tx, |
| tx.threadId(), |
| createTtl, |
| accessTtl, |
| filter, |
| skipStore, |
| keepBinary); |
| |
| if (fut.isDone()) // Possible in case of cancellation or timeout or rollback. |
| return fut; |
| |
| for (KeyCacheObject key : keys) { |
| try { |
| while (true) { |
| GridDhtCacheEntry entry = entryExx(key, tx.topologyVersion()); |
| |
| try { |
| fut.addEntry(entry); |
| |
| // Possible in case of cancellation or time out or rollback. |
| if (fut.isDone()) |
| return fut; |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry when adding lock (will retry): " + entry); |
| } |
| catch (GridDistributedLockCancelledException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']'); |
| |
| return new GridDhtFinishedFuture<>(e); |
| } |
| } |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| fut.addInvalidPartition(ctx, e.partition()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Added invalid partition to DHT lock future [part=" + e.partition() + ", fut=" + |
| fut + ']'); |
| } |
| } |
| |
| if (!fut.isDone()) { |
| ctx.mvcc().addFuture(fut); |
| |
| fut.map(); |
| } |
| |
| return fut; |
| } |
| |
| /** |
| * @param cacheCtx Cache context. |
| * @param nearNode Near node. |
| * @param req Request. |
| * @param filter0 Filter. |
| * @return Future. |
| */ |
| public IgniteInternalFuture<GridNearLockResponse> lockAllAsync( |
| final GridCacheContext<?, ?> cacheCtx, |
| final ClusterNode nearNode, |
| final GridNearLockRequest req, |
| @Nullable final CacheEntryPredicate[] filter0) { |
| final List<KeyCacheObject> keys = req.keys(); |
| |
| CacheEntryPredicate[] filter = filter0; |
| |
| // Set message into thread context. |
| GridDhtTxLocal tx = null; |
| |
| try { |
| int cnt = keys.size(); |
| |
| if (req.inTx()) { |
| GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version()); |
| |
| if (dhtVer != null) |
| tx = ctx.tm().tx(dhtVer); |
| } |
| |
| final List<GridCacheEntryEx> entries = new ArrayList<>(cnt); |
| |
| // Unmarshal filter first. |
| if (filter == null) |
| filter = req.filter(); |
| |
| GridDhtLockFuture fut = null; |
| |
| GridDhtPartitionTopology top = null; |
| |
| if (req.firstClientRequest()) { |
| assert nearNode.isClient(); |
| |
| top = topology(); |
| |
| top.readLock(); |
| |
| if (!top.topologyVersionFuture().isDone()) { |
| top.readUnlock(); |
| |
| return null; |
| } |
| } |
| |
| try { |
| if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) { |
| if (log.isDebugEnabled()) { |
| log.debug("Client topology version mismatch, need remap lock request [" + |
| "reqTopVer=" + req.topologyVersion() + |
| ", locTopVer=" + top.readyTopologyVersion() + |
| ", req=" + req + ']'); |
| } |
| |
| GridNearLockResponse res = sendClientLockRemapResponse(nearNode, |
| req, |
| top.lastTopologyChangeVersion()); |
| |
| return new GridFinishedFuture<>(res); |
| } |
| |
| if (req.inTx()) { |
| if (tx == null) { |
| tx = new GridDhtTxLocal( |
| ctx.shared(), |
| topology().readyTopologyVersion(), |
| nearNode.id(), |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| req.threadId(), |
| /*implicitTx*/false, |
| /*implicitSingleTx*/false, |
| ctx.systemTx(), |
| false, |
| ctx.ioPolicy(), |
| PESSIMISTIC, |
| req.isolation(), |
| req.timeout(), |
| req.isInvalidate(), |
| !req.skipStore(), |
| false, |
| req.txSize(), |
| null, |
| req.subjectId(), |
| req.taskNameHash(), |
| req.txLabel(), |
| null); |
| |
| if (req.syncCommit()) |
| tx.syncMode(FULL_SYNC); |
| |
| tx = ctx.tm().onCreated(null, tx); |
| |
| if (tx == null || !tx.init()) { |
| String msg = "Failed to acquire lock (transaction has been completed): " + |
| req.version(); |
| |
| U.warn(log, msg); |
| |
| if (tx != null) |
| tx.rollbackDhtLocal(); |
| |
| return new GridDhtFinishedFuture<>(new IgniteTxRollbackCheckedException(msg)); |
| } |
| |
| tx.topologyVersion(req.topologyVersion()); |
| } |
| |
| GridDhtPartitionsExchangeFuture lastFinishedFut = ctx.shared().exchange().lastFinishedFuture(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| CacheInvalidStateException validateCacheE = lastFinishedFut |
| .validateCache(ctx, opCtx != null && opCtx.recovery(), req.txRead(), null, keys); |
| |
| if (validateCacheE != null) |
| throw validateCacheE; |
| } |
| else { |
| fut = new GridDhtLockFuture(ctx, |
| nearNode.id(), |
| req.version(), |
| req.topologyVersion(), |
| cnt, |
| req.txRead(), |
| req.needReturnValue(), |
| req.timeout(), |
| tx, |
| req.threadId(), |
| req.createTtl(), |
| req.accessTtl(), |
| filter, |
| req.skipStore(), |
| req.keepBinary()); |
| |
| // Add before mapping. |
| if (!ctx.mvcc().addFuture(fut)) |
| throw new IllegalStateException("Duplicate future ID: " + fut); |
| } |
| } |
| finally { |
| if (top != null) |
| top.readUnlock(); |
| } |
| |
| boolean timedOut = false; |
| |
| for (KeyCacheObject key : keys) { |
| if (timedOut) |
| break; |
| |
| while (true) { |
| // Specify topology version to make sure containment is checked |
| // based on the requested version, not the latest. |
| GridDhtCacheEntry entry = entryExx(key, req.topologyVersion()); |
| |
| try { |
| if (fut != null) { |
| // This method will add local candidate. |
| // Entry cannot become obsolete after this method succeeded. |
| fut.addEntry(key == null ? null : entry); |
| |
| if (fut.isDone()) { |
| timedOut = true; |
| |
| break; |
| } |
| } |
| |
| entries.add(entry); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry when adding lock (will retry): " + entry); |
| } |
| catch (GridDistributedLockCancelledException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Got lock request for cancelled lock (will ignore): " + |
| entry); |
| |
| fut.onError(e); |
| |
| return new GridDhtFinishedFuture<>(e); |
| } |
| } |
| } |
| |
| // Handle implicit locks for pessimistic transactions. |
| if (req.inTx()) { |
| ctx.tm().txContext(tx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']'); |
| |
| IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync( |
| cacheCtx, |
| entries, |
| req.messageId(), |
| req.txRead(), |
| req.needReturnValue(), |
| req.createTtl(), |
| req.accessTtl(), |
| req.skipStore(), |
| req.keepBinary(), |
| req.nearCache()); |
| |
| final GridDhtTxLocal t = tx; |
| |
| return new GridDhtEmbeddedFuture( |
| txFut, |
| new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() { |
| @Override public IgniteInternalFuture<GridNearLockResponse> apply( |
| GridCacheReturn o, Exception e) { |
| if (e != null) |
| e = U.unwrap(e); |
| |
| // Transaction can be emptied by asynchronous rollback. |
| assert e != null || !t.empty(); |
| |
| // Create response while holding locks. |
| final GridNearLockResponse resp = createLockReply(nearNode, |
| entries, |
| req, |
| t, |
| t.xidVersion(), |
| e); |
| |
| assert !t.implicit() : t; |
| assert !t.onePhaseCommit() : t; |
| |
| sendLockReply(nearNode, t, req, resp); |
| |
| return new GridFinishedFuture<>(resp); |
| } |
| } |
| ); |
| } |
| else { |
| assert fut != null; |
| |
| // This will send remote messages. |
| fut.map(); |
| |
| final GridCacheVersion mappedVer = fut.version(); |
| |
| return new GridDhtEmbeddedFuture<>( |
| new C2<Boolean, Exception, GridNearLockResponse>() { |
| @Override public GridNearLockResponse apply(Boolean b, Exception e) { |
| if (e != null) |
| e = U.unwrap(e); |
| else if (!b) |
| e = new GridCacheLockTimeoutException(req.version()); |
| |
| GridNearLockResponse res = createLockReply(nearNode, |
| entries, |
| req, |
| null, |
| mappedVer, |
| e); |
| |
| sendLockReply(nearNode, null, req, res); |
| |
| return res; |
| } |
| }, |
| fut); |
| } |
| } |
| catch (IgniteCheckedException | RuntimeException e) { |
| U.error(log, req, e); |
| |
| if (tx != null) { |
| try { |
| tx.rollbackDhtLocal(); |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(log, "Failed to rollback the transaction: " + tx, ex); |
| } |
| } |
| |
| try { |
| GridNearLockResponse res = createLockReply(nearNode, |
| Collections.emptyList(), |
| req, |
| tx, |
| tx != null ? tx.xidVersion() : req.version(), |
| e); |
| |
| sendLockReply(nearNode, null, req, res); |
| } |
| catch (Exception ex) { |
| U.error(log, "Failed to send response for request message: " + req, ex); |
| } |
| |
| return new GridDhtFinishedFuture<>( |
| new IgniteCheckedException(e)); |
| } |
| } |
| |
| /** |
| * @param nearNode Client node. |
| * @param req Request. |
| * @param topVer Remap version. |
| * @return Response. |
| */ |
| private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode, |
| GridNearLockRequest req, |
| AffinityTopologyVersion topVer) { |
| assert topVer != null; |
| |
| GridNearLockResponse res = new GridNearLockResponse( |
| ctx.cacheId(), |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| false, |
| 0, |
| null, |
| topVer, |
| ctx.deploymentEnabled(), |
| false); |
| |
| try { |
| ctx.io().send(nearNode, res, ctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send client lock remap response, client node failed " + |
| "[node=" + nearNode + ", req=" + req + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * @param nearNode Near node. |
| * @param entries Entries. |
| * @param req Lock request. |
| * @param tx Transaction. |
| * @param mappedVer Mapped version. |
| * @param err Error. |
| * @return Response. |
| */ |
| private GridNearLockResponse createLockReply( |
| ClusterNode nearNode, |
| List<GridCacheEntryEx> entries, |
| GridNearLockRequest req, |
| @Nullable GridDhtTxLocalAdapter tx, |
| GridCacheVersion mappedVer, |
| Throwable err) { |
| assert mappedVer != null; |
| assert tx == null || tx.xidVersion().equals(mappedVer); |
| |
| try { |
| // All subsequent lock requests must use actual topology version to avoid mapping on invalid primaries. |
| AffinityTopologyVersion clienRemapVer = req.firstClientRequest() && |
| tx != null && |
| topology().readyTopologyVersion().after(req.topologyVersion()) ? |
| topology().readyTopologyVersion() : null; |
| |
| // Send reply back to originating near node. |
| GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(), |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| tx != null && tx.onePhaseCommit(), |
| entries.size(), |
| err, |
| clienRemapVer, |
| ctx.deploymentEnabled(), |
| clienRemapVer != null); |
| |
| if (err == null) { |
| res.pending(localDhtPendingVersions(entries, mappedVer)); |
| |
| // We have to add completed versions for cases when nearLocal and remote transactions |
| // execute concurrently. |
| IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(req.version()); |
| |
| res.completedVersions(versPair.get1(), versPair.get2()); |
| |
| int i = 0; |
| |
| for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext(); ) { |
| GridCacheEntryEx e = it.next(); |
| |
| assert e != null; |
| |
| while (true) { |
| try { |
| // Don't return anything for invalid partitions. |
| if (tx == null || !tx.isRollbackOnly()) { |
| GridCacheVersion dhtVer = req.dhtVersion(i); |
| |
| GridCacheVersion ver = e.version(); |
| |
| boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver); |
| |
| CacheObject val = null; |
| |
| if (ret) { |
| val = e.innerGet( |
| null, |
| tx, |
| /*read-through*/false, |
| /*update-metrics*/true, |
| /*event notification*/req.returnValue(i), |
| CU.subjectId(tx, ctx.shared()), |
| null, |
| tx != null ? tx.resolveTaskName() : null, |
| null, |
| req.keepBinary()); |
| } |
| |
| assert e.lockedBy(mappedVer) || |
| ctx.mvcc().isRemoved(e.context(), mappedVer) || |
| tx != null && tx.isRollbackOnly() : |
| "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() + |
| ", entry=" + e + |
| ", mappedVer=" + mappedVer + ", ver=" + ver + |
| ", tx=" + CU.txString(tx) + ", req=" + req + ']'; |
| |
| boolean filterPassed = false; |
| |
| if (tx != null && tx.onePhaseCommit()) { |
| IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key())); |
| |
| assert writeEntry != null : |
| "Missing tx entry for locked cache entry: " + e; |
| |
| filterPassed = writeEntry.filtersPassed(); |
| } |
| |
| if (ret && val == null) |
| val = e.valueBytes(null); |
| |
| // We include values into response since they are required for local |
| // calls and won't be serialized. We are also including DHT version. |
| res.addValueBytes( |
| ret ? val : null, |
| filterPassed, |
| ver, |
| mappedVer); |
| } |
| else { |
| // We include values into response since they are required for local |
| // calls and won't be serialized. We are also including DHT version. |
| res.addValueBytes(null, false, e.version(), mappedVer); |
| } |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry when sending reply to DHT lock request " + |
| "(will retry): " + e); |
| |
| e = entryExx(e.key()); |
| |
| it.set(e); |
| } |
| } |
| |
| i++; |
| } |
| } |
| |
| return res; |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to get value for lock reply message for node [node=" + |
| U.toShortString(nearNode) + ", req=" + req + ']', e); |
| |
| return new GridNearLockResponse(ctx.cacheId(), |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| false, |
| entries.size(), |
| e, |
| null, |
| ctx.deploymentEnabled(), |
| false); |
| } |
| } |
| |
| /** |
| * Send lock reply back to near node. |
| * |
| * @param nearNode Near node. |
| * @param tx Transaction. |
| * @param req Lock request. |
| * @param res Lock response. |
| */ |
| private void sendLockReply( |
| ClusterNode nearNode, |
| @Nullable GridDhtTxLocal tx, |
| GridNearLockRequest req, |
| GridNearLockResponse res |
| ) { |
| Throwable err = res.error(); |
| |
| // Log error before sending reply. |
| if (err != null && !(err instanceof GridCacheLockTimeoutException) && |
| !(err instanceof IgniteTxRollbackCheckedException) && !ctx.kernalContext().isStopping()) |
| U.error(log, "Failed to acquire lock for request: " + req, err); |
| |
| try { |
| // TODO Async rollback |
| // Don't send reply message to this node or if lock was cancelled or tx was rolled back asynchronously. |
| if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class) && |
| !X.hasCause(err, IgniteTxRollbackCheckedException.class)) { |
| ctx.io().send(nearNode, res, ctx.ioPolicy()); |
| |
| if (txLockMsgLog.isDebugEnabled()) { |
| txLockMsgLog.debug("Sent near lock response [txId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nearNode.id() + ']'); |
| } |
| } |
| else { |
| if (txLockMsgLog.isDebugEnabled() && !nearNode.id().equals(ctx.nodeId())) { |
| txLockMsgLog.debug("Skip send near lock response [txId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nearNode.id() + |
| ", err=" + err + ']'); |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(txLockMsgLog, "Failed to send near lock response (will rollback transaction) [" + |
| "txId=" + req.version() + |
| ", inTx=" + req.inTx() + |
| ", node=" + nearNode.id() + |
| ", res=" + res + ']', e); |
| |
| if (tx != null) |
| try { |
| tx.rollbackDhtLocalAsync(); |
| } |
| catch (Throwable e1) { |
| e.addSuppressed(e1); |
| } |
| |
| // Convert to closure exception as this method is only called form closures. |
| throw new GridClosureException(e); |
| } |
| } |
| |
| /** |
| * Collects versions of pending candidates versions less then base. |
| * |
| * @param entries Tx entries to process. |
| * @param baseVer Base version. |
| * @return Collection of pending candidates versions. |
| */ |
| private Collection<GridCacheVersion> localDhtPendingVersions(Iterable<GridCacheEntryEx> entries, |
| GridCacheVersion baseVer) { |
| Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5); |
| |
| for (GridCacheEntryEx entry : entries) { |
| // Since entries were collected before locks are added, some of them may become obsolete. |
| while (true) { |
| try { |
| for (GridCacheMvccCandidate cand : entry.localCandidates()) { |
| if (cand.version().isLess(baseVer)) |
| lessPending.add(cand.version()); |
| } |
| |
| break; // While. |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry is localDhtPendingVersions (will retry): " + entry); |
| |
| entry = entryExx(entry.key()); |
| } |
| } |
| } |
| |
| return lessPending; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void clearLocks(UUID nodeId, GridDistributedUnlockRequest req) { |
| assert nodeId != null; |
| |
| List<KeyCacheObject> keys = req.keys(); |
| |
| if (keys != null) { |
| for (KeyCacheObject key : keys) { |
| while (true) { |
| GridDistributedCacheEntry entry = peekExx(key); |
| |
| if (entry == null) |
| // Nothing to unlock. |
| break; |
| |
| try { |
| entry.doneRemote( |
| req.version(), |
| req.version(), |
| null, |
| null, |
| null, |
| /*system invalidate*/false); |
| |
| // Note that we don't reorder completed versions here, |
| // as there is no point to reorder relative to the version |
| // we are about to remove. |
| if (entry.removeLock(req.version())) { |
| if (log.isDebugEnabled()) |
| log.debug("Removed lock [lockId=" + req.version() + ", key=" + key + ']'); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Received unlock request for unknown candidate " + |
| "(added to cancelled locks set): " + req); |
| } |
| |
| entry.touch(); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Received remove lock request for removed entry (will retry) [entry=" + |
| entry + ", req=" + req + ']'); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Sender ID. |
| * @param req Request. |
| */ |
| private void processNearUnlockRequest(UUID nodeId, GridNearUnlockRequest req) { |
| assert ctx.affinityNode(); |
| assert nodeId != null; |
| |
| removeLocks(nodeId, req.version(), req.keys(), true); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param topVer Topology version. |
| * @param cached Entry. |
| * @param readers Readers for this entry. |
| * @param dhtMap DHT map. |
| * @param nearMap Near map. |
| */ |
| private void map(UUID nodeId, |
| AffinityTopologyVersion topVer, |
| GridCacheEntryEx cached, |
| Collection<UUID> readers, |
| Map<ClusterNode, List<KeyCacheObject>> dhtMap, |
| Map<ClusterNode, List<KeyCacheObject>> nearMap |
| ) { |
| List<ClusterNode> dhtNodes = ctx.dht().topology().nodes(cached.partition(), topVer); |
| |
| ClusterNode primary = dhtNodes.get(0); |
| |
| assert primary != null; |
| |
| if (!primary.id().equals(ctx.nodeId())) { |
| if (log.isDebugEnabled()) |
| log.debug("Primary node mismatch for unlock [entry=" + cached + ", expected=" + ctx.nodeId() + |
| ", actual=" + U.toShortString(primary) + ']'); |
| |
| return; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) + ", entry=" + cached + ']'); |
| |
| Collection<ClusterNode> nearNodes = null; |
| |
| if (!F.isEmpty(readers)) { |
| nearNodes = ctx.discovery().nodes(readers, F0.not(F.idForNodeId(nodeId))); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) + ", entry=" + cached + |
| ']'); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Entry has no near readers: " + cached); |
| } |
| |
| map(cached, F.view(dhtNodes, F.remoteNodes(ctx.nodeId())), dhtMap); // Exclude local node. |
| map(cached, nearNodes, nearMap); |
| } |
| |
| /** |
| * @param entry Entry. |
| * @param nodes Nodes. |
| * @param map Map. |
| */ |
| private void map(GridCacheEntryEx entry, |
| @Nullable Iterable<? extends ClusterNode> nodes, |
| Map<ClusterNode, List<KeyCacheObject>> map) { |
| if (nodes != null) { |
| for (ClusterNode n : nodes) { |
| List<KeyCacheObject> keys = map.get(n); |
| |
| if (keys == null) |
| map.put(n, keys = new LinkedList<>()); |
| |
| keys.add(entry.key()); |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param ver Version. |
| * @param keys Keys. |
| * @param unmap Flag for un-mapping version. |
| */ |
| public void removeLocks(UUID nodeId, GridCacheVersion ver, Iterable<KeyCacheObject> keys, boolean unmap) { |
| assert nodeId != null; |
| assert ver != null; |
| |
| if (F.isEmpty(keys)) |
| return; |
| |
| // Remove mapped versions. |
| GridCacheVersion dhtVer = unmap ? ctx.mvcc().unmapVersion(ver) : ver; |
| |
| ctx.mvcc().addRemoved(ctx, ver); |
| |
| Map<ClusterNode, List<KeyCacheObject>> dhtMap = new HashMap<>(); |
| Map<ClusterNode, List<KeyCacheObject>> nearMap = new HashMap<>(); |
| |
| GridCacheVersion obsoleteVer = null; |
| |
| for (KeyCacheObject key : keys) { |
| while (true) { |
| boolean created = false; |
| |
| GridDhtCacheEntry entry = peekExx(key); |
| |
| if (entry == null) { |
| entry = entryExx(key); |
| |
| created = true; |
| } |
| |
| try { |
| GridCacheMvccCandidate cand = null; |
| |
| if (dhtVer == null) { |
| cand = entry.localCandidateByNearVersion(ver, true); |
| |
| if (cand != null) |
| dhtVer = cand.version(); |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to locate lock candidate based on dht or near versions [nodeId=" + |
| nodeId + ", ver=" + ver + ", unmap=" + unmap + ", keys=" + keys + ']'); |
| |
| entry.removeLock(ver); |
| |
| if (created) { |
| if (obsoleteVer == null) |
| obsoleteVer = nextVersion(); |
| |
| if (entry.markObsolete(obsoleteVer)) |
| removeEntry(entry); |
| } |
| |
| break; |
| } |
| } |
| |
| if (cand == null) |
| cand = entry.candidate(dhtVer); |
| |
| AffinityTopologyVersion topVer = cand == null |
| ? AffinityTopologyVersion.NONE |
| : cand.topologyVersion(); |
| |
| // Note that we obtain readers before lock is removed. |
| // Even in case if entry would be removed just after lock is removed, |
| // we must send release messages to backups and readers. |
| Collection<UUID> readers = entry.readers(); |
| |
| // Note that we don't reorder completed versions here, |
| // as there is no point to reorder relative to the version |
| // we are about to remove. |
| if (entry.removeLock(dhtVer)) { |
| // Map to backups and near readers. |
| map(nodeId, topVer, entry, readers, dhtMap, nearMap); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Removed lock [lockId=" + ver + ", key=" + key + ']'); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Received unlock request for unknown candidate " + |
| "(added to cancelled locks set) [ver=" + ver + ", entry=" + entry + ']'); |
| |
| if (created && entry.markObsolete(dhtVer)) |
| removeEntry(entry); |
| |
| entry.touch(); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Received remove lock request for removed entry (will retry): " + entry); |
| } |
| } |
| } |
| |
| IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver); |
| |
| Collection<GridCacheVersion> committed = versPair.get1(); |
| Collection<GridCacheVersion> rolledback = versPair.get2(); |
| |
| // Backups. |
| for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : dhtMap.entrySet()) { |
| ClusterNode n = entry.getKey(); |
| |
| List<KeyCacheObject> keyBytes = entry.getValue(); |
| |
| GridDhtUnlockRequest req = new GridDhtUnlockRequest(ctx.cacheId(), keyBytes.size(), |
| ctx.deploymentEnabled()); |
| |
| req.version(dhtVer); |
| |
| try { |
| for (KeyCacheObject key : keyBytes) |
| req.addKey(key, ctx); |
| |
| keyBytes = nearMap.get(n); |
| |
| if (keyBytes != null) |
| for (KeyCacheObject key : keyBytes) |
| req.addNearKey(key); |
| |
| req.completedVersions(committed, rolledback); |
| |
| ctx.io().send(n, req, ctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Node left while sending unlock request: " + n); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send unlock request to node (will make best effort to complete): " + n, e); |
| } |
| } |
| |
| // Readers. |
| for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : nearMap.entrySet()) { |
| ClusterNode n = entry.getKey(); |
| |
| if (!dhtMap.containsKey(n)) { |
| List<KeyCacheObject> keyBytes = entry.getValue(); |
| |
| GridDhtUnlockRequest req = new GridDhtUnlockRequest(ctx.cacheId(), keyBytes.size(), |
| ctx.deploymentEnabled()); |
| |
| req.version(dhtVer); |
| |
| try { |
| for (KeyCacheObject key : keyBytes) |
| req.addNearKey(key); |
| |
| req.completedVersions(committed, rolledback); |
| |
| ctx.io().send(n, req, ctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Node left while sending unlock request: " + n); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send unlock request to node (will make best effort to complete): " + n, e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param key Key |
| * @param ver Version. |
| * @throws IgniteCheckedException If invalidate failed. |
| */ |
| private void invalidateNearEntry(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException { |
| GridCacheEntryEx nearEntry = near().peekEx(key); |
| |
| if (nearEntry != null) |
| nearEntry.invalidate(ver); |
| } |
| |
| /** |
| * @param key Key |
| */ |
| private void obsoleteNearEntry(KeyCacheObject key) { |
| GridCacheEntryEx nearEntry = near().peekEx(key); |
| |
| if (nearEntry != null) |
| nearEntry.markObsolete(nextVersion()); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void processNearTxQueryResultsEnlistRequest(UUID nodeId, final GridNearTxQueryResultsEnlistRequest req) { |
| assert nodeId != null; |
| assert req != null; |
| |
| ClusterNode nearNode = ctx.discovery().node(nodeId); |
| |
| GridDhtTxLocal tx; |
| |
| try { |
| tx = initTxTopologyVersion(nodeId, |
| nearNode, |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| req.firstClientRequest(), |
| req.topologyVersion(), |
| req.threadId(), |
| req.txTimeout(), |
| req.subjectId(), |
| req.taskNameHash(), |
| req.mvccSnapshot()); |
| } |
| catch (Throwable e) { |
| GridNearTxQueryResultsEnlistResponse res = new GridNearTxQueryResultsEnlistResponse(req.cacheId(), |
| req.futureId(), |
| req.miniId(), |
| req.version(), |
| e); |
| |
| try { |
| ctx.io().send(nearNode, res, ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException ioEx) { |
| U.error(log, "Failed to send near enlist response " + |
| "[txId=" + req.version() + ", node=" + nodeId + ", res=" + res + ']', ioEx); |
| } |
| |
| if (e instanceof Error) |
| throw (Error) e; |
| |
| return; |
| } |
| |
| GridDhtTxQueryResultsEnlistFuture fut = new GridDhtTxQueryResultsEnlistFuture( |
| nodeId, |
| req.version(), |
| req.mvccSnapshot(), |
| req.threadId(), |
| req.futureId(), |
| req.miniId(), |
| tx, |
| req.timeout(), |
| ctx, |
| req.rows(), |
| req.operation()); |
| |
| fut.listen(NearTxQueryEnlistResultHandler.instance()); |
| |
| fut.init(); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void processNearTxEnlistRequest(UUID nodeId, final GridNearTxEnlistRequest req) { |
| assert nodeId != null; |
| assert req != null; |
| |
| ClusterNode nearNode = ctx.discovery().node(nodeId); |
| |
| GridDhtTxLocal tx; |
| |
| try { |
| tx = initTxTopologyVersion(nodeId, |
| nearNode, |
| req.version(), |
| req.futureId(), |
| req.miniId(), |
| req.firstClientRequest(), |
| req.topologyVersion(), |
| req.threadId(), |
| req.txTimeout(), |
| req.subjectId(), |
| req.taskNameHash(), |
| req.mvccSnapshot()); |
| } |
| catch (IgniteCheckedException | IgniteException ex) { |
| GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(req.cacheId(), |
| req.futureId(), |
| req.miniId(), |
| req.version(), |
| ex); |
| |
| try { |
| ctx.io().send(nearNode, res, ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send near enlist response [" + |
| "txId=" + req.version() + |
| ", node=" + nodeId + |
| ", res=" + res + ']', e); |
| } |
| |
| return; |
| } |
| |
| GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture( |
| nodeId, |
| req.version(), |
| req.mvccSnapshot(), |
| req.threadId(), |
| req.futureId(), |
| req.miniId(), |
| tx, |
| req.timeout(), |
| ctx, |
| req.rows(), |
| req.operation(), |
| req.filter(), |
| req.needRes(), |
| req.keepBinary()); |
| |
| fut.listen(NearTxResultHandler.instance()); |
| |
| fut.init(); |
| } |
| |
| /** |
| * @param nodeId Near node id. |
| * @param nearNode Near node. |
| * @param nearLockVer Near lock version. |
| * @param nearFutId Near future id. |
| * @param nearMiniId Near mini-future id. |
| * @param firstClientReq First client request flag. |
| * @param topVer Topology version. |
| * @param nearThreadId Near node thread id. |
| * @param timeout Timeout. |
| * @param txSubjectId Transaction subject id. |
| * @param txTaskNameHash Transaction task name hash. |
| * @param snapshot Mvcc snapsht. |
| * @return Transaction. |
| */ |
| public GridDhtTxLocal initTxTopologyVersion(UUID nodeId, |
| ClusterNode nearNode, |
| GridCacheVersion nearLockVer, |
| IgniteUuid nearFutId, |
| int nearMiniId, |
| boolean firstClientReq, |
| AffinityTopologyVersion topVer, |
| long nearThreadId, |
| long timeout, |
| UUID txSubjectId, |
| int txTaskNameHash, |
| MvccSnapshot snapshot |
| ) throws IgniteException, IgniteCheckedException { |
| |
| assert ctx.affinityNode(); |
| |
| if (txLockMsgLog.isDebugEnabled()) { |
| txLockMsgLog.debug("Received near enlist request [txId=" + nearLockVer + |
| ", node=" + nodeId + ']'); |
| } |
| |
| if (nearNode == null) { |
| U.warn(txLockMsgLog, "Received near enlist request from unknown node (will ignore) [txId=" + nearLockVer + |
| ", node=" + nodeId + ']'); |
| |
| return null; |
| } |
| |
| GridDhtTxLocal tx = null; |
| |
| GridCacheVersion dhtVer = ctx.tm().mappedVersion(nearLockVer); |
| |
| if (dhtVer != null) |
| tx = ctx.tm().tx(dhtVer); |
| |
| GridDhtPartitionTopology top = null; |
| |
| if (tx == null) { |
| if (firstClientReq) { |
| assert nearNode.isClient(); |
| |
| top = topology(); |
| |
| top.readLock(); |
| |
| GridDhtTopologyFuture topFut = top.topologyVersionFuture(); |
| |
| boolean done = topFut.isDone(); |
| |
| if (!done || !(topFut.topologyVersion().compareTo(topVer) >= 0 |
| && ctx.shared().exchange().lastAffinityChangedTopologyVersion(topFut.initialVersion()).compareTo(topVer) <= 0)) { |
| // TODO IGNITE-7164 Wait for topology change, remap client TX in case affinity was changed. |
| top.readUnlock(); |
| |
| throw new ClusterTopologyException("Topology was changed. Please retry on stable topology."); |
| } |
| } |
| |
| try { |
| tx = new GridDhtTxLocal( |
| ctx.shared(), |
| topVer, |
| nearNode.id(), |
| nearLockVer, |
| nearFutId, |
| nearMiniId, |
| nearThreadId, |
| false, |
| false, |
| ctx.systemTx(), |
| false, |
| ctx.ioPolicy(), |
| PESSIMISTIC, |
| REPEATABLE_READ, |
| timeout, |
| false, |
| false, |
| false, |
| -1, |
| null, |
| txSubjectId, |
| txTaskNameHash, |
| null, |
| null); |
| |
| // if (req.syncCommit()) |
| tx.syncMode(FULL_SYNC); |
| |
| tx = ctx.tm().onCreated(null, tx); |
| |
| if (tx == null || !tx.init()) { |
| String msg = "Failed to acquire lock (transaction has been completed): " + |
| nearLockVer; |
| |
| U.warn(log, msg); |
| |
| try { |
| if (tx != null) |
| tx.rollbackDhtLocal(); |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(log, "Failed to rollback the transaction: " + tx, ex); |
| } |
| |
| throw new IgniteCheckedException(msg); |
| } |
| |
| tx.mvccSnapshot(snapshot); |
| tx.topologyVersion(topVer); |
| } |
| finally { |
| if (top != null) |
| top.readUnlock(); |
| } |
| } |
| |
| ctx.tm().txContext(tx); |
| |
| return tx; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param res Response. |
| */ |
| private void processNearTxEnlistResponse(UUID nodeId, final GridNearTxEnlistResponse res) { |
| GridNearTxEnlistFuture fut = (GridNearTxEnlistFuture) |
| ctx.mvcc().versionedFuture(res.version(), res.futureId()); |
| |
| if (fut != null) |
| fut.onResult(nodeId, res); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param res Response. |
| */ |
| private void processNearTxQueryEnlistResponse(UUID nodeId, final GridNearTxQueryEnlistResponse res) { |
| GridNearTxQueryEnlistFuture fut = (GridNearTxQueryEnlistFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId()); |
| |
| if (fut != null) |
| fut.onResult(nodeId, res); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param res Response. |
| */ |
| private void processNearTxQueryResultsEnlistResponse(UUID nodeId, final GridNearTxQueryResultsEnlistResponse res) { |
| GridNearTxQueryResultsEnlistFuture fut = (GridNearTxQueryResultsEnlistFuture) |
| ctx.mvcc().versionedFuture(res.version(), res.futureId()); |
| |
| if (fut != null) |
| fut.onResult(nodeId, res); |
| } |
| |
| /** |
| * @param primary Primary node. |
| * @param req Message. |
| * @param first Flag if this is a first request in current operation. |
| */ |
| private void processDhtTxQueryEnlistRequest(UUID primary, GridDhtTxQueryEnlistRequest req, boolean first) { |
| try { |
| assert req.version() != null && req.op() != null; |
| |
| GridDhtTxRemote tx = ctx.tm().tx(req.version()); |
| |
| if (tx == null) { |
| if (!first) |
| throw new IgniteCheckedException("Can not find a transaction for version [version=" |
| + req.version() + ']'); |
| |
| GridDhtTxQueryFirstEnlistRequest req0 = (GridDhtTxQueryFirstEnlistRequest)req; |
| |
| tx = new GridDhtTxRemote(ctx.shared(), |
| req0.nearNodeId(), |
| req0.dhtFutureId(), |
| primary, |
| req0.nearXidVersion(), |
| req0.topologyVersion(), |
| req0.version(), |
| null, |
| ctx.systemTx(), |
| ctx.ioPolicy(), |
| PESSIMISTIC, |
| REPEATABLE_READ, |
| false, |
| req0.timeout(), |
| -1, |
| req0.subjectId(), |
| req0.taskNameHash(), |
| false, |
| null); |
| |
| tx.mvccSnapshot(new MvccSnapshotWithoutTxs(req0.coordinatorVersion(), req0.counter(), |
| MVCC_OP_COUNTER_NA, req0.cleanupVersion())); |
| |
| tx = ctx.tm().onCreated(null, tx); |
| |
| if (tx == null || !ctx.tm().onStarted(tx)) { |
| throw new IgniteTxRollbackCheckedException("Failed to update backup " + |
| "(transaction has been completed): " + req0.version()); |
| } |
| } |
| |
| assert tx != null; |
| |
| MvccSnapshot s0 = tx.mvccSnapshot(); |
| |
| MvccSnapshot snapshot = new MvccSnapshotWithoutTxs(s0.coordinatorVersion(), s0.counter(), |
| req.operationCounter(), s0.cleanupVersion()); |
| |
| ctx.tm().txHandler().mvccEnlistBatch(tx, ctx, req.op(), req.keys(), req.values(), snapshot, |
| req.dhtFutureId(), req.batchId()); |
| |
| GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(req.cacheId(), |
| req.dhtFutureId(), |
| req.batchId(), |
| null); |
| |
| try { |
| ctx.io().send(primary, res, ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException ioEx) { |
| U.error(log, "Failed to send DHT enlist reply to primary node [node: " + primary + ", req=" + |
| req + ']', ioEx); |
| } |
| } |
| catch (Throwable e) { |
| GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(ctx.cacheId(), |
| req.dhtFutureId(), |
| req.batchId(), |
| e); |
| |
| try { |
| ctx.io().send(primary, res, ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException ioEx) { |
| U.error(log, "Failed to send DHT enlist reply to primary node " + |
| "[node: " + primary + ", req=" + req + ']', ioEx); |
| } |
| |
| if (e instanceof Error) |
| throw (Error) e; |
| } |
| } |
| |
| /** |
| * @param backup Backup node. |
| * @param res Response message. |
| */ |
| private void processDhtTxQueryEnlistResponse(UUID backup, GridDhtTxQueryEnlistResponse res) { |
| GridDhtTxAbstractEnlistFuture fut = (GridDhtTxAbstractEnlistFuture) |
| ctx.mvcc().future(res.futureId()); |
| |
| if (fut == null) { |
| U.warn(log, "Received dht enlist response for unknown future [futId=" + res.futureId() + |
| ", batchId=" + res.batchId() + |
| ", node=" + backup + ']'); |
| |
| return; |
| } |
| |
| fut.onResult(backup, res); |
| } |
| |
| } |