blob: 42a4fb2767ef79ddf9b06cccd5d97bc71566d61a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.testframework.LogListener.matches;
/**
*
*/
@RunWith(Parameterized.class)
public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFreeCellularSwitchAbstractTest {
/** Start from. */
@Parameterized.Parameter(0)
public TransactionCoordinatorNode startFrom;
/**
*
*/
@Parameterized.Parameters(name = "Started from = {0}")
public static Collection<Object[]> runConfig() {
ArrayList<Object[]> params = new ArrayList<>();
for (TransactionCoordinatorNode from : TransactionCoordinatorNode.values())
params.add(new Object[] {from});
return params;
}
/**
*
*/
@Test
public void testOnlyAffectedNodesWaitForRecovery() throws Exception {
int nodes = 6;
String recoveryStatusMsg = "TxRecovery Status and Timings [txs=";
LogListener lsnrAny = matches(recoveryStatusMsg).build(); // Any.
LogListener lsnrBackup = matches(recoveryStatusMsg).times((nodes / 2) - 1).build(); // Cell 1 (backups).
LogListener lsnrNear = matches(recoveryStatusMsg).times((nodes / 2)).build(); // Cell 2 (near).
listeningLog.registerListener(lsnrAny);
startGridsMultiThreaded(nodes);
blockRecoveryMessages();
Ignite failed = G.allGrids().get(new Random().nextInt(nodes));
Integer partKey = primaryKey(failed.getOrCreateCache(PART_CACHE_NAME));
Integer replKey = primaryKey(failed.getOrCreateCache(REPL_CACHE_NAME));
List<Ignite> backupNodes = backupNodes(partKey, PART_CACHE_NAME);
List<Ignite> nearNodes = new ArrayList<>(G.allGrids());
nearNodes.remove(failed);
nearNodes.removeAll(backupNodes);
assertTrue(Collections.disjoint(backupNodes, nearNodes));
assertEquals(nodes / 2 - 1, backupNodes.size()); // Cell 1.
assertEquals(nodes / 2, nearNodes.size()); // Cell 2.
Ignite orig;
switch (startFrom) {
case PRIMARY:
orig = failed;
break;
case BACKUP:
orig = backupNodes.get(0);
break;
case NEAR:
orig = nearNodes.get(0);
break;
case CLIENT:
orig = startClientGrid();
break;
default:
throw new UnsupportedOperationException();
}
Set<GridCacheVersion> vers = new GridConcurrentHashSet<>();
CountDownLatch partPreparedLatch = new CountDownLatch(1);
CountDownLatch partCommitLatch = new CountDownLatch(1);
CountDownLatch replPreparedLatch = new CountDownLatch(1);
CountDownLatch replCommitLatch = new CountDownLatch(1);
IgniteInternalFuture<?> partFut = multithreadedAsync(() -> {
try {
checkTransactionsCount(
orig, 0,
failed, 0,
backupNodes, 0,
nearNodes, 0,
vers);
Transaction tx = orig.transactions().txStart();
vers.add(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
checkTransactionsCount(
orig, 1,
failed, 0,
backupNodes, 0,
nearNodes, 0,
vers);
orig.getOrCreateCache(PART_CACHE_NAME).put(partKey, 42);
checkTransactionsCount(
orig, 1,
failed, 1,
backupNodes, 0,
nearNodes, 0,
vers);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
checkTransactionsCount(
orig, 1,
failed, 1,
backupNodes, 1,
nearNodes, 0,
vers);
partPreparedLatch.countDown();
partCommitLatch.await();
if (orig != failed)
((TransactionProxyImpl<?, ?>)tx).commit();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
}, 1);
AtomicReference<GridCacheVersion> replTxVer = new AtomicReference<>();
IgniteInternalFuture<?> replFut = multithreadedAsync(() -> {
try {
partPreparedLatch.await(); // Waiting for partitioned cache tx preparation.
checkTransactionsCount(
orig, 1,
failed, 1,
backupNodes, 1,
nearNodes, 0,
vers);
Transaction tx = orig.transactions().txStart();
replTxVer.set(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
vers.add(replTxVer.get());
checkTransactionsCount(
orig, 2,
failed, 1,
backupNodes, 1,
nearNodes, 0,
vers);
orig.getOrCreateCache(REPL_CACHE_NAME).put(replKey, 43);
checkTransactionsCount(
orig, 2,
failed, 2,
backupNodes, 1,
nearNodes, 0,
vers);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
checkTransactionsCount(
orig, 2,
failed, 2,
backupNodes, 2,
nearNodes, 1,
vers);
replPreparedLatch.countDown();
replCommitLatch.await();
if (orig != failed)
((TransactionProxyImpl<?, ?>)tx).commit();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
}, 1);
partPreparedLatch.await();
replPreparedLatch.await();
checkTransactionsCount(
orig, 2,
failed, 2,
backupNodes, 2,
nearNodes, 1,
vers);
assertFalse(lsnrAny.check());
listeningLog.registerListener(lsnrNear);
failed.close(); // Stopping node.
awaitForSwitchOnNodeLeft(failed);
checkTransactionsCount(
orig != failed ? orig : null /*stopped*/, 2 /* replicated + partitioned */,
null /*stopped*/, 0,
backupNodes, 2 /* replicated + partitioned */,
nearNodes, 1 /* replicated */,
vers);
BiConsumer<T2<Ignite, String>, T3<CountDownLatch, CountDownLatch, CountDownLatch>> txRun = // Counts tx's creations and preparations.
(T2<Ignite, String> pair, T3</*create*/CountDownLatch, /*put*/CountDownLatch, /*commit*/CountDownLatch> latches) -> {
try {
Ignite ignite = pair.get1();
String cacheName = pair.get2();
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName);
try (Transaction tx = ignite.transactions().txStart()) {
latches.get1().countDown(); // Create.
cache.put(primaryKeys(cache, 100).get(99), 2);
latches.get2().countDown(); // Put.
tx.commit();
latches.get3().countDown(); // Commit.
}
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
};
CountDownLatch replBackupCreateLatch = new CountDownLatch(backupNodes.size());
CountDownLatch replBackupPutLatch = new CountDownLatch(backupNodes.size());
CountDownLatch replBackupCommitLatch = new CountDownLatch(backupNodes.size());
CountDownLatch replNearCreateLatch = new CountDownLatch(nearNodes.size());
CountDownLatch replNearPutLatch = new CountDownLatch(nearNodes.size());
CountDownLatch replNearCommitLatch = new CountDownLatch(nearNodes.size());
CountDownLatch partBackupCreateLatch = new CountDownLatch(backupNodes.size());
CountDownLatch partBackupPutLatch = new CountDownLatch(backupNodes.size());
CountDownLatch partBackupCommitLatch = new CountDownLatch(backupNodes.size());
CountDownLatch partNearCreateLatch = new CountDownLatch(nearNodes.size());
CountDownLatch partNearPutLatch = new CountDownLatch(nearNodes.size());
CountDownLatch partNearCommitLatch = new CountDownLatch(nearNodes.size());
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (Ignite backup : backupNodes) {
futs.add(multithreadedAsync(() ->
txRun.accept(new T2<>(backup, REPL_CACHE_NAME),
new T3<>(replBackupCreateLatch, replBackupPutLatch, replBackupCommitLatch)), 1));
futs.add(multithreadedAsync(() ->
txRun.accept(new T2<>(backup, PART_CACHE_NAME),
new T3<>(partBackupCreateLatch, partBackupPutLatch, partBackupCommitLatch)), 1));
}
for (Ignite near : nearNodes) {
futs.add(multithreadedAsync(() ->
txRun.accept(new T2<>(near, REPL_CACHE_NAME),
new T3<>(replNearCreateLatch, replNearPutLatch, replNearCommitLatch)), 1));
futs.add(multithreadedAsync(() ->
txRun.accept(new T2<>(near, PART_CACHE_NAME),
new T3<>(partNearCreateLatch, partNearPutLatch, partNearCommitLatch)), 1));
}
// Switch in progress cluster-wide.
checkUpcomingTransactionsState(
replBackupCreateLatch, 0, // Started.
replBackupPutLatch, backupNodes.size(),
replBackupCommitLatch, backupNodes.size(),
replNearCreateLatch, 0, // Started.
replNearPutLatch, nearNodes.size(),
replNearCommitLatch, nearNodes.size());
checkUpcomingTransactionsState(
partBackupCreateLatch, 0, // Started.
partBackupPutLatch, backupNodes.size(),
partBackupCommitLatch, backupNodes.size(),
partNearCreateLatch, 0, // Started.
partNearPutLatch, nearNodes.size(),
partNearCommitLatch, nearNodes.size());
checkTransactionsCount(
orig != failed ? orig : null, 2 /* replicated + partitioned */,
null, 0,
backupNodes, 2 /* replicated + partitioned */,
nearNodes, 1 /* replicated */,
vers);
// Replicated recovery.
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
spi.stopBlock(true, t -> {
Message msg = t.get2().message();
return ((GridCacheTxRecoveryRequest)msg).nearXidVersion().equals(replTxVer.get());
});
}
replCommitLatch.countDown();
replFut.get();
// Switch partially finished.
// Cell 1 (backups) still in switch.
// Cell 2 (near nodes) finished the switch.
checkUpcomingTransactionsState(
replBackupCreateLatch, 0, // Started.
replBackupPutLatch, backupNodes.size(),
replBackupCommitLatch, backupNodes.size(),
replNearCreateLatch, 0, // Started.
replNearPutLatch, 0, // Near nodes able to start transactions on primaries (Cell 2),
replNearCommitLatch, nearNodes.size()); // But not able to commit, since some backups (Cell 1) still in switch.
checkUpcomingTransactionsState(
partBackupCreateLatch, 0, // Started.
partBackupPutLatch, backupNodes.size(),
partBackupCommitLatch, backupNodes.size(),
partNearCreateLatch, 0, // Started.
partNearPutLatch, 0, // Near nodes able to start transactions on primaries (Cell 2),
partNearCommitLatch, 0); // Able to commit, since all primaries and backups are in Cell 2.
checkTransactionsCount(
orig != failed ? orig : null, 1 /* partitioned */,
null, 0,
backupNodes, 1 /* partitioned */,
nearNodes, 0,
vers);
assertTrue(waitForCondition(lsnrNear::check, 5000));
listeningLog.registerListener(lsnrBackup);
// Partitioned recovery.
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
spi.stopBlock(true, (t) -> true);
}
partCommitLatch.countDown();
partFut.get();
// Switches finished cluster-wide, all transactions can be committed.
checkUpcomingTransactionsState(
replBackupCreateLatch, 0,
replBackupPutLatch, 0,
replBackupCommitLatch, 0,
replNearCreateLatch, 0,
replNearPutLatch, 0,
replNearCommitLatch, 0);
checkUpcomingTransactionsState(
partBackupCreateLatch, 0,
partBackupPutLatch, 0,
partBackupCommitLatch, 0,
partNearCreateLatch, 0,
partNearPutLatch, 0,
partNearCommitLatch, 0);
// Check that pre-failure transactions are absent.
checkTransactionsCount(
orig != failed ? orig : null, 0,
null, 0,
backupNodes, 0,
nearNodes, 0,
vers);
assertTrue(waitForCondition(lsnrBackup::check, 5000));
for (IgniteInternalFuture<?> fut : futs)
fut.get();
for (Ignite node : G.allGrids()) {
assertEquals(42, node.getOrCreateCache(PART_CACHE_NAME).get(partKey));
assertEquals(43, node.getOrCreateCache(REPL_CACHE_NAME).get(replKey));
}
// Final check that any transactions are absent.
checkTransactionsCount(
null, 0,
null, 0,
backupNodes, 0,
nearNodes, 0,
null);
}
/**
*
*/
private void checkUpcomingTransactionsState(
CountDownLatch backupCreateLatch,
int backupCreateCnt,
CountDownLatch backupPutLatch,
int backupPutCnt,
CountDownLatch backupCommitLatch,
int backupCommitCnt,
CountDownLatch nearCreateLatch,
int nearCreateCnt,
CountDownLatch nearPutLatch,
int nearPutCnt,
CountDownLatch nearCommitLatch,
int nearCommitCnt) throws InterruptedException {
checkTransactionsState(backupCreateLatch, backupCreateCnt);
checkTransactionsState(backupPutLatch, backupPutCnt);
checkTransactionsState(backupCommitLatch, backupCommitCnt);
checkTransactionsState(nearCreateLatch, nearCreateCnt);
checkTransactionsState(nearPutLatch, nearPutCnt);
checkTransactionsState(nearCommitLatch, nearCommitCnt);
}
/**
*
*/
private void checkTransactionsState(CountDownLatch latch, int cnt) throws InterruptedException {
if (cnt > 0)
assertEquals(cnt, latch.getCount()); // Switch in progress.
else
latch.await(); // Switch finished (finishing).
}
}