blob: 5861a20aa3abb5225a5d46502a77612adcdefc62 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME;
/**
* Concurrent and advanced tests for WAL state change.
*/
@SuppressWarnings("unchecked")
public class WalModeChangeAdvancedSelfTest extends WalModeChangeCommonAbstractSelfTest {
/**
* Constructor.
*/
public WalModeChangeAdvancedSelfTest() {
super(false);
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
cleanPersistenceDir();
}
/**
* Verifies that node with consistent partitions (fully synchronized with disk on a previous checkpoint)
* starts successfully even if WAL for that cache group is globally disabled.
*
* <p>
* Test scenario:
* </p>
* <ol>
* <li>
* Start a cluster from one server node, activate cluster.
* </li>
* <li>
* Create new cache. Disable WAL for the cache and put some data to it.
* </li>
* <li>
* Trigger checkpoint and wait for it finish. Restart node.
* </li>
* <li>
* Verify that node starts successfully and data is presented in the cache.
* </li>
* </ol>
*
* @throws Exception If failed.
*/
@Test
public void testConsistentDataPreserved() throws Exception {
Ignite srv = startGrid(config(SRV_1, false, false));
srv.cluster().state(ACTIVE);
IgniteCache cache1 = srv.getOrCreateCache(cacheConfig(CACHE_NAME, PARTITIONED, TRANSACTIONAL));
srv.cluster().disableWal(CACHE_NAME);
for (int i = 0; i < 10; i++)
cache1.put(i, i);
GridCacheDatabaseSharedManager dbMrg0 = (GridCacheDatabaseSharedManager)((IgniteEx)srv).context().cache().context().database();
dbMrg0.forceCheckpoint("cp").futureFor(CheckpointState.FINISHED).get();
stopGrid(SRV_1);
srv = startGrid(config(SRV_1, false, false));
assertForAllNodes(CACHE_NAME, false);
cache1 = srv.cache(CACHE_NAME);
for (int i = 0; i < 10; i++)
assertNotNull(cache1.get(i));
}
/**
* If user manually clears corrupted files when node was down, node detects this and not enters maintenance mode
* (although still need another restart to get back to normal operations).
*
* <p>
* Test scenario:
* <ol>
* <li>
* Start server node, create cache, disable WAL for the cache, put some keys to it.
* </li>
* <li>
* Stop server node, remove checkpoint end markers from cp directory
* to make node think it has failed in the middle of checkpoint.
* </li>
* <li>
* Start the node, verify it fails to start because of corrupted PDS of the cache.
* </li>
* <li>
* Clean data directory of the cache, start node again, verify it doesn't report maintenance mode.
* </li>
* <li>
* Restart node and verify it is in normal operations mode.
* </li>
* </ol>
* </p>
*
*
* @throws Exception If failed.
*/
@Test
public void testMaintenanceIsSkippedIfWasFixedManuallyOnDowntime() throws Exception {
IgniteEx srv = startGrid(config(SRV_1, false, false));
File cacheToClean = cacheDir(srv, CACHE_NAME);
String ig0Folder = srv.context().pdsFolderResolver().resolveFolders().folderName();
File dbDir = U.resolveWorkDirectory(srv.configuration().getWorkDirectory(), "db", false);
File ig0LfsDir = new File(dbDir, ig0Folder);
File ig0CpDir = new File(ig0LfsDir, "cp");
srv.cluster().state(ACTIVE);
IgniteCache cache1 = srv.getOrCreateCache(cacheConfig(CACHE_NAME, PARTITIONED, TRANSACTIONAL));
srv.cluster().disableWal(CACHE_NAME);
for (int i = 0; i < 10; i++)
cache1.put(i, i);
stopAllGrids(true);
File[] cpMarkers = ig0CpDir.listFiles();
for (File cpMark : cpMarkers) {
if (cpMark.getName().contains("-END"))
cpMark.delete();
}
// Node should fail as its PDS may be corrupted because of disabled WAL
GridTestUtils.assertThrows(null,
() -> startGrid(config(SRV_1, false, false)),
Exception.class,
null);
cleanCacheDir(cacheToClean);
// Node should start successfully and enter maintenance mode. MaintenanceRecord will be cleaned
// automatically because corrupted PDS was deleted during downtime
srv = startGrid(config(SRV_1, false, false));
assertTrue(srv.context().maintenanceRegistry().isMaintenanceMode());
try {
srv.context().maintenanceRegistry().actionsForMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
fail("Maintenance task is not completed yet for some reason.");
}
catch (Exception ignore) {
}
stopAllGrids(false);
// After restart node works normal mode even without executing maintenance action to clear corrupted PDS
srv = startGrid(config(SRV_1, false, false));
assertFalse(srv.context().maintenanceRegistry().isMaintenanceMode());
srv.cluster().state(ACTIVE);
cache1 = srv.getOrCreateCache(CACHE_NAME);
assertEquals(0, cache1.size());
}
/**
* Test cache cleanup on restart.
*
* @throws Exception If failed.
*/
@Test
public void testCacheCleanup() throws Exception {
Ignite srv = startGrid(config(SRV_1, false, false));
File cacheToClean = cacheDir(srv, CACHE_NAME_2);
srv.cluster().state(ACTIVE);
IgniteCache cache1 = srv.getOrCreateCache(cacheConfig(CACHE_NAME, PARTITIONED, TRANSACTIONAL));
IgniteCache cache2 = srv.getOrCreateCache(cacheConfig(CACHE_NAME_2, PARTITIONED, TRANSACTIONAL));
assertForAllNodes(CACHE_NAME, true);
assertForAllNodes(CACHE_NAME_2, true);
for (int i = 0; i < 10; i++) {
cache1.put(i, i);
cache2.put(i, i);
}
srv.cluster().disableWal(CACHE_NAME);
assertForAllNodes(CACHE_NAME, false);
assertForAllNodes(CACHE_NAME_2, true);
for (int i = 10; i < 20; i++) {
cache1.put(i, i);
cache2.put(i, i);
}
srv.cluster().disableWal(CACHE_NAME_2);
assertForAllNodes(CACHE_NAME, false);
assertForAllNodes(CACHE_NAME_2, false);
for (int i = 20; i < 30; i++) {
cache1.put(i, i);
cache2.put(i, i);
}
assertEquals(cache1.size(), 30);
assertEquals(cache2.size(), 30);
srv.cluster().enableWal(CACHE_NAME);
assertForAllNodes(CACHE_NAME, true);
assertForAllNodes(CACHE_NAME_2, false);
assertEquals(cache1.size(), 30);
assertEquals(cache2.size(), 30);
stopAllGrids(true);
cleanCacheDir(cacheToClean);
srv = startGrid(config(SRV_1, false, false));
srv.cluster().state(ACTIVE);
cache1 = srv.cache(CACHE_NAME);
cache2 = srv.cache(CACHE_NAME_2);
assertForAllNodes(CACHE_NAME, true);
assertForAllNodes(CACHE_NAME_2, false);
assertEquals(30, cache1.size());
assertEquals(0, cache2.size());
}
/** */
private File cacheDir(Ignite ig, String cacheName) throws IgniteCheckedException {
String igFolder = ((IgniteEx)ig).context().pdsFolderResolver().resolveFolders().folderName();
File dbDir = U.resolveWorkDirectory(ig.configuration().getWorkDirectory(), "db", false);
File igPdsFolder = new File(dbDir, igFolder);
return new File(igPdsFolder, "cache-" + cacheName);
}
/** */
private void cleanCacheDir(File cacheDir) {
for (File f : cacheDir.listFiles()) {
if (!f.getName().equals(CACHE_DATA_FILENAME))
f.delete();
}
}
/**
* Test simple node join.
*
* @throws Exception If failed.
*/
@Test
public void testJoin() throws Exception {
checkJoin(false);
}
/**
* Test simple node join when operations is performed from coordinator.
*
* @throws Exception If failed.
*/
@Test
public void testJoinCoordinator() throws Exception {
checkJoin(true);
}
/**
* Check node join behavior.
*
* @param crdFiltered {@code True} if first node is coordinator.
* @throws Exception If failed.
*/
private void checkJoin(boolean crdFiltered) throws Exception {
// Start node and disable WAL.
Ignite srv = startGrid(config(SRV_1, false, crdFiltered));
srv.cluster().state(ACTIVE);
srv.getOrCreateCache(cacheConfig(PARTITIONED));
assertForAllNodes(CACHE_NAME, true);
if (!crdFiltered) {
srv.cluster().disableWal(CACHE_NAME);
assertForAllNodes(CACHE_NAME, false);
}
// Start other nodes.
IgniteEx ig2 = startGrid(config(SRV_2, false, false));
File ig2CacheDir = cacheDir(ig2, CACHE_NAME);
if (crdFiltered)
srv.cluster().disableWal(CACHE_NAME);
assertForAllNodes(CACHE_NAME, false);
startGrid(config(SRV_3, false, !crdFiltered));
assertForAllNodes(CACHE_NAME, false);
startGrid(config(CLI, true, false));
assertForAllNodes(CACHE_NAME, false);
// Stop nodes and restore WAL state on the first node.
stopGrid(SRV_2, true);
stopGrid(SRV_3, true);
stopGrid(CLI, true);
if (!crdFiltered) {
srv.cluster().enableWal(CACHE_NAME);
assertForAllNodes(CACHE_NAME, true);
}
cleanCacheDir(ig2CacheDir);
// Start other nodes again.
startGrid(config(SRV_2, false, false));
if (crdFiltered)
srv.cluster().enableWal(CACHE_NAME);
assertForAllNodes(CACHE_NAME, true);
startGrid(config(SRV_3, false, !crdFiltered));
assertForAllNodes(CACHE_NAME, true);
startGrid(config(CLI, true, false));
assertForAllNodes(CACHE_NAME, true);
}
/**
* Test server restart (non-coordinator).
*
* @throws Exception If failed.
*/
@Test
public void testServerRestartNonCoordinator() throws Exception {
checkNodeRestart(false);
}
/**
* Test server restart (coordinator).
*
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-7472")
@Test
public void testServerRestartCoordinator() throws Exception {
checkNodeRestart(true);
}
/**
* Test coordinator node migration.
*
* @param failCrd Whether to fail coordinator nodes.
* @throws Exception If failed.
*/
public void checkNodeRestart(boolean failCrd) throws Exception {
startGrid(config(SRV_1, false, false));
startGrid(config(SRV_2, false, false));
Ignite cli = startGrid(config(CLI, true, false));
cli.cluster().state(ACTIVE);
cli.getOrCreateCache(cacheConfig(PARTITIONED));
final AtomicInteger restartCnt = new AtomicInteger();
final int restarts = SF.applyLB(5, 3);
Thread t = new Thread(new Runnable() {
@Override public void run() {
boolean firstOrSecond = true;
while (restartCnt.get() < restarts) {
String victimName;
if (failCrd) {
victimName = firstOrSecond ? SRV_1 : SRV_2;
firstOrSecond = !firstOrSecond;
}
else
victimName = SRV_2;
try {
File cacheDir = cacheDir(grid(victimName), CACHE_NAME);
stopGrid(victimName);
cleanCacheDir(cacheDir);
startGrid(config(victimName, false, false));
Thread.sleep(200);
}
catch (Exception e) {
throw new RuntimeException();
}
restartCnt.incrementAndGet();
log.info(">>> Finished restart: " + restartCnt.get());
}
}
});
t.start();
boolean state = true;
while (restartCnt.get() < restarts && !Thread.currentThread().isInterrupted()) {
try {
if (state)
cli.cluster().disableWal(CACHE_NAME);
else
cli.cluster().enableWal(CACHE_NAME);
state = !state;
}
catch (IgniteException ignore) {
// Possible disconnect, re-try.
}
}
}
/**
* Test client re-connect.
*
* @throws Exception If failed.
*/
@Test
public void testClientReconnect() throws Exception {
final Ignite srv = startGrid(config(SRV_1, false, false));
Ignite cli = startGrid(config(CLI, true, false));
cli.cluster().state(ACTIVE);
cli.getOrCreateCache(cacheConfig(PARTITIONED));
final AtomicBoolean done = new AtomicBoolean();
// Start load.
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
boolean state = false;
while (!done.get()) {
try {
if (state)
cli.cluster().enableWal(CACHE_NAME);
else
cli.cluster().disableWal(CACHE_NAME);
}
catch (IgniteException e) {
String msg = e.getMessage();
assert msg.startsWith("Client node disconnected") ||
msg.startsWith("Client node was disconnected") ||
msg.contains("client is disconnected") : e.getMessage();
}
finally {
state = !state;
}
}
}, "wal-load-" + cli.name());
// Now perform multiple client reconnects.
try {
for (int i = 1; i <= 10; i++) {
Thread.sleep(ThreadLocalRandom.current().nextLong(200, 1000));
IgniteClientReconnectAbstractTest.reconnectClientNode(log, cli, srv, new Runnable() {
@Override public void run() {
// No-op.
}
});
log.info(">>> Finished iteration: " + i);
}
}
finally {
done.set(true);
}
fut.get();
}
/**
* Test client re-connect.
*
* @throws Exception If failed.
*/
@Test
public void testCacheDestroy() throws Exception {
final Ignite srv = startGrid(config(SRV_1, false, false));
Ignite cli = startGrid(config(CLI, true, false));
cli.cluster().state(ACTIVE);
srv.createCache(cacheConfig(PARTITIONED));
final AtomicBoolean done = new AtomicBoolean();
// Start load.
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
boolean state = false;
while (!done.get()) {
try {
if (state)
cli.cluster().enableWal(CACHE_NAME);
else
cli.cluster().disableWal(CACHE_NAME);
}
catch (IgniteException e) {
String msg = e.getMessage();
assert msg.startsWith("Cache doesn't exist") ||
msg.startsWith("Failed to change WAL mode because some caches no longer exist") :
e.getMessage();
}
finally {
state = !state;
}
}
}, "wal-load-" + cli.name());
try {
// Now perform multiple client reconnects.
for (int i = 1; i <= 20; i++) {
Thread.sleep(ThreadLocalRandom.current().nextLong(200, 1000));
srv.destroyCache(CACHE_NAME);
Thread.sleep(100);
srv.createCache(cacheConfig(PARTITIONED));
log.info(">>> Finished iteration: " + i);
}
}
finally {
done.set(true);
}
fut.get();
}
/**
* Test that concurrent enable/disable events doesn't leave to hangs.
*
* @throws Exception If failed.
*/
@Test
public void testConcurrentOperations() throws Exception {
final Ignite srv1 = startGrid(config(SRV_1, false, false));
final Ignite srv2 = startGrid(config(SRV_2, false, false));
final Ignite srv3 = startGrid(config(SRV_3, false, true));
final Ignite cli = startGrid(config(CLI, true, false));
final Ignite cacheCli = startGrid(config(CLI_2, true, false));
cacheCli.cluster().state(ACTIVE);
final IgniteCache cache = cacheCli.getOrCreateCache(cacheConfig(PARTITIONED));
for (int i = 1; i <= SF.applyLB(3, 2); i++) {
// Start pushing requests.
Collection<Ignite> walNodes = new ArrayList<>();
walNodes.add(srv1);
walNodes.add(srv2);
walNodes.add(srv3);
walNodes.add(cli);
final AtomicBoolean done = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(walNodes.size() + 1);
for (Ignite node : walNodes) {
final Ignite node0 = node;
Thread t = new Thread(new Runnable() {
@Override public void run() {
checkConcurrentOperations(done, node0);
latch.countDown();
}
});
t.setName("wal-load-" + node0.name());
t.start();
}
// Do some cache loading in the mean time.
Thread t = new Thread(new Runnable() {
@Override public void run() {
int i = 0;
while (!done.get())
cache.put(i, i++);
latch.countDown();
}
});
t.setName("cache-load");
t.start();
Thread.sleep(SF.applyLB(20_000, 2_000));
done.set(true);
log.info(">>> Stopping iteration: " + i);
latch.await();
log.info(">>> Iteration finished: " + i);
}
}
/**
* Check concurrent operations.
*
* @param done Done flag.
* @param node Node.
*/
private static void checkConcurrentOperations(AtomicBoolean done, Ignite node) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
boolean state = rnd.nextBoolean();
while (!done.get()) {
if (state)
node.cluster().enableWal(CACHE_NAME);
else
node.cluster().disableWal(CACHE_NAME);
state = !state;
}
try {
Thread.sleep(rnd.nextLong(200, 1000));
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}