blob: 471ca58bc2e52f1b63a372d73f46caaeb821c86f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.ExchangeActions.CacheActionData;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.RunnableX;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
/**
*
*/
public abstract class CacheBlockOnReadAbstractTest extends GridCommonAbstractTest {
/** Default cache entries count. */
private static final int DFLT_CACHE_ENTRIES_CNT = 2 * 1024;
/** Ip finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** List of baseline nodes started at the beginning of the test. */
protected final List<IgniteEx> baseline = new CopyOnWriteArrayList<>();
/** List of server nodes started at the beginning of the test. */
protected final List<IgniteEx> srvs = new CopyOnWriteArrayList<>();
/** List of client nodes started at the beginning of the test. */
protected final List<IgniteEx> clients = new CopyOnWriteArrayList<>();
/** Start node in client mode. */
private volatile boolean startNodesInClientMode;
/** Latch that is used to wait until all required messages are blocked. */
private volatile CountDownLatch cntFinishedReadOperations;
/** Custom ip finder. Replaces {@link #IP_FINDER} if present at the moment of node starting. */
private volatile TcpDiscoveryIpFinder customIpFinder;
/** Discovery message processor. Used in every started node. */
private volatile BiConsumer<TcpDiscoveryAbstractMessage, String> discoveryMsgProcessor;
/**
* Number of baseline servers to start before test.
*
* @see Params#baseline()
*/
protected int baselineServersCount() {
return currentTestParams().baseline();
}
/**
* Number of non-baseline servers to start before test.
*
* @see Params#servers()
*/
protected int serversCount() {
return currentTestParams().servers();
}
/**
* Number of clients to start before test.
*
* @see Params#clients()
*/
protected int clientsCount() {
return currentTestParams().clients();
}
/**
* Number of backups to configure in caches by default.
*/
protected int backupsCount() {
return Math.min(3, baselineServersCount() - 1);
}
/**
* Number of milliseconds to warmup reading process. Used to lower fluctuations in run time. Might be 0.
*
* @see Params#warmup()
*/
protected long warmup() {
return currentTestParams().warmup();
}
/**
* Number of milliseconds to wait on the potentially blocking operation.
*
* @see Params#timeout()
*/
protected long timeout() {
return currentTestParams().timeout();
}
/**
* Cache atomicity mode.
*
* @see Params#atomicityMode()
*/
protected CacheAtomicityMode atomicityMode() {
return currentTestParams().atomicityMode();
}
/**
* Cache mode.
*
* @see Params#cacheMode()
*/
protected CacheMode cacheMode() {
return currentTestParams().cacheMode();
}
/**
* Whether allowing {@link ClusterTopologyCheckedException} as the valid reading result or not.
*
* @see Params#allowException()
*/
protected boolean allowException() {
return currentTestParams().allowException();
}
/**
* @param startNodesInClientMode Start nodes on client mode.
*/
public void startNodesInClientMode(boolean startNodesInClientMode) {
this.startNodesInClientMode = startNodesInClientMode;
}
/** List of baseline nodes started at the beginning of the test. */
public List<? extends IgniteEx> baseline() {
return baseline;
}
/** List of server nodes started at the beginning of the test. */
public List<? extends IgniteEx> servers() {
return srvs;
}
/** List of client nodes started at the beginning of the test. */
public List<? extends IgniteEx> clients() {
return clients;
}
/**
* Annotation to configure test methods in {@link CacheBlockOnReadAbstractTest}. Its values are used throughout
* test implementation.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Params {
/**
* Number of baseline servers to start before test.
*/
int baseline() default 3;
/**
* Number of non-baseline servers to start before test.
*/
int servers() default 1;
/**
* Number of clients to start before test.
*/
int clients() default 1;
/**
* Number of milliseconds to warmup reading process. Used to lower fluctuations in run time. Might be 0.
*/
long warmup() default 2000L;
/**
* Number of milliseconds to wait on the potentially blocking operation.
*/
long timeout() default 3000L;
/**
* Cache atomicity mode.
*/
CacheAtomicityMode atomicityMode();
/**
* Cache mode.
*/
CacheMode cacheMode();
/**
* Whether allowing {@link ClusterTopologyCheckedException} as the valid reading result or not.
*/
boolean allowException() default false;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setConsistentId(igniteInstanceName);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
cfg.setDiscoverySpi(new TestTcpDiscoverySpi() {
/** {@inheritDoc} */
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (discoveryMsgProcessor != null)
discoveryMsgProcessor.accept(msg, igniteInstanceName);
}
}.setIpFinder(customIpFinder == null ? IP_FINDER : customIpFinder));
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
)
);
cfg.setClientMode(startNodesInClientMode);
return cfg;
}
/** {@inheritDoc} */
@Override public void beforeTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
// Checking prerequisites.
assertTrue("Positive timeout is required for the test.", timeout() > 0);
assertTrue("No baseline servers were requested.", baselineServersCount() > 0);
int idx = 0;
// Start baseline nodes.
for (int i = 0; i < baselineServersCount(); i++)
baseline.add(startGrid(idx++));
// Activate cluster.
baseline.get(0).cluster().active(true);
// Start server nodes in activated cluster.
for (int i = 0; i < serversCount(); i++)
srvs.add(startGrid(idx++));
// Start client nodes.
startNodesInClientMode(true);
customIpFinder = new TcpDiscoveryVmIpFinder(false)
.setAddresses(
Collections.singletonList("127.0.0.1:47500")
);
for (int i = 0; i < clientsCount(); i++)
clients.add(startGrid(idx++));
customIpFinder = null;
}
/** {@inheritDoc} */
@Override public void afterTest() throws Exception {
baseline.clear();
srvs.clear();
clients.clear();
grid(0).cluster().active(false);
stopAllGrids();
cleanPersistenceDir();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testCreateCacheAtomicPartitioned() throws Exception {
testCreateCacheTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testCreateCacheAtomicReplicated() throws Exception {
testCreateCacheTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testCreateCacheTransactionalPartitioned() throws Exception {
testCreateCacheTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
@Test
public void testCreateCacheTransactionalReplicated() throws Exception {
doTest(
asMessagePredicate(CacheBlockOnReadAbstractTest::createCachePredicate),
() -> baseline.get(0).createCache(UUID.randomUUID().toString())
);
}
/**
* @throws Exception If failed.
*/
@Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testDestroyCacheAtomicPartitioned() throws Exception {
testDestroyCacheTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testDestroyCacheAtomicReplicated() throws Exception {
testDestroyCacheTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testDestroyCacheTransactionalPartitioned() throws Exception {
testDestroyCacheTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
@Test
public void testDestroyCacheTransactionalReplicated() throws Exception {
List<String> cacheNames = new ArrayList<>(Arrays.asList(
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
UUID.randomUUID().toString())
);
for (String cacheName : cacheNames)
baseline.get(0).createCache(cacheName);
doTest(
asMessagePredicate(CacheBlockOnReadAbstractTest::destroyCachePredicate),
() -> baseline.get(0).destroyCache(cacheNames.remove(0))
);
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testStartServerAtomicPartitioned() throws Exception {
testStartServerTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testStartServerAtomicReplicated() throws Exception {
testStartServerTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testStartServerTransactionalPartitioned() throws Exception {
testStartServerTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
@Test
public void testStartServerTransactionalReplicated() throws Exception {
startNodesInClientMode(false);
doTest(
asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_JOINED),
() -> startGrid(UUID.randomUUID().toString())
);
}
/**
* @throws Exception If failed.
*/
@Params(servers = 4, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testStopServerAtomicPartitioned() throws Exception {
testStopServerTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(servers = 4, atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testStopServerAtomicReplicated() throws Exception {
testStopServerTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(servers = 4, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testStopServerTransactionalPartitioned() throws Exception {
testStopServerTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(servers = 4, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
@Test
public void testStopServerTransactionalReplicated() throws Exception {
doTest(
asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_LEFT),
() -> stopGrid(srvs.remove(srvs.size() - 1).name())
);
}
/**
* @throws Exception If failed.
*/
@Params(baseline = 4, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testRestartBaselineAtomicPartitioned() throws Exception {
testRestartBaselineTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(baseline = 4, atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testRestartBaselineAtomicReplicated() throws Exception {
testRestartBaselineTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(baseline = 4, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testRestartBaselineTransactionalPartitioned() throws Exception {
testRestartBaselineTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(baseline = 4, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
@Test
public void testRestartBaselineTransactionalReplicated() throws Exception {
doTest(
asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_JOINED),
() -> {
IgniteEx node = baseline.get(baseline.size() - 1);
TestRecordingCommunicationSpi.spi(node).stopBlock();
stopGrid(node.name());
for (int i = 0; i < baselineServersCount() - 2; i++)
cntFinishedReadOperations.countDown();
startGrid(node.name());
}
);
}
/**
* @throws Exception If failed.
*/
@Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testUpdateBaselineTopologyAtomicPartitioned() throws Exception {
testUpdateBaselineTopologyTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testUpdateBaselineTopologyAtomicReplicated() throws Exception {
testUpdateBaselineTopologyTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testUpdateBaselineTopologyTransactionalPartitioned() throws Exception {
testUpdateBaselineTopologyTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
@Test
public void testUpdateBaselineTopologyTransactionalReplicated() throws Exception {
doTest(
asMessagePredicate(discoEvt -> {
if (discoEvt instanceof DiscoveryCustomEvent) {
DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt;
DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage();
return customMsg instanceof ChangeGlobalStateMessage;
}
return false;
}),
() -> {
startNodesInClientMode(false);
IgniteEx ignite = startGrid(UUID.randomUUID().toString());
baseline.get(0).cluster().setBaselineTopology(baseline.get(0).context().discovery().topologyVersion());
baseline.add(ignite);
}
);
}
/**
* @throws Exception If failed.
*/
@Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testStopBaselineAtomicPartitioned() throws Exception {
testStopBaselineTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testStopBaselineAtomicReplicated() throws Exception {
testStopBaselineTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testStopBaselineTransactionalPartitioned() throws Exception {
testStopBaselineTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
@Test
public void testStopBaselineTransactionalReplicated() throws Exception {
AtomicInteger cntDownCntr = new AtomicInteger(0);
doTest(
asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_LEFT),
() -> {
IgniteEx node = baseline.get(baseline.size() - cntDownCntr.get() - 1);
TestRecordingCommunicationSpi.spi(node).stopBlock();
cntDownCntr.incrementAndGet();
for (int i = 0; i < cntDownCntr.get(); i++)
cntFinishedReadOperations.countDown(); // This node and previously stopped nodes as well.
stopGrid(node.name());
}
);
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testStartClientAtomicPartitioned() throws Exception {
testStartClientTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testStartClientAtomicReplicated() throws Exception {
testStartClientTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testStartClientTransactionalPartitioned() throws Exception {
testStartClientTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
@Test
public void testStartClientTransactionalReplicated() throws Exception {
doTest(
TcpDiscoveryNodeAddFinishedMessage.class,
() -> {
startNodesInClientMode(true);
customIpFinder = new TcpDiscoveryVmIpFinder(false)
.setAddresses(
Collections.singletonList("127.0.0.1:47502")
);
try {
startGrid(UUID.randomUUID().toString());
}
finally {
customIpFinder = null;
}
}
);
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
@Test
public void testStopClientAtomicPartitioned() throws Exception {
testStopClientTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
@Test
public void testStopClientAtomicReplicated() throws Exception {
testStopClientTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
@Test
public void testStopClientTransactionalPartitioned() throws Exception {
testStopClientTransactionalReplicated();
}
/**
* @throws Exception If failed.
*/
@Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED, timeout = 5_000L)
@Test
public void testStopClientTransactionalReplicated() throws Exception {
startNodesInClientMode(true);
customIpFinder = new TcpDiscoveryVmIpFinder(false)
.setAddresses(
Collections.singletonList("127.0.0.1:47502")
);
for (int i = 0; i < 3; i++)
clients.add(startGrid(UUID.randomUUID().toString()));
customIpFinder = null;
doTest(
TcpDiscoveryNodeLeftMessage.class,
() -> stopGrid(clients.remove(clients.size() - 1).name())
);
}
/**
* Checks that given discovery event is from "Create cache" operation.
*
* @param discoEvt Discovery event.
*/
private static boolean createCachePredicate(DiscoveryEvent discoEvt) {
if (discoEvt instanceof DiscoveryCustomEvent) {
DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt;
DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage();
if (customMsg instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)customMsg;
ExchangeActions exchangeActions = U.field(cacheChangeBatch, "exchangeActions");
Collection<CacheActionData> startRequests = exchangeActions.cacheStartRequests();
return !startRequests.isEmpty();
}
}
return false;
}
/**
* Checks that given discovery event is from "Destroy cache" operation.
*
* @param discoEvt Discovery event.
*/
private static boolean destroyCachePredicate(DiscoveryEvent discoEvt) {
if (discoEvt instanceof DiscoveryCustomEvent) {
DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt;
DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage();
if (customMsg instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)customMsg;
ExchangeActions exchangeActions = U.field(cacheChangeBatch, "exchangeActions");
Collection<CacheActionData> stopRequests = exchangeActions.cacheStopRequests();
return !stopRequests.isEmpty();
}
}
return false;
}
/**
* Read operation tat is going to be executed during blocking operation.
*/
@NotNull protected abstract CacheReadBackgroundOperation getReadOperation();
/**
* Checks that {@code block} closure doesn't block read operation.
* Does it for client, baseline and regular server node.
*
* @param blockMsgPred Predicate that check whether the message corresponds to the {@code block} or not.
* @param block Blocking operation.
* @throws Exception If failed.
*/
public void doTest(Predicate<Message> blockMsgPred, RunnableX block) throws Exception {
BackgroundOperation backgroundOperation = new BlockMessageOnBaselineBackgroundOperation(
block,
blockMsgPred
);
doTest(backgroundOperation);
}
/**
* Checks that {@code block} closure doesn't block read operation.
* Does it for client, baseline and regular server node.
*
* @param blockMsgCls Class of discovery message to block.
* @param block Blocking operation.
* @throws Exception If failed.
*/
public void doTest(Class<? extends TcpDiscoveryAbstractMessage> blockMsgCls, RunnableX block) throws Exception {
BlockDiscoveryMessageBackgroundOperation backgroundOperation = new BlockDiscoveryMessageBackgroundOperation(
block,
blockMsgCls
);
doTest(backgroundOperation);
}
/**
* Checks that {@code block} closure doesn't block read operation.
* Does it for client, baseline and regular server node.
*
* @param backgroundOperation Background operation.
* @throws Exception If failed.
*/
public void doTest(BackgroundOperation backgroundOperation) throws Exception {
CacheReadBackgroundOperation<?, ?> readOperation = getReadOperation();
readOperation.initCache(baseline.get(0), true);
// Warmup.
if (warmup() > 0) {
try (AutoCloseable read = readOperation.start()) {
Thread.sleep(warmup());
}
assertEquals(
readOperation.readOperationsFailed() + " read operations failed during warmup.",
0,
readOperation.readOperationsFailed()
);
assertTrue(
"No read operations were finished during warmup.",
readOperation.readOperationsFinishedUnderBlock() > 0
);
}
doTest0(clients.get(0), readOperation, backgroundOperation);
doTest0(srvs.get(0), readOperation, backgroundOperation);
doTest0(baseline.get(0), readOperation, backgroundOperation);
try (AutoCloseable read = readOperation.start()) {
Thread.sleep(500L);
}
assertEquals(
readOperation.readOperationsFailed() + " read operations failed during finish stage.",
0,
readOperation.readOperationsFailed()
);
assertTrue(
"No read operations were finished during finish stage.",
readOperation.readOperationsFinishedUnderBlock() > 0
);
}
/**
* Internal part for {@link CacheBlockOnReadAbstractTest#doTest(Predicate, RunnableX)}.
*
* @param ignite Ignite instance. Client / baseline / server node.
* @param readOperation Read operation.
* @param backgroundOperation Background operation.
*/
private void doTest0(
IgniteEx ignite,
CacheReadBackgroundOperation<?, ?> readOperation,
BackgroundOperation backgroundOperation
) throws Exception {
// Reinit internal cache state with given ignite instance.
readOperation.initCache(ignite, false);
cntFinishedReadOperations = new CountDownLatch(baseline.size() - 1);
// Read while potentially blocking operation is executing.
try (AutoCloseable block = backgroundOperation.start()) {
cntFinishedReadOperations.await(5 * timeout(), TimeUnit.MILLISECONDS);
// Possible if test itself is wrong.
assertEquals("Messages weren't blocked in time", 0, cntFinishedReadOperations.getCount());
try (AutoCloseable read = readOperation.start()) {
Thread.sleep(timeout());
}
}
finally {
cntFinishedReadOperations = null;
}
log.info("Operations finished: " + readOperation.readOperationsFinishedUnderBlock());
log.info("Longest operation took " + readOperation.maxReadDuration() + "ms");
// None of read operations should fail.
assertEquals(
readOperation.readOperationsFailed() + " read operations failed.",
0,
readOperation.readOperationsFailed()
);
assertTrue(
"No read operations were finished during timeout.",
readOperation.readOperationsFinishedUnderBlock() > 0
);
// There were no operations as long as blocking timeout.
assertNotAlmostEqual(timeout(), readOperation.maxReadDuration());
// On average every read operation was much faster then blocking timeout.
double avgDuration = (double)timeout() / readOperation.readOperationsFinishedUnderBlock();
assertTrue("Avarage duration was too long.",avgDuration < timeout() * 0.25);
}
/**
* Utility class that allows to start and stop some background operation many times.
*/
protected abstract static class BackgroundOperation {
/** */
private IgniteInternalFuture<?> fut;
/**
* Invoked strictly before background thread is started.
*/
protected void init() {
// No-op.
}
/**
* Operation itself. Will be executed in separate thread. Thread interruption has to be considered as a valid
* way to stop operation.
*/
protected abstract void execute();
/**
* @return Allowed time to wait in {@link BackgroundOperation#stop()} method before canceling background thread.
*/
protected abstract long stopTimeout();
/**
* Start separate thread and execute method {@link BackgroundOperation#execute()} in it.
*
* @return {@link AutoCloseable} that invokes {@link BackgroundOperation#stop()} on closing.
*/
AutoCloseable start() {
if (fut != null)
throw new UnsupportedOperationException("Only one simultanious operation is allowed");
init();
CountDownLatch threadStarted = new CountDownLatch(1);
fut = GridTestUtils.runAsync(() -> {
try {
threadStarted.countDown();
execute();
}
catch (Exception e) {
throw new IgniteException("Unexpected exception in background operation thread", e);
}
});
try {
threadStarted.await();
}
catch (InterruptedException e) {
try {
fut.cancel();
}
catch (IgniteCheckedException e1) {
e.addSuppressed(e1);
}
throw new IgniteException(e);
}
return this::stop;
}
/**
* Interrupt the operation started in {@link BackgroundOperation#start()} method and join interrupted thread.
*/
void stop() throws Exception {
if (fut == null)
return;
try {
fut.get(stopTimeout());
}
catch (IgniteFutureTimeoutCheckedException e) {
fut.cancel();
fut.get();
}
finally {
fut = null;
}
}
}
/**
* @param discoEvtPred Predicate that tests discovery events.
* @return New predicate that test any message based on {@code discoEvtPred} predicate.
*/
public static Predicate<Message> asMessagePredicate(Predicate<DiscoveryEvent> discoEvtPred) {
return msg -> {
if (msg instanceof GridDhtPartitionsFullMessage) {
GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage)msg;
GridDhtPartitionExchangeId exchangeId = fullMsg.exchangeId();
if (exchangeId != null)
return discoEvtPred.test(U.field(exchangeId, "discoEvt"));
}
return false;
};
}
/**
* Background operation that executes some node request and doesn't allow its exchange messages to be fully
* processed until operation is stopped.
*/
protected class BlockMessageOnBaselineBackgroundOperation extends BackgroundOperation {
/** */
private final RunnableX block;
/** */
private final Predicate<Message> blockMsg;
/**
* @param block Blocking operation.
* @param blockMsgPred Predicate that checks whether to block message or not.
*
* @see BlockMessageOnBaselineBackgroundOperation#blockMessage(ClusterNode, Message)
*/
protected BlockMessageOnBaselineBackgroundOperation(
RunnableX block,
Predicate<Message> blockMsgPred
) {
this.block = block;
blockMsg = blockMsgPred;
}
/** {@inheritDoc} */
@Override protected void execute() {
for (IgniteEx server : baseline) {
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(server);
spi.blockMessages(this::blockMessage);
}
block.run();
}
/**
* Function to pass into {@link TestRecordingCommunicationSpi#blockMessages(IgniteBiPredicate)}.
*
* @param node Node that receives message.
* @param msg Message.
* @return Whether the given message should be blocked or not.
*/
private boolean blockMessage(ClusterNode node, Message msg) {
boolean block = blockMsg.test(msg)
&& baseline.stream().map(IgniteEx::name).anyMatch(node.consistentId()::equals);
if (block)
cntFinishedReadOperations.countDown();
return block;
}
/** {@inheritDoc} */
@Override protected long stopTimeout() {
// Should be big enough so thread will stop by it's own. Otherwise test will fail, but that's fine.
return 30_000L;
}
/** {@inheritDoc} */
@Override void stop() throws Exception {
for (IgniteEx server : baseline) {
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(server);
spi.stopBlock();
}
super.stop();
}
}
/**
* Background operation that executes some node request and doesn't allow its discovery messages to be fully
* processed until operation is stopped.
*/
protected class BlockDiscoveryMessageBackgroundOperation extends BackgroundOperation {
/** */
private final RunnableX block;
/** */
private final Class<? extends TcpDiscoveryAbstractMessage> blockMsgCls;
/** */
private volatile CountDownLatch blockLatch;
/**
* @param block Blocking operation.
* @param blockMsgCls Class of message to block.
*
* @see BlockMessageOnBaselineBackgroundOperation#blockMessage(ClusterNode, Message)
*/
protected BlockDiscoveryMessageBackgroundOperation(
RunnableX block,
Class<? extends TcpDiscoveryAbstractMessage> blockMsgCls
) {
this.block = block;
this.blockMsgCls = blockMsgCls;
}
/** {@inheritDoc} */
@Override protected void execute() {
try {
blockLatch = new CountDownLatch(1);
discoveryMsgProcessor = this::processMessage;
for (int i = 0; i < baselineServersCount() - 2; i++)
cntFinishedReadOperations.countDown();
block.run();
}
finally {
discoveryMsgProcessor = null;
}
}
/**
* Process discovery spi message.
*
* @param msg Message.
* @param receiverConsistentId Consistent ID of message receiver.
*/
private void processMessage(TcpDiscoveryAbstractMessage msg, String receiverConsistentId) {
if (!blockMsgCls.isInstance(msg))
return;
boolean baselineSnd = Objects.equals(
baseline.get(1).localNode().consistentId(),
receiverConsistentId
);
if (baselineSnd) {
cntFinishedReadOperations.countDown();
try {
blockLatch.await();
}
catch (InterruptedException ignore) {
}
}
}
/** {@inheritDoc} */
@Override protected long stopTimeout() {
// Should be big enough so thread will stop by it's own. Otherwise test will fail, but that's fine.
return 30_000L;
}
/** {@inheritDoc} */
@Override void stop() throws Exception {
blockLatch.countDown();
super.stop();
}
}
/**
* {@link BackgroundOperation} implementation for cache reading operations.
*/
protected abstract class ReadBackgroundOperation extends BackgroundOperation {
/** Counter for successfully finished operations. */
private final AtomicInteger readOperationsFinishedUnderBlock = new AtomicInteger();
/** Counter for failed operations. */
private final AtomicInteger readOperationsFailed = new AtomicInteger();
/** Duration of the longest read operation. */
private final AtomicLong maxReadDuration = new AtomicLong(-1);
/**
* Do single iteration of reading operation. Will be executed in a loop.
*/
protected abstract void doRead() throws Exception;
/** {@inheritDoc} */
@Override protected void init() {
readOperationsFinishedUnderBlock.set(0);
readOperationsFailed.set(0);
maxReadDuration.set(-1);
}
/** {@inheritDoc} */
@Override protected void execute() {
Set<String> loggedMessages = new HashSet<>();
while (!Thread.currentThread().isInterrupted()) {
long prevTs = System.currentTimeMillis();
try {
doRead();
readOperationsFinishedUnderBlock.incrementAndGet();
}
catch (Exception e) {
boolean threadInterrupted = X.hasCause(e,
InterruptedException.class,
IgniteInterruptedException.class,
IgniteInterruptedCheckedException.class
);
if (threadInterrupted)
Thread.currentThread().interrupt();
else if (allowException() && X.hasCause(e, ClusterTopologyCheckedException.class))
readOperationsFinishedUnderBlock.incrementAndGet();
else {
readOperationsFailed.incrementAndGet();
if (loggedMessages.add(e.getMessage()))
log.error("Error during read operation execution", e);
continue;
}
}
maxReadDuration.set(Math.max(maxReadDuration.get(), System.currentTimeMillis() - prevTs));
}
}
/** {@inheritDoc} */
@Override protected long stopTimeout() {
return 0;
}
/**
* @return Number of successfully finished operations.
*/
public int readOperationsFinishedUnderBlock() {
return readOperationsFinishedUnderBlock.get();
}
/**
* @return Number of failed operations.
*/
public int readOperationsFailed() {
return readOperationsFailed.get();
}
/**
* @return Duration of the longest read operation.
*/
public long maxReadDuration() {
return maxReadDuration.get();
}
}
/**
*
*/
protected abstract class CacheReadBackgroundOperation<KeyType, ValueType> extends ReadBackgroundOperation {
/**
* {@link CacheReadBackgroundOperation#cache()} method backing field. Updated on each
* {@link CacheReadBackgroundOperation#initCache(IgniteEx, boolean)} invocation.
*/
private IgniteCache<KeyType, ValueType> cache;
/**
* Reinit internal cache using passed ignite instance and fill it with data if required.
*
* @param ignite Node to get or create cache from.
* @param fillData Whether the cache should be filled with new data or not.
*/
public void initCache(IgniteEx ignite, boolean fillData) {
cache = ignite.getOrCreateCache(
createCacheConfiguration()
.setAtomicityMode(atomicityMode())
.setCacheMode(cacheMode())
);
if (fillData) {
try (IgniteDataStreamer<KeyType, ValueType> dataStreamer = ignite.dataStreamer(cache.getName())) {
dataStreamer.allowOverwrite(true);
for (int i = 0; i < entriesCount(); i++)
dataStreamer.addData(createKey(i), createValue(i));
}
}
}
/**
* @return Cache configuration.
*/
protected CacheConfiguration<KeyType, ValueType> createCacheConfiguration() {
return new CacheConfiguration<KeyType, ValueType>(DEFAULT_CACHE_NAME)
.setBackups(backupsCount())
.setAffinity(
new RendezvousAffinityFunction()
.setPartitions(32)
);
}
/**
* @return Current cache.
*/
protected final IgniteCache<KeyType, ValueType> cache() {
return cache;
}
/**
* @return Count of cache entries to create in {@link CacheReadBackgroundOperation#initCache(IgniteEx, boolean)}
* method.
*/
protected int entriesCount() {
return DFLT_CACHE_ENTRIES_CNT;
}
/**
* @param idx Unique number.
* @return Key to be used for inserting into cache.
* @see CacheReadBackgroundOperation#createValue(int)
*/
protected abstract KeyType createKey(int idx);
/**
* @param idx Unique number.
* @return Value to be used for inserting into cache.
* @see CacheReadBackgroundOperation#createKey(int)
*/
protected abstract ValueType createValue(int idx);
}
/**
* {@link CacheReadBackgroundOperation} implementation for (int -> int) cache. Keys and values are equal by default.
*/
protected abstract class IntCacheReadBackgroundOperation
extends CacheReadBackgroundOperation<Integer, Integer> {
/** {@inheritDoc} */
@Override protected Integer createKey(int idx) {
return idx;
}
/** {@inheritDoc} */
@Override protected Integer createValue(int idx) {
return idx;
}
}
/**
* @return {@link Params} annotation object from the current test method.
*/
protected Params currentTestParams() {
Params params = currentTestAnnotation(Params.class);
assertNotNull("Test " + getName() + " is not annotated with @Param annotation.", params);
return params;
}
/**
* Assert that two numbers are close to each other.
*/
private static void assertAlmostEqual(long exp, long actual) {
assertTrue(String.format("Numbers differ too much [exp=%d, actual=%d]", exp, actual), almostEqual(exp, actual));
}
/**
* Assert that two numbers are not close to each other.
*/
private static void assertNotAlmostEqual(long exp, long actual) {
assertFalse(String.format("Numbers are almost equal [exp=%d, actual=%d]", exp, actual), almostEqual(exp, actual));
}
/**
* Check that two numbers are close to each other.
*/
private static boolean almostEqual(long exp, long actual) {
double rel = (double)(actual - exp) / exp;
return Math.abs(rel) < 0.05;
}
}