| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.transactions; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.cache.CacheWriteSynchronizationMode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.failure.StopNodeFailureHandler; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.TestRecordingCommunicationSpi; |
| import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionState; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; |
| import static org.apache.ignite.cluster.ClusterState.ACTIVE; |
| import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi; |
| import static org.apache.ignite.testframework.GridTestUtils.runAsync; |
| import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; |
| import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; |
| |
| /** |
| */ |
| public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest { |
| /** Backups. */ |
| private int backups; |
| |
| /** Persistence. */ |
| private boolean persistence; |
| |
| /** Sync mode. */ |
| private CacheWriteSynchronizationMode syncMode; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { |
| final IgniteConfiguration cfg = super.getConfiguration(name); |
| |
| cfg.setConsistentId(name); |
| |
| if (persistence) { |
| cfg.setDataStorageConfiguration( |
| new DataStorageConfiguration(). |
| setWalSegmentSize(4 * 1024 * 1024). |
| setWalHistorySize(1000). |
| setCheckpointFrequency(Integer.MAX_VALUE). |
| setDefaultDataRegionConfiguration( |
| new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(50 * 1024 * 1024))); |
| } |
| |
| cfg.setActiveOnStart(false); |
| cfg.setClientMode(name.startsWith("client")); |
| cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); |
| |
| cfg.setFailureHandler(new StopNodeFailureHandler()); |
| |
| cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME). |
| setCacheMode(PARTITIONED). |
| setBackups(backups). |
| setAtomicityMode(TRANSACTIONAL). |
| setWriteSynchronizationMode(syncMode)); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| super.afterTest(); |
| |
| stopAllGrids(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** |
| * The test enforces specific order in messages processing during concurrent tx rollback and tx recovery due to |
| * node left. |
| * <p> |
| * Expected result: both DHT transactions produces same COMMITTED state on tx finish. |
| * */ |
| @Test |
| public void testRecoveryNotBreakingTxAtomicityOnNearFail() throws Exception { |
| backups = 1; |
| persistence = false; |
| |
| final IgniteEx node0 = startGrids(3); |
| node0.cluster().state(ACTIVE); |
| |
| final Ignite client = startGrid("client"); |
| |
| final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME); |
| |
| final List<Integer> g0Keys = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 100); |
| final List<Integer> g1Keys = primaryKeys(grid(1).cache(DEFAULT_CACHE_NAME), 100); |
| |
| final List<Integer> g2BackupKeys = backupKeys(grid(2).cache(DEFAULT_CACHE_NAME), 100, 0); |
| |
| Integer k1 = null; |
| Integer k2 = null; |
| |
| for (Integer key : g2BackupKeys) { |
| if (g0Keys.contains(key)) |
| k1 = key; |
| else if (g1Keys.contains(key)) |
| k2 = key; |
| |
| if (k1 != null && k2 != null) |
| break; |
| } |
| |
| assertNotNull(k1); |
| assertNotNull(k2); |
| |
| List<IgniteInternalTx> txs0 = null; |
| List<IgniteInternalTx> txs1 = null; |
| |
| CountDownLatch stripeBlockLatch = new CountDownLatch(1); |
| |
| int[] stripeHolder = new int[1]; |
| |
| try (final Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { |
| cache.put(k1, Boolean.TRUE); |
| cache.put(k2, Boolean.TRUE); |
| |
| TransactionProxyImpl p = (TransactionProxyImpl)tx; |
| p.tx().prepare(true); |
| |
| txs0 = txs(grid(0)); |
| txs1 = txs(grid(1)); |
| List<IgniteInternalTx> txs2 = txs(grid(2)); |
| |
| assertTrue(txs0.size() == 1); |
| assertTrue(txs1.size() == 1); |
| assertTrue(txs2.size() == 2); |
| |
| // Prevent recovery request for grid1 tx branch to go to grid0. |
| spi(grid(1)).blockMessages(GridCacheTxRecoveryRequest.class, grid(0).name()); |
| // Prevent finish(false) request processing on node0. |
| spi(client).blockMessages(GridNearTxFinishRequest.class, grid(0).name()); |
| |
| int stripe = U.safeAbs(p.tx().xidVersion().hashCode()); |
| |
| stripeHolder[0] = stripe; |
| |
| // Blocks stripe processing for rollback request on node1. |
| grid(1).context().pools().getStripedExecutorService().execute(stripe, () -> U.awaitQuiet(stripeBlockLatch)); |
| // Dummy task to ensure msg is processed. |
| grid(1).context().pools().getStripedExecutorService().execute(stripe, () -> {}); |
| |
| runAsync(() -> { |
| spi(client).waitForBlocked(); |
| |
| client.close(); |
| |
| return null; |
| }); |
| |
| tx.rollback(); |
| |
| fail(); |
| } |
| catch (Exception ignored) { |
| // Expected. |
| } |
| |
| // Wait until tx0 is committed by recovery on node0. |
| assertNotNull(txs0); |
| try { |
| txs0.get(0).finishFuture().get(3_000); |
| } |
| catch (IgniteFutureTimeoutCheckedException e) { |
| // If timeout happens recovery message from g0 to g1 is mapped to the same stripe as near finish request. |
| // We will complete latch to allow sequential processing. |
| stripeBlockLatch.countDown(); |
| |
| // Wait until sequential processing is finished. |
| assertTrue("sequential processing", GridTestUtils.waitForCondition(() -> |
| grid(1).context().pools().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000)); |
| |
| // Unblock recovery message from g1 to g0 because tx is in RECOVERY_FINISH state and waits for recovery end. |
| spi(grid(1)).stopBlock(); |
| |
| txs0.get(0).finishFuture().get(); |
| txs1.get(0).finishFuture().get(); |
| |
| final TransactionState s1 = txs0.get(0).state(); |
| final TransactionState s2 = txs1.get(0).state(); |
| |
| assertEquals(s1, s2); |
| |
| return; |
| } |
| |
| // Release rollback request processing, triggering an attempt to rollback the transaction during recovery. |
| stripeBlockLatch.countDown(); |
| |
| // Wait until finish message is processed. |
| assertTrue("concurrent processing", GridTestUtils.waitForCondition(() -> |
| grid(1).context().pools().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000)); |
| |
| // Proceed with recovery on grid1 -> grid0. Tx0 is committed so tx1 also should be committed. |
| spi(grid(1)).stopBlock(); |
| |
| assertNotNull(txs1); |
| txs1.get(0).finishFuture().get(); |
| |
| final TransactionState s1 = txs0.get(0).state(); |
| final TransactionState s2 = txs1.get(0).state(); |
| |
| assertEquals(s1, s2); |
| } |
| |
| /** */ |
| @Test |
| public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_FULL_SYNC() throws Exception { |
| doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(FULL_SYNC); |
| } |
| |
| /** */ |
| @Test |
| public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_PRIMARY_SYNC() throws Exception { |
| doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(PRIMARY_SYNC); |
| } |
| |
| /** */ |
| @Test |
| public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_FULL_ASYNC() throws Exception { |
| doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(FULL_ASYNC); |
| } |
| |
| /** |
| * Stop near and primary node after primary tx is rolled back with enabled persistence. |
| * <p> |
| * Expected result: after restarting a primary node all partitions are consistent. |
| */ |
| private void doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(CacheWriteSynchronizationMode syncMode) |
| throws Exception { |
| backups = 2; |
| persistence = true; |
| this.syncMode = syncMode; |
| |
| final IgniteEx node0 = startGrids(3); |
| node0.cluster().state(ACTIVE); |
| |
| final Ignite client = startGrid("client"); |
| |
| final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME); |
| |
| final Integer pk = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME)); |
| |
| IgniteInternalFuture<Void> fut = null; |
| |
| List<IgniteInternalTx> tx0 = null; |
| List<IgniteInternalTx> tx2 = null; |
| |
| try (final Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { |
| cache.put(pk, Boolean.TRUE); |
| |
| TransactionProxyImpl p = (TransactionProxyImpl)tx; |
| p.tx().prepare(true); |
| |
| tx0 = txs(grid(0)); |
| tx2 = txs(grid(2)); |
| |
| spi(grid(1)).blockMessages((node, msg) -> msg instanceof GridDhtTxFinishRequest); |
| |
| fut = runAsync(() -> { |
| spi(grid(1)).waitForBlocked(2); |
| |
| client.close(); |
| grid(1).close(); |
| |
| return null; |
| }); |
| |
| tx.rollback(); |
| } |
| catch (Exception e) { |
| // No-op. |
| } |
| |
| fut.get(); |
| |
| final IgniteInternalTx tx_0 = tx0.get(0); |
| tx_0.finishFuture().get(); |
| |
| final IgniteInternalTx tx_2 = tx2.get(0); |
| tx_2.finishFuture().get(); |
| |
| assertPartitionsSame(idleVerify(grid(0), DEFAULT_CACHE_NAME)); |
| |
| startGrid(1); |
| |
| awaitPartitionMapExchange(); |
| |
| assertPartitionsSame(idleVerify(grid(0), DEFAULT_CACHE_NAME)); |
| } |
| |
| /** |
| * @param g Grid. |
| */ |
| private List<IgniteInternalTx> txs(IgniteEx g) { |
| return new ArrayList<>(g.context().cache().context().tm().activeTransactions()); |
| } |
| |
| /** |
| * Start 3 servers, |
| * start 2 clients, |
| * start two OPTIMISTIC transactions with the same key from different client nodes, |
| * trying to transfer both to PREPARED state, |
| * stop one client node. |
| */ |
| @Test |
| public void testTxDoesntBecomePreparedAfterError() throws Exception { |
| backups = 2; |
| persistence = true; |
| syncMode = FULL_ASYNC; |
| |
| final IgniteEx node0 = startGrids(3); |
| |
| node0.cluster().state(ACTIVE); |
| |
| final IgniteEx client1 = startGrid("client1"); |
| final IgniteEx client2 = startGrid("client2"); |
| |
| awaitPartitionMapExchange(); |
| |
| final IgniteCache<Object, Object> cache = client1.cache(DEFAULT_CACHE_NAME); |
| final IgniteCache<Object, Object> cache2 = client2.cache(DEFAULT_CACHE_NAME); |
| |
| final Integer pk = primaryKey(node0.cache(DEFAULT_CACHE_NAME)); |
| |
| CountDownLatch txPrepareLatch = new CountDownLatch(1); |
| |
| GridTestUtils.runMultiThreadedAsync(() -> { |
| try (final Transaction tx = client1.transactions().withLabel("tx1").txStart(OPTIMISTIC, READ_COMMITTED, 5000, 1)) { |
| cache.put(pk, Boolean.TRUE); |
| |
| TransactionProxyImpl p = (TransactionProxyImpl)tx; |
| |
| // To prevent tx rollback on exit from try-with-resource block, this should cause another tx timeout fail. |
| spi(client1).blockMessages((node, msg) -> msg instanceof GridNearTxFinishRequest); |
| |
| log.info("Test, preparing tx: xid=" + tx.xid() + ", tx=" + tx); |
| |
| // Doing only prepare to try to lock the key, commit is not needed here. |
| p.tx().prepareNearTxLocal(); |
| |
| p.tx().currentPrepareFuture().listen(fut -> txPrepareLatch.countDown()); |
| } catch (Exception e) { |
| // No-op. |
| } |
| }, 1, "tx1-thread"); |
| |
| try (final Transaction tx = client2.transactions().withLabel("tx2").txStart(OPTIMISTIC, READ_COMMITTED, 5000, 1)) { |
| cache2.put(pk, Boolean.TRUE); |
| |
| TransactionProxyImpl p = (TransactionProxyImpl)tx; |
| |
| log.info("Test, preparing tx: xid=" + tx.xid() + ", tx=" + tx); |
| |
| p.tx().prepareNearTxLocal(); |
| |
| p.tx().currentPrepareFuture().listen(fut -> txPrepareLatch.countDown()); |
| |
| txPrepareLatch.await(6, TimeUnit.SECONDS); |
| |
| if (txPrepareLatch.getCount() > 0) |
| fail("Failed to await for tx prepare."); |
| |
| AtomicReference<GridDhtTxLocal> dhtTxLocRef = new AtomicReference<>(); |
| |
| assertTrue(waitForCondition(() -> { |
| dhtTxLocRef.set((GridDhtTxLocal) txs(node0).stream() |
| .filter(t -> t.state() == TransactionState.PREPARING) |
| .findFirst() |
| .orElse(null) |
| ); |
| |
| return dhtTxLocRef.get() != null; |
| }, 6_000)); |
| |
| assertNotNull(dhtTxLocRef.get()); |
| |
| UUID clientNodeToFail = dhtTxLocRef.get().eventNodeId(); |
| |
| GridDhtTxPrepareFuture prep = GridTestUtils.getFieldValue(dhtTxLocRef.get(), "prepFut"); |
| |
| prep.get(); |
| |
| List<IgniteInternalTx> txs = txs(node0); |
| |
| String txsStr = txs.stream().map(Object::toString).collect(Collectors.joining(", ")); |
| |
| log.info("Transactions check point [count=" + txs.size() + ", txs=" + txsStr + "]"); |
| |
| if (clientNodeToFail.equals(client1.localNode().id())) |
| client1.close(); |
| else if (clientNodeToFail.equals(client2.localNode().id())) |
| client2.close(); |
| } |
| catch (Exception e) { |
| log.error(e.getMessage(), e); |
| } |
| |
| U.sleep(500); |
| |
| assertEquals(3, grid(1).context().discovery().aliveServerNodes().size()); |
| |
| assertEquals(txs(client1).toString() + ", " + txs(client2).toString(), 1, txs(client1).size() + txs(client2).size()); |
| } |
| } |