blob: 57a8a2bac79b2e3e74306ef651751e15b9e4f3bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.snapshotMetaFileName;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_INCREMENTAL_SNAPSHOT_START;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.junit.Assume.assumeFalse;
/**
* Basic tests for incremental snapshots.
*/
public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
/** */
public static final int GRID_CNT = 3;
/** */
public static final String OTHER_CACHE = "other-cache";
/** */
public static final String GROUPED_CACHE = "my-grouped-cache2";
/** @see DataStorageConfiguration#isWalCompactionEnabled() */
public boolean walCompactionEnabled = true;
/** */
private AtomicInteger cntr = new AtomicInteger();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (cfg.isClientMode())
return cfg;
cfg.getDataStorageConfiguration()
.setWalCompactionEnabled(walCompactionEnabled)
.setWalSegmentSize((int)U.MB);
return cfg;
}
/** */
@Test
public void testCreation() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
IgniteEx srv = startGridsWithCache(
GRID_CNT,
CACHE_KEYS_RANGE,
key -> new Account(key, key),
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
);
File snpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
assertTrue("Target directory is not empty: " + snpDir, F.isEmpty(snpDir.list()));
IgniteEx cli = startClientGrid(
GRID_CNT,
(UnaryOperator<IgniteConfiguration>)
cfg -> cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
);
File exSnpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
for (boolean client : new boolean[] {false, true}) {
IgniteSnapshotManager snpCreate = snp(client ? cli : srv);
for (File snpPath : new File[] {null, exSnpDir}) {
if (client && snpPath != null) // Set snapshot path not supported for snapshot from client nodes.
continue;
String snpName = SNAPSHOT_NAME + "_" + client + "_" + (snpPath == null ? "" : snpPath.getName());
snpCreate
.createSnapshot(snpName, snpPath == null ? null : snpPath.getAbsolutePath(), false, onlyPrimary)
.get(TIMEOUT);
checkSnapshot(snpName, snpPath == null ? null : snpPath.getAbsolutePath());
for (int incIdx = 1; incIdx < 3; incIdx++) {
addData(cli);
if (snpPath == null)
snpCreate.createIncrementalSnapshot(snpName).get(TIMEOUT);
else
snpCreate.createSnapshot(snpName, snpPath.getAbsolutePath(), true, false).get(TIMEOUT);
for (int gridIdx = 0; gridIdx < GRID_CNT; gridIdx++) {
assertTrue("Incremental snapshot must exists on node " + gridIdx, checkIncremental(
grid(gridIdx),
snpName,
snpPath == null ? null : snpPath.getAbsolutePath(),
incIdx
));
}
}
}
}
}
/** */
private void addData(IgniteEx node) {
IgniteCache<Integer, byte[]> cache = node.getOrCreateCache(DEFAULT_CACHE_NAME);
for (int i = 0; i < (int)(U.MB * 3 * GRID_CNT / U.KB); i++) {
byte[] bytes = new byte[(int)U.KB];
ThreadLocalRandom.current().nextBytes(bytes);
cache.put(cntr.incrementAndGet(), bytes);
}
}
/** */
@Test
public void testFailForUnknownBaseSnapshot() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
IgniteEx ign = startGridsWithCache(1, CACHE_KEYS_RANGE, key -> new Account(key, key),
new CacheConfiguration<>(DEFAULT_CACHE_NAME));
assertThrowsWithCause(
() -> snp(ign).createIncrementalSnapshot("unknown").get(TIMEOUT),
IgniteException.class
);
createAndCheckSnapshot(ign, SNAPSHOT_NAME, null, TIMEOUT);
assertThrowsWithCause(
() -> snp(ign).createIncrementalSnapshot("unknown").get(TIMEOUT),
IgniteException.class
);
}
/** */
@Test
public void testFailIfPreviousIncrementNotAvailable() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
IgniteEx srv = startGridsWithCache(
GRID_CNT,
CACHE_KEYS_RANGE,
key -> new Account(key, key),
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
);
IgniteEx cli = startClientGrid(
GRID_CNT,
(UnaryOperator<IgniteConfiguration>)
cfg -> cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
);
createAndCheckSnapshot(cli, SNAPSHOT_NAME, null, TIMEOUT);
cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
File toRmv = new File(
snp(ignite(GRID_CNT - 1)).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 2),
snapshotMetaFileName(ignite(GRID_CNT - 1).localNode().consistentId().toString()));
assertTrue(toRmv.exists());
U.delete(toRmv);
assertThrowsWithCause(
() -> cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
IgniteException.class
);
}
/** */
@Test
public void testFailIfSegmentNotFound() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
IgniteEx srv = startGridsWithCache(
1,
CACHE_KEYS_RANGE,
key -> new Account(key, key),
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
);
createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
addData(srv);
FileWriteAheadLogManager wal = (FileWriteAheadLogManager)srv.context().cache().context().wal();
assertTrue(waitForCondition(() -> wal.lastCompactedSegment() >= 0, getTestTimeout()));
long segIdx = wal.lastCompactedSegment();
U.delete(wal.compactedSegment(segIdx));
assertThrowsWithCause(
() -> srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
IgniteException.class
);
}
/** */
@Test
public void testIncrementalSnapshotFailsOnTopologyChange() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
IgniteEx srv = startGridsWithCache(
GRID_CNT,
CACHE_KEYS_RANGE,
key -> new Account(key, key),
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
);
IgniteSnapshotManager snpCreate = snp(srv);
createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
String consId = grid(1).context().discovery().localNode().consistentId().toString();
// Stop some node.
stopGrid(1);
assertThrows(
null,
() -> snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
IgniteException.class,
"Create incremental snapshot request has been rejected. " +
"Node from full snapshot offline [consistentId=" + consId + ']'
);
}
/** */
@Test
public void testIncrementalSnapshotFailsOnCacheDestroy() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
checkFailWhenCacheDestroyed(OTHER_CACHE, "Create incremental snapshot request has been rejected. " +
"Cache group destroyed [groupId=" + CU.cacheId(OTHER_CACHE) + ']');
}
/** */
@Test
public void testIncrementalSnapshotFailsOnGroupedCacheDestroy() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
checkFailWhenCacheDestroyed(GROUPED_CACHE, "Create incremental snapshot request has been rejected. " +
"Cache destroyed [cacheId=" + CU.cacheId(GROUPED_CACHE) + ", cacheName=" + GROUPED_CACHE + ']');
}
/** */
@Test
public void testIncrementalSnapshotFailsOnCacheChange() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
CacheConfiguration<Integer, Account> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
IgniteEx srv = startGridsWithCache(1, CACHE_KEYS_RANGE, key -> new Account(key, key), ccfg);
createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
GridLocalConfigManager locCfgMgr = srv.context().cache().configManager();
File ccfgFile = locCfgMgr.cacheConfigurationFile(ccfg);
StoredCacheData cacheData = locCfgMgr.readCacheData(ccfgFile);
assertNotNull(cacheData);
cacheData.queryEntities(singletonList(new QueryEntity(String.class, Account.class)));
locCfgMgr.writeCacheData(cacheData, ccfgFile);
assertThrows(
null,
() -> snp(srv).createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
IgniteException.class,
"Cache changed [cacheId=" + CU.cacheId(DEFAULT_CACHE_NAME) + ", cacheName=" + DEFAULT_CACHE_NAME + ']'
);
}
/** */
@Test
public void testIncrementalSnapshotFailOnDirtyDir() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
IgniteEx srv = startGridsWithCache(
GRID_CNT,
CACHE_KEYS_RANGE,
key -> new Account(key, key),
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
);
createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
assertTrue(snp(srv).incrementalSnapshotsLocalRootDir(SNAPSHOT_NAME, null).mkdirs());
assertTrue(snp(srv).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 1).createNewFile());
assertThrows(
null,
() -> snp(srv).createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
IgniteException.class,
"Failed to create snapshot WAL directory"
);
for (int i = 0; i < GRID_CNT; i++)
assertFalse(snp(grid(i)).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 1).exists());
}
/** */
@Test
public void testFailIfWalCompactionDisabled() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
walCompactionEnabled = false;
try {
IgniteEx srv = startGridsWithCache(
1,
CACHE_KEYS_RANGE,
key -> new Account(key, key),
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
);
createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
assertThrows(
null,
() -> srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
IgniteException.class,
"Create incremental snapshot request has been rejected. WAL compaction must be enabled."
);
}
finally {
walCompactionEnabled = true;
}
}
/** @throws Exception if failed. */
@Test
public void testStagesFail() throws Exception {
assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
DistributedProcess.DistributedProcessType[] stages = new DistributedProcess.DistributedProcessType[] {
RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD,
RESTORE_CACHE_GROUP_SNAPSHOT_START,
RESTORE_INCREMENTAL_SNAPSHOT_START
};
IgniteEx srv = startGridsWithCache(
2,
CACHE_KEYS_RANGE,
valueBuilder(),
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
);
createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
srv.destroyCache(DEFAULT_CACHE_NAME);
awaitPartitionMapExchange();
AtomicReference<DistributedProcess.DistributedProcessType> failStage = new AtomicReference<>();
TestRecordingCommunicationSpi.spi(grid(1)).blockMessages((node, msg) -> {
if (msg instanceof SingleNodeMessage) {
SingleNodeMessage<?> singleMsg = (SingleNodeMessage<?>)msg;
DistributedProcess.DistributedProcessType stage = failStage.get();
if (stage != null && stage.ordinal() == singleMsg.type())
GridTestUtils.setFieldValue(singleMsg, "err", new IgniteException("Test exception."));
}
return false;
});
for (DistributedProcess.DistributedProcessType stage : stages) {
failStage.set(stage);
assertThrows(log,
() -> srv.snapshot().restoreSnapshot(SNAPSHOT_NAME, singleton(DEFAULT_CACHE_NAME), 1).get(),
IgniteException.class, "Test exception.");
}
failStage.set(null);
srv.snapshot().restoreSnapshot(SNAPSHOT_NAME, singleton(DEFAULT_CACHE_NAME), 1).get();
assertCacheKeys(srv.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
}
/** */
private void checkFailWhenCacheDestroyed(String cache2rvm, String errMsg) throws Exception {
IgniteEx srv = startGridsWithCache(
1,
CACHE_KEYS_RANGE,
key -> new Account(key, key),
new CacheConfiguration<>(DEFAULT_CACHE_NAME),
new CacheConfiguration<>(OTHER_CACHE),
new CacheConfiguration<Integer, Object>("my-grouped-cache1").setGroupName("mygroup"),
new CacheConfiguration<Integer, Object>(GROUPED_CACHE).setGroupName("mygroup")
);
IgniteSnapshotManager snpCreate = snp(srv);
createAndCheckSnapshot(srv, SNAPSHOT_NAME, null, TIMEOUT);
snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
srv.destroyCache(cache2rvm);
assertThrows(
null,
() -> snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
IgniteException.class,
errMsg
);
}
}