blob: 742db8a8909990ad65f613e21a0d450799e40c9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static java.util.Collections.emptySet;
import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.DSClock;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.partitioned.FetchKeysMessage;
import org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
@RunWith(JUnitParamsRunner.class)
@SuppressWarnings({"deprecation", "unchecked", "unused"})
public class PartitionedRegionTest {
private InternalCache cache;
private InternalDistributedSystem system;
private DistributionManager distributionManager;
private InternalResourceManager resourceManager;
private AttributesFactory attributesFactory;
private PartitionedRegion partitionedRegion;
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
@Before
public void setUp() {
system = mock(InternalDistributedSystem.class);
distributionManager = mock(DistributionManager.class);
InternalDistributedMember distributedMember = mock(InternalDistributedMember.class);
InternalResourceManager resourceManager = mock(InternalResourceManager.class);
cache = mock(InternalCache.class);
attributesFactory = new AttributesFactory();
attributesFactory.setPartitionAttributes(
new PartitionAttributesFactory().setTotalNumBuckets(1).setRedundantCopies(1).create());
when(cache.getDistributedSystem())
.thenReturn(system);
when(cache.getInternalDistributedSystem())
.thenReturn(system);
when(cache.getInternalResourceManager())
.thenReturn(resourceManager);
when(distributionManager.getId())
.thenReturn(distributedMember);
when(system.createAtomicStatistics(any(), any()))
.thenReturn(mock(Statistics.class));
when(system.getClock())
.thenReturn(mock(DSClock.class));
when(system.getDistributedMember())
.thenReturn(distributedMember);
when(system.getDistributionManager())
.thenReturn(distributionManager);
partitionedRegion = new PartitionedRegion("regionName", attributesFactory.create(), null,
cache, mock(InternalRegionArguments.class), disabledClock(),
ColocationLoggerFactory.create());
}
private Object[] cacheLoaderAndWriter() {
CacheLoader mockLoader = mock(CacheLoader.class);
CacheWriter mockWriter = mock(CacheWriter.class);
return new Object[] {
new Object[] {mockLoader, null},
new Object[] {null, mockWriter},
new Object[] {mockLoader, mockWriter},
new Object[] {null, null}
};
}
@Test
@Parameters(method = "cacheLoaderAndWriter")
@TestCaseName("{method}(CacheLoader={0}, CacheWriter={1})")
public void verifyPRConfigUpdatedAfterLoaderUpdate(CacheLoader cacheLoader,
CacheWriter cacheWriter) {
// ARRANGE
PartitionRegionConfig partitionRegionConfig = mock(PartitionRegionConfig.class);
Region<String, PartitionRegionConfig> partitionedRegionRoot = mock(LocalRegion.class);
PartitionedRegion.RegionLock regionLock = mock(PartitionedRegion.RegionLock.class);
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
InternalDistributedMember ourMember = spyPartitionedRegion.getDistributionManager().getId();
InternalDistributedMember otherMember1 = mock(InternalDistributedMember.class);
InternalDistributedMember otherMember2 = mock(InternalDistributedMember.class);
Node ourNode = mock(Node.class, "ourNode");
Node otherNode1 = mock(Node.class, "otherNode1");
Node otherNode2 = mock(Node.class, "otherNode2");
when(otherNode1.getMemberId())
.thenReturn(otherMember1);
when(otherNode2.getMemberId())
.thenReturn(otherMember2);
when(ourNode.getMemberId())
.thenReturn(ourMember);
when(ourNode.isCacheLoaderAttached())
.thenReturn(cacheLoader != null);
when(ourNode.isCacheWriterAttached())
.thenReturn(cacheWriter != null);
when(partitionRegionConfig.getNodes())
.thenReturn(asSet(otherNode1, ourNode, otherNode2));
when(partitionedRegionRoot.get(spyPartitionedRegion.getRegionIdentifier()))
.thenReturn(partitionRegionConfig);
when(spyPartitionedRegion.getPRRoot())
.thenReturn(partitionedRegionRoot);
doReturn(cacheLoader)
.when(spyPartitionedRegion).basicGetLoader();
doReturn(cacheWriter)
.when(spyPartitionedRegion).basicGetWriter();
doReturn(regionLock)
.when(spyPartitionedRegion).getRegionLock();
// ACT
spyPartitionedRegion.updatePRNodeInformation();
// ASSERT
assertThat(partitionRegionConfig.getNodes())
.contains(ourNode);
Node verifyOurNode = null;
for (Node node : partitionRegionConfig.getNodes()) {
if (node.getMemberId().equals(ourMember)) {
verifyOurNode = node;
}
}
assertThat(verifyOurNode)
.withFailMessage("Failed to find " + ourMember + " in " + partitionRegionConfig.getNodes())
.isNotNull();
verify(partitionedRegionRoot)
.get(spyPartitionedRegion.getRegionIdentifier());
verify(partitionedRegionRoot)
.put(spyPartitionedRegion.getRegionIdentifier(), partitionRegionConfig);
verify(spyPartitionedRegion)
.updatePRConfig(partitionRegionConfig, false);
assertThat(verifyOurNode.isCacheLoaderAttached())
.isEqualTo(cacheLoader != null);
assertThat(verifyOurNode.isCacheWriterAttached())
.isEqualTo(cacheWriter != null);
}
@Test
public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
// ARRANGE
EntryEventImpl clientEvent = mock(EntryEventImpl.class);
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
when(clientEvent.getOperation())
.thenReturn(Operation.GET_FOR_REGISTER_INTEREST);
int bucketId = 0;
doReturn(primaryMember)
.when(spyPartitionedRegion).getNodeForBucketWrite(eq(bucketId), isNull());
// ACT
InternalDistributedMember memberForRegisterInterestRead =
spyPartitionedRegion.getBucketNodeForReadOrWrite(bucketId, clientEvent);
// ASSERT
assertThat(memberForRegisterInterestRead)
.isSameAs(primaryMember);
verify(spyPartitionedRegion)
.getNodeForBucketWrite(anyInt(), any());
}
@Test
public void getBucketNodeForReadOrWriteReturnsSecondaryNodeForNonRegisterInterest() {
// ARRANGE
EntryEventImpl clientEvent = mock(EntryEventImpl.class);
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
when(clientEvent.getOperation())
.thenReturn(Operation.GET);
int bucketId = 0;
doReturn(secondaryMember)
.when(spyPartitionedRegion).getNodeForBucketRead(eq(bucketId));
// ACT
InternalDistributedMember memberForRegisterInterestRead =
spyPartitionedRegion.getBucketNodeForReadOrWrite(bucketId, clientEvent);
// ASSERT
assertThat(memberForRegisterInterestRead)
.isSameAs(secondaryMember);
verify(spyPartitionedRegion)
.getNodeForBucketRead(anyInt());
}
@Test
public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventIsNotPresent() {
// ARRANGE
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
int bucketId = 0;
doReturn(secondaryMember)
.when(spyPartitionedRegion).getNodeForBucketRead(eq(bucketId));
// ACT
InternalDistributedMember memberForRegisterInterestRead =
spyPartitionedRegion.getBucketNodeForReadOrWrite(bucketId, null);
// ASSERT
assertThat(memberForRegisterInterestRead)
.isSameAs(secondaryMember);
verify(spyPartitionedRegion)
.getNodeForBucketRead(anyInt());
}
@Test
public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventOperationIsNotPresent() {
// ARRANGE
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
int bucketId = 0;
doReturn(secondaryMember)
.when(spyPartitionedRegion).getNodeForBucketRead(eq(bucketId));
// ACT
InternalDistributedMember memberForRegisterInterestRead =
spyPartitionedRegion.getBucketNodeForReadOrWrite(bucketId, null);
// ASSERT
assertThat(memberForRegisterInterestRead)
.isSameAs(secondaryMember);
verify(spyPartitionedRegion)
.getNodeForBucketRead(anyInt());
}
@Test
public void updateBucketMapsForInterestRegistrationWithSetOfKeysFetchesPrimaryBucketsForRead() {
// ARRANGE
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
doReturn(primaryMember)
.when(spyPartitionedRegion).getNodeForBucketWrite(anyInt(), isNull());
HashMap<InternalDistributedMember, HashSet<Integer>> nodeToBuckets = new HashMap<>();
// ACT
spyPartitionedRegion.updateNodeToBucketMap(nodeToBuckets, asSet(0, 1));
// ASSERT
verify(spyPartitionedRegion, times(2))
.getNodeForBucketWrite(anyInt(), isNull());
}
@Test
public void updateBucketMapsForInterestRegistrationWithAllKeysFetchesPrimaryBucketsForRead() {
// ARRANGE
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
doReturn(primaryMember)
.when(spyPartitionedRegion).getNodeForBucketWrite(anyInt(), isNull());
HashMap<InternalDistributedMember, HashMap<Integer, HashSet>> nodeToBuckets = new HashMap<>();
HashMap<Integer, HashSet> bucketKeys = (HashMap) asMapOfSet(0, (HashSet) asSet(0, 1));
// ACT
spyPartitionedRegion.updateNodeToBucketMap(nodeToBuckets, bucketKeys);
// ASSERT
verify(spyPartitionedRegion)
.getNodeForBucketWrite(anyInt(), isNull());
}
@Test
public void filterOutNonParallelGatewaySendersShouldReturnCorrectly() {
// ARRANGE
GatewaySender parallelSender = mock(GatewaySender.class);
GatewaySender anotherParallelSender = mock(GatewaySender.class);
GatewaySender serialSender = mock(GatewaySender.class);
when(parallelSender.isParallel())
.thenReturn(true);
when(parallelSender.getId())
.thenReturn("parallel");
when(anotherParallelSender.isParallel())
.thenReturn(true);
when(anotherParallelSender.getId())
.thenReturn("anotherParallel");
when(serialSender.isParallel())
.thenReturn(false);
when(cache.getAllGatewaySenders())
.thenReturn(asSet(parallelSender, anotherParallelSender, serialSender));
// ACT/ASSERT
assertThat(partitionedRegion.filterOutNonParallelGatewaySenders(asSet("serial")))
.isEmpty();
// ACT/ASSERT
assertThat(partitionedRegion.filterOutNonParallelGatewaySenders(asSet("unknownSender")))
.isEmpty();
// ACT/ASSERT
assertThat(partitionedRegion.filterOutNonParallelGatewaySenders(asSet("parallel", "serial")))
.containsExactly("parallel");
// ACT/ASSERT
assertThat(partitionedRegion
.filterOutNonParallelGatewaySenders(asSet("parallel", "serial", "anotherParallel")))
.containsExactly("parallel", "anotherParallel");
}
@Test
public void filterOutNonParallelAsyncEventQueuesShouldReturnCorrectly() {
// ARRANGE
AsyncEventQueue parallelQueue = mock(AsyncEventQueue.class);
AsyncEventQueue anotherParallelQueue = mock(AsyncEventQueue.class);
AsyncEventQueue serialQueue = mock(AsyncEventQueue.class);
when(parallelQueue.isParallel())
.thenReturn(true);
when(parallelQueue.getId())
.thenReturn(getSenderIdFromAsyncEventQueueId("parallel"));
when(anotherParallelQueue.isParallel())
.thenReturn(true);
when(anotherParallelQueue.getId())
.thenReturn(getSenderIdFromAsyncEventQueueId("anotherParallel"));
when(serialQueue.isParallel())
.thenReturn(false);
when(cache.getAsyncEventQueues())
.thenReturn(asSet(parallelQueue, anotherParallelQueue, serialQueue));
// ACT/ASSERT
assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(asSet("serial")))
.isEmpty();
// ACT/ASSERT
assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(asSet("unknownSender")))
.isEmpty();
// ACT/ASSERT
assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(asSet("parallel", "serial")))
.containsExactly("parallel");
// ACT/ASSERT
assertThat(partitionedRegion
.filterOutNonParallelAsyncEventQueues(asSet("parallel", "serial", "anotherParallel")))
.containsExactly("parallel", "anotherParallel");
}
@Test
public void getLocalSizeDoesNotThrowIfRegionUninitialized() {
// ARRANGE
partitionedRegion = new PartitionedRegion("region", attributesFactory.create(), null, cache,
mock(InternalRegionArguments.class), disabledClock(), ColocationLoggerFactory.create());
// ACT/ASSERT
assertThatCode(partitionedRegion::getLocalSize)
.doesNotThrowAnyException();
}
@Test
public void generatePRIdShouldNotThrowNumberFormatExceptionIfAnErrorOccursWhileReleasingTheLock() {
// ARRANGE
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
DistributedLockService lockService = mock(DistributedLockService.class);
when(system.getDistributionManager().getCancelCriterion())
.thenReturn(mock(CancelCriterion.class));
when(distributionManager.getOtherDistributionManagerIds())
.thenReturn(emptySet());
when(spyPartitionedRegion.getPartitionedRegionLockService())
.thenReturn(lockService);
when(lockService.lock(any(), anyLong(), anyLong()))
.thenReturn(true);
doThrow(new RuntimeException("for test"))
.when(lockService).unlock(any());
// ACT/ASSERT
assertThatCode(() -> spyPartitionedRegion.generatePRId(system))
.doesNotThrowAnyException();
}
@Test
public void getDataRegionForWriteThrowsTransactionExceptionIfNotDataStore() {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
KeyInfo keyInfo = mock(KeyInfo.class);
when(keyInfo.getBucketId()).thenReturn(1);
doReturn(null).when(spyPartitionedRegion).getDataStore();
Throwable caughtException =
catchThrowable(() -> spyPartitionedRegion.getDataRegionForWrite(keyInfo));
assertThat(caughtException).isInstanceOf(TransactionException.class).hasMessage(
"PartitionedRegion Transactions cannot execute on nodes with local max memory zero");
}
@Test
public void getDataRegionForWriteThrowsTransactionDataRebalancedExceptionIfGetInitializedBucketThrowsForceReattemptException()
throws Exception {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
KeyInfo keyInfo = mock(KeyInfo.class);
Object key = new Object();
PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
when(keyInfo.getBucketId()).thenReturn(1);
when(keyInfo.getKey()).thenReturn(key);
when(keyInfo.isCheckPrimary()).thenReturn(true);
doReturn(dataStore).when(spyPartitionedRegion).getDataStore();
doThrow(new ForceReattemptException("")).when(dataStore)
.getInitializedBucketWithKnownPrimaryForId(key, 1);
doReturn(mock(InternalDistributedMember.class)).when(spyPartitionedRegion).createBucket(1, 0,
null);
Throwable caughtException =
catchThrowable(() -> spyPartitionedRegion.getDataRegionForWrite(keyInfo));
assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class)
.hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE);
}
@Test
public void getDataRegionForWriteThrowsTransactionDataRebalancedExceptionIfGetInitializedBucketThrowsRegionDestroyedException()
throws Exception {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
KeyInfo keyInfo = mock(KeyInfo.class);
Object key = new Object();
PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
when(keyInfo.getBucketId()).thenReturn(1);
when(keyInfo.getKey()).thenReturn(key);
doReturn(dataStore).when(spyPartitionedRegion).getDataStore();
doThrow(new RegionDestroyedException("", "")).when(dataStore)
.getInitializedBucketWithKnownPrimaryForId(key, 1);
Throwable caughtException =
catchThrowable(() -> spyPartitionedRegion.getDataRegionForWrite(keyInfo));
assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class)
.hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE);
}
@Test
public void transactionThrowsTransactionDataRebalancedExceptionIfBucketNotFoundException() {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
ForceReattemptException exception = mock(BucketNotFoundException.class);
Throwable caughtException =
catchThrowable(
() -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception));
assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class)
.hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE);
}
@Test
public void transactionThrowsPrimaryBucketExceptionIfForceReattemptExceptionIsCausedByPrimaryBucketException() {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
ForceReattemptException exception = mock(ForceReattemptException.class);
PrimaryBucketException primaryBucketException = new PrimaryBucketException();
when(exception.getCause()).thenReturn(primaryBucketException);
Throwable caughtException =
catchThrowable(
() -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception));
assertThat(caughtException).isSameAs(primaryBucketException);
}
@Test
public void transactionThrowsTransactionDataRebalancedExceptionIfForceReattemptExceptionIsCausedByTransactionDataRebalancedException() {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
ForceReattemptException exception = mock(ForceReattemptException.class);
TransactionDataRebalancedException transactionDataRebalancedException =
new TransactionDataRebalancedException("");
when(exception.getCause()).thenReturn(transactionDataRebalancedException);
Throwable caughtException =
catchThrowable(
() -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception));
assertThat(caughtException).isSameAs(transactionDataRebalancedException);
}
@Test
public void transactionThrowsTransactionDataRebalancedExceptionIfForceReattemptExceptionIsCausedByRegionDestroyedException() {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
ForceReattemptException exception = mock(ForceReattemptException.class);
RegionDestroyedException regionDestroyedException = new RegionDestroyedException("", "");
when(exception.getCause()).thenReturn(regionDestroyedException);
Throwable caughtException =
catchThrowable(
() -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception));
assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class)
.hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE).hasCause(regionDestroyedException);
}
@Test
public void transactionThrowsTransactionDataRebalancedExceptionIfIsAForceReattemptException() {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
ForceReattemptException exception = mock(ForceReattemptException.class);
Throwable caughtException =
catchThrowable(
() -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception));
assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class)
.hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE).hasCause(exception);
}
@Test
public void failuresSavedIfFetchKeysThrows() throws Exception {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
VersionedObjectList values = mock(VersionedObjectList.class);
ServerConnection serverConnection = mock(ServerConnection.class);
Set<Integer> failures = new HashSet<>();
InternalDistributedMember member = mock(InternalDistributedMember.class);
Set<Integer> buckets = new HashSet<>();
buckets.add(1);
doThrow(new ForceReattemptException("")).when(spyPartitionedRegion).getFetchKeysResponse(member,
1);
spyPartitionedRegion.fetchKeysAndValues(values, serverConnection, failures, member, null,
buckets);
verify(spyPartitionedRegion, never()).getValuesForKeys(values, serverConnection, null);
assertThat(failures.contains(1)).isTrue();
}
@Test
public void fetchKeysAndValuesInvokesGetValuesForKeys() throws Exception {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
VersionedObjectList values = mock(VersionedObjectList.class);
ServerConnection serverConnection = mock(ServerConnection.class);
Set<Integer> failures = new HashSet<>();
InternalDistributedMember member = mock(InternalDistributedMember.class);
Set<Integer> buckets = new HashSet<>();
buckets.add(1);
FetchKeysMessage.FetchKeysResponse fetchKeysResponse =
mock(FetchKeysMessage.FetchKeysResponse.class);
doReturn(fetchKeysResponse).when(spyPartitionedRegion).getFetchKeysResponse(member, 1);
Set keys = new HashSet();
when(fetchKeysResponse.waitForKeys()).thenReturn(keys);
doNothing().when(spyPartitionedRegion).getValuesForKeys(values, serverConnection, keys);
spyPartitionedRegion.fetchKeysAndValues(values, serverConnection, failures, member, null,
buckets);
verify(spyPartitionedRegion).getValuesForKeys(values, serverConnection, keys);
assertThat(failures.contains(1)).isFalse();
}
@Test
public void testGetRegionCreateNotification() {
partitionedRegion = new PartitionedRegion("region", attributesFactory.create(), null, cache,
mock(InternalRegionArguments.class), disabledClock(), ColocationLoggerFactory.create());
assertThat(partitionedRegion.isRegionCreateNotified()).isFalse();
partitionedRegion.setRegionCreateNotified(true);
assertThat(partitionedRegion.isRegionCreateNotified()).isTrue();
}
@Test
public void testNotifyRegionCreated() {
partitionedRegion = new PartitionedRegion("region", attributesFactory.create(), null, cache,
mock(InternalRegionArguments.class), disabledClock(), ColocationLoggerFactory.create());
assertThat(partitionedRegion.isRegionCreateNotified()).isFalse();
partitionedRegion.notifyRegionCreated();
assertThat(partitionedRegion.isRegionCreateNotified()).isTrue();
}
private static <K> Set<K> asSet(K... values) {
Set<K> set = new HashSet<>();
Collections.addAll(set, values);
return set;
}
private static <K, V> Map<K, Set<V>> asMapOfSet(K key, V... values) {
Map<K, Set<V>> map = new HashMap<>();
map.put(key, asSet(values));
return map;
}
}