| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed; |
| |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.util.GridLeanMap; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.transactions.TransactionState.PREPARED; |
| |
| /** |
| * Future verifying that all remote transactions related to transaction were prepared or committed. |
| */ |
| public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<Boolean> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Logger reference. */ |
| private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); |
| |
| /** Logger. */ |
| private static IgniteLogger log; |
| |
| /** Logger. */ |
| private static IgniteLogger msgLog; |
| |
| /** Trackable flag. */ |
| private boolean trackable = true; |
| |
| /** Context. */ |
| private final GridCacheSharedContext<?, ?> cctx; |
| |
| /** Future ID. */ |
| private final IgniteUuid futId = IgniteUuid.randomUuid(); |
| |
| /** Transaction. */ |
| private final IgniteInternalTx tx; |
| |
| /** All involved nodes. */ |
| private final Map<UUID, ClusterNode> nodes; |
| |
| /** ID of failed nodes started transaction. */ |
| @GridToStringInclude |
| private final Set<UUID> failedNodeIds; |
| |
| /** Transaction nodes mapping. */ |
| private final Map<UUID, Collection<UUID>> txNodes; |
| |
| /** */ |
| private final boolean nearTxCheck; |
| |
| /** |
| * @param cctx Context. |
| * @param tx Transaction. |
| * @param failedNodeIds IDs of failed nodes started transaction. |
| * @param txNodes Transaction mapping. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx, |
| IgniteInternalTx tx, |
| Set<UUID> failedNodeIds, |
| Map<UUID, Collection<UUID>> txNodes) |
| { |
| super(CU.boolReducer()); |
| |
| this.cctx = cctx; |
| this.tx = tx; |
| this.txNodes = txNodes; |
| this.failedNodeIds = failedNodeIds; |
| |
| if (log == null) { |
| msgLog = cctx.txRecoveryMessageLogger(); |
| log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class); |
| } |
| |
| nodes = new GridLeanMap<>(); |
| |
| UUID locNodeId = cctx.localNodeId(); |
| |
| for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) { |
| if (!locNodeId.equals(e.getKey()) && !failedNodeIds.contains(e.getKey()) && !nodes.containsKey(e.getKey())) { |
| ClusterNode node = cctx.discovery().node(e.getKey()); |
| |
| if (node != null) |
| nodes.put(node.id(), node); |
| else if (log.isInfoEnabled()) |
| log.info("Transaction node left (will ignore) " + e.getKey()); |
| } |
| |
| for (UUID nodeId : e.getValue()) { |
| if (!locNodeId.equals(nodeId) && !failedNodeIds.contains(nodeId) && !nodes.containsKey(nodeId)) { |
| ClusterNode node = cctx.discovery().node(nodeId); |
| |
| if (node != null) |
| nodes.put(node.id(), node); |
| else if (log.isInfoEnabled()) |
| log.info("Transaction node left (will ignore) " + e.getKey()); |
| } |
| } |
| } |
| |
| UUID nearNodeId = tx.eventNodeId(); |
| |
| nearTxCheck = !failedNodeIds.contains(nearNodeId) && cctx.discovery().alive(nearNodeId); |
| } |
| |
| /** |
| * Initializes future. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| public void prepare() { |
| if (nearTxCheck) { |
| UUID nearNodeId = tx.eventNodeId(); |
| |
| if (cctx.localNodeId().equals(nearNodeId)) { |
| IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion()); |
| |
| fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { |
| @Override public void apply(IgniteInternalFuture<Boolean> fut) { |
| try { |
| onDone(fut.get()); |
| } |
| catch (IgniteCheckedException e) { |
| onDone(e); |
| } |
| } |
| }); |
| } |
| else { |
| MiniFuture fut = new MiniFuture(tx.eventNodeId()); |
| |
| add(fut); |
| |
| GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest( |
| tx, |
| 0, |
| true, |
| futureId(), |
| fut.futureId(), |
| tx.activeCachesDeploymentEnabled()); |
| |
| try { |
| cctx.io().send(nearNodeId, req, tx.ioPolicy()); |
| |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Recovery fut, sent request near tx [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + nearNodeId + ']'); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignore) { |
| fut.onNodeLeft(nearNodeId); |
| } |
| catch (IgniteCheckedException e) { |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Recovery fut, failed to send request near tx [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + nearNodeId + |
| ", err=" + e + ']'); |
| } |
| |
| fut.onError(e); |
| } |
| |
| markInitialized(); |
| } |
| |
| return; |
| } |
| |
| // First check transactions on local node. |
| int locTxNum = nodeTransactions(cctx.localNodeId()); |
| |
| if (locTxNum > 1) { |
| IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum); |
| |
| if (fut == null || fut.isDone()) { |
| boolean prepared; |
| |
| try { |
| prepared = fut == null ? true : fut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Check prepared transaction future failed: " + e, e); |
| |
| prepared = false; |
| } |
| |
| if (!prepared) { |
| onDone(false); |
| |
| markInitialized(); |
| |
| return; |
| } |
| } |
| else { |
| fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { |
| @Override public void apply(IgniteInternalFuture<Boolean> fut) { |
| boolean prepared; |
| |
| try { |
| prepared = fut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Check prepared transaction future failed: " + e, e); |
| |
| prepared = false; |
| } |
| |
| if (!prepared) { |
| onDone(false); |
| |
| markInitialized(); |
| } |
| else |
| proceedPrepare(); |
| } |
| }); |
| |
| return; |
| } |
| } |
| |
| proceedPrepare(); |
| } |
| |
| /** |
| * Process prepare after local check. |
| */ |
| private void proceedPrepare() { |
| for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) { |
| UUID nodeId = entry.getKey(); |
| |
| // Skip left nodes and local node. |
| if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId())) |
| continue; |
| |
| /* |
| * If primary node failed then send message to all backups, otherwise |
| * send message only to primary node. |
| */ |
| |
| if (failedNodeIds.contains(nodeId)) { |
| for (UUID id : entry.getValue()) { |
| // Skip backup node if it is local node or if it is also was mapped as primary. |
| if (txNodes.containsKey(id) || id.equals(cctx.localNodeId())) |
| continue; |
| |
| MiniFuture fut = new MiniFuture(id); |
| |
| add(fut); |
| |
| GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(tx, |
| nodeTransactions(id), |
| false, |
| futureId(), |
| fut.futureId(), |
| tx.activeCachesDeploymentEnabled()); |
| |
| try { |
| cctx.io().send(id, req, tx.ioPolicy()); |
| |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Recovery fut, sent request to backup [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + id + ']'); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| fut.onNodeLeft(id); |
| } |
| catch (IgniteCheckedException e) { |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Recovery fut, failed to send request to backup [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + id + |
| ", err=" + e + ']'); |
| } |
| |
| fut.onError(e); |
| |
| break; |
| } |
| } |
| } |
| else { |
| MiniFuture fut = new MiniFuture(nodeId); |
| |
| add(fut); |
| |
| GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest( |
| tx, |
| nodeTransactions(nodeId), |
| false, |
| futureId(), |
| fut.futureId(), |
| tx.activeCachesDeploymentEnabled()); |
| |
| try { |
| cctx.io().send(nodeId, req, tx.ioPolicy()); |
| |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Recovery fut, sent request to primary [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + nodeId + ']'); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| fut.onNodeLeft(nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Recovery fut, failed to send request to primary [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + nodeId + |
| ", err=" + e + ']'); |
| } |
| |
| fut.onError(e); |
| |
| break; |
| } |
| } |
| } |
| |
| markInitialized(); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @return Number of transactions on node. |
| */ |
| private int nodeTransactions(UUID nodeId) { |
| int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary. |
| |
| for (Collection<UUID> backups : txNodes.values()) { |
| for (UUID backup : backups) { |
| if (backup.equals(nodeId)) { |
| cnt++; // +1 if node is backup. |
| |
| break; |
| } |
| } |
| } |
| |
| return cnt; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param res Response. |
| */ |
| public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) { |
| if (!isDone()) { |
| MiniFuture mini = miniFuture(res.miniId()); |
| |
| if (mini != null) { |
| assert mini.nodeId().equals(nodeId); |
| |
| mini.onResult(res); |
| } |
| else { |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Tx recovery fut, failed to find mini future [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + nodeId + |
| ", res=" + res + |
| ", fut=" + this + ']'); |
| } |
| } |
| } |
| else { |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Tx recovery fut, response for finished future [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + nodeId + |
| ", res=" + res + |
| ", fut=" + this + ']'); |
| } |
| } |
| } |
| |
| /** |
| * Finds pending mini future by the given mini ID. |
| * |
| * @param miniId Mini ID to find. |
| * @return Mini future. |
| */ |
| @SuppressWarnings("ForLoopReplaceableByForEach") |
| private MiniFuture miniFuture(IgniteUuid miniId) { |
| // We iterate directly over the futs collection here to avoid copy. |
| synchronized (this) { |
| int size = futuresCountNoLock(); |
| |
| // Avoid iterator creation. |
| for (int i = 0; i < size; i++) { |
| IgniteInternalFuture<Boolean> fut = future(i); |
| |
| if (!isMini(fut)) |
| continue; |
| |
| MiniFuture mini = (MiniFuture)fut; |
| |
| if (mini.futureId().equals(miniId)) { |
| if (!mini.isDone()) |
| return mini; |
| else |
| return null; |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @return Transaction. |
| */ |
| public IgniteInternalTx tx() { |
| return tx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteUuid futureId() { |
| return futId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onNodeLeft(final UUID nodeId) { |
| for (IgniteInternalFuture<?> fut : futures()) { |
| if (isMini(fut)) { |
| final MiniFuture f = (MiniFuture)fut; |
| |
| if (f.nodeId().equals(nodeId)) { |
| cctx.kernalContext().closure().runLocalSafe(new Runnable() { |
| @Override public void run() { |
| f.onNodeLeft(nodeId); |
| } |
| }); |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean trackable() { |
| return trackable; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void markNotTrackable() { |
| trackable = false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { |
| if (super.onDone(res, err)) { |
| cctx.mvcc().removeFuture(futId); |
| |
| if (err == null) { |
| assert res != null; |
| |
| cctx.tm().finishTxOnRecovery(tx, res); |
| } |
| else { |
| if (err instanceof ClusterTopologyCheckedException && nearTxCheck) { |
| if (log.isInfoEnabled()) { |
| log.info("Failed to check transaction on near node, " + |
| "ignoring [err=" + err + ", tx=" + tx + ']'); |
| } |
| } |
| else { |
| if (log.isInfoEnabled()) { |
| log.info("Failed to check prepared transactions, " + |
| "invalidating transaction [err=" + err + ", tx=" + tx + ']'); |
| } |
| |
| cctx.tm().salvageTx(tx); |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param f Future. |
| * @return {@code True} if mini-future. |
| */ |
| private boolean isMini(IgniteInternalFuture<?> f) { |
| return f.getClass().equals(MiniFuture.class); |
| } |
| |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { |
| @Override public String apply(IgniteInternalFuture<?> f) { |
| return "[node=" + ((MiniFuture)f).nodeId + |
| ", done=" + f.isDone() + "]"; |
| } |
| }); |
| |
| return S.toString(GridCacheTxRecoveryFuture.class, this, |
| "innerFuts", futs, |
| "super", super.toString()); |
| } |
| |
| /** |
| * |
| */ |
| private class MiniFuture extends GridFutureAdapter<Boolean> { |
| /** Mini future ID. */ |
| private final IgniteUuid futId = IgniteUuid.randomUuid(); |
| |
| /** Node ID. */ |
| private UUID nodeId; |
| |
| /** |
| * @param nodeId Node ID. |
| */ |
| private MiniFuture(UUID nodeId) { |
| this.nodeId = nodeId; |
| } |
| |
| /** |
| * @return Node ID. |
| */ |
| private UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** |
| * @return Future ID. |
| */ |
| private IgniteUuid futureId() { |
| return futId; |
| } |
| |
| /** |
| * @param e Error. |
| */ |
| private void onError(Throwable e) { |
| if (log.isInfoEnabled()) |
| log.info("Failed to get future result [fut=" + this + ", err=" + e + ']'); |
| |
| onDone(e); |
| } |
| |
| /** |
| * @param nodeId Failed node ID. |
| */ |
| private void onNodeLeft(UUID nodeId) { |
| if (msgLog.isInfoEnabled()) { |
| msgLog.info("Tx recovery fut, mini future node left [txId=" + tx.nearXidVersion() + |
| ", dhtTxId=" + tx.xidVersion() + |
| ", node=" + nodeId + |
| ", nearTxCheck=" + nearTxCheck + ']'); |
| } |
| |
| if (nearTxCheck) { |
| if (tx.state() == PREPARED) { |
| Set<UUID> failedNodeIds0 = new HashSet<>(failedNodeIds); |
| failedNodeIds0.add(nodeId); |
| |
| // Near and originating nodes left, need initiate tx check. |
| cctx.tm().commitIfPrepared(tx, failedNodeIds0); |
| } |
| |
| onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore).")); |
| } |
| else |
| onDone(true); |
| } |
| |
| /** |
| * @param res Result callback. |
| */ |
| private void onResult(GridCacheTxRecoveryResponse res) { |
| onDone(res.success()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(MiniFuture.class, this, "done", isDone(), "err", error()); |
| } |
| } |
| } |