blob: 540f34334b49b86e9cc23e52e2033493ecd9db3e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.cache;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
* Advanced coordinator failure scenarios during PME.
public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstractTest {
/** */
private static final String CACHE_NAME = "cache";
/** Coordinator node name. */
private static final String CRD_NONE = "crd";
/** */
private volatile Supplier<TcpCommunicationSpi> spiFactory = TcpCommunicationSpi::new;
/** */
private boolean newCaches = true;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
new CacheConfiguration(CACHE_NAME)
.setAffinity(new RendezvousAffinityFunction(false, 32))
// Add cache that exists only on coordinator node.
if (newCaches && igniteInstanceName.equals(CRD_NONE)) {
IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName);
new CacheConfiguration(CACHE_NAME + 0)
.setAffinity(new RendezvousAffinityFunction(false, 32))
return cfg;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 60 * 1000L;
* Tests that new coordinator is able to finish old exchanges in case of in-complete coordinator initialization.
public void testNewCoordinatorCompletedExchange() throws Exception {
spiFactory = TestRecordingCommunicationSpi::new;
IgniteEx crd = startGrid(CRD_NONE);
IgniteEx newCrd = startGrid(1);
// 3 node join topology version.
AffinityTopologyVersion joinThirdNodeVer = new AffinityTopologyVersion(3, 0);
// 4 node join topology version.
AffinityTopologyVersion joinFourNodeVer = new AffinityTopologyVersion(4, 0);
// Block FullMessage for newly joined nodes.
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd);
final CountDownLatch sndFullMsgLatch = new CountDownLatch(1);
// Delay sending full message to newly joined nodes.
spi.blockMessages((node, msg) -> {
if (msg instanceof GridDhtPartitionsFullMessage && node.order() > 2) {
try {
catch (Throwable ignored) { }
return true;
return false;
IgniteInternalFuture joinTwoNodesFut = GridTestUtils.runAsync(() -> startGridsMultiThreaded(2, 2));
GridCachePartitionExchangeManager exchangeMgr = newCrd.context().cache().context().exchange();
// Wait till new coordinator finishes third node join exchange.
() -> exchangeMgr.readyAffinityVersion().compareTo(joinThirdNodeVer) >= 0,
IgniteInternalFuture startLastNodeFut = GridTestUtils.runAsync(() -> startGrid(5));
// Wait till new coordinator starts third node join exchange.
() -> exchangeMgr.lastTopologyFuture().initialVersion().compareTo(joinFourNodeVer) >= 0,
IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid(CRD_NONE, true, false));
// Magic sleep to make sure that coordinator stop process has started.
// Resume full messages sending to unblock coordinator stopping process.
// Coordinator stop should succeed.
// Nodes join should succeed.
// Check that all caches are operable.
for (Ignite grid : G.allGrids()) {
IgniteCache cache = grid.cache(CACHE_NAME);
cache.put(0, 0);
* Test checks that delayed full messages are processed correctly in case of changed coordinator.
* @throws Exception If failed.
public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws Exception {
spiFactory = TestRecordingCommunicationSpi::new;
IgniteEx crd = startGrid(CRD_NONE);
IgniteEx newCrd = startGrid(1);
IgniteEx problemNode = startGrid(2);
blockSendingFullMessage(crd, node -> node.equals(problemNode.localNode()));
IgniteInternalFuture joinNextNodeFut = GridTestUtils.runAsync(() -> startGrid(3));
blockSendingFullMessage(newCrd, node -> node.equals(problemNode.localNode()));
IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid(CRD_NONE));
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(newCrd);
* Test that exchange coordinator initialized correctly in case of exchanges merge and caches without affinity nodes.
* @throws Exception If failed.
public void testCoordinatorChangeAfterExchangesMerge() throws Exception {
// Delay demand messages sending to suspend late affinity assignment.
spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
final int delay = 5_000;
if (msg instanceof GridDhtPartitionDemandMessage) {
GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage) msg;
if (demandMsg.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
return 0;
return delay;
return 0;
final IgniteEx crd = startGrid(CRD_NONE);
for (int k = 0; k < 1024; k++)
crd.cache(CACHE_NAME).put(k, k);
// Delay sending single messages to ensure exchanges are merged.
spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
final int delay = 1_000;
if (msg instanceof GridDhtPartitionsSingleMessage) {
GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg;
if (singleMsg.exchangeId() != null)
return delay;
return 0;
// This should trigger exchanges merge.
startGridsMultiThreaded(2, 2);
// Delay sending single message from new node to have time to shutdown coordinator.
spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
final int delay = 5_000;
if (msg instanceof GridDhtPartitionsSingleMessage) {
GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg;
if (singleMsg.exchangeId() != null)
return delay;
return 0;
// Trigger next exchange.
IgniteInternalFuture startNodeFut = GridTestUtils.runAsync(() -> startGrid(4));
// Wait till other nodes will send their messages to coordinator.
// And then stop coordinator node.
stopGrid(CRD_NONE, true);
// Check that all caches are operable.
for (Ignite grid : G.allGrids()) {
IgniteCache cache = grid.cache(CACHE_NAME);
for (int k = 0; k < 1024; k++)
Assert.assertEquals(k, cache.get(k));
for (int k = 0; k < 1024; k++)
cache.put(k, k);
* Test checks that changing coordinator to a node that joining to cluster at the moment works correctly
* in case of exchanges merge and completed exchange on other joining nodes.
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = "true")
public void testChangeCoordinatorToLocallyJoiningNode() throws Exception {
newCaches = false;
spiFactory = TestRecordingCommunicationSpi::new;
IgniteEx crd = startGrid(CRD_NONE);
final int newCrdNodeIdx = 1;
// A full message shouldn't be send to new coordinator.
blockSendingFullMessage(crd, node -> node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx)));
CountDownLatch joiningNodeSentSingleMsg = new CountDownLatch(1);
// For next joining node delay sending single message to emulate exchanges merge.
spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
final int delay = 5_000;
if (msg instanceof GridDhtPartitionsSingleMessage) {
GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg;
if (singleMsg.exchangeId() != null) {
return delay;
return 0;
IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() -> startGrid(newCrdNodeIdx));
// Wait till new coordinator node sent single message.
spiFactory = TcpCommunicationSpi::new;
// Additionally start 2 new nodes. Their exchange should be merged with exchange on join new coordinator node.
startGridsMultiThreaded(2, 2);
Assert.assertFalse("New coordinator join shouldn't be happened before stopping old coordinator.",
// Stop coordinator.
// New coordinator join process should succeed after that.
// Check that affinity are equal on all nodes.
AffinityTopologyVersion affVer = ((IgniteEx) ignite(1)).cachex(CACHE_NAME)
List<List<ClusterNode>> expAssignment = null;
IgniteEx expAssignmentNode = null;
for (Ignite node : G.allGrids()) {
IgniteEx nodeEx = (IgniteEx) node;
List<List<ClusterNode>> assignment = nodeEx.cachex(CACHE_NAME).context().affinity().assignments(affVer);
if (expAssignment == null) {
expAssignment = assignment;
expAssignmentNode = nodeEx;
Assert.assertEquals("Affinity assignments are different " +
"[expectedNode=" + expAssignmentNode + ", actualNode=" + nodeEx + "]", expAssignment, assignment);
* Test checks that changing coordinator to a node that joining to cluster at the moment works correctly
* in case of completed exchange on client nodes.
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = "true")
public void testChangeCoordinatorToLocallyJoiningNode2() throws Exception {
newCaches = false;
spiFactory = TestRecordingCommunicationSpi::new;
IgniteEx crd = startGrid(CRD_NONE);
// Start several clients.
IgniteEx clientNode = (IgniteEx)startClientGridsMultiThreaded(2, 2);
final int newCrdNodeIdx = 1;
// A full message shouldn't be send to new coordinator.
blockSendingFullMessage(crd, node -> node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx)));
IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() -> startGrid(newCrdNodeIdx));
// Wait till client node will receive full message and finish exchange on node join.
GridTestUtils.waitForCondition(() -> {
GridDhtPartitionsExchangeFuture fut = clientNode.cachex(CACHE_NAME)
return fut != null && fut.topologyVersion().equals(new AffinityTopologyVersion(4, 0));
}, 60_000
Assert.assertFalse("New coordinator join shouldn't be happened before stopping old coordinator.",
// Stop coordinator.
// New coordinator join process should succeed after that.
* Blocks sending full message from coordinator to non-coordinator node.
* @param from Coordinator node.
* @param pred Non-coordinator node predicate.
* If predicate returns {@code true} a full message will not be send to that node.
private void blockSendingFullMessage(IgniteEx from, Predicate<ClusterNode> pred) {
// Block FullMessage for newly joined nodes.
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(from);
// Delay sending full messages (without exchange id).
spi.blockMessages((node, msg) -> {
if (msg instanceof GridDhtPartitionsFullMessage) {
GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage) msg;
if (fullMsg.exchangeId() != null && pred.test(node)) {
log.warning("Blocked sending " + msg + " to " + node);
return true;
return false;
* Communication SPI that allows to delay sending message by predicate.
static class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
/** Function that returns delay in milliseconds for given message. */
private final Function<Message, Integer> delayMsgFunc;
/** */
DynamicDelayingCommunicationSpi() {
this(msg -> 0);
* @param delayMsgFunc Function to calculate delay for message.
DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMsgFunc) {
this.delayMsgFunc = delayMsgFunc;
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
try {
GridIoMessage ioMsg = (GridIoMessage)msg;
int delay = delayMsgFunc.apply(ioMsg.message());
if (delay > 0) {
log.warning(String.format("Delay sending %s to %s", msg, node));
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException(e);
super.sendMessage(node, msg, ackC);