blob: 3159c919399a7a7fa662c2bcd62fab0e00dd8fd1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed.gc;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.GarbageCollector;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;
/**
* For testing {@link MvGc}.
*/
@ExtendWith(ConfigurationExtension.class)
public class MvGcTest extends BaseIgniteAbstractTest {
private static final int PARTITION_ID = 0;
private MvGc gc;
private final AtomicInteger nextTableId = new AtomicInteger(1001);
@InjectConfiguration("mock.threads = 1")
private GcConfiguration gcConfig;
private final TestLowWatermark lowWatermark = new TestLowWatermark();
@BeforeEach
void setUp() {
gc = new MvGc("test", gcConfig, lowWatermark);
gc.start();
}
@AfterEach
void tearDown() throws Exception {
closeAllManually(gc);
}
@Test
void testAddStorageWithoutLowWatermark() {
CompletableFuture<Void> invokeVacuumMethodFuture = new CompletableFuture<>();
gc.addStorage(createTablePartitionId(), createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, null));
// We expect that GcUpdateHandler#vacuum will not be called.
assertThat(invokeVacuumMethodFuture, willTimeoutFast());
}
@Test
void testAddStorageWithLowWatermark() {
HybridTimestamp lowWatermark = new HybridTimestamp(1, 1);
this.lowWatermark.updateAndNotify(lowWatermark);
CompletableFuture<Void> invokeVacuumMethodFuture = new CompletableFuture<>();
gc.addStorage(createTablePartitionId(), createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, lowWatermark));
// We expect GcUpdateHandler#vacuum to be called with the set low watermark.
assertThat(invokeVacuumMethodFuture, willCompleteSuccessfully());
}
@Test
void testStartVacuumOnSuccessfulUpdateLowWatermark() {
CompletableFuture<Void> invokeVacuumMethodFuture0 = new CompletableFuture<>();
CompletableFuture<Void> invokeVacuumMethodFuture1 = new CompletableFuture<>();
HybridTimestamp lowWatermark0 = new HybridTimestamp(1, 1);
GcUpdateHandler gcUpdateHandler0 = createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, lowWatermark0);
GcUpdateHandler gcUpdateHandler1 = createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture1, lowWatermark0);
gc.addStorage(createTablePartitionId(), gcUpdateHandler0);
gc.addStorage(createTablePartitionId(), gcUpdateHandler1);
lowWatermark.updateAndNotify(lowWatermark0);
// We expect GcUpdateHandler#vacuum to be called with the set lowWatermark0.
assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
assertThat(invokeVacuumMethodFuture1, willCompleteSuccessfully());
// What happens if we increase low watermark ?
CompletableFuture<Void> invokeVacuumMethodFuture2 = new CompletableFuture<>();
CompletableFuture<Void> invokeVacuumMethodFuture3 = new CompletableFuture<>();
HybridTimestamp lowWatermark1 = new HybridTimestamp(2, 2);
completeFutureOnVacuum(gcUpdateHandler0, invokeVacuumMethodFuture2, lowWatermark1);
completeFutureOnVacuum(gcUpdateHandler1, invokeVacuumMethodFuture3, lowWatermark1);
lowWatermark.updateAndNotify(lowWatermark1);
// We expect GcUpdateHandler#vacuum to be called with the set lowWatermark0.
assertThat(invokeVacuumMethodFuture2, willCompleteSuccessfully());
assertThat(invokeVacuumMethodFuture3, willCompleteSuccessfully());
}
@Test
void testStartVacuumOnFailUpdateLowWatermark() {
HybridTimestamp firstLowWatermark = new HybridTimestamp(2, 2);
CompletableFuture<Void> invokeVacuumMethodFuture0 = new CompletableFuture<>();
CompletableFuture<Void> invokeVacuumMethodFuture1 = new CompletableFuture<>();
GcUpdateHandler gcUpdateHandler0 = createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, firstLowWatermark);
GcUpdateHandler gcUpdateHandler1 = createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture1, firstLowWatermark);
gc.addStorage(createTablePartitionId(), gcUpdateHandler0);
gc.addStorage(createTablePartitionId(), gcUpdateHandler1);
lowWatermark.updateAndNotify(firstLowWatermark);
// We expect GcUpdateHandler#vacuum to be called with the set lowWatermark0.
assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
assertThat(invokeVacuumMethodFuture1, willCompleteSuccessfully());
// What happens if we try set same low watermark ?
HybridTimestamp sameLowWatermark = new HybridTimestamp(2, 2);
CompletableFuture<Void> invokeVacuumMethodFutureForSame0 = new CompletableFuture<>();
CompletableFuture<Void> invokeVacuumMethodFutureForSame1 = new CompletableFuture<>();
completeFutureOnVacuum(gcUpdateHandler0, invokeVacuumMethodFutureForSame0, sameLowWatermark);
completeFutureOnVacuum(gcUpdateHandler1, invokeVacuumMethodFutureForSame1, sameLowWatermark);
lowWatermark.updateAndNotify(sameLowWatermark);
// We expect that GcUpdateHandler#vacuum will not be called.
assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
assertThat(invokeVacuumMethodFutureForSame1, willTimeoutFast());
// What happens if we try set same lower watermark ?
HybridTimestamp lowerLowWatermark = new HybridTimestamp(1, 1);
CompletableFuture<Void> invokeVacuumMethodFutureForLower0 = new CompletableFuture<>();
CompletableFuture<Void> invokeVacuumMethodFutureForLower1 = new CompletableFuture<>();
completeFutureOnVacuum(gcUpdateHandler0, invokeVacuumMethodFutureForLower0, lowerLowWatermark);
completeFutureOnVacuum(gcUpdateHandler1, invokeVacuumMethodFutureForLower1, lowerLowWatermark);
lowWatermark.updateAndNotify(lowerLowWatermark);
// We expect that GcUpdateHandler#vacuum will not be called.
assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
assertThat(invokeVacuumMethodFutureForSame1, willTimeoutFast());
assertThat(invokeVacuumMethodFutureForLower0, willTimeoutFast());
assertThat(invokeVacuumMethodFutureForLower1, willTimeoutFast());
}
@Test
void testCountInvokeVacuum() throws Exception {
CountDownLatch latch = new CountDownLatch(gcConfig.value().batchSize() + 2);
GcUpdateHandler gcUpdateHandler = createWithCountDownOnVacuum(latch);
gc.addStorage(createTablePartitionId(), gcUpdateHandler);
lowWatermark.updateAndNotify(new HybridTimestamp(2, 2));
assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
}
@Test
void testRemoveStorageNotExist() {
assertThat(gc.removeStorage(createTablePartitionId()), willCompleteSuccessfully());
}
@Test
void testRemoveStorageForCompletedGc() {
CompletableFuture<Void> invokeVacuumMethodFuture = new CompletableFuture<>();
TablePartitionId tablePartitionId = createTablePartitionId();
gc.addStorage(tablePartitionId, createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, null));
lowWatermark.updateAndNotify(new HybridTimestamp(1, 1));
assertThat(invokeVacuumMethodFuture, willCompleteSuccessfully());
assertThat(gc.removeStorage(tablePartitionId), willCompleteSuccessfully());
// What happens if we delete it again?
assertThat(gc.removeStorage(tablePartitionId), willCompleteSuccessfully());
}
@Test
void testRemoveStorageInMiddleGc() {
CompletableFuture<Void> startInvokeVacuumMethodFuture = new CompletableFuture<>();
CompletableFuture<Void> finishInvokeVacuumMethodFuture = new CompletableFuture<>();
TablePartitionId tablePartitionId = createTablePartitionId();
gc.addStorage(tablePartitionId, createWithWaitFinishVacuum(startInvokeVacuumMethodFuture, finishInvokeVacuumMethodFuture));
lowWatermark.updateAndNotify(new HybridTimestamp(1, 1));
assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
CompletableFuture<Void> removeStorageFuture = gc.removeStorage(tablePartitionId);
assertThat(removeStorageFuture, willTimeoutFast());
finishInvokeVacuumMethodFuture.complete(null);
assertThat(removeStorageFuture, willCompleteSuccessfully());
}
@Test
void testRemoveStorageWithError() {
CompletableFuture<Void> startInvokeVacuumMethodFuture = new CompletableFuture<>();
CompletableFuture<Void> finishInvokeVacuumMethodFuture = new CompletableFuture<>();
TablePartitionId tablePartitionId = createTablePartitionId();
gc.addStorage(tablePartitionId, createWithWaitFinishVacuum(startInvokeVacuumMethodFuture, finishInvokeVacuumMethodFuture));
lowWatermark.updateAndNotify(new HybridTimestamp(1, 1));
assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
CompletableFuture<Void> removeStorageFuture = gc.removeStorage(tablePartitionId);
assertThat(removeStorageFuture, willTimeoutFast());
finishInvokeVacuumMethodFuture.completeExceptionally(new IllegalStateException("from test"));
assertThat(removeStorageFuture, willThrowFast(IllegalStateException.class));
}
@Test
void testRemoveStorage() {
CompletableFuture<Void> invokeVacuumMethodFuture0 = new CompletableFuture<>();
TablePartitionId tablePartitionId = createTablePartitionId();
GcUpdateHandler gcUpdateHandler = createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, null);
gc.addStorage(tablePartitionId, gcUpdateHandler);
lowWatermark.updateAndNotify(new HybridTimestamp(1, 1));
assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
assertThat(gc.removeStorage(tablePartitionId), willCompleteSuccessfully());
// What happens if we update the low watermark?
CompletableFuture<Void> invokeVacuumMethodFuture1 = new CompletableFuture<>();
completeFutureOnVacuum(gcUpdateHandler, invokeVacuumMethodFuture1, null);
assertThat(invokeVacuumMethodFuture1, willTimeoutFast());
}
@Test
void testClose() throws Exception {
gc.close();
assertThrowsClosed(() -> gc.addStorage(createTablePartitionId(), createGcUpdateHandler()));
assertThrowsClosed(() -> gc.removeStorage(createTablePartitionId()));
assertDoesNotThrow(gc::close);
}
@Test
void testParallelUpdateLowWatermark(@InjectConfiguration GcConfiguration gcConfig) throws Exception {
// By default, in the tests we work in one thread, we don’t have enough this, we will add more.
assertThat(gcConfig.threads().update(Runtime.getRuntime().availableProcessors()), willCompleteSuccessfully());
gc.close();
gc = new MvGc("test", gcConfig, lowWatermark);
gc.start();
lowWatermark.updateAndNotify(new HybridTimestamp(1, 1));
for (int i = 0; i < 100; i++) {
CountDownLatch latch = new CountDownLatch(5);
TablePartitionId tablePartitionId = createTablePartitionId();
gc.addStorage(tablePartitionId, createWithCountDownOnVacuumWithoutNextBatch(latch));
runRace(
() -> gc.scheduleGcForAllStorages(),
() -> gc.scheduleGcForAllStorages(),
() -> gc.scheduleGcForAllStorages(),
() -> gc.scheduleGcForAllStorages()
);
// We will check that we will call the vacuum on each update of the low watermark.
assertTrue(latch.await(200, TimeUnit.MILLISECONDS), "remaining=" + latch.getCount());
assertThat(gc.removeStorage(tablePartitionId), willCompleteSuccessfully());
}
}
@Test
void testInvokeVacuumOnlyAfterReachSafeTime() {
CompletableFuture<Void> invokeVacuumMethodFuture = new CompletableFuture<>();
HybridTimestamp lvm = new HybridTimestamp(10, 10);
GcUpdateHandler gcUpdateHandler = createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, lvm);
PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker = spy(new PendingComparableValuesTracker<>(
HybridTimestamp.MIN_VALUE
));
when(gcUpdateHandler.getSafeTimeTracker()).thenReturn(safeTimeTracker);
gc.addStorage(createTablePartitionId(), gcUpdateHandler);
// Let's update the low watermark and see that we didn't start the garbage collection because we didn't reach the safe time.
lowWatermark.updateAndNotify(lvm);
assertThat(invokeVacuumMethodFuture, willTimeoutFast());
verify(safeTimeTracker).waitFor(lvm);
// Update the safe time to be equal to the low watermark and make sure the garbage collection starts.
safeTimeTracker.update(lvm, null);
assertThat(invokeVacuumMethodFuture, willSucceedFast());
}
private TablePartitionId createTablePartitionId() {
return new TablePartitionId(nextTableId.getAndIncrement(), PARTITION_ID);
}
private GcUpdateHandler createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable HybridTimestamp exp) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
completeFutureOnVacuum(gcUpdateHandler, future, exp);
return gcUpdateHandler;
}
private void completeFutureOnVacuum(
GcUpdateHandler gcUpdateHandler,
CompletableFuture<Void> future,
@Nullable HybridTimestamp exp
) {
when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), eq(true))).then(invocation -> {
if (exp != null) {
try {
assertEquals(exp, invocation.getArgument(0));
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
} else {
future.complete(null);
}
return false;
});
}
private GcUpdateHandler createWithCountDownOnVacuum(CountDownLatch latch) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), eq(true))).then(invocation -> {
latch.countDown();
return latch.getCount() > 0;
});
return gcUpdateHandler;
}
private GcUpdateHandler createWithWaitFinishVacuum(CompletableFuture<Void> startFuture, CompletableFuture<Void> finishFuture) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), eq(true))).then(invocation -> {
startFuture.complete(null);
finishFuture.get(1, TimeUnit.SECONDS);
return false;
});
return gcUpdateHandler;
}
private static void assertThrowsClosed(Executable executable) {
IgniteInternalException exception = assertThrows(IgniteInternalException.class, executable);
assertEquals(GarbageCollector.CLOSED_ERR, exception.code());
}
private GcUpdateHandler createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), eq(true))).then(invocation -> {
latch.countDown();
// So that there is no processing of the next batch.
return false;
});
return gcUpdateHandler;
}
private static GcUpdateHandler createGcUpdateHandler() {
GcUpdateHandler gcUpdateHandler = mock(GcUpdateHandler.class);
when(gcUpdateHandler.getSafeTimeTracker()).thenReturn(new PendingComparableValuesTracker<>(HybridTimestamp.MAX_VALUE));
return gcUpdateHandler;
}
}