blob: 9bb0e8bc14a392ecb326938bd973d0f7fe70a913 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.testframework.LogListener.matches;
/**
*
*/
@RunWith(Parameterized.class)
public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFreeCellularSwitchAbstractTest {
/** Start from. */
@Parameterized.Parameter(0)
public TransactionCoordinatorNode startFrom;
/**
*
*/
@Parameterized.Parameters(name = "Started from = {0}")
public static Collection<Object[]> runConfig() {
ArrayList<Object[]> params = new ArrayList<>();
for (TransactionCoordinatorNode from : TransactionCoordinatorNode.values())
params.add(new Object[] {from});
return params;
}
/**
* Tests checks that switch finished only when all transactions required recovery are recovered.
* Based on corner case found at TeamCity runs:
*
* We have 2 cells, the first contains partitions for k1, second for k2.
* Tx with put(k1,v1) and put(k2,v2) started and prepared.
* Then node from the first cell, which is the primary for k1, failed.
* The second cell (with key2) should NOT finish the cellular switch before tx recovered,
* otherwice stale data read is possible.
*/
@Test
public void testMutliKeyTxRecoveryHappenBeforeTheSwitchOnCellularSwitch() throws Exception {
int nodes = 6;
startGridsMultiThreaded(nodes);
blockRecoveryMessages();
CellularCluster cluster = resolveCellularCluster(nodes, startFrom);
Ignite orig = cluster.orig;
Ignite failed = cluster.failed;
List<Ignite> brokenCellNodes = cluster.brokenCellNodes;
List<Ignite> aliveCellNodes = cluster.aliveCellNodes;
CountDownLatch prepLatch = new CountDownLatch(1);
CountDownLatch commitLatch = new CountDownLatch(1);
AtomicInteger key = new AtomicInteger();
// Puts 2 entries, each on it's own cell.
IgniteInternalFuture<?> putFut = multithreadedAsync(() -> {
try {
Transaction tx = orig.transactions().txStart();
IgniteCache<Integer, Integer> cache = orig.getOrCreateCache(PART_CACHE_NAME);
cache.put(primaryKey(failed.getOrCreateCache(PART_CACHE_NAME)), 42);
key.set(primaryKey(aliveCellNodes.get(0).getOrCreateCache(PART_CACHE_NAME)));
cache.put(key.get(), key.get());
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
prepLatch.countDown();
commitLatch.await();
if (orig != failed)
((TransactionProxyImpl<?, ?>)tx).commit();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
}, 1);
prepLatch.await();
// Should be null white tx is uncommitted/unrecovered.
assertNull(aliveCellNodes.get(0).getOrCreateCache(PART_CACHE_NAME).get(key.get()));
failed.close(); // Stopping node.
awaitForSwitchOnNodeLeft(failed);
checkTransactionsCount( // Making sure txs still unrecovered.
null, 0,
brokenCellNodes, 1,
aliveCellNodes, 1,
null /*any*/);
CountDownLatch getLatch = new CountDownLatch(1);
IgniteInternalFuture<?> getFut = multithreadedAsync(() -> {
try {
IgniteCache<Integer, Integer> cache = aliveCellNodes.get(0).getOrCreateCache(PART_CACHE_NAME);
// Should be available for reading only after recovery happen (should be not null).
assertEquals((Integer)key.get(), cache.get(key.get()));
getLatch.countDown();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
}, 1);
// Get should not happen while tx is not recovered.
assertFalse(getLatch.await(10, TimeUnit.SECONDS));
// Allowing recovery.
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
spi.stopBlock(true, blockedMsg -> true);
}
// Allowing commit.
commitLatch.countDown();
putFut.get();
// Awaiting for get on alive cell.
getLatch.await();
// Making sure get finished with recovered value.
getFut.get();
// Final check that any transactions are absent.
checkTransactionsCount(
null, 0,
brokenCellNodes, 0,
aliveCellNodes, 0,
null /*any*/);
}
/**
* Test checks than non-affected nodes (alive cells) finishes the switch asap,
* that they wait only for the recovery related to these nodes (eg. replicated caches recovery that affects every node).
*/
@Test
public void testOnlyAffectedNodesWaitForRecovery() throws Exception {
int nodes = 6;
String recoveryStatusMsg = "TxRecovery Status and Timings [txs=";
LogListener lsnrAny = matches(recoveryStatusMsg).build(); // Any.
LogListener lsnrBrokenCell = matches(recoveryStatusMsg).times((nodes / 2) - 1 /*failed*/).build();
LogListener lsnrAliveCell = matches(recoveryStatusMsg).times((nodes / 2)).build();
listeningLog.registerListener(lsnrAny);
startGridsMultiThreaded(nodes);
blockRecoveryMessages();
CellularCluster cluster = resolveCellularCluster(nodes, startFrom);
Ignite orig = cluster.orig;
Ignite failed = cluster.failed;
List<Ignite> brokenCellNodes = cluster.brokenCellNodes;
List<Ignite> aliveCellNodes = cluster.aliveCellNodes;
List<Integer> partKeys = new ArrayList<>();
List<Integer> replKeys = new ArrayList<>();
for (Ignite node : G.allGrids()) {
if (!node.configuration().isClientMode()) {
partKeys.add(primaryKey(node.getOrCreateCache(PART_CACHE_NAME)));
replKeys.add(primaryKey(node.getOrCreateCache(REPL_CACHE_NAME)));
}
}
CountDownLatch partPreparedLatch = new CountDownLatch(nodes);
CountDownLatch replPreparedLatch = new CountDownLatch(nodes);
CountDownLatch partCommitLatch = new CountDownLatch(1);
CountDownLatch replCommitLatch = new CountDownLatch(1);
AtomicInteger partKeyIdx = new AtomicInteger();
AtomicInteger replKeyIdx = new AtomicInteger();
Set<GridCacheVersion> partTxVers = new GridConcurrentHashSet<>();
Set<GridCacheVersion> replTxVers = new GridConcurrentHashSet<>();
IgniteInternalFuture<?> partFut = multithreadedAsync(() -> {
try {
int idx = partKeyIdx.getAndIncrement();
Transaction tx = orig.transactions().txStart();
partTxVers.add(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
int key = partKeys.get(idx);
orig.getOrCreateCache(PART_CACHE_NAME).put(key, key);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
partPreparedLatch.countDown();
partCommitLatch.await();
if (orig != failed)
((TransactionProxyImpl<?, ?>)tx).commit();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
}, nodes);
IgniteInternalFuture<?> replFut = multithreadedAsync(() -> {
try {
int idx = replKeyIdx.getAndIncrement();
Transaction tx = orig.transactions().txStart();
replTxVers.add(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
int key = replKeys.get(idx);
orig.getOrCreateCache(REPL_CACHE_NAME).put(key, key);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
replPreparedLatch.countDown();
replCommitLatch.await();
if (orig != failed)
((TransactionProxyImpl<?, ?>)tx).commit();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
}, nodes);
partPreparedLatch.await();
replPreparedLatch.await();
checkTransactionsCount(
orig, nodes,
brokenCellNodes, nodes / 2,
aliveCellNodes, nodes / 2,
partTxVers);
checkTransactionsCount(
orig, nodes,
brokenCellNodes, nodes,
aliveCellNodes, nodes,
replTxVers);
assertFalse(lsnrAny.check());
listeningLog.registerListener(lsnrAliveCell);
failed.close(); // Stopping node.
awaitForSwitchOnNodeLeft(failed);
// In case of originating node failed all alive primaries will recover (commit) txs on tx cordinator falure.
// Txs with failed primary will start recovery, but can't finish it since recovery messages are blocked.
// Broken cell's nodes will have 1 unrecovered tx for partitioned cache.
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, nodes,
brokenCellNodes, orig == failed ? 1 : nodes / 2,
aliveCellNodes, orig == failed ? 0 : nodes / 2,
partTxVers);
// All cell's nodes will have 1 unrecovered tx for replicated cache.
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, nodes,
brokenCellNodes, orig == failed ? 1 : nodes,
aliveCellNodes, orig == failed ? 1 : nodes,
replTxVers);
// Counts tx's creations and preparations.
BiConsumer<T2<Ignite, String>, T3<CountDownLatch, CountDownLatch, CountDownLatch>> txRun =
(T2<Ignite, String> pair, T3</*create*/CountDownLatch, /*put*/CountDownLatch, /*commit*/CountDownLatch> latches) -> {
try {
Ignite ignite = pair.get1();
String cacheName = pair.get2();
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName);
try (Transaction tx = ignite.transactions().txStart()) {
latches.get1().countDown(); // Create.
// Avoiding intersection with prepared keys.
cache.put(primaryKeys(cache, 1, 1_000).get(0), 42);
latches.get2().countDown(); // Put.
tx.commit();
latches.get3().countDown(); // Commit.
}
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
};
CountDownLatch partBrokenCellCreateLatch = new CountDownLatch(brokenCellNodes.size());
CountDownLatch partBrokenCellPutLatch = new CountDownLatch(brokenCellNodes.size());
CountDownLatch partBrokenCellCommitLatch = new CountDownLatch(brokenCellNodes.size());
CountDownLatch partAliveCellCreateLatch = new CountDownLatch(aliveCellNodes.size());
CountDownLatch partAliveCellPutLatch = new CountDownLatch(aliveCellNodes.size());
CountDownLatch partAliveCellCommitLatch = new CountDownLatch(aliveCellNodes.size());
CountDownLatch replBrokenCellCreateLatch = new CountDownLatch(brokenCellNodes.size());
CountDownLatch replBrokenCellPutLatch = new CountDownLatch(brokenCellNodes.size());
CountDownLatch replBrokenCellCommitLatch = new CountDownLatch(brokenCellNodes.size());
CountDownLatch replAliveCellCreateLatch = new CountDownLatch(aliveCellNodes.size());
CountDownLatch replAliveCellPutLatch = new CountDownLatch(aliveCellNodes.size());
CountDownLatch replAliveCellCommitLatch = new CountDownLatch(aliveCellNodes.size());
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (Ignite brokenCellNode : brokenCellNodes) {
futs.add(multithreadedAsync(() ->
txRun.accept(new T2<>(brokenCellNode, REPL_CACHE_NAME),
new T3<>(replBrokenCellCreateLatch, replBrokenCellPutLatch, replBrokenCellCommitLatch)), 1));
futs.add(multithreadedAsync(() ->
txRun.accept(new T2<>(brokenCellNode, PART_CACHE_NAME),
new T3<>(partBrokenCellCreateLatch, partBrokenCellPutLatch, partBrokenCellCommitLatch)), 1));
}
for (Ignite aliveCellNode : aliveCellNodes) {
futs.add(multithreadedAsync(() ->
txRun.accept(new T2<>(aliveCellNode, REPL_CACHE_NAME),
new T3<>(replAliveCellCreateLatch, replAliveCellPutLatch, replAliveCellCommitLatch)), 1));
futs.add(multithreadedAsync(() ->
txRun.accept(new T2<>(aliveCellNode, PART_CACHE_NAME),
new T3<>(partAliveCellCreateLatch, partAliveCellPutLatch, partAliveCellCommitLatch)), 1));
}
// Switch in progress cluster-wide.
// Alive nodes switch blocked until replicated caches recovery happen.
checkUpcomingTransactionsState(
partBrokenCellCreateLatch, 0, // Started.
partBrokenCellPutLatch, brokenCellNodes.size(),
partBrokenCellCommitLatch, brokenCellNodes.size(),
partAliveCellCreateLatch, 0, // Started. Blocked by replicated cache recovery.
partAliveCellPutLatch, aliveCellNodes.size(),
partAliveCellCommitLatch, aliveCellNodes.size());
checkUpcomingTransactionsState(
replBrokenCellCreateLatch, 0, // Started.
replBrokenCellPutLatch, brokenCellNodes.size(),
replBrokenCellCommitLatch, brokenCellNodes.size(),
replAliveCellCreateLatch, 0, // Started. Blocked by replicated cache recovery.
replAliveCellPutLatch, aliveCellNodes.size(),
replAliveCellCommitLatch, aliveCellNodes.size());
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, nodes,
brokenCellNodes, orig == failed ? 1 : nodes / 2,
aliveCellNodes, orig == failed ? 0 : nodes / 2,
partTxVers);
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, nodes,
brokenCellNodes, orig == failed ? 1 : nodes,
aliveCellNodes, orig == failed ? 1 : nodes,
replTxVers);
// Replicated recovery.
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
spi.stopBlock(true, blockedMsg -> {
Message msg = blockedMsg.ioMessage().message();
return replTxVers.contains(((GridCacheTxRecoveryRequest)msg).nearXidVersion());
});
}
replCommitLatch.countDown();
replFut.get();
// Switch partially finished.
// Broken cell still in switch.
// Alive cell finished the switch.
checkUpcomingTransactionsState(
partBrokenCellCreateLatch, 0, // Started.
partBrokenCellPutLatch, brokenCellNodes.size(),
partBrokenCellCommitLatch, brokenCellNodes.size(),
partAliveCellCreateLatch, 0, // Started.
partAliveCellPutLatch, 0, // Alive cell nodes's able to start transactions on primaries,
partAliveCellCommitLatch, 0); // Able to commit, since all primaries and backups are inside the alive cell.
checkUpcomingTransactionsState(
replBrokenCellCreateLatch, 0, // Started.
replBrokenCellPutLatch, brokenCellNodes.size(),
replBrokenCellCommitLatch, brokenCellNodes.size(),
replAliveCellCreateLatch, 0, // Started.
replAliveCellPutLatch, 0, // Alive cell's nodes able to start transactions on primaries,
replAliveCellCommitLatch, aliveCellNodes.size()); // But not able to commit, since broken cell's nodes still in switch.
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, nodes,
brokenCellNodes, orig == failed ? 1 : nodes / 2,
aliveCellNodes, orig == failed ? 0 : nodes / 2 /*to be committed*/, // New txs able to start while previous are in progress.
partTxVers);
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, 0,
brokenCellNodes, 0,
aliveCellNodes, 0,
replTxVers);
// Recovery finished on alive cell.
assertTrue(waitForCondition(lsnrAliveCell::check, 5000));
listeningLog.registerListener(lsnrBrokenCell);
// Partitioned recovery.
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
spi.stopBlock(true, blockedMsg -> {
Message msg = blockedMsg.ioMessage().message();
return partTxVers.contains(((GridCacheTxRecoveryRequest)msg).nearXidVersion());
});
}
partCommitLatch.countDown();
partFut.get();
// Switches finished cluster-wide, all transactions can be committed.
checkUpcomingTransactionsState(
replBrokenCellCreateLatch, 0,
replBrokenCellPutLatch, 0,
replBrokenCellCommitLatch, 0,
replAliveCellCreateLatch, 0,
replAliveCellPutLatch, 0,
replAliveCellCommitLatch, 0);
checkUpcomingTransactionsState(
partBrokenCellCreateLatch, 0,
partBrokenCellPutLatch, 0,
partBrokenCellCommitLatch, 0,
partAliveCellCreateLatch, 0,
partAliveCellPutLatch, 0,
partAliveCellCommitLatch, 0);
// Check that pre-failure transactions are absent.
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, 0,
brokenCellNodes, 0,
aliveCellNodes, 0,
partTxVers);
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, 0,
brokenCellNodes, 0,
aliveCellNodes, 0,
replTxVers);
// Recovery finished on broken cell.
assertTrue(waitForCondition(lsnrBrokenCell::check, 5000));
for (IgniteInternalFuture<?> fut : futs)
fut.get();
for (Ignite node : G.allGrids()) {
for (int key : partKeys)
assertEquals(key, node.getOrCreateCache(PART_CACHE_NAME).get(key));
for (int key : replKeys)
assertEquals(key, node.getOrCreateCache(REPL_CACHE_NAME).get(key));
}
// Final check that any transactions are absent.
checkTransactionsCount(
null, 0,
brokenCellNodes, 0,
aliveCellNodes, 0,
null /*any*/);
}
/**
*
*/
private void checkUpcomingTransactionsState(
CountDownLatch brokenCellCreateLatch,
int brokenCellCreateCnt,
CountDownLatch brokenCellPutLatch,
int brokenCellPutCnt,
CountDownLatch brokenCellCommitLatch,
int brokenCellCommitCnt,
CountDownLatch aliveCellCreateLatch,
int aliveCellCreateCnt,
CountDownLatch aliveCellPutLatch,
int aliveCellPutCnt,
CountDownLatch aliveCellCommitLatch,
int aliveCellCommitCnt) throws InterruptedException {
checkTransactionsState(brokenCellCreateLatch, brokenCellCreateCnt);
checkTransactionsState(brokenCellPutLatch, brokenCellPutCnt);
checkTransactionsState(brokenCellCommitLatch, brokenCellCommitCnt);
checkTransactionsState(aliveCellCreateLatch, aliveCellCreateCnt);
checkTransactionsState(aliveCellPutLatch, aliveCellPutCnt);
checkTransactionsState(aliveCellCommitLatch, aliveCellCommitCnt);
}
/**
*
*/
private void checkTransactionsState(CountDownLatch latch, int cnt) throws InterruptedException {
if (cnt == 0)
latch.await(10, TimeUnit.SECONDS); // Switch finished (finishing).
assertEquals(cnt, latch.getCount()); // Switch in progress.
}
}