blob: 6a3c02140969ac87435a9d0d5246d6bda0dfd636 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
*
*/
public class ClientSlowDiscoveryTopologyChangeTest extends ClientSlowDiscoveryAbstractTest {
/**
*
*/
@Before
public void before() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/**
*
*/
@After
public void after() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/**
* Test check that client join works well if cache configured on it stopped on server nodes
* but discovery event about cache stop is not delivered to client node immediately.
* When client node joins to cluster it sends SingleMessage to coordinator.
* During this time topology on server nodes can be changed,
* because client exchange doesn't require acknowledgement for SingleMessage on coordinator.
* Delay is simulated by blocking sending this SingleMessage and resume sending after topology is changed.
*/
@Test
public void testClientJoinAndCacheStop() throws Exception {
IgniteEx crd = (IgniteEx)startGridsMultiThreaded(3);
awaitPartitionMapExchange();
for (int k = 0; k < 64; k++)
crd.cache(CACHE_NAME).put(k, k);
TestRecordingCommunicationSpi clientCommSpi = new TestRecordingCommunicationSpi();
// Delay client join process.
clientCommSpi.blockMessages((node, msg) -> {
if (!(msg instanceof GridDhtPartitionsSingleMessage))
return false;
GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg;
return Optional.ofNullable(singleMsg.exchangeId())
.map(GridDhtPartitionExchangeId::topologyVersion)
.filter(topVer -> topVer.equals(new AffinityTopologyVersion(4, 0)))
.isPresent();
});
communicationSpiSupplier = () -> clientCommSpi;
CustomMessageInterceptingDiscoverySpi clientDiscoSpi = new CustomMessageInterceptingDiscoverySpi();
CountDownLatch clientDiscoSpiBlock = new CountDownLatch(1);
// Delay cache destroying on client node.
clientDiscoSpi.interceptor = (msg) -> {
if (!(msg instanceof DynamicCacheChangeBatch))
return;
DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)msg;
boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
.anyMatch(req -> req.stop() && req.cacheName().equals(CACHE_NAME));
if (hasCacheStopReq)
U.awaitQuiet(clientDiscoSpiBlock);
};
discoverySpiSupplier = () -> clientDiscoSpi;
IgniteInternalFuture<IgniteEx> clientStartFut = GridTestUtils.runAsync(() -> startClientGrid(3));
// Wait till client node starts join process.
clientCommSpi.waitForBlocked();
// Destroy cache on server nodes.
crd.destroyCache(CACHE_NAME);
// Resume client join.
clientCommSpi.stopBlock();
// Client join should succeed.
IgniteEx client = clientStartFut.get();
IgniteCache<Object, Object> clientCache = client.cache(CACHE_NAME);
Assert.assertNotNull("Cache should exists on client node", clientCache);
IgniteInternalFuture<?> cacheGet = GridTestUtils.runAsync(() -> clientCache.get(0));
try {
cacheGet.get(5_000); // Reasonable timeout.
fail("Cache get operation should throw " + CacheStoppedException.class);
}
catch (Exception e) {
assertTrue("Got unexpected exception during cache get " + e,
X.hasCause(e, CacheStoppedException.class));
}
finally {
// Resume processing cache destroy on client node.
clientDiscoSpiBlock.countDown();
}
// Wait till cache destroyed on client node.
GridTestUtils.waitForCondition(() -> {
AffinityTopologyVersion topVer = client.context().cache().context().exchange().lastFinishedFuture()
.topologyVersion();
// Cache destroy version.
return topVer.equals(new AffinityTopologyVersion(4, 1));
}, 5_000); // Reasonable timeout.
Assert.assertNull("Cache should be destroyed on client node", client.cache(CACHE_NAME));
}
}