blob: 0744a35435b55acbb49cf44864f37a0dc014cd98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
import org.apache.geode.internal.cache.partitioned.LoadProbe;
import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.partitioned.rebalance.RebalanceDirector;
@RunWith(JUnitParamsRunner.class)
public class PRHARedundancyProviderTest {
private InternalCache cache;
private PartitionedRegion partitionedRegion;
private InternalResourceManager resourceManager;
private PRHARedundancyProvider prHaRedundancyProvider;
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Before
public void setUp() {
cache = mock(InternalCache.class);
partitionedRegion = mock(PartitionedRegion.class);
resourceManager = mock(InternalResourceManager.class);
}
@Test
public void waitForPersistentBucketRecoveryProceedsWhenPersistentBucketRecovererLatchIsNotSet() {
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class));
prHaRedundancyProvider.waitForPersistentBucketRecovery();
}
@Test
public void waitForPersistentBucketRecoveryProceedsAfterLatchCountDown() {
when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true))
.thenReturn(mock(DistributedRegion.class));
when(partitionedRegion.getCache()).thenReturn(cache);
when(partitionedRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
when(partitionedRegion.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class));
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> spy(new ThreadlessPersistentBucketRecoverer(a, b)));
prHaRedundancyProvider.createPersistentBucketRecoverer(1);
prHaRedundancyProvider.getPersistentBucketRecoverer().countDown();
prHaRedundancyProvider.waitForPersistentBucketRecovery();
verify(prHaRedundancyProvider.getPersistentBucketRecoverer()).await();
}
@Test
public void buildPartitionedRegionInfo() {
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class));
when(partitionedRegion.getRedundancyTracker())
.thenReturn(mock(PartitionedRegionRedundancyTracker.class));
when(partitionedRegion.getRedundancyTracker().getActualRedundancy()).thenReturn(33);
when(partitionedRegion.getRedundancyTracker().getLowRedundancyBuckets()).thenReturn(3);
when(partitionedRegion.getRedundantCopies()).thenReturn(12);
when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
when(partitionedRegion.getRegionAdvisor().adviseDataStore()).thenReturn(new HashSet<>());
when(partitionedRegion.getRegionAdvisor().getCreatedBucketsCount()).thenReturn(17);
when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(42);
InternalPRInfo internalPRInfo =
prHaRedundancyProvider.buildPartitionedRegionInfo(false, mock(LoadProbe.class));
assertThat(internalPRInfo.getConfiguredBucketCount()).isEqualTo(42);
assertThat(internalPRInfo.getCreatedBucketCount()).isEqualTo(17);
assertThat(internalPRInfo.getLowRedundancyBucketCount()).isEqualTo(3);
assertThat(internalPRInfo.getConfiguredRedundantCopies()).isEqualTo(12);
assertThat(internalPRInfo.getActualRedundantCopies()).isEqualTo(33);
}
@Test
public void reportsStartupTaskToResourceManager() {
@SuppressWarnings("unchecked")
CompletableFuture<Void> providerStartupTask = mock(CompletableFuture.class);
when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
when(partitionedRegion.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class));
when(partitionedRegion.isDataStore()).thenReturn(true);
when(resourceManager.getExecutor()).thenReturn(mock(ScheduledExecutorService.class));
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class),
PRHARedundancyProviderTest::createRebalanceOp, providerStartupTask);
prHaRedundancyProvider.startRedundancyRecovery();
verify(resourceManager).addStartupTask(same(providerStartupTask));
}
@Test
public void doNotNotReportStartupTaskIfRecoveryDelayIsNegative() {
when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
PartitionAttributes partitionAttributes = mock(PartitionAttributes.class);
when(partitionAttributes.getStartupRecoveryDelay()).thenReturn(-1L);
when(partitionedRegion.getPartitionAttributes()).thenReturn(partitionAttributes);
when(resourceManager.getExecutor()).thenReturn(mock(ScheduledExecutorService.class));
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class),
PRHARedundancyProviderTest::createRebalanceOp, null);
prHaRedundancyProvider.startRedundancyRecovery();
verify(resourceManager, never()).addStartupTask(any());
}
@Test
public void doNotNotReportStartupTaskIfPartitionIsNotDataStore() {
when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
PartitionAttributes partitionAttributes = mock(PartitionAttributes.class);
when(partitionedRegion.getPartitionAttributes()).thenReturn(partitionAttributes);
when(partitionedRegion.isDataStore()).thenReturn(false);
when(resourceManager.getExecutor()).thenReturn(mock(ScheduledExecutorService.class));
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class),
PRHARedundancyProviderTest::createRebalanceOp, null);
prHaRedundancyProvider.startRedundancyRecovery();
verify(resourceManager, never()).addStartupTask(any());
}
@Test
public void doNotNotReportStartupTaskIfExecutorRejectsRebalanceTask() {
when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
PartitionAttributes partitionAttributes = mock(PartitionAttributes.class);
when(partitionedRegion.getPartitionAttributes()).thenReturn(partitionAttributes);
when(partitionedRegion.isDataStore()).thenReturn(true);
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
when(executor.schedule(any(Runnable.class), anyLong(), any()))
.thenThrow(new RejectedExecutionException("Rejected for the test"));
when(resourceManager.getExecutor()).thenReturn(executor);
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class),
PRHARedundancyProviderTest::createRebalanceOp, null);
prHaRedundancyProvider.startRedundancyRecovery();
verify(resourceManager, never()).addStartupTask(any());
}
@Test
public void doNotNotReportStartupTaskIfIsHasShutDown() {
when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
PartitionAttributes partitionAttributes = mock(PartitionAttributes.class);
when(partitionedRegion.getPartitionAttributes()).thenReturn(partitionAttributes);
when(partitionedRegion.isDataStore()).thenReturn(true);
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
when(resourceManager.getExecutor()).thenReturn(executor);
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class),
PRHARedundancyProviderTest::createRebalanceOp, null);
prHaRedundancyProvider.shutdown();
prHaRedundancyProvider.startRedundancyRecovery();
verify(resourceManager, never()).addStartupTask(any());
}
@Test
public void completesStartupTaskWhenRedundancyRecovered() {
DistributedSystem distributedSystem = mock(DistributedSystem.class);
when(distributedSystem.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
InternalCache cache = mock(InternalCache.class);
when(cache.getDistributedSystem()).thenReturn(distributedSystem);
when(partitionedRegion.getGemFireCache()).thenReturn(cache);
when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
when(partitionedRegion.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class));
when(partitionedRegion.isDataStore()).thenReturn(true);
when(partitionedRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class));
ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
when(resourceManager.getExecutor()).thenReturn(executorService);
when(executorService.schedule(any(Runnable.class), anyLong(), any()))
.thenAnswer(runTheRunnable());
@SuppressWarnings("unchecked")
CompletableFuture<Void> providerStartupTask = mock(CompletableFuture.class);
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class),
PRHARedundancyProviderTest::createRebalanceOp, providerStartupTask);
prHaRedundancyProvider.startRedundancyRecovery();
verify(providerStartupTask).complete(any());
}
@Test
@Parameters({"RUNTIME", "CANCEL", "REGION_DESTROYED"})
@TestCaseName("{method}[{index}]: {params}")
public void startTaskCompletesExceptionallyIfExceptionIsThrown(
ExceptionToThrow exceptionToThrow) {
DistributedSystem distributedSystem = mock(DistributedSystem.class);
when(distributedSystem.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
InternalCache cache = mock(InternalCache.class);
when(cache.getDistributedSystem()).thenReturn(distributedSystem);
when(partitionedRegion.getGemFireCache()).thenReturn(cache);
when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
when(partitionedRegion.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class));
when(partitionedRegion.isDataStore()).thenReturn(true);
when(partitionedRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class));
ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
when(resourceManager.getExecutor()).thenReturn(executorService);
when(executorService.schedule(any(Runnable.class), anyLong(), any()))
.thenAnswer(runTheRunnable());
@SuppressWarnings("unchecked")
CompletableFuture<Void> providerStartupTask = mock(CompletableFuture.class);
Exception exception = exceptionToThrow.getException();
prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
(a, b) -> mock(PersistentBucketRecoverer.class),
(a, b, c, d, e) -> createThrowingRebalanceOp(exception), providerStartupTask);
prHaRedundancyProvider.startRedundancyRecovery();
verify(providerStartupTask, never()).complete(any());
verify(providerStartupTask).completeExceptionally(exception);
}
private static PartitionedRegionRebalanceOp createRebalanceOp(PartitionedRegion region,
boolean simulate, RebalanceDirector director, boolean replaceOfflineData,
boolean isRebalance) {
return mock(PartitionedRegionRebalanceOp.class);
}
private static PartitionedRegionRebalanceOp createThrowingRebalanceOp(Exception exception) {
PartitionedRegionRebalanceOp rebalanceOp = mock(PartitionedRegionRebalanceOp.class);
when(rebalanceOp.execute()).thenThrow(exception);
return rebalanceOp;
}
private enum ExceptionToThrow {
RUNTIME(new RuntimeException("Runtime error")),
CANCEL(new CacheClosedException("Cache closed")),
REGION_DESTROYED(new RegionDestroyedException("Region destroyed", SEPARATOR + "Region"));
private final Exception exception;
ExceptionToThrow(Exception e) {
exception = e;
}
Exception getException() {
return exception;
}
}
private static class ThreadlessPersistentBucketRecoverer extends PersistentBucketRecoverer {
ThreadlessPersistentBucketRecoverer(
PRHARedundancyProvider prhaRedundancyProvider, int proxyBuckets) {
super(prhaRedundancyProvider, proxyBuckets);
}
@Override
public void startLoggingThread() {
// do nothing
}
}
private static Answer<?> runTheRunnable() {
return (Answer<Object>) invocation -> {
((Runnable) invocation.getArgument(0)).run();
return null;
};
}
}