blob: 80de9ab7e1348b490c8224dff7c2da6922dea4c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.util;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.MessageOrderLogListener;
import org.apache.ignite.util.GridCommandHandlerIndexingUtils.Person;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.lang.String.valueOf;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static org.apache.ignite.internal.commandline.CommandLogger.INDENT;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.breakSqlIndex;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.complexIndexEntity;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillThreeFieldsEntryCache;
/**
* Test for --cache indexes_force_rebuild command. Uses single cluster per suite.
*/
public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerAbstractTest {
/** */
private static final String CACHE_NAME_1_1 = "cache_1_1";
/** */
private static final String CACHE_NAME_1_2 = "cache_1_2";
/** */
private static final String CACHE_NAME_2_1 = "cache_2_1";
/** */
private static final String CACHE_NAME_NO_GRP = "cache_no_group";
/** */
private static final String CACHE_NAME_NON_EXISTING = "non_existing_cache";
/** */
private static final String GRP_NAME_1 = "group_1";
/** */
private static final String GRP_NAME_2 = "group_2";
/** */
private static final String GRP_NAME_NON_EXISTING = "non_existing_group";
/** */
private static final int GRIDS_NUM = 3;
/** */
private static final int LAST_NODE_NUM = GRIDS_NUM - 1;
/**
* Map for blocking index rebuilds in a {@link BlockingIndexesRebuildTask}.
* To stop blocking, need to delete the entry.
* Mapping: cache name -> future start blocking rebuilding indexes.
*/
private static final Map<String, GridFutureAdapter<Void>> blockRebuildIdx = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
ListeningTestLogger testLog = new ListeningTestLogger(log);
return super.getConfiguration(igniteInstanceName)
.setGridLogger(testLog);
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
cleanPersistenceDir();
startupTestCluster();
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
cleanPersistenceDir();
super.afterTestsStopped();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
blockRebuildIdx.clear();
}
/** */
private void startupTestCluster() throws Exception {
for (int i = 0; i < GRIDS_NUM; i++ ) {
IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class;
startGrid(i);
}
IgniteEx ignite = grid(0);
ignite.cluster().active(true);
createAndFillCache(ignite, CACHE_NAME_1_1, GRP_NAME_1);
createAndFillCache(ignite, CACHE_NAME_1_2, GRP_NAME_1);
createAndFillCache(ignite, CACHE_NAME_2_1, GRP_NAME_2);
createAndFillThreeFieldsEntryCache(ignite, CACHE_NAME_NO_GRP, null, Collections.singletonList(complexIndexEntity()));
}
/**
* Checks error messages when trying to rebuild indexes for
* non-existent cache of group.
*/
@Test
public void testEmptyResult() {
injectTestSystemOut();
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", grid(LAST_NODE_NUM).localNode().id().toString(),
"--cache-names", CACHE_NAME_NON_EXISTING));
String cacheNamesOutputStr = testOut.toString();
assertTrue(cacheNamesOutputStr.contains("WARNING: Indexes rebuild was not started for any cache. Check command input."));
testOut.reset();
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", grid(LAST_NODE_NUM).localNode().id().toString(),
"--group-names", GRP_NAME_NON_EXISTING));
String grpNamesOutputStr = testOut.toString();
assertTrue(grpNamesOutputStr.contains("WARNING: Indexes rebuild was not started for any cache. Check command input."));
}
/**
* Checks that index on 2 fields is rebuilt correctly.
*/
@Test
public void testComplexIndexRebuild() throws IgniteInterruptedCheckedException {
injectTestSystemOut();
LogListener lsnr = installRebuildCheckListener(grid(LAST_NODE_NUM), CACHE_NAME_NO_GRP);
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", grid(LAST_NODE_NUM).localNode().id().toString(),
"--cache-names", CACHE_NAME_NO_GRP));
assertTrue(waitForIndexesRebuild(grid(LAST_NODE_NUM)));
assertTrue(lsnr.check());
removeLogListener(grid(LAST_NODE_NUM), lsnr);
}
/**
* Checks --node-id and --cache-names options,
* correctness of utility output and the fact that indexes were actually rebuilt.
*/
@Test
public void testCacheNamesArg() throws Exception {
blockRebuildIdx.put(CACHE_NAME_2_1, new GridFutureAdapter<>());
injectTestSystemOut();
LogListener[] cache1Listeners = new LogListener[GRIDS_NUM];
LogListener[] cache2Listeners = new LogListener[GRIDS_NUM];
try {
triggerIndexRebuild(LAST_NODE_NUM, Collections.singletonList(CACHE_NAME_2_1));
for (int i = 0; i < GRIDS_NUM; i++) {
cache1Listeners[i] = installRebuildCheckListener(grid(i), CACHE_NAME_1_1);
cache2Listeners[i] = installRebuildCheckListener(grid(i), CACHE_NAME_1_2);
}
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", grid(LAST_NODE_NUM).localNode().id().toString(),
"--cache-names", CACHE_NAME_1_1 + "," + CACHE_NAME_2_1 + "," + CACHE_NAME_NON_EXISTING));
blockRebuildIdx.remove(CACHE_NAME_2_1);
waitForIndexesRebuild(grid(LAST_NODE_NUM));
String outputStr = testOut.toString();
validateOutputCacheNamesNotFound(outputStr, CACHE_NAME_NON_EXISTING);
validateOutputIndicesRebuildingInProgress(outputStr, F.asMap(GRP_NAME_2, F.asList(CACHE_NAME_2_1)));
validateOutputIndicesRebuildWasStarted(outputStr, F.asMap(GRP_NAME_1, F.asList(CACHE_NAME_1_1)));
assertEquals("Unexpected number of lines in output.", 19, outputStr.split("\n").length);
// Index rebuild must be triggered only for cache1_1 and only on node3.
assertFalse(cache1Listeners[0].check());
assertFalse(cache1Listeners[1].check());
assertTrue(cache1Listeners[LAST_NODE_NUM].check());
for (LogListener cache2Lsnr: cache2Listeners)
assertFalse(cache2Lsnr.check());
}
finally {
blockRebuildIdx.remove(CACHE_NAME_2_1);
for (int i = 0; i < GRIDS_NUM; i++) {
removeLogListener(grid(i), cache1Listeners[i]);
removeLogListener(grid(i), cache2Listeners[i]);
}
assertTrue(waitForIndexesRebuild(grid(LAST_NODE_NUM)));
}
}
/**
* Checks --node-id and --group-names options,
* correctness of utility output and the fact that indexes were actually rebuilt.
*/
@Test
public void testGroupNamesArg() throws Exception {
blockRebuildIdx.put(CACHE_NAME_1_2, new GridFutureAdapter<>());
injectTestSystemOut();
LogListener[] cache1Listeners = new LogListener[GRIDS_NUM];
LogListener[] cache2Listeners = new LogListener[GRIDS_NUM];
try {
triggerIndexRebuild(LAST_NODE_NUM, Collections.singletonList(CACHE_NAME_1_2));
for (int i = 0; i < GRIDS_NUM; i++) {
cache1Listeners[i] = installRebuildCheckListener(grid(i), CACHE_NAME_1_1);
cache2Listeners[i] = installRebuildCheckListener(grid(i), CACHE_NAME_NO_GRP);
}
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", grid(LAST_NODE_NUM).localNode().id().toString(),
"--group-names", GRP_NAME_1 + "," + GRP_NAME_2 + "," + GRP_NAME_NON_EXISTING));
blockRebuildIdx.remove(CACHE_NAME_1_2);
waitForIndexesRebuild(grid(LAST_NODE_NUM));
String outputStr = testOut.toString();
validateOutputCacheGroupsNotFound(outputStr, GRP_NAME_NON_EXISTING);
validateOutputIndicesRebuildingInProgress(outputStr, F.asMap(GRP_NAME_1, F.asList(CACHE_NAME_1_2)));
validateOutputIndicesRebuildWasStarted(
outputStr,
F.asMap(
GRP_NAME_1, F.asList(CACHE_NAME_1_1),
GRP_NAME_2, F.asList(CACHE_NAME_2_1)
)
);
assertEquals("Unexpected number of lines in outputStr.", 20, outputStr.split("\n").length);
assertFalse(cache1Listeners[0].check());
assertFalse(cache1Listeners[1].check());
assertTrue(cache1Listeners[LAST_NODE_NUM].check());
for (LogListener cache2Lsnr: cache2Listeners)
assertFalse(cache2Lsnr.check());
}
finally {
blockRebuildIdx.remove(CACHE_NAME_1_2);
for (int i = 0; i < GRIDS_NUM; i++) {
removeLogListener(grid(i), cache1Listeners[i]);
removeLogListener(grid(i), cache2Listeners[i]);
}
assertTrue(waitForIndexesRebuild(grid(LAST_NODE_NUM)));
}
}
/**
* Checks illegal parameter after indexes_force_rebuild.
*/
@Test
public void testIllegalArgument() {
int code = execute("--cache", "indexes_force_rebuild", "--illegal_parameter");
assertEquals(1, code);
}
/**
* Checks client node id as an agrument. Command shoul
*
* @throws Exception If failed to start client node.
*/
@Test
public void testClientNodeConnection() throws Exception {
IgniteEx client = startGrid("client");
try {
assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute("--cache", "indexes_force_rebuild",
"--node-id", client.localNode().id().toString(),
"--group-names", GRP_NAME_1));
}
finally {
stopGrid("client");
}
}
/**
* Checks that 2 commands launch trigger async index rebuild.
*/
@Test
public void testAsyncIndexesRebuild() throws IgniteInterruptedCheckedException {
blockRebuildIdx.put(CACHE_NAME_1_1, new GridFutureAdapter<>());
blockRebuildIdx.put(CACHE_NAME_1_2, new GridFutureAdapter<>());
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", grid(0).localNode().id().toString(),
"--cache-names", CACHE_NAME_1_1));
assertTrue("Failed to wait for index rebuild start for first cache.",
GridTestUtils.waitForCondition(() -> getActiveRebuildCaches(grid(0)).size() == 1, 10_000));
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", grid(0).localNode().id().toString(),
"--cache-names", CACHE_NAME_1_2));
assertTrue("Failed to wait for index rebuild start for second cache.",
GridTestUtils.waitForCondition(() -> getActiveRebuildCaches(grid(0)).size() == 2, 10_000));
blockRebuildIdx.clear();
assertTrue("Failed to wait for final index rebuild.", waitForIndexesRebuild(grid(0)));
}
/**
* Checks how index force rebuild command behaves when caches are under load.
*
* @throws Exception If failed.
*/
@Test
public void testIndexRebuildUnderLoad() throws Exception {
IgniteEx n = grid(0);
AtomicBoolean stopLoad = new AtomicBoolean(false);
String cacheName1 = "tmpCache1";
String cacheName2 = "tmpCache2";
List<String> caches = F.asList(cacheName1, cacheName2);
try {
for (String c : caches)
createAndFillCache(n, c, "tmpGrp");
int cacheSize = n.cache(cacheName1).size();
for (String c : caches)
blockRebuildIdx.put(c, new GridFutureAdapter<>());
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", n.localNode().id().toString(),
"--cache-names", cacheName1 + "," + cacheName2));
IgniteInternalFuture<?> putCacheFut = runAsync(() -> {
ThreadLocalRandom r = ThreadLocalRandom.current();
while (!stopLoad.get())
n.cache(cacheName1).put(r.nextInt(), new Person(r.nextInt(), valueOf(r.nextLong())));
});
assertTrue(waitForCondition(() -> n.cache(cacheName1).size() > cacheSize, getTestTimeout()));
for (String c : caches) {
IgniteInternalFuture<?> rebIdxFut = n.context().query().indexRebuildFuture(CU.cacheId(c));
assertNotNull(rebIdxFut);
assertFalse(rebIdxFut.isDone());
blockRebuildIdx.get(c).get(getTestTimeout());
}
IgniteInternalFuture<Boolean> destroyCacheFut = n.context().cache()
.dynamicDestroyCache(cacheName2, false, true, false, null);
SchemaIndexCacheFuture intlRebIdxFut = schemaIndexCacheFuture(n, CU.cacheId(cacheName2));
assertNotNull(intlRebIdxFut);
assertTrue(waitForCondition(intlRebIdxFut.cancelToken()::isCancelled, getTestTimeout()));
stopLoad.set(true);
blockRebuildIdx.clear();
waitForIndexesRebuild(n);
intlRebIdxFut.get(getTestTimeout());
destroyCacheFut.get(getTestTimeout());
putCacheFut.get(getTestTimeout());
injectTestSystemOut();
assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "--check-crc", cacheName1));
assertContains(log, testOut.toString(), "no issues found.");
}
finally {
stopLoad.set(true);
blockRebuildIdx.clear();
n.destroyCache(cacheName1);
n.destroyCache(cacheName2);
}
}
/**
* Checks that corrupted index is successfully rebuilt by the command.
*/
@Test
public void testCorruptedIndexRebuild() throws Exception {
IgniteEx ignite = grid(0);
final String cacheName = "tmpCache";
final String grpName = "tmpGrp";
try {
createAndFillCache(ignite, cacheName, grpName);
breakSqlIndex(ignite.cachex(cacheName), 1, null);
injectTestSystemOut();
assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "--check-crc", "--check-sizes"));
assertContains(log, testOut.toString(), "issues found (listed above)");
testOut.reset();
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", ignite.localNode().id().toString(),
"--cache-names", cacheName));
assertTrue(waitForIndexesRebuild(ignite));
forceCheckpoint(ignite);
assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "--check-crc", cacheName));
assertContains(log, testOut.toString(), "no issues found.");
}
finally {
ignite.destroyCache(cacheName);
}
}
/**
* Checking that a sequence of forced rebuild of indexes is possible
*
* @throws Exception If failed.
*/
@Test
public void testSequentialForceRebuildIndexes() throws Exception {
IgniteEx grid = grid(0);
injectTestSystemOut();
String outputStr;
forceRebuildIndices(F.asList(CACHE_NAME_1_1), grid);
outputStr = testOut.toString();
validateOutputIndicesRebuildWasStarted(outputStr, F.asMap(GRP_NAME_1, F.asList(CACHE_NAME_1_1)));
assertFalse(outputStr.contains("WARNING: These caches have indexes rebuilding in progress:"));
forceRebuildIndices(F.asList(CACHE_NAME_1_1), grid);
validateOutputIndicesRebuildWasStarted(outputStr, F.asMap(GRP_NAME_1, F.asList(CACHE_NAME_1_1)));
assertFalse(outputStr.contains("WARNING: These caches have indexes rebuilding in progress:"));
}
/**
* Validates control.sh output when caches by name not found.
*
* @param outputStr CLI {@code control.sh} utility output.
* @param cacheNames Cache names to print.
*/
private void validateOutputCacheNamesNotFound(String outputStr, String... cacheNames) {
assertContains(
log,
outputStr,
"WARNING: These caches were not found:" + U.nl() + makeStringListWithIndent(cacheNames)
);
}
/**
* Validates control.sh output when caches by group not found.
*
* @param outputStr CLI {@code control.sh} utility output.
* @param cacheGrps Cache groups to print.
*/
private void validateOutputCacheGroupsNotFound(String outputStr, String... cacheGrps) {
assertContains(
log,
outputStr,
"WARNING: These cache groups were not found:" + U.nl() + makeStringListWithIndent(cacheGrps)
);
}
/**
* Makes new-line List with indent.
* @param strings List of strings.
* @return Formated text.
*/
private String makeStringListWithIndent(String... strings) {
return INDENT + String.join(U.nl() + INDENT, strings);
}
/**
* Makes formatted text for given caches.
*
* @param cacheGroputToNames Cache groups mapping to non-existing cache names.
* @return Text for CLI print output for given caches.
*/
private String makeStringListForCacheGroupsAndNames(Map<String, List<String>> cacheGroputToNames) {
SB sb = new SB();
for (Map.Entry<String, List<String>> entry : cacheGroputToNames.entrySet()) {
String cacheGrp = entry.getKey();
for (String cacheName : entry.getValue())
sb.a(INDENT).a("groupName=").a(cacheGrp).a(", cacheName=").a(cacheName).a(U.nl());
}
return sb.toString();
}
/**
* Validates control.sh output when some indices rebuilt in progress.
*
* @param outputStr CLI {@code control.sh} utility output.
* @param cacheGroputToNames Cache groups mapping to non-existing cache names.
*/
private void validateOutputIndicesRebuildingInProgress(String outputStr, Map<String, List<String>> cacheGroputToNames) {
String caches = makeStringListForCacheGroupsAndNames(cacheGroputToNames);
assertContains(
log,
outputStr,
"WARNING: These caches have indexes rebuilding in progress:" + U.nl() + caches
);
}
/**
* Validates control.sh output when indices started to rebuild.
*
* @param outputStr CLI {@code control.sh} utility output.
* @param cacheGroputToNames Cache groups mapping to non-existing cache names.
*/
private void validateOutputIndicesRebuildWasStarted(String outputStr, Map<String, List<String>> cacheGroputToNames) {
String caches = makeStringListForCacheGroupsAndNames(cacheGroputToNames);
assertContains(
log,
outputStr,
"Indexes rebuild was started for these caches:" + U.nl() + caches
);
}
/**
* Triggers indexes rebuild for ALL caches on grid node with index {@code igniteIdx}.
*
* @param igniteIdx Node index.
* @param excludedCacheNames Collection of cache names for which
* end of index rebuilding is not awaited.
* @throws Exception if failed.
*/
private void triggerIndexRebuild(int igniteIdx, Collection<String> excludedCacheNames) throws Exception {
stopGrid(igniteIdx);
GridTestUtils.deleteIndexBin(getTestIgniteInstanceName(2));
IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class;
final IgniteEx ignite = startGrid(igniteIdx);
resetBaselineTopology();
awaitPartitionMapExchange();
waitForIndexesRebuild(ignite, 30_000, excludedCacheNames);
}
/** */
private boolean waitForIndexesRebuild(IgniteEx ignite) throws IgniteInterruptedCheckedException {
return waitForIndexesRebuild(ignite, 30_000, Collections.emptySet());
}
/**
* @param ignite Ignite instance.
* @param timeout timeout
* @param excludedCacheNames Collection of cache names for which
* end of index rebuilding is not awaited.
* @return {@code True} if index rebuild was completed before {@code timeout} was reached.
* @throws IgniteInterruptedCheckedException if failed.
*/
private boolean waitForIndexesRebuild(IgniteEx ignite, long timeout, Collection<String> excludedCacheNames)
throws IgniteInterruptedCheckedException
{
return GridTestUtils.waitForCondition(
() -> ignite.context().cache().publicCaches()
.stream()
.filter(c -> !excludedCacheNames.contains(c.getName()))
.allMatch(c -> c.indexReadyFuture().isDone()),
timeout);
}
/**
* @param ignite Node from which caches will be collected.
* @return {@code Set} of ignite caches that have index rebuild in process.
*/
private Set<IgniteCacheProxy<?, ?>> getActiveRebuildCaches(IgniteEx ignite) {
return ignite.context().cache().publicCaches()
.stream()
.filter(c -> !c.indexReadyFuture().isDone())
.collect(Collectors.toSet());
}
/**
* @param ignite IgniteEx instance.
* @param cacheName Name of checked cache.
* @return newly installed LogListener.
*/
private LogListener installRebuildCheckListener(IgniteEx ignite, String cacheName) {
final MessageOrderLogListener lsnr = new MessageOrderLogListener(
new MessageOrderLogListener.MessageGroup(true)
.add("Started indexes rebuilding for cache \\[name=" + cacheName + ".*")
.add("Finished indexes rebuilding for cache \\[name=" + cacheName + ".*")
);
ListeningTestLogger impl = GridTestUtils.getFieldValue(ignite.log(), "impl");
assertNotNull(impl);
impl.registerListener(lsnr);
return lsnr;
}
/** */
private void removeLogListener(IgniteEx ignite, LogListener lsnr) {
ListeningTestLogger impl = GridTestUtils.getFieldValue(ignite.log(), "impl");
assertNotNull(impl);
impl.unregisterListener(lsnr);
}
/**
* Indexing that blocks index rebuild until status request is completed.
*/
private static class BlockingIndexesRebuildTask extends IndexesRebuildTask {
/** {@inheritDoc} */
@Override protected void startRebuild(GridCacheContext cctx, GridFutureAdapter<Void> fut,
SchemaIndexCacheVisitorClosure clo, SchemaIndexOperationCancellationToken cancel) {
super.startRebuild(cctx, new BlockingRebuildIdxFuture(fut, cctx), clo, cancel);
}
}
/**
* Modified rebuild indexes future which is blocked right before finishing for specific caches.
*/
private static class BlockingRebuildIdxFuture extends GridFutureAdapter<Void> {
/** */
private final GridFutureAdapter<Void> original;
/** */
private final GridCacheContext cctx;
/** */
BlockingRebuildIdxFuture(GridFutureAdapter<Void> original, GridCacheContext cctx) {
this.original = original;
this.cctx = cctx;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
try {
GridFutureAdapter<Void> fut = blockRebuildIdx.get(cctx.name());
if (fut != null) {
fut.onDone();
assertTrue("Failed to wait for indexes rebuild unblocking",
GridTestUtils.waitForCondition(() -> !blockRebuildIdx.containsKey(cctx.name()), 60_000));
}
}
catch (IgniteInterruptedCheckedException e) {
fail("Waiting for indexes rebuild unblocking was interrupted");
}
return original.onDone(res, err);
}
}
/**
* Getting internal index rebuild future for cache.
*
* @param n Node.
* @param cacheId Cache id.
* @return Internal index rebuild future.
*/
@Nullable private SchemaIndexCacheFuture schemaIndexCacheFuture(IgniteEx n, int cacheId) {
IndexesRebuildTask idxRebuild = n.context().indexProcessor().idxRebuild();
Map<Integer, SchemaIndexCacheFuture> idxRebuildFuts = getFieldValue(idxRebuild, "idxRebuildFuts");
return idxRebuildFuts.get(cacheId);
}
/**
* Force rebuilds indices for chosen caches, and waits until rebuild process is complete.
*
* @param cacheNames Cache names need indices to rebuild.
* @param grid Ignite node.
* @throws Exception If failed.
*/
private void forceRebuildIndices(Iterable<String> cacheNames, IgniteEx grid) throws Exception {
String cacheNamesArg = String.join(",", cacheNames);
assertEquals(
EXIT_CODE_OK,
execute(
"--cache", "indexes_force_rebuild",
"--node-id", grid.localNode().id().toString(),
"--cache-names", cacheNamesArg
)
);
waitForIndexesRebuild(grid, getTestTimeout(), Collections.emptyList());
}
}