blob: b4ca9dac2141cbe51586bba2112051e81e54f6a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.ShutdownPolicy;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_FOR_BACKUPS_ON_SHUTDOWN;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
/**
* Test checks various cluster shutdown and initiated policy.
*/
@WithSystemProperty(key = IGNITE_WAIT_FOR_BACKUPS_ON_SHUTDOWN, value = "false")
public class GracefulShutdownTest extends GridCacheDhtPreloadWaitForBackupsWithPersistenceTest {
/** Shutdown policy of static configuration. */
public ShutdownPolicy policy = ShutdownPolicy.GRACEFUL;
/** Listening test logger. */
ListeningTestLogger listeningLog;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
listeningLog = new ListeningTestLogger(log);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setGridLogger(listeningLog)
.setCommunicationSpi(new TestRecordingCommunicationSpi())
.setShutdownPolicy(policy);
}
/**
* Check static configuration of shutdown policy.
*
* @throws Exception If failed.
*/
@Test
public void testRestartWithStaticConfiguredPolicy() throws Exception {
Ignite ignite0 = startGrid(0);
assertSame(ignite0.cluster().shutdownPolicy(), ignite0.configuration().getShutdownPolicy());
ignite0.close();
policy = ShutdownPolicy.IMMEDIATE;
ignite0 = startGrid(0);
assertSame(ignite0.cluster().shutdownPolicy(), ignite0.configuration().getShutdownPolicy());
assertSame(ignite0.cluster().shutdownPolicy(), ShutdownPolicy.IMMEDIATE);
}
/**
* Test checked exception which is thrown when configuration of nodes different.
*
* @throws Exception If failed.
*/
@Test
public void testTwoNodesWithDifferentConfuguration() throws Exception {
Ignite ignite0 = startGrid(0);
ignite0.cluster().active(true);
assertSame(ignite0.configuration().getShutdownPolicy(), ShutdownPolicy.GRACEFUL);
policy = ShutdownPolicy.IMMEDIATE;
GridTestUtils.assertThrowsAnyCause(log, () -> startGrid(1), IgniteCheckedException.class,
"Remote node has shutdoun policy different from local local");
}
/**
* Check dynamic configuration of shutdown policy.
*
* @throws Exception If failed.
*/
@Test
public void testRestartWithDynamicConfiguredPolicy() throws Exception {
Ignite ignite0 = startGrid(0);
ignite0.cluster().active(true);
assertSame(ignite0.cluster().shutdownPolicy(), ignite0.configuration().getShutdownPolicy());
ShutdownPolicy configuredPolicy = ignite0.cluster().shutdownPolicy();
ShutdownPolicy policyToChange = null;
for (ShutdownPolicy policy : ShutdownPolicy.values()) {
if (policy != ignite0.cluster().shutdownPolicy())
policyToChange = policy;
}
assertNotNull(policyToChange);
ignite0.cluster().shutdownPolicy(policyToChange);
forceCheckpoint();
info("Policy to change: " + policyToChange);
ignite0.close();
ignite0 = startGrid(0);
info("Policy after restart: " + ignite0.cluster().shutdownPolicy());
assertNotSame(ignite0.cluster().shutdownPolicy(), ignite0.configuration().getShutdownPolicy());
assertSame(ignite0.cluster().shutdownPolicy(), policyToChange);
}
/**
* Try to stop node when not all backups are matching of ideal assignment.
*
* @throws Exception If failed.
*/
@Test
public void testNotIdealOwners() throws Exception {
backups = 1;
Ignite ignite0 = startGrid(0);
ignite0.cluster().active(true);
for (int i = 1; i <= 3; i++) {
IgniteCache cache = ignite0.cache("cache" + i);
assertNotNull(cache);
try (IgniteDataStreamer streamer = ignite0.dataStreamer("cache" + i)) {
for (int j = 0; j < 100; j++)
streamer.addData(j, j);
}
}
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ignite0);
spi.blockMessages((node, msg) -> {
String nodeName = (String)node.attributes().get(ATTR_IGNITE_INSTANCE_NAME);
if (msg instanceof GridDhtPartitionSupplyMessage) {
GridDhtPartitionSupplyMessage supplyMsg = (GridDhtPartitionSupplyMessage)msg;
if (supplyMsg.groupId() != CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME) &&
getTestIgniteInstanceName(1).equals(nodeName))
return true;
}
return false;
});
startGrid(1);
Ignite ignite2 = startGrid(2);
resetBaselineTopology();
spi.waitForBlocked();
for (CacheGroupContext grp: ((IgniteEx)ignite2).context().cache().cacheGroups()) {
GridTestUtils.waitForCondition(() ->
!grp.topology().partitionMap(false).get(((IgniteEx)ignite2).localNode().id()).hasMovingPartitions(), 30_000);
}
LogListener lnsr = LogListener.matches("This node is waiting for backups of local partitions for group")
.build();
listeningLog.registerListener(lnsr);
IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
ignite2.close();
});
assertTrue(GridTestUtils.waitForCondition(lnsr::check, 30_000));
assertFalse(fut.isDone());
spi.stopBlock();
assertTrue(GridTestUtils.waitForCondition(fut::isDone, 30_000));
}
/**
* Stopping node and start cache which does not allow it.
*
* @throws Exception If failed.
*/
@Test
public void tesStartCacheWhenNodeStopping() throws Exception {
backups = 2;
Ignite ignite0 = startGrid(0);
ignite0.cluster().active(true);
for (int i = 1; i <= 3; i++) {
IgniteCache cache = ignite0.cache("cache" + i);
assertNotNull(cache);
try (IgniteDataStreamer streamer = ignite0.dataStreamer("cache" + i)) {
for (int j = 0; j < 100; j++)
streamer.addData(j, j);
}
}
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ignite0);
spi.blockMessages((node, msg) -> {
String nodeName = (String)node.attributes().get(ATTR_IGNITE_INSTANCE_NAME);
if (msg instanceof GridDhtPartitionSupplyMessage) {
GridDhtPartitionSupplyMessage supplyMsg = (GridDhtPartitionSupplyMessage)msg;
if (supplyMsg.groupId() != CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME) &&
getTestIgniteInstanceName(1).equals(nodeName))
return true;
}
return false;
});
Ignite ignite1 = startGrid(1);
Ignite ignite2 = startGrid(2);
resetBaselineTopology();
spi.waitForBlocked();
for (CacheGroupContext grp: ((IgniteEx)ignite2).context().cache().cacheGroups()) {
grp.preloader().rebalanceFuture().get();
}
ignite2.close();
LogListener lnsr = LogListener.matches("This node is waiting for completion of rebalance for group")
.build();
listeningLog.registerListener(lnsr);
IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
ignite1.close();
});
assertTrue(GridTestUtils.waitForCondition(lnsr::check, 30_000));
listeningLog.unregisterListener(lnsr);
assertFalse(fut.isDone());
ignite0.getOrCreateCache(new CacheConfiguration(DEFAULT_CACHE_NAME).setBackups(1));
spi.stopBlock();
lnsr = LogListener.matches("This node is waiting for backups of local partitions for group")
.build();
listeningLog.registerListener(lnsr);
assertTrue(GridTestUtils.waitForCondition(lnsr::check, 30_000));
}
}