blob: 4a392c027f9d53f80d455121a44ce084aa558315 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
*
*/
@RunWith(Parameterized.class)
public class GridExchangeFreeCellularSwitchComplexOperationsTest extends GridExchangeFreeCellularSwitchAbstractTest {
/** Concurrency. */
@Parameterized.Parameter(0)
public TransactionConcurrency concurrency;
/** Isolation. */
@Parameterized.Parameter(1)
public TransactionIsolation isolation;
/** Start from. */
@Parameterized.Parameter(2)
public TransactionCoordinatorNode startFrom;
/**
*
*/
@Parameterized.Parameters(name = "Isolation = {0}, Concurrency = {1}, Started from = {2}")
public static Collection<Object[]> runConfig() {
ArrayList<Object[]> params = new ArrayList<>();
for (TransactionConcurrency concurrency : TransactionConcurrency.values())
for (TransactionIsolation isolation : TransactionIsolation.values())
for (TransactionCoordinatorNode from : TransactionCoordinatorNode.values())
params.add(new Object[] {concurrency, isolation, from});
return params;
}
/**
* Test checks that txs will be recovered on Cellular switch if prepared, regardless of their content,
* as well as upcoming txs will be committed.
*/
@Test
public void testComplexOperationsRecoveryOnCellularSwitch() throws Exception {
int nodes = 6;
startGridsMultiThreaded(nodes);
blockRecoveryMessages();
CellularCluster cluster = resolveCellularCluster(nodes, startFrom);
Ignite orig = cluster.orig;
Ignite failed = cluster.failed;
List<Ignite> brokenCellNodes = cluster.brokenCellNodes;
List<Ignite> aliveCellNodes = cluster.aliveCellNodes;
int recFutsCnt = 7;
CountDownLatch prepLatch = new CountDownLatch(recFutsCnt);
CountDownLatch commitLatch = new CountDownLatch(1);
Set<Integer> partSet = new GridConcurrentHashSet<>();
Set<Integer> replSet = new GridConcurrentHashSet<>();
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
AtomicInteger cnt = new AtomicInteger();
BiFunction<Ignite, String, Integer> nextPrimaryKey = (ignite, cacheName) -> {
int idx = cnt.getAndIncrement();
return primaryKeys(ignite.getOrCreateCache(cacheName), idx + 1).get(idx);
};
BiConsumer<String, Set<Integer>> singlePutEverywhere = (cacheName, globSet) -> {
try {
Transaction tx = orig.transactions().txStart(concurrency, isolation);
Set<Integer> set = new HashSet<>();
for (Ignite ignite : G.allGrids()) {
if (ignite.configuration().isClientMode())
continue;
set.add(nextPrimaryKey.apply(ignite, cacheName));
}
globSet.addAll(set);
IgniteCache<Integer, Integer> cache = orig.getOrCreateCache(cacheName);
for (Integer key : set)
cache.put(key, key);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
prepLatch.countDown();
commitLatch.await();
if (orig != failed)
((TransactionProxyImpl<?, ?>)tx).commit();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
};
futs.add(multithreadedAsync(() -> singlePutEverywhere.accept(PART_CACHE_NAME, partSet), 1));
futs.add(multithreadedAsync(() -> singlePutEverywhere.accept(REPL_CACHE_NAME, replSet), 1));
Consumer<Integer> putEverywhereToBoth = (putPerTx) -> {
try {
Transaction tx = orig.transactions().txStart(concurrency, isolation);
Set<Integer> pSet = new HashSet<>();
Set<Integer> rSet = new HashSet<>();
for (int i = 0; i < putPerTx; i++)
for (Ignite ignite : G.allGrids()) {
if (ignite.configuration().isClientMode())
continue;
pSet.add(nextPrimaryKey.apply(ignite, PART_CACHE_NAME));
rSet.add(nextPrimaryKey.apply(ignite, REPL_CACHE_NAME));
}
partSet.addAll(pSet);
replSet.addAll(rSet);
IgniteCache<Integer, Integer> pCache = orig.getOrCreateCache(PART_CACHE_NAME);
IgniteCache<Integer, Integer> rCache = orig.getOrCreateCache(REPL_CACHE_NAME);
for (Integer key : pSet)
pCache.put(key, key);
for (Integer key : rSet)
rCache.put(key, key);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
prepLatch.countDown();
commitLatch.await();
if (orig != failed)
((TransactionProxyImpl<?, ?>)tx).commit();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
};
futs.add(multithreadedAsync(() -> putEverywhereToBoth.accept(1), 1));
futs.add(multithreadedAsync(() -> putEverywhereToBoth.accept(2), 1));
futs.add(multithreadedAsync(() -> putEverywhereToBoth.accept(10), 1));
Consumer<Boolean> singleTxPerCell = (partAtBrokenCell) -> {
try {
Transaction tx = orig.transactions().txStart(concurrency, isolation);
Integer pKey = partAtBrokenCell ? nextPrimaryKey.apply(failed, PART_CACHE_NAME) :
nextPrimaryKey.apply(aliveCellNodes.get(0), PART_CACHE_NAME);
Integer rKey = partAtBrokenCell ? nextPrimaryKey.apply(aliveCellNodes.get(0), REPL_CACHE_NAME) :
nextPrimaryKey.apply(failed, REPL_CACHE_NAME);
IgniteCache<Integer, Integer> pCache = orig.getOrCreateCache(PART_CACHE_NAME);
IgniteCache<Integer, Integer> rCache = orig.getOrCreateCache(REPL_CACHE_NAME);
pCache.put(pKey, pKey);
rCache.put(rKey, rKey);
partSet.add(pKey);
replSet.add((rKey));
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
prepLatch.countDown();
commitLatch.await();
if (orig != failed)
((TransactionProxyImpl<?, ?>)tx).commit();
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
};
futs.add(multithreadedAsync(() -> singleTxPerCell.accept(true), 1));
futs.add(multithreadedAsync(() -> singleTxPerCell.accept(false), 1));
prepLatch.await();
assertEquals(futs.size(), recFutsCnt);
failed.close(); // Stopping node.
awaitForSwitchOnNodeLeft(failed);
Consumer<Ignite> partTxRun = (ignite) -> {
try {
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(PART_CACHE_NAME);
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
Integer key = nextPrimaryKey.apply(ignite, PART_CACHE_NAME);
partSet.add(key);
cache.put(key, key);
tx.commit();
}
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
};
Consumer<Ignite> replTxRun = (ignite) -> {
try {
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(REPL_CACHE_NAME);
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
Integer key = nextPrimaryKey.apply(ignite, REPL_CACHE_NAME);
replSet.add(key);
cache.put(key, key);
tx.commit();
}
}
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
};
for (Ignite brokenCellNode : brokenCellNodes) {
futs.add(multithreadedAsync(() -> partTxRun.accept(brokenCellNode), 1));
futs.add(multithreadedAsync(() -> replTxRun.accept(brokenCellNode), 1));
}
for (Ignite aliveCellNode : aliveCellNodes) {
futs.add(multithreadedAsync(() -> partTxRun.accept(aliveCellNode), 1));
futs.add(multithreadedAsync(() -> replTxRun.accept(aliveCellNode), 1));
}
// Allowing recovery.
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
spi.stopBlock(true, blockedMsg -> true);
}
commitLatch.countDown();
for (IgniteInternalFuture<?> fut : futs)
fut.get();
for (Ignite node : G.allGrids()) {
IgniteCache<Integer, Integer> partCache = node.getOrCreateCache(PART_CACHE_NAME);
IgniteCache<Integer, Integer> replCache = node.getOrCreateCache(REPL_CACHE_NAME);
for (Integer key : partSet)
assertEquals(key, partCache.get(key));
for (Integer key : replSet)
assertEquals(key, replCache.get(key));
}
// Final check that any transactions are absent.
checkTransactionsCount(
null, 0,
brokenCellNodes, 0,
aliveCellNodes, 0,
null /*any*/);
}
}