blob: 6f87fd502f9a878e4ee06678eb2232570cf398f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.plugin.ExtensionRegistry;
import org.apache.ignite.plugin.PluginConfiguration;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT;
/**
* Snapshot custom handlers test.
*/
public class IgniteClusterSnapshotHandlerTest extends IgniteClusterSnapshotRestoreBaseTest {
/** Custom snapshot handlers. */
private final List<SnapshotHandler<?>> handlers = new ArrayList<>();
/** Extensions plugin provider. */
private final PluginProvider<PluginConfiguration> pluginProvider = new AbstractTestPluginProvider() {
@Override public String name() {
return "SnapshotVerifier";
}
@Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
for (SnapshotHandler<?> hnd : handlers)
registry.registerExtension(SnapshotHandler.class, hnd);
}
};
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginProvider);
}
/** {@inheritDoc} */
@Override protected Function<Integer, Object> valueBuilder() {
return Integer::new;
}
/**
* Test the basic snapshot metadata consistency using handlers.
*
* @throws Exception If fails.
*/
@Test
public void testClusterSnapshotHandlers() throws Exception {
String expMsg = "Inconsistent data";
AtomicReference<UUID> reqIdRef = new AtomicReference<>();
handlers.add(new SnapshotHandler<UUID>() {
@Override public SnapshotHandlerType type() {
return SnapshotHandlerType.CREATE;
}
@Override public UUID invoke(SnapshotHandlerContext ctx) {
return ctx.metadata().requestId();
}
@Override public void complete(String name,
Collection<SnapshotHandlerResult<UUID>> results) throws IgniteCheckedException {
for (SnapshotHandlerResult<UUID> res : results) {
if (!reqIdRef.compareAndSet(null, res.data()) && !reqIdRef.get().equals(res.data()))
throw new IgniteCheckedException("The request ID must be the same on all nodes.");
}
}
});
handlers.add(new SnapshotHandler<UUID>() {
@Override public SnapshotHandlerType type() {
return SnapshotHandlerType.RESTORE;
}
@Override public UUID invoke(SnapshotHandlerContext ctx) {
return ctx.metadata().requestId();
}
@Override public void complete(String name,
Collection<SnapshotHandlerResult<UUID>> results) throws IgniteCheckedException {
for (SnapshotHandlerResult<UUID> res : results) {
if (!reqIdRef.get().equals(res.data()))
throw new IgniteCheckedException(expMsg);
}
}
});
IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
assertNotNull(reqIdRef.get());
changeMetadataRequestIdOnDisk(UUID.randomUUID());
IgniteFuture<Void> fut = ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteCheckedException.class, expMsg);
changeMetadataRequestIdOnDisk(reqIdRef.get());
ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
}
/**
* @param newReqId Request ID that will be stored on all nodes.
* @throws Exception If failed.
*/
private void changeMetadataRequestIdOnDisk(UUID newReqId) throws Exception {
for (Ignite grid : G.allGrids()) {
IgniteSnapshotManager snpMgr = ((IgniteEx)grid).context().cache().context().snapshotMgr();
String constId = grid.cluster().localNode().consistentId().toString();
SnapshotMetadata metadata = snpMgr.readSnapshotMetadata(SNAPSHOT_NAME, constId);
File smf = new File(snpMgr.snapshotLocalDir(SNAPSHOT_NAME), U.maskForFileName(constId) + SNAPSHOT_METAFILE_EXT);
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) {
GridTestUtils.setFieldValue(metadata, "rqId", newReqId);
U.marshal(MarshallerUtils.jdkMarshaller(grid.name()), metadata, out);
}
}
}
/**
* Test for failures of different types of handlers.
*
* @throws Exception If fails.
*/
@Test
public void testClusterSnapshotHandlerFailure() throws Exception {
String expMsg = "Test verification exception message.";
AtomicBoolean failCreateFlag = new AtomicBoolean(true);
AtomicBoolean failRestoreFlag = new AtomicBoolean(true);
handlers.add(new SnapshotHandler<Void>() {
@Override public SnapshotHandlerType type() {
return SnapshotHandlerType.CREATE;
}
@Override public Void invoke(SnapshotHandlerContext ctx) throws IgniteCheckedException {
if (failCreateFlag.get())
throw new IgniteCheckedException(expMsg);
return null;
}
});
handlers.add(new SnapshotHandler<Void>() {
@Override public SnapshotHandlerType type() {
return SnapshotHandlerType.RESTORE;
}
@Override public Void invoke(SnapshotHandlerContext ctx) throws IgniteCheckedException {
if (failRestoreFlag.get())
throw new IgniteCheckedException(expMsg);
return null;
}
});
IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteCheckedException.class, expMsg);
failCreateFlag.set(false);
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
ignite.cache(DEFAULT_CACHE_NAME).destroy();
awaitPartitionMapExchange();
IgniteFuture<Void> fut0 = ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
GridTestUtils.assertThrowsAnyCause(log, () -> fut0.get(TIMEOUT), IgniteCheckedException.class, expMsg);
failRestoreFlag.set(false);
ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
}
/**
* Test ensures that the snapshot operation is aborted if different handlers are loaded on different nodes.
*
* @throws Exception If fails.
*/
@Test
public void testClusterSnapshotHandlerConfigurationMismatch() throws Exception {
SnapshotHandler<Void> defHnd = new SnapshotHandler<Void>() {
@Override public SnapshotHandlerType type() {
return SnapshotHandlerType.CREATE;
}
@Override public Void invoke(SnapshotHandlerContext ctx) {
return null;
}
};
handlers.add(defHnd);
startGridsWithCache(1, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
handlers.clear();
startGrid(1);
resetBaselineTopology();
// Case 1: handler is loaded on only one of the two nodes.
IgniteFuture<Void> fut0 = grid(0).snapshot().createSnapshot(SNAPSHOT_NAME);
UUID nodeId1 = grid(1).localNode().id();
GridTestUtils.assertThrowsAnyCause(log, () -> fut0.get(TIMEOUT), IgniteCheckedException.class,
"handler is missing on the remote node(s). The current operation will be aborted [missing=[" + nodeId1 + "]]");
stopGrid(1);
// Case 2: different handlers are loaded on different nodes.
handlers.add(new SnapshotHandler<Void>() {
@Override public SnapshotHandlerType type() {
return SnapshotHandlerType.CREATE;
}
@Nullable @Override public Void invoke(SnapshotHandlerContext ctx) {
return null;
}
});
startGrid(1);
IgniteFuture<Void> fut1 = grid(0).snapshot().createSnapshot(SNAPSHOT_NAME);
GridTestUtils.assertThrowsAnyCause(log, () -> fut1.get(TIMEOUT), IgniteCheckedException.class,
"Snapshot handlers configuration mismatch (number of local snapshot handlers differs from the remote one)");
stopGrid(1);
// Case 3: one handler is loaded on two nodes, the other only on one.
handlers.add(defHnd);
startGrid(1);
IgniteFuture<Void> fut2 = grid(0).snapshot().createSnapshot(SNAPSHOT_NAME);
GridTestUtils.assertThrowsAnyCause(log, () -> fut2.get(TIMEOUT), IgniteCheckedException.class,
"Snapshot handlers configuration mismatch (number of local snapshot handlers differs from the remote one)");
stopGrid(1);
// Make sure the operation was successful with the same configuration.
handlers.clear();
handlers.add(defHnd);
startGrid(1);
grid(1).snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
}
/**
* Test ensures that the snapshot creation is aborted if node exits while the {@link
* SnapshotHandler#complete(String, Collection)} method is executed.
*
* @throws Exception If fails.
*/
@Test
public void testCrdChangeDuringHandlerCompleteOnSnapshotCreate() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
handlers.add(new SnapshotHandler<Void>() {
@Override public SnapshotHandlerType type() {
return SnapshotHandlerType.CREATE;
}
@Override public Void invoke(SnapshotHandlerContext ctx) {
return null;
}
@Override public void complete(String name, Collection<SnapshotHandlerResult<Void>> results)
throws Exception {
if (latch.getCount() == 1) {
latch.countDown();
Thread.sleep(Long.MAX_VALUE);
}
}
});
startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
IgniteFuture<Void> fut = grid(1).snapshot().createSnapshot(SNAPSHOT_NAME);
latch.await();
UUID crdNodeId = grid(0).localNode().id();
stopGrid(0, true);
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class,
"Snapshot operation interrupted, because baseline node left the cluster: " + crdNodeId);
startGrid(0);
grid(0).snapshot().createSnapshot(SNAPSHOT_NAME);
}
}