| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import javax.cache.processor.EntryProcessor; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; |
| import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; |
| import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; |
| import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow; |
| import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.query.EnlistOperation; |
| import org.apache.ignite.internal.processors.query.IgniteSQLException; |
| import org.apache.ignite.internal.processors.query.UpdateSourceIterator; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; |
| import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Abstract future processing transaction enlisting and locking. |
| */ |
| public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAdapter<T> |
| implements DhtLockFuture<T> { |
| /** Done field updater. */ |
| private static final AtomicIntegerFieldUpdater<GridDhtTxAbstractEnlistFuture> DONE_UPD = |
| AtomicIntegerFieldUpdater.newUpdater(GridDhtTxAbstractEnlistFuture.class, "done"); |
| |
| /** SkipCntr field updater. */ |
| private static final AtomicIntegerFieldUpdater<GridDhtTxAbstractEnlistFuture> SKIP_UPD = |
| AtomicIntegerFieldUpdater.newUpdater(GridDhtTxAbstractEnlistFuture.class, "skipCntr"); |
| |
| /** Marker object. */ |
| private static final Object FINISHED = new Object(); |
| |
| /** */ |
| private static final int BATCH_SIZE = 1024; |
| |
| /** In-flight batches per node limit. */ |
| private static final int BATCHES_PER_NODE = 5; |
| |
| /** */ |
| private static final int FIRST_BATCH_ID = 0; |
| |
| /** Future ID. */ |
| protected final IgniteUuid futId; |
| |
| /** Cache registry. */ |
| @GridToStringExclude |
| protected final GridCacheContext<?, ?> cctx; |
| |
| /** Logger. */ |
| @GridToStringExclude |
| protected final IgniteLogger log; |
| |
| /** Thread. */ |
| protected final long threadId; |
| |
| /** Future ID. */ |
| protected final IgniteUuid nearFutId; |
| |
| /** Future ID. */ |
| protected final int nearMiniId; |
| |
| /** Transaction. */ |
| protected final GridDhtTxLocalAdapter tx; |
| |
| /** Lock version. */ |
| protected final GridCacheVersion lockVer; |
| |
| /** */ |
| protected final MvccSnapshot mvccSnapshot; |
| |
| /** New DHT nodes. */ |
| protected Set<UUID> newDhtNodes = new HashSet<>(); |
| |
| /** Near node ID. */ |
| protected final UUID nearNodeId; |
| |
| /** Near lock version. */ |
| protected final GridCacheVersion nearLockVer; |
| |
| /** Filter. */ |
| private final CacheEntryPredicate filter; |
| |
| /** Keep binary flag. */ |
| protected boolean keepBinary; |
| |
| /** Timeout object. */ |
| @GridToStringExclude |
| protected LockTimeoutObject timeoutObj; |
| |
| /** Lock timeout. */ |
| protected final long timeout; |
| |
| /** Query iterator */ |
| private UpdateSourceIterator<?> it; |
| |
| /** Row extracted from iterator but not yet used. */ |
| private Object peek; |
| |
| /** */ |
| @GridToStringExclude |
| private volatile int skipCntr; |
| |
| /** */ |
| @SuppressWarnings("unused") |
| @GridToStringExclude |
| private volatile int done; |
| |
| /** */ |
| @GridToStringExclude |
| private int batchIdCntr; |
| |
| /** Batches for sending to remote nodes. */ |
| private Map<UUID, Batch> batches; |
| |
| /** Batches already sent to remotes, but their acks are not received yet. */ |
| private ConcurrentMap<UUID, ConcurrentMap<Integer, Batch>> pending; |
| |
| /** Do not send DHT requests to near node. */ |
| protected boolean skipNearNodeUpdates; |
| |
| /** There are keys belonging to backup partitions on near node. */ |
| protected boolean hasNearNodeUpdates; |
| |
| /** Moving partitions. */ |
| private Map<Integer, Boolean> movingParts; |
| |
| /** Map for tracking nodes to which first request was already sent in order to send smaller subsequent requests. */ |
| private final Set<ClusterNode> firstReqSent = new HashSet<>(); |
| |
| /** Deployment class loader id which will be used for deserialization of entries on a distributed task. */ |
| @GridToStringExclude |
| protected final IgniteUuid deploymentLdrId; |
| |
| /** |
| * @param nearNodeId Near node ID. |
| * @param nearLockVer Near lock version. |
| * @param mvccSnapshot Mvcc snapshot. |
| * @param threadId Thread ID. |
| * @param nearFutId Near future id. |
| * @param nearMiniId Near mini future id. |
| * @param tx Transaction. |
| * @param timeout Lock acquisition timeout. |
| * @param cctx Cache context. |
| * @param filter Filter. |
| * @param keepBinary Keep binary flag. |
| */ |
| protected GridDhtTxAbstractEnlistFuture(UUID nearNodeId, |
| GridCacheVersion nearLockVer, |
| MvccSnapshot mvccSnapshot, |
| long threadId, |
| IgniteUuid nearFutId, |
| int nearMiniId, |
| GridDhtTxLocalAdapter tx, |
| long timeout, |
| GridCacheContext<?, ?> cctx, |
| @Nullable CacheEntryPredicate filter, |
| boolean keepBinary) { |
| assert tx != null; |
| assert timeout >= 0; |
| assert nearNodeId != null; |
| assert nearLockVer != null; |
| |
| this.threadId = threadId; |
| this.cctx = cctx; |
| this.nearNodeId = nearNodeId; |
| this.nearLockVer = nearLockVer; |
| this.nearFutId = nearFutId; |
| this.nearMiniId = nearMiniId; |
| this.mvccSnapshot = mvccSnapshot; |
| this.timeout = timeout; |
| this.tx = tx; |
| this.filter = filter; |
| this.keepBinary = keepBinary; |
| this.deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); |
| |
| lockVer = tx.xidVersion(); |
| |
| futId = IgniteUuid.randomUuid(); |
| |
| log = cctx.logger(GridDhtTxAbstractEnlistFuture.class); |
| } |
| |
| /** |
| * Gets source to be updated iterator. |
| * |
| * @return iterator. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected abstract UpdateSourceIterator<?> createIterator() throws IgniteCheckedException; |
| |
| /** |
| * Gets query result. |
| * |
| * @return Query result. |
| */ |
| protected abstract T result0(); |
| |
| /** |
| * Gets need previous value flag. |
| * |
| * @return {@code True} if previous value is required. |
| */ |
| public boolean needResult() { |
| return false; |
| } |
| |
| /** |
| * Entry processed callback. |
| * |
| * @param key Entry key. |
| * @param res Update result. |
| */ |
| protected abstract void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res); |
| |
| /** |
| * |
| */ |
| public void init() { |
| if (timeout < 0) { |
| // Time is out. |
| onDone(timeoutException()); |
| |
| return; |
| } |
| else if (timeout > 0) |
| timeoutObj = new LockTimeoutObject(); |
| |
| while (true) { |
| IgniteInternalFuture<?> fut = tx.lockFut; |
| |
| if (fut == GridDhtTxLocalAdapter.ROLLBACK_FUT) { |
| onDone(tx.timedOut() ? tx.timeoutException() : tx.rollbackException()); |
| |
| return; |
| } |
| else if (fut != null) { |
| // Wait for previous future. |
| assert fut instanceof GridNearTxAbstractEnlistFuture |
| || fut instanceof GridDhtTxAbstractEnlistFuture : fut; |
| |
| // Terminate this future if parent future is terminated by rollback. |
| if (!fut.isDone()) { |
| fut.listen(new IgniteInClosure<IgniteInternalFuture>() { |
| @Override public void apply(IgniteInternalFuture fut) { |
| if (fut.error() != null) |
| onDone(fut.error()); |
| } |
| }); |
| } |
| else if (fut.error() != null) |
| onDone(fut.error()); |
| |
| break; |
| } |
| else if (tx.updateLockFuture(null, this)) |
| break; |
| } |
| |
| boolean added = cctx.mvcc().addFuture(this, futId); |
| |
| if (isDone()) { |
| cctx.mvcc().removeFuture(futId); |
| |
| return; |
| } |
| |
| assert added; |
| |
| if (timeoutObj != null) |
| cctx.time().addTimeoutObject(timeoutObj); |
| |
| try { |
| checkCoordinatorVersion(); |
| |
| UpdateSourceIterator<?> it = createIterator(); |
| |
| if (!it.hasNext()) { |
| U.close(it, log); |
| |
| onDone(result0()); |
| |
| return; |
| } |
| |
| if (!tx.implicitSingle()) |
| tx.addActiveCache(cctx, false); |
| else // Nothing to do for single update. |
| assert tx.txState().cacheIds().contains(cctx.cacheId()) && tx.txState().cacheIds().size() == 1; |
| |
| tx.markQueryEnlisted(); |
| |
| this.it = it; |
| } |
| catch (Throwable e) { |
| onDone(e); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| |
| return; |
| } |
| |
| continueLoop(false); |
| } |
| |
| /** |
| * Clears lock future. |
| */ |
| protected void clearLockFuture() { |
| tx.clearLockFuture(this); |
| } |
| |
| /** |
| * Iterates over iterator, applies changes locally and sends it on backups. |
| * |
| * @param ignoreCntr {@code True} if need to ignore skip counter. |
| */ |
| private void continueLoop(boolean ignoreCntr) { |
| if (isDone() || (!ignoreCntr && (SKIP_UPD.getAndIncrement(this) != 0))) |
| return; |
| |
| GridDhtCacheAdapter cache = cctx.dhtCache(); |
| EnlistOperation op = it.operation(); |
| AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); |
| |
| try { |
| while (true) { |
| int curPart = -1; |
| List<ClusterNode> backups = null; |
| |
| while (hasNext0()) { |
| Object cur = next0(); |
| |
| KeyCacheObject key = toKey(op, cur); |
| |
| if (curPart != key.partition()) |
| backups = backupNodes(curPart = key.partition()); |
| |
| assert backups != null; |
| |
| if (!ensureFreeSlot(key, backups)) { |
| // Can't advance further at the moment. |
| peek = cur; |
| |
| it.beforeDetach(); |
| |
| break; |
| } |
| |
| GridDhtCacheEntry entry = cache.entryExx(key); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Adding entry: " + entry); |
| |
| assert !entry.detached(); |
| |
| CacheObject val = op.isDeleteOrLock() || op.isInvoke() |
| ? null : cctx.toCacheObject(((IgniteBiTuple)cur).getValue()); |
| |
| GridInvokeValue invokeVal = null; |
| EntryProcessor entryProc = null; |
| Object[] invokeArgs = null; |
| |
| if (op.isInvoke()) { |
| assert needResult(); |
| |
| invokeVal = (GridInvokeValue)((IgniteBiTuple)cur).getValue(); |
| |
| entryProc = invokeVal.entryProcessor(); |
| invokeArgs = invokeVal.invokeArgs(); |
| } |
| |
| assert entryProc != null || !op.isInvoke(); |
| |
| boolean needOldVal = tx.txState().useMvccCaching(cctx.cacheId()); |
| |
| GridCacheUpdateTxResult res; |
| |
| while (true) { |
| cctx.shared().database().checkpointReadLock(); |
| |
| try { |
| switch (op) { |
| case DELETE: |
| res = entry.mvccRemove( |
| tx, |
| cctx.localNodeId(), |
| topVer, |
| mvccSnapshot, |
| isMoving(key.partition(), backups), |
| needOldVal, |
| filter, |
| needResult()); |
| |
| break; |
| |
| case INSERT: |
| case TRANSFORM: |
| case UPSERT: |
| case UPDATE: |
| res = entry.mvccSet( |
| tx, |
| cctx.localNodeId(), |
| val, |
| entryProc, |
| invokeArgs, |
| 0, |
| topVer, |
| mvccSnapshot, |
| op.cacheOperation(), |
| isMoving(key.partition(), backups), |
| op.noCreate(), |
| needOldVal, |
| filter, |
| needResult(), |
| keepBinary); |
| |
| break; |
| |
| case LOCK: |
| res = entry.mvccLock( |
| tx, |
| mvccSnapshot); |
| |
| break; |
| |
| default: |
| throw new IgniteSQLException("Cannot acquire lock for operation [op= " + op + "]" + |
| "Operation is unsupported at the moment ", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); |
| } |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| entry = cache.entryExx(entry.key(), topVer); |
| } |
| finally { |
| cctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| |
| IgniteInternalFuture<GridCacheUpdateTxResult> updateFut = res.updateFuture(); |
| |
| final Message val0 = invokeVal != null ? invokeVal : val; |
| |
| if (updateFut != null) { |
| if (updateFut.isDone()) |
| res = updateFut.get(); |
| else { |
| GridDhtCacheEntry entry0 = entry; |
| List<ClusterNode> backups0 = backups; |
| |
| it.beforeDetach(); |
| |
| updateFut.listen(new CI1<IgniteInternalFuture<GridCacheUpdateTxResult>>() { |
| @Override public void apply(IgniteInternalFuture<GridCacheUpdateTxResult> fut) { |
| try { |
| tx.incrementLockCounter(); |
| |
| processEntry(entry0, op, fut.get(), val0, backups0); |
| |
| continueLoop(true); |
| } |
| catch (Throwable e) { |
| onDone(e); |
| } |
| } |
| }); |
| |
| // Can't move further. Exit loop without decrementing the counter. |
| return; |
| } |
| } |
| |
| tx.incrementLockCounter(); |
| |
| processEntry(entry, op, res, val0, backups); |
| } |
| |
| if (!hasNext0()) { |
| if (!F.isEmpty(batches)) { |
| // Flush incomplete batches. |
| // Need to skip batches for nodes where first request (contains tx info) is still in-flight. |
| // Otherwise, the regular enlist request (without tx info) may beat it to the primary node. |
| Iterator<Map.Entry<UUID, Batch>> it = batches.entrySet().iterator(); |
| |
| while (it.hasNext()) { |
| Map.Entry<UUID, Batch> e = it.next(); |
| |
| ConcurrentMap<Integer, Batch> pending0 = |
| pending == null ? null : pending.get(e.getKey()); |
| |
| if (pending0 == null || !pending0.containsKey(FIRST_BATCH_ID)) { |
| it.remove(); |
| |
| sendBatch(e.getValue()); |
| } |
| } |
| } |
| |
| if (noPendingRequests()) { |
| onDone(result0()); |
| |
| return; |
| } |
| } |
| |
| if (SKIP_UPD.decrementAndGet(this) == 0) |
| break; |
| |
| skipCntr = 1; |
| } |
| } |
| catch (Throwable e) { |
| onDone(e); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| } |
| } |
| |
| /** */ |
| private Object next0() { |
| if (!hasNext0()) |
| throw new NoSuchElementException(); |
| |
| Object cur; |
| |
| if ((cur = peek) != null) |
| peek = null; |
| else |
| cur = it.next(); |
| |
| return cur; |
| } |
| |
| /** */ |
| private boolean hasNext0() { |
| if (peek == null && !it.hasNext()) |
| peek = FINISHED; |
| |
| return peek != FINISHED; |
| } |
| |
| /** */ |
| private KeyCacheObject toKey(EnlistOperation op, Object cur) { |
| KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey()); |
| |
| if (key.partition() == -1) |
| key.partition(cctx.affinity().partition(key)); |
| |
| return key; |
| } |
| |
| /** |
| * @return {@code True} if in-flight batches map is empty. |
| */ |
| private boolean noPendingRequests() { |
| if (F.isEmpty(pending)) |
| return true; |
| |
| for (ConcurrentMap<Integer, Batch> e : pending.values()) { |
| if (!e.isEmpty()) |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * @param entry Cache entry. |
| * @param op Operation. |
| * @param updRes Update result. |
| * @param val New value. |
| * @param backups Backup nodes |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void processEntry(GridDhtCacheEntry entry, EnlistOperation op, |
| GridCacheUpdateTxResult updRes, Message val, List<ClusterNode> backups) throws IgniteCheckedException { |
| checkCompleted(); |
| |
| assert updRes != null && updRes.updateFuture() == null; |
| |
| if (op != EnlistOperation.LOCK) |
| onEntryProcessed(entry.key(), updRes); |
| |
| if (!updRes.success() |
| || updRes.filtered() |
| || op == EnlistOperation.LOCK) |
| return; |
| |
| cctx.shared().mvccCaching().addEnlisted(entry.key(), updRes.newValue(), 0, 0, lockVer, |
| updRes.oldValue(), tx.local(), tx.topologyVersion(), mvccSnapshot, cctx.cacheId(), tx, null, -1); |
| |
| addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId(), backups); |
| } |
| |
| /** |
| * Adds row to batch. |
| * <b>IMPORTANT:</b> This method should be called from the critical section in {@link this.sendNextBatches()} |
| * @param key Key. |
| * @param val Value. |
| * @param hist History rows. |
| * @param cacheId Cache Id. |
| * @param backups Backup nodes |
| */ |
| private void addToBatch(KeyCacheObject key, Message val, List<MvccLinkAwareSearchRow> hist, |
| int cacheId, List<ClusterNode> backups) throws IgniteCheckedException { |
| int part = key.partition(); |
| |
| tx.touchPartition(cacheId, part); |
| |
| if (F.isEmpty(backups)) |
| return; |
| |
| CacheEntryInfoCollection hist0 = null; |
| |
| for (ClusterNode node : backups) { |
| assert !node.isLocal(); |
| |
| boolean moving = isMoving(node, part); |
| |
| if (skipNearLocalUpdate(node, moving)) { |
| updateMappings(node); |
| |
| if (newRemoteTx(node)) |
| addNewRemoteTxNode(node); |
| |
| hasNearNodeUpdates = true; |
| |
| continue; |
| } |
| |
| Batch batch = null; |
| |
| if (batches == null) |
| batches = new HashMap<>(); |
| else |
| batch = batches.get(node.id()); |
| |
| if (batch == null) |
| batches.put(node.id(), batch = new Batch(node)); |
| |
| if (moving && hist0 == null) { |
| assert !F.isEmpty(hist) || val == null; |
| |
| hist0 = fetchHistoryInfo(key, hist); |
| } |
| |
| batch.add(key, moving ? hist0 : val); |
| |
| if (batch.size() == BATCH_SIZE) { |
| assert batches != null; |
| |
| batches.remove(node.id()); |
| |
| sendBatch(batch); |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param key Key. |
| * @param hist History rows. |
| * @return History entries. |
| * @throws IgniteCheckedException, if failed. |
| */ |
| private CacheEntryInfoCollection fetchHistoryInfo(KeyCacheObject key, List<MvccLinkAwareSearchRow> hist) { |
| List<GridCacheEntryInfo> res = new ArrayList<>(); |
| |
| for (int i = 0; i < hist.size(); i++) { |
| MvccLinkAwareSearchRow row0 = hist.get(i); |
| |
| MvccDataRow row = new MvccDataRow(cctx.group(), |
| row0.hash(), |
| row0.link(), |
| key.partition(), |
| CacheDataRowAdapter.RowData.NO_KEY_WITH_HINTS, |
| row0.mvccCoordinatorVersion(), |
| row0.mvccCounter(), |
| row0.mvccOperationCounter(), |
| false |
| ); |
| |
| GridCacheMvccEntryInfo entry = new GridCacheMvccEntryInfo(); |
| |
| entry.cacheId(cctx.cacheId()); |
| entry.version(row.version()); |
| entry.value(row.value()); |
| entry.expireTime(row.expireTime()); |
| |
| // Row should be retrieved with actual hints. |
| entry.mvccVersion(row); |
| entry.newMvccVersion(row); |
| |
| if (MvccUtils.compare(mvccSnapshot, row.mvccCoordinatorVersion(), row.mvccCounter()) != 0) |
| entry.mvccTxState(row.mvccTxState()); |
| |
| if (row.newMvccCoordinatorVersion() != MvccUtils.MVCC_CRD_COUNTER_NA |
| && MvccUtils.compare(mvccSnapshot, row.newMvccCoordinatorVersion(), row.newMvccCounter()) != 0) |
| entry.newMvccTxState(row.newMvccTxState()); |
| |
| assert mvccSnapshot.coordinatorVersion() != MvccUtils.MVCC_CRD_COUNTER_NA; |
| |
| res.add(entry); |
| } |
| |
| return new CacheEntryInfoCollection(res); |
| } |
| |
| /** */ |
| private boolean newRemoteTx(ClusterNode node) { |
| Set<ClusterNode> nodes = tx.lockTransactionNodes(); |
| |
| return nodes == null || !nodes.contains(node); |
| } |
| |
| /** |
| * Add new involved DHT node. |
| * |
| * @param node Node. |
| */ |
| private void addNewRemoteTxNode(ClusterNode node) { |
| tx.addLockTransactionNode(node); |
| |
| newDhtNodes.add(node.id()); |
| } |
| |
| /** |
| * Checks if there free space in batches or free slot in in-flight batches is available for the given key. |
| * |
| * @param key Key. |
| * @param backups Backup nodes. |
| * @return {@code True} if there is possible to add this key to batch or send ready batch. |
| */ |
| private boolean ensureFreeSlot(KeyCacheObject key, List<ClusterNode> backups) { |
| if (F.isEmpty(batches) || F.isEmpty(pending)) |
| return true; |
| |
| int part = key.partition(); |
| |
| // Check possibility of adding to batch and sending. |
| for (ClusterNode node : backups) { |
| if (skipNearLocalUpdate(node, isMoving(node, part))) |
| continue; |
| |
| Batch batch = batches.get(node.id()); |
| |
| // We can add key if batch is not full. |
| if (batch == null || batch.size() < BATCH_SIZE - 1) |
| continue; |
| |
| ConcurrentMap<Integer, Batch> pending0 = pending.get(node.id()); |
| |
| assert pending0 == null || pending0.size() <= BATCHES_PER_NODE; |
| |
| if (pending0 != null && (pending0.containsKey(FIRST_BATCH_ID) || pending0.size() == BATCHES_PER_NODE)) |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Send batch request to remote data node. |
| * |
| * @param batch Batch. |
| */ |
| private void sendBatch(Batch batch) throws IgniteCheckedException { |
| assert batch != null && !batch.node().isLocal(); |
| |
| ClusterNode node = batch.node(); |
| |
| updateMappings(node); |
| |
| GridDhtTxQueryEnlistRequest req; |
| |
| if (newRemoteTx(node)) |
| addNewRemoteTxNode(node); |
| |
| if (firstReqSent.add(node)) { |
| // If this is a first request to this node, send full info. |
| req = new GridDhtTxQueryFirstEnlistRequest(cctx.cacheId(), |
| futId, |
| cctx.localNodeId(), |
| tx.topologyVersionSnapshot(), |
| lockVer, |
| mvccSnapshot.withoutActiveTransactions(), |
| tx.remainingTime(), |
| tx.taskNameHash(), |
| nearNodeId, |
| nearLockVer, |
| it.operation(), |
| FIRST_BATCH_ID, |
| batch.keys(), |
| batch.values() |
| ); |
| } |
| else { |
| // Send only keys, values, LockVersion and batchId if this is not a first request to this backup. |
| req = new GridDhtTxQueryEnlistRequest(cctx.cacheId(), |
| futId, |
| lockVer, |
| it.operation(), |
| ++batchIdCntr, |
| mvccSnapshot.operationCounter(), |
| batch.keys(), |
| batch.values() |
| ); |
| } |
| |
| ConcurrentMap<Integer, Batch> pending0 = null; |
| |
| if (pending == null) |
| pending = new ConcurrentHashMap<>(); |
| else |
| pending0 = pending.get(node.id()); |
| |
| if (pending0 == null) |
| pending.put(node.id(), pending0 = new ConcurrentHashMap<>()); |
| |
| Batch prev = pending0.put(req.batchId(), batch); |
| |
| assert prev == null; |
| |
| try { |
| cctx.io().send(node, req, cctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| // backup node left the grid, will continue. |
| onNodeLeft(node.id()); |
| } |
| } |
| |
| /** */ |
| private synchronized void updateMappings(ClusterNode node) throws IgniteCheckedException { |
| checkCompleted(); |
| |
| Map<UUID, GridDistributedTxMapping> m = tx.dhtMap; |
| |
| GridDistributedTxMapping mapping = m.get(node.id()); |
| |
| if (mapping == null) |
| m.put(node.id(), mapping = new GridDistributedTxMapping(node)); |
| |
| mapping.markQueryUpdate(); |
| |
| checkCompleted(); |
| } |
| |
| /** */ |
| private boolean skipNearLocalUpdate(ClusterNode node, boolean moving) { |
| return skipNearNodeUpdates && node.id().equals(nearNodeId) && !moving; |
| } |
| |
| /** |
| * @param part Partition. |
| * @return Backup nodes for the given partition. |
| */ |
| @NotNull private List<ClusterNode> backupNodes(int part) { |
| List<ClusterNode> nodes = cctx.topology().nodes(part, tx.topologyVersion()); |
| |
| assert !nodes.isEmpty() && nodes.get(0).isLocal(); |
| |
| return nodes.subList(1, nodes.size()); |
| } |
| |
| /** |
| * Checks whether new coordinator was initialized after the snapshot is acquired. |
| * |
| * Need to fit invariant that all updates are finished before a new coordinator is initialized. |
| * |
| * @throws ClusterTopologyCheckedException If failed. |
| */ |
| private void checkCoordinatorVersion() throws ClusterTopologyCheckedException { |
| MvccCoordinator crd = cctx.shared().coordinators().currentCoordinator(); |
| |
| if (!crd.initialized() || crd.version() != mvccSnapshot.coordinatorVersion()) |
| throw new ClusterTopologyCheckedException("Cannot perform update, coordinator was changed: " + |
| "[currentCoordinator=" + crd + ", mvccSnapshot=" + mvccSnapshot + "]."); |
| } |
| |
| /** |
| * @param part Partition. |
| * @param backups Backup nodes. |
| * @return {@code true} if the given partition is rebalancing to any backup node. |
| */ |
| private boolean isMoving(int part, List<ClusterNode> backups) { |
| Boolean res; |
| |
| if (movingParts == null) |
| movingParts = new HashMap<>(); |
| |
| if ((res = movingParts.get(part)) == null) |
| movingParts.put(part, res = isMoving0(part, backups)); |
| |
| return res == Boolean.TRUE; |
| } |
| |
| /** |
| * @param part Partition. |
| * @param backups Backup nodes. |
| * @return {@code true} if the given partition is rebalancing to any backup node. |
| */ |
| private Boolean isMoving0(int part, List<ClusterNode> backups) { |
| for (ClusterNode node : backups) { |
| if (isMoving(node, part)) |
| return Boolean.TRUE; |
| } |
| |
| return Boolean.FALSE; |
| } |
| |
| /** |
| * @param node Cluster node. |
| * @param part Partition. |
| * @return {@code true} if the given partition is rebalancing to the given node. |
| */ |
| private boolean isMoving(ClusterNode node, int part) { |
| return cctx.topology().partitionState(node.id(), part) == GridDhtPartitionState.MOVING; |
| } |
| |
| /** */ |
| private void checkCompleted() throws IgniteCheckedException { |
| if (isDone()) |
| throw new IgniteCheckedException("Future is done."); |
| } |
| |
| /** |
| * Callback on backup response. |
| * |
| * @param nodeId Backup node. |
| * @param res Response. |
| */ |
| public void onResult(UUID nodeId, GridDhtTxQueryEnlistResponse res) { |
| if (res.error() != null) { |
| onDone(new IgniteCheckedException("Failed to update backup node: [localNodeId=" + cctx.localNodeId() + |
| ", remoteNodeId=" + nodeId + ']', res.error())); |
| |
| return; |
| } |
| |
| assert pending != null; |
| |
| ConcurrentMap<Integer, Batch> pending0 = pending.get(nodeId); |
| |
| assert pending0 != null; |
| |
| Batch rmv = pending0.remove(res.batchId()); |
| |
| assert rmv != null; |
| |
| continueLoop(false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean trackable() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void markNotTrackable() { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteUuid futureId() { |
| return futId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onNodeLeft(UUID nodeId) { |
| try { |
| if (nearNodeId.equals(nodeId)) |
| onDone(new ClusterTopologyCheckedException("Requesting node left the grid [nodeId=" + nodeId + ']')); |
| else if (pending != null && pending.remove(nodeId) != null) |
| cctx.kernalContext().closure().runLocalSafe(() -> continueLoop(false)); |
| } |
| catch (Exception e) { |
| onDone(e); |
| } |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable T res, @Nullable Throwable err) { |
| assert res != null || err != null; |
| |
| if (!DONE_UPD.compareAndSet(this, 0, 1)) |
| return false; |
| |
| if (err == null) |
| clearLockFuture(); |
| |
| // To prevent new remote transactions creation |
| // after future is cancelled by rollback. |
| synchronized (this) { |
| boolean done = super.onDone(res, err); |
| |
| assert done; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Completing future: " + this); |
| |
| // Clean up. |
| cctx.mvcc().removeFuture(futId); |
| |
| if (timeoutObj != null) |
| cctx.time().removeTimeoutObject(timeoutObj); |
| |
| U.close(it, log); |
| |
| return true; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onError(Throwable error) { |
| onDone(error); |
| } |
| |
| /** |
| * @return Timeout exception. |
| */ |
| @NotNull protected IgniteTxTimeoutCheckedException timeoutException() { |
| return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " + |
| "transaction [timeout=" + timeout + ", tx=" + tx + ']'); |
| } |
| |
| /** |
| * A batch of rows |
| */ |
| private static class Batch { |
| /** Node ID. */ |
| @GridToStringExclude |
| private final ClusterNode node; |
| |
| /** */ |
| private List<KeyCacheObject> keys; |
| |
| /** |
| * Values collection. |
| * Items can be either {@link CacheObject} or preload entries collection {@link CacheEntryInfoCollection}. |
| */ |
| private List<Message> vals; |
| |
| /** |
| * @param node Cluster node. |
| */ |
| private Batch(ClusterNode node) { |
| this.node = node; |
| } |
| |
| /** |
| * @return Node. |
| */ |
| public ClusterNode node() { |
| return node; |
| } |
| |
| /** |
| * Adds a row to batch. |
| * |
| * @param key Key. |
| * @param val Value or preload entries collection. |
| */ |
| public void add(KeyCacheObject key, Message val) { |
| assert val == null || val instanceof GridInvokeValue || val instanceof CacheObject |
| || val instanceof CacheEntryInfoCollection; |
| |
| if (keys == null) |
| keys = new ArrayList<>(); |
| |
| if (vals == null && val != null) { |
| vals = new ArrayList<>(U.ceilPow2(keys.size() + 1)); |
| |
| while (vals.size() != keys.size()) |
| vals.add(null); // Init vals with missed 'nulls'. |
| } |
| |
| keys.add(key); |
| |
| if (vals != null) |
| vals.add(val); |
| } |
| |
| /** |
| * @return number of rows. |
| */ |
| public int size() { |
| return keys == null ? 0 : keys.size(); |
| } |
| |
| /** |
| * @return Collection of row keys. |
| */ |
| public List<KeyCacheObject> keys() { |
| return keys; |
| } |
| |
| /** |
| * @return Collection of row values. |
| */ |
| public List<Message> values() { |
| return vals; |
| } |
| } |
| |
| /** |
| * Lock request timeout object. |
| */ |
| protected class LockTimeoutObject extends GridTimeoutObjectAdapter { |
| /** |
| * Default constructor. |
| */ |
| LockTimeoutObject() { |
| super(timeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onTimeout() { |
| if (log.isDebugEnabled()) |
| log.debug("Timed out waiting for lock response: " + this); |
| |
| onDone(timeoutException()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(LockTimeoutObject.class, this); |
| } |
| } |
| } |