IGNITE-11650 Fixed flaky TcpCommunicationSpiFreezingClientTest test (#9975)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteDevOnlyLogTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteDevOnlyLogTest.java
index 0dd792f..b939771 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteDevOnlyLogTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteDevOnlyLogTest.java
@@ -62,9 +62,9 @@
public void testDevOnlyQuietMessage() throws Exception {
additionalArgs = Collections.singletonList("-D" + IgniteSystemProperties.IGNITE_QUIET + "=true");
- log = new GridStringLogger(false, grid(0).log());
+ GridStringLogger log = new GridStringLogger(false, grid(0).log());
- Ignite ignite = startGrid(1);
+ Ignite ignite = startGrid(optimize(getConfiguration(getTestIgniteInstanceName(1)).setGridLogger(log)));
String msg = getMessage(ignite);
@@ -78,9 +78,9 @@
public void testDevOnlyVerboseMessage() throws Exception {
additionalArgs = Collections.singletonList("-D" + IgniteSystemProperties.IGNITE_QUIET + "=false");
- log = new GridStringLogger(false, grid(0).log());
+ GridStringLogger log = new GridStringLogger(false, grid(0).log());
- Ignite ignite = startGrid(1);
+ Ignite ignite = startGrid(optimize(getConfiguration(getTestIgniteInstanceName(1)).setGridLogger(log)));
String msg = getMessage(ignite);
@@ -99,9 +99,9 @@
additionalArgs = Collections.singletonList("-D" +
IgniteSystemProperties.IGNITE_DEV_ONLY_LOGGING_DISABLED + "=true");
- log = new GridStringLogger(false, grid(0).log());
+ GridStringLogger log = new GridStringLogger(false, grid(0).log());
- Ignite ignite = startGrid(1);
+ Ignite ignite = startGrid(optimize(getConfiguration(getTestIgniteInstanceName(1)).setGridLogger(log)));
String msg = getMessage(ignite);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java
index 1e32a0a..c81a701 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java
@@ -17,170 +17,152 @@
package org.apache.ignite.spi.communication.tcp;
-import java.lang.management.ManagementFactory;
-import java.util.Iterator;
-import javax.cache.Cache;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterTopologyException;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.lang.IgniteCallable;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Tests that freezing due to JVM STW client will be failed if connection can't be established.
*/
+@WithSystemProperty(key = IGNITE_ENABLE_FORCIBLE_NODE_KILL, value = "true")
public class TcpCommunicationSpiFreezingClientTest extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ /** Message to catch GC start on a client. */
+ private static final String GC_START_MSG = "Try to start GC.";
+
+ /** Last GC start time. */
+ private final AtomicLong lastGC = new AtomicLong(Long.MAX_VALUE);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- cfg.setFailureDetectionTimeout(120000);
- cfg.setClientFailureDetectionTimeout(120000);
+ cfg.setFailureDetectionTimeout(getTestTimeout());
+ cfg.setClientFailureDetectionTimeout(getTestTimeout());
TcpCommunicationSpi spi = new TcpCommunicationSpi();
- spi.setConnectTimeout(3000);
- spi.setMaxConnectTimeout(6000);
- spi.setReconnectCount(3);
+ spi.setConnectTimeout(1000);
+ spi.setMaxConnectTimeout(1000);
spi.setIdleConnectionTimeout(100);
spi.setSharedMemoryPort(-1);
- TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
cfg.setCommunicationSpi(spi);
- cfg.setDiscoverySpi(discoSpi);
- cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).setWriteSynchronizationMode(FULL_SYNC).
- setCacheMode(PARTITIONED).setAtomicityMode(ATOMIC));
+ ListeningTestLogger log = new ListeningTestLogger(GridAbstractTest.log);
+
+ log.registerListener((s) -> {
+ if (s.contains(GC_START_MSG))
+ lastGC.set(System.currentTimeMillis());
+ });
+
+ cfg.setGridLogger(log);
return cfg;
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
- }
-
- /** {@inheritDoc} */
@Override protected boolean isMultiJvm() {
return true;
}
- /**
- * @throws Exception If failed.
- */
+ /** @throws Exception If failed. */
@Test
public void testFreezingClient() throws Exception {
- try {
- final IgniteEx srv = startGrids(2);
+ Ignite srv = startGrid(0);
+ Ignite client = startClientGrid("client");
+ IgniteCompute compute = srv.compute(srv.cluster().forNode(client.cluster().localNode())).withNoFailover();
- final IgniteEx client = startClientGrid(3);
+ // Close communication connections by idle and trigger STW on the client.
+ compute.runAsync(() -> {
+ waitConnectionsClosed(Ignition.localIgnite());
- final int keysCnt = 100_000;
+ triggerSTW();
+ });
- try (IgniteDataStreamer<Integer, byte[]> streamer = srv.dataStreamer(DEFAULT_CACHE_NAME)) {
- for (int i = 0; i < keysCnt; i++)
- streamer.addData(i, new byte[512]);
+ while (!Thread.interrupted()) {
+ // Make sure connections closed on the server.
+ waitConnectionsClosed(srv);
+
+ // Make sure that the client is freezed by STW.
+ assertTrue(waitForCondition(() -> System.currentTimeMillis() - lastGC.get() > 1000, getTestTimeout()));
+
+ // Open new connection to the freezed client. Retry if client has completed GC and was not freezed.
+ try {
+ compute.run(() -> {});
}
-
- // Wait for connections go idle.
- doSleep(1000);
-
- srv.compute(srv.cluster().forNode(client.localNode())).withNoFailover().call(new ClientClosure());
-
- fail("Client node must be kicked from topology");
+ catch (ClusterTopologyException ignored) {
+ break;
+ }
}
- catch (ClusterTopologyException e) {
- // Expected.
- e.printStackTrace();
+ assertEquals(1, srv.cluster().nodes().size());
+ }
- System.out.println(e);
- }
- finally {
- stopAllGrids();
+ /** Triggers STW. */
+ private void triggerSTW() {
+ long end = System.currentTimeMillis() + getTestTimeout();
+
+ while (!Thread.interrupted() && (System.currentTimeMillis() < end)) {
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(this::simulateLoad);
+
+ while (!fut.isDone()) {
+ System.out.println(GC_START_MSG);
+
+ GridTestUtils.runGC();
+ }
}
}
- /** */
- public static class ClientClosure implements IgniteCallable<Integer> {
- /** */
- private static final long serialVersionUID = 0L;
+ /** Simulate load without safepoints to block GC. */
+ public double simulateLoad() {
+ double d = 0;
- /** */
- @IgniteInstanceResource
- private transient Ignite ignite;
+ for (int i = 0; i < Integer.MAX_VALUE; i++)
+ d += Math.log(Math.PI * i);
- /** */
- @LoggerResource
- private IgniteLogger log;
+ return d;
+ }
- /** {@inheritDoc} */
- @Override public Integer call() throws Exception {
- Thread loadThread = new Thread(() -> log.info("result = " + simulateLoad()));
+ /** Waits for all communication connections closed by idle. */
+ private void waitConnectionsClosed(Ignite node) {
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
+ Map<UUID, GridCommunicationClient[]> clientsMap = GridTestUtils.getFieldValue(spi, "clientPool", "clients");
- loadThread.setName("load-thread");
- loadThread.start();
+ try {
+ assertTrue(waitForCondition(() -> {
+ for (GridCommunicationClient[] clients : clientsMap.values()) {
+ if (clients == null)
+ continue;
- int cnt = 0;
+ for (GridCommunicationClient client : clients) {
+ if (client != null)
+ return false;
+ }
+ }
- final Iterator<Cache.Entry<Integer, byte[]>> it = ignite.cache(DEFAULT_CACHE_NAME).
- query(new ScanQuery<Integer, byte[]>().setPageSize(100000)).iterator();
-
- while (it.hasNext()) {
- Cache.Entry<Integer, byte[]> entry = it.next();
-
- // Trigger STW.
- final long[] tids = ManagementFactory.getThreadMXBean().findDeadlockedThreads();
-
- cnt++;
- }
-
- loadThread.join();
-
- return cnt;
+ return true;
+ }, getTestTimeout()));
}
-
- /**
- *
- */
- public static double simulateLoad() {
- double d = 0;
-
- for (int i = 0; i < 1000000000; i++)
- d += Math.log(Math.PI * i);
-
- return d;
+ catch (IgniteInterruptedCheckedException e) {
+ throw U.convertException(e);
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 615b9ad..b895ab6 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1409,7 +1409,7 @@
}
}
- return new IgniteProcessProxy(cfg, log, (x) -> grid(0), resetDiscovery, additionalRemoteJvmArgs());
+ return new IgniteProcessProxy(cfg, cfg.getGridLogger(), (x) -> grid(0), resetDiscovery, additionalRemoteJvmArgs());
}
/**