blob: c2ea3c21923022e09070f8e3c5ed73b99f9f9730 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.util;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.UnaryOperator;
import java.util.logging.Formatter;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.StreamHandler;
import java.util.regex.Pattern;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.commandline.CommandHandler;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.maintenance.MaintenanceTask;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
/** */
public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClusterPerMethodAbstractTest {
/** */
private static CountDownLatch blockCdl;
/** */
private static CountDownLatch waitCdl;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.getDataStorageConfiguration().setWalSegmentSize(512 * 1024).setWalSegments(3);
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefragmentationSchedule() throws Exception {
Ignite ignite = startGrids(2);
ignite.cluster().state(ACTIVE);
assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute("--defragmentation", "schedule"));
String grid0ConsId = grid(0).configuration().getConsistentId().toString();
String grid1ConsId = grid(1).configuration().getConsistentId().toString();
ListeningTestLogger testLog = new ListeningTestLogger();
CommandHandler cmd = createCommandHandler(testLog);
LogListener logLsnr = LogListener.matches("Scheduling completed successfully.").build();
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--defragmentation",
"schedule",
"--nodes",
grid0ConsId
));
assertTrue(logLsnr.check());
MaintenanceTask mntcTask = DefragmentationParameters.toStore(Collections.emptyList());
assertNotNull(grid(0).context().maintenanceRegistry().registerMaintenanceTask(mntcTask));
assertNull(grid(1).context().maintenanceRegistry().registerMaintenanceTask(mntcTask));
stopGrid(0);
startGrid(0);
logLsnr = LogListener.matches("Node is already in Maintenance Mode").build();
testLog.clearListeners();
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--defragmentation",
"schedule",
"--nodes",
grid0ConsId
));
assertTrue(logLsnr.check());
stopGrid(0);
startGrid(0);
stopGrid(1);
startGrid(1);
stopAllGrids();
startGrids(2);
logLsnr = LogListener.matches("Scheduling completed successfully.").times(2).build();
testLog.clearListeners();
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--defragmentation",
"schedule",
"--nodes",
String.join(",", grid0ConsId, grid1ConsId)
));
assertTrue(logLsnr.check());
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefragmentationCancel() throws Exception {
Ignite ignite = startGrids(2);
ignite.cluster().state(ACTIVE);
String grid0ConsId = grid(0).configuration().getConsistentId().toString();
ListeningTestLogger testLog = new ListeningTestLogger();
CommandHandler cmd = createCommandHandler(testLog);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--defragmentation",
"schedule",
"--nodes",
grid0ConsId
));
LogListener logLsnr = LogListener.matches("Scheduled defragmentation task cancelled successfully.").atLeast(1).build();
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--port",
grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(),
"--defragmentation",
"cancel"
));
assertTrue(logLsnr.check());
testLog.clearListeners();
logLsnr = LogListener.matches("Scheduled defragmentation task is not found.").build();
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--port",
grid(1).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(),
"--defragmentation",
"cancel"
));
assertTrue(logLsnr.check());
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefragmentationCancelInProgress() throws Exception {
IgniteEx ig = startGrid(0);
ig.cluster().state(ClusterState.ACTIVE);
IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
for (int i = 0; i < 1024; i++)
cache.put(i, i);
forceCheckpoint(ig);
String grid0ConsId = ig.configuration().getConsistentId().toString();
ListeningTestLogger testLog = new ListeningTestLogger();
CommandHandler cmd = createCommandHandler(testLog);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--defragmentation",
"schedule",
"--nodes",
grid0ConsId
));
String port = grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString();
stopGrid(0);
blockCdl = new CountDownLatch(128);
UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
FileIOFactory delegate = dsCfg.getFileIOFactory();
dsCfg.setFileIOFactory((file, modes) -> {
if (file.getName().contains("dfrg")) {
if (blockCdl.getCount() == 0) {
try {
// Slow down defragmentation process.
// This'll be enough for the test since we have, like, 900 partitions left.
Thread.sleep(100);
}
catch (InterruptedException ignore) {
// No-op.
}
}
else
blockCdl.countDown();
}
return delegate.create(file, modes);
});
return cfg;
};
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
try {
startGrid(0, cfgOp);
}
catch (Exception e) {
// No-op.
throw new RuntimeException(e);
}
});
blockCdl.await();
LogListener logLsnr = LogListener.matches("Defragmentation cancelled successfully.").build();
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--port",
port,
"--defragmentation",
"cancel"
));
assertTrue(logLsnr.check());
fut.get();
testLog.clearListeners();
logLsnr = LogListener.matches("Defragmentation is already completed or has been cancelled previously.").build();
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--port",
port,
"--defragmentation",
"cancel"
));
assertTrue(logLsnr.check());
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefragmentationStatus() throws Exception {
IgniteEx ig = startGrid(0);
ig.cluster().state(ClusterState.ACTIVE);
ig.getOrCreateCache(DEFAULT_CACHE_NAME + "1");
IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME + "2");
ig.getOrCreateCache(DEFAULT_CACHE_NAME + "3");
for (int i = 0; i < 1024; i++)
cache.put(i, i);
forceCheckpoint(ig);
String grid0ConsId = ig.configuration().getConsistentId().toString();
ListeningTestLogger testLog = new ListeningTestLogger();
CommandHandler cmd = createCommandHandler(testLog);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--defragmentation",
"schedule",
"--nodes",
grid0ConsId
));
String port = grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString();
stopGrid(0);
blockCdl = new CountDownLatch(128);
waitCdl = new CountDownLatch(1);
UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
FileIOFactory delegate = dsCfg.getFileIOFactory();
dsCfg.setFileIOFactory((file, modes) -> {
if (file.getName().contains("dfrg")) {
if (blockCdl.getCount() == 0) {
try {
waitCdl.await();
}
catch (InterruptedException ignore) {
// No-op.
}
}
else
blockCdl.countDown();
}
return delegate.create(file, modes);
});
return cfg;
};
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
try {
startGrid(0, cfgOp);
}
catch (Exception e) {
// No-op.
throw new RuntimeException(e);
}
});
blockCdl.await();
List<LogListener> logLsnrs = Arrays.asList(
LogListener.matches("default1 - size before/after: 0MB/0MB").build(),
LogListener.matches("default2 - partitions processed/all:").build(),
LogListener.matches("Awaiting defragmentation: default3").build()
);
for (LogListener logLsnr : logLsnrs)
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--port",
port,
"--defragmentation",
"status"
));
waitCdl.countDown();
for (LogListener logLsnr : logLsnrs)
assertTrue(logLsnr.check());
fut.get();
((GridCacheDatabaseSharedManager)grid(0).context().cache().context().database())
.defragmentationManager()
.completionFuture()
.get();
testLog.clearListeners();
logLsnrs = Arrays.asList(
LogListener.matches("default1 - size before/after: 0MB/0MB").build(),
LogListener.matches(Pattern.compile("default2 - size before/after: (\\S+)/\\1")).build(),
LogListener.matches("default3 - size before/after: 0MB/0MB").build()
);
for (LogListener logLsnr : logLsnrs)
testLog.registerListener(logLsnr);
assertEquals(EXIT_CODE_OK, execute(
cmd,
"--port",
port,
"--defragmentation",
"status"
));
for (LogListener logLsnr : logLsnrs)
assertTrue(logLsnr.check());
}
/** */
private CommandHandler createCommandHandler(ListeningTestLogger testLog) {
Logger log = CommandHandler.initLogger(null);
log.addHandler(new StreamHandler(System.out, new Formatter() {
/** {@inheritDoc} */
@Override public String format(LogRecord record) {
String msg = record.getMessage();
testLog.info(msg);
return msg + "\n";
}
}));
return new CommandHandler(log);
}
}