| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import org.apache.ignite.*; |
| import org.apache.ignite.internal.*; |
| import org.apache.ignite.internal.cluster.*; |
| import org.apache.ignite.internal.processors.affinity.*; |
| import org.apache.ignite.internal.managers.communication.*; |
| import org.apache.ignite.internal.processors.cache.*; |
| import org.apache.ignite.internal.processors.cache.distributed.*; |
| import org.apache.ignite.internal.processors.cache.distributed.near.*; |
| import org.apache.ignite.internal.processors.cache.transactions.*; |
| import org.apache.ignite.internal.processors.cache.version.*; |
| import org.apache.ignite.internal.transactions.*; |
| import org.apache.ignite.internal.util.tostring.*; |
| import org.apache.ignite.internal.util.typedef.*; |
| import org.apache.ignite.internal.util.typedef.internal.*; |
| import org.apache.ignite.lang.*; |
| import org.apache.ignite.transactions.*; |
| import org.jetbrains.annotations.*; |
| |
| import java.io.*; |
| import java.util.*; |
| import java.util.concurrent.atomic.*; |
| |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*; |
| import static org.apache.ignite.transactions.TransactionState.*; |
| |
| /** |
| * Replicated user transaction. |
| */ |
| public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMappedVersion { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private UUID nearNodeId; |
| |
| /** Near future ID. */ |
| private IgniteUuid nearFutId; |
| |
| /** Near future ID. */ |
| private IgniteUuid nearMiniId; |
| |
| /** Near future ID. */ |
| private IgniteUuid nearFinFutId; |
| |
| /** Near future ID. */ |
| private IgniteUuid nearFinMiniId; |
| |
| /** Near XID. */ |
| private GridCacheVersion nearXidVer; |
| |
| /** Future. */ |
| @GridToStringExclude |
| private final AtomicReference<GridDhtTxPrepareFuture> prepFut = |
| new AtomicReference<>(); |
| |
| /** |
| * Empty constructor required for {@link Externalizable}. |
| */ |
| public GridDhtTxLocal() { |
| // No-op. |
| } |
| |
| /** |
| * @param nearNodeId Near node ID that initiated transaction. |
| * @param nearXidVer Near transaction ID. |
| * @param nearFutId Near future ID. |
| * @param nearMiniId Near mini future ID. |
| * @param nearThreadId Near thread ID. |
| * @param implicit Implicit flag. |
| * @param implicitSingle Implicit-with-single-key flag. |
| * @param cctx Cache context. |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @param timeout Timeout. |
| * @param storeEnabled Store enabled flag. |
| * @param txSize Expected transaction size. |
| * @param txNodes Transaction nodes mapping. |
| */ |
| public GridDhtTxLocal( |
| GridCacheSharedContext cctx, |
| UUID nearNodeId, |
| GridCacheVersion nearXidVer, |
| IgniteUuid nearFutId, |
| IgniteUuid nearMiniId, |
| long nearThreadId, |
| boolean implicit, |
| boolean implicitSingle, |
| boolean sys, |
| boolean explicitLock, |
| GridIoPolicy plc, |
| TransactionConcurrency concurrency, |
| TransactionIsolation isolation, |
| long timeout, |
| boolean invalidate, |
| boolean storeEnabled, |
| int txSize, |
| Map<UUID, Collection<UUID>> txNodes, |
| UUID subjId, |
| int taskNameHash |
| ) { |
| super( |
| cctx, |
| cctx.versions().onReceivedAndNext(nearNodeId, nearXidVer), |
| implicit, |
| implicitSingle, |
| sys, |
| explicitLock, |
| plc, |
| concurrency, |
| isolation, |
| timeout, |
| invalidate, |
| storeEnabled, |
| txSize, |
| subjId, |
| taskNameHash); |
| |
| assert cctx != null; |
| assert nearNodeId != null; |
| assert nearFutId != null; |
| assert nearMiniId != null; |
| assert nearXidVer != null; |
| |
| this.nearNodeId = nearNodeId; |
| this.nearXidVer = nearXidVer; |
| this.nearFutId = nearFutId; |
| this.nearMiniId = nearMiniId; |
| this.txNodes = txNodes; |
| |
| threadId = nearThreadId; |
| |
| assert !F.eq(xidVer, nearXidVer); |
| |
| initResult(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public UUID eventNodeId() { |
| return nearNodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<UUID> masterNodeIds() { |
| assert nearNodeId != null; |
| |
| return Collections.singleton(nearNodeId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public UUID otherNodeId() { |
| assert nearNodeId != null; |
| |
| return nearNodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public UUID originatingNodeId() { |
| return nearNodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected UUID nearNodeId() { |
| return nearNodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCacheVersion nearXidVersion() { |
| return nearXidVer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCacheVersion mappedVersion() { |
| return nearXidVer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteUuid nearFutureId() { |
| return nearFutId; |
| } |
| |
| /** |
| * @param nearFutId Near future ID. |
| */ |
| public void nearFutureId(IgniteUuid nearFutId) { |
| this.nearFutId = nearFutId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteUuid nearMiniId() { |
| return nearMiniId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean dht() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, AffinityTopologyVersion topVer) { |
| return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId()); |
| } |
| |
| /** |
| * @return Near future ID. |
| */ |
| public IgniteUuid nearFinishFutureId() { |
| return nearFinFutId; |
| } |
| |
| /** |
| * @param nearFinFutId Near future ID. |
| */ |
| public void nearFinishFutureId(IgniteUuid nearFinFutId) { |
| this.nearFinFutId = nearFinFutId; |
| } |
| |
| /** |
| * @return Near future mini ID. |
| */ |
| public IgniteUuid nearFinishMiniId() { |
| return nearFinMiniId; |
| } |
| |
| /** |
| * @param nearFinMiniId Near future mini ID. |
| */ |
| public void nearFinishMiniId(IgniteUuid nearFinMiniId) { |
| this.nearFinMiniId = nearFinMiniId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached, |
| IgniteTxEntry entry, AffinityTopologyVersion topVer) { |
| // Don't add local node as reader. |
| if (!cctx.localNodeId().equals(nearNodeId)) { |
| GridCacheContext cacheCtx = cached.context(); |
| |
| while (true) { |
| try { |
| return cached.addReader(nearNodeId, msgId, topVer); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry when adding to DHT local transaction: " + cached); |
| |
| cached = cacheCtx.dht().entryExx(entry.key(), topVer); |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateExplicitVersion(IgniteTxEntry txEntry, GridCacheEntryEx entry) |
| throws GridCacheEntryRemovedException { |
| // DHT local transactions don't have explicit locks. |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> prepareAsync() { |
| if (optimistic()) { |
| assert isSystemInvalidate(); |
| |
| return prepareAsync( |
| null, |
| null, |
| Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), |
| 0, |
| nearMiniId, |
| null, |
| true, |
| null); |
| } |
| |
| // For pessimistic mode we don't distribute prepare request. |
| GridDhtTxPrepareFuture fut = prepFut.get(); |
| |
| if (fut == null) { |
| // Future must be created before any exception can be thrown. |
| if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture( |
| cctx, |
| this, |
| nearMiniId, |
| Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), |
| true, |
| needReturnValue(), |
| null))) |
| return prepFut.get(); |
| } |
| else |
| // Prepare was called explicitly. |
| return fut; |
| |
| if (!state(PREPARING)) { |
| if (setRollbackOnly()) { |
| if (timedOut()) |
| fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + this)); |
| else |
| fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + |
| ", tx=" + this + ']')); |
| } |
| else |
| fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" + state() |
| + ", tx=" + this + ']')); |
| |
| return fut; |
| } |
| |
| try { |
| userPrepare(); |
| |
| if (!state(PREPARED)) { |
| setRollbackOnly(); |
| |
| fut.onError(new IgniteCheckedException("Invalid transaction state for commit [state=" + state() + |
| ", tx=" + this + ']')); |
| |
| return fut; |
| } |
| |
| fut.complete(); |
| |
| return fut; |
| } |
| catch (IgniteCheckedException e) { |
| fut.onError(e); |
| |
| return fut; |
| } |
| } |
| |
| /** |
| * Prepares next batch of entries in dht transaction. |
| * |
| * @param reads Read entries. |
| * @param writes Write entries. |
| * @param verMap Version map. |
| * @param msgId Message ID. |
| * @param nearMiniId Near mini future ID. |
| * @param txNodes Transaction nodes mapping. |
| * @param last {@code True} if this is last prepare request. |
| * @param lastBackups IDs of backup nodes receiving last prepare request. |
| * @return Future that will be completed when locks are acquired. |
| */ |
| public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( |
| @Nullable Iterable<IgniteTxEntry> reads, |
| @Nullable Iterable<IgniteTxEntry> writes, |
| Map<IgniteTxKey, GridCacheVersion> verMap, |
| long msgId, |
| IgniteUuid nearMiniId, |
| Map<UUID, Collection<UUID>> txNodes, |
| boolean last, |
| Collection<UUID> lastBackups |
| ) { |
| // In optimistic mode prepare still can be called explicitly from salvageTx. |
| GridDhtTxPrepareFuture fut = prepFut.get(); |
| |
| if (fut == null) { |
| init(); |
| |
| // Future must be created before any exception can be thrown. |
| if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture( |
| cctx, |
| this, |
| nearMiniId, |
| verMap, |
| last, |
| needReturnValue(), |
| lastBackups))) { |
| GridDhtTxPrepareFuture f = prepFut.get(); |
| |
| assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + |
| "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; |
| |
| return chainOnePhasePrepare(f); |
| } |
| } |
| else { |
| assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + |
| "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; |
| |
| // Prepare was called explicitly. |
| return chainOnePhasePrepare(fut); |
| } |
| |
| if (state() != PREPARING) { |
| if (!state(PREPARING)) { |
| if (state() == PREPARED && isSystemInvalidate()) |
| fut.complete(); |
| if (setRollbackOnly()) { |
| if (timedOut()) |
| fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + |
| this)); |
| else |
| fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + |
| ", tx=" + this + ']')); |
| } |
| else |
| fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" + |
| state() + ", tx=" + this + ']')); |
| |
| return fut; |
| } |
| } |
| |
| try { |
| if (reads != null) |
| for (IgniteTxEntry e : reads) |
| addEntry(msgId, e); |
| |
| if (writes != null) |
| for (IgniteTxEntry e : writes) |
| addEntry(msgId, e); |
| |
| userPrepare(); |
| |
| // Make sure to add future before calling prepare on it. |
| cctx.mvcc().addFuture(fut); |
| |
| if (isSystemInvalidate()) |
| fut.complete(); |
| else |
| fut.prepare(reads, writes, txNodes); |
| } |
| catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) { |
| fut.onError(e); |
| } |
| catch (IgniteCheckedException e) { |
| setRollbackOnly(); |
| |
| fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + this, e)); |
| |
| try { |
| rollback(); |
| } |
| catch (IgniteTxOptimisticCheckedException e1) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e1 + ']'); |
| |
| fut.onError(e); |
| } |
| catch (IgniteCheckedException e1) { |
| U.error(log, "Failed to rollback transaction: " + this, e1); |
| } |
| } |
| |
| return chainOnePhasePrepare(fut); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"ThrowableInstanceNeverThrown"}) |
| @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { |
| if (log.isDebugEnabled()) |
| log.debug("Committing dht local tx: " + this); |
| |
| // In optimistic mode prepare was called explicitly. |
| if (pessimistic()) |
| prepareAsync(); |
| |
| final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); |
| |
| cctx.mvcc().addFuture(fut); |
| |
| GridDhtTxPrepareFuture prep = prepFut.get(); |
| |
| if (prep != null) { |
| if (prep.isDone()) { |
| try { |
| prep.get(); // Check for errors of a parent future. |
| |
| if (finish(true)) |
| fut.finish(); |
| else |
| fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); |
| } |
| catch (IgniteTxOptimisticCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to optimistically prepare transaction [tx=" + this + ", e=" + e + ']'); |
| |
| fut.onError(e); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to prepare transaction: " + this, e); |
| |
| fut.onError(e); |
| } |
| } |
| else |
| prep.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> f) { |
| try { |
| f.get(); // Check for errors of a parent future. |
| |
| if (finish(true)) |
| fut.finish(); |
| else |
| fut.onError(new IgniteCheckedException("Failed to commit transaction: " + |
| CU.txString(GridDhtTxLocal.this))); |
| } |
| catch (IgniteTxOptimisticCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); |
| |
| fut.onError(e); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to prepare transaction: " + this, e); |
| |
| fut.onError(e); |
| } |
| } |
| }); |
| } |
| else { |
| assert optimistic(); |
| |
| try { |
| if (finish(true)) |
| fut.finish(); |
| else |
| fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); |
| } |
| catch (IgniteTxOptimisticCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); |
| |
| fut.onError(e); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to commit transaction: " + this, e); |
| |
| fut.onError(e); |
| } |
| } |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) { |
| assert optimistic(); |
| |
| prepFut.compareAndSet(fut, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { |
| GridDhtTxPrepareFuture prepFut = this.prepFut.get(); |
| |
| final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); |
| |
| cctx.mvcc().addFuture(fut); |
| |
| if (prepFut == null) { |
| try { |
| if (finish(false) || state() == UNKNOWN) |
| fut.finish(); |
| else |
| fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); |
| } |
| catch (IgniteTxOptimisticCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); |
| |
| fut.onError(e); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to rollback transaction (will make the best effort to rollback remote nodes): " + |
| this, e); |
| |
| fut.onError(e); |
| } |
| } |
| else { |
| prepFut.complete(); |
| |
| prepFut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> f) { |
| try { |
| f.get(); // Check for errors of a parent future. |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to prepare or rollback transaction [tx=" + this + ", e=" + e + ']'); |
| } |
| |
| try { |
| if (finish(false) || state() == UNKNOWN) |
| fut.finish(); |
| else |
| fut.onError(new IgniteCheckedException("Failed to commit transaction: " + |
| CU.txString(GridDhtTxLocal.this))); |
| |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to gracefully rollback transaction: " + CU.txString(GridDhtTxLocal.this), |
| e); |
| |
| fut.onError(e); |
| } |
| } |
| }); |
| } |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) |
| @Override public boolean finish(boolean commit) throws IgniteCheckedException { |
| assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate() |
| || onePhaseCommit() || state() == PREPARED : |
| "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit + |
| ", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']'; |
| |
| assert nearMiniId != null; |
| |
| return super.finish(commit); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void sendFinishReply(boolean commit, @Nullable Throwable err) { |
| if (nearFinFutId != null) { |
| if (nearNodeId.equals(cctx.localNodeId())) { |
| if (log.isDebugEnabled()) |
| log.debug("Skipping response sending to local node: " + this); |
| |
| return; |
| } |
| |
| GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId, |
| nearFinMiniId, err); |
| |
| try { |
| cctx.io().send(nearNodeId, res, ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Node left before sending finish response (transaction was committed) [node=" + |
| nearNodeId + ", res=" + res + ']'); |
| } |
| catch (Throwable ex) { |
| U.error(log, "Failed to send finish response to node (transaction was " + |
| (commit ? "committed" : "rolledback") + ") [node=" + nearNodeId + ", res=" + res + ']', ex); |
| |
| if (ex instanceof Error) |
| throw (Error)ex; |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Will not send finish reply because sender node has not sent finish request yet: " + this); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() { |
| return prepFut.get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return GridToStringBuilder.toString(GridDhtTxLocal.class, this, "super", super.toString()); |
| } |
| } |