blob: 0268f800c284784f81019d739de8151762257347 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.util.VersionedArrayList;
import org.apache.geode.test.fake.Fakes;
@RunWith(JUnitParamsRunner.class)
public class PartitionedRegionTest {
private InternalCache internalCache;
private PartitionedRegion partitionedRegion;
@SuppressWarnings("deprecation")
private AttributesFactory attributesFactory;
@Before
@SuppressWarnings("deprecation")
public void setup() {
internalCache = Fakes.cache();
InternalResourceManager resourceManager =
mock(InternalResourceManager.class, RETURNS_DEEP_STUBS);
when(internalCache.getInternalResourceManager()).thenReturn(resourceManager);
attributesFactory = new AttributesFactory();
attributesFactory.setPartitionAttributes(
new PartitionAttributesFactory().setTotalNumBuckets(1).setRedundantCopies(1).create());
partitionedRegion = new PartitionedRegion("prTestRegion", attributesFactory.create(), null,
internalCache, mock(InternalRegionArguments.class), disabledClock());
DistributedSystem mockDistributedSystem = mock(DistributedSystem.class);
when(internalCache.getDistributedSystem()).thenReturn(mockDistributedSystem);
when(mockDistributedSystem.getProperties()).thenReturn(new Properties());
when(mockDistributedSystem.createAtomicStatistics(any(), any()))
.thenReturn(mock(Statistics.class));
}
@SuppressWarnings("unused")
private Object[] parametersToTestUpdatePRNodeInformation() {
CacheLoader mockLoader = mock(CacheLoader.class);
CacheWriter mockWriter = mock(CacheWriter.class);
return new Object[] {
new Object[] {mockLoader, null, (byte) 0x01},
new Object[] {null, mockWriter, (byte) 0x02},
new Object[] {mockLoader, mockWriter, (byte) 0x03},
new Object[] {null, null, (byte) 0x00}
};
}
@Test
@Parameters(method = "parametersToTestUpdatePRNodeInformation")
public void verifyPRConfigUpdatedAfterLoaderUpdate(CacheLoader mockLoader, CacheWriter mockWriter,
@SuppressWarnings("unused") byte configByte) {
@SuppressWarnings("unchecked")
Region<String, PartitionRegionConfig> prRoot = mock(LocalRegion.class);
PartitionRegionConfig mockConfig = mock(PartitionRegionConfig.class);
PartitionedRegion prSpy = spy(partitionedRegion);
when(prSpy.getPRRoot()).thenReturn(prRoot);
when(prRoot.get(prSpy.getRegionIdentifier())).thenReturn(mockConfig);
InternalDistributedMember ourMember = prSpy.getDistributionManager().getId();
InternalDistributedMember otherMember1 = mock(InternalDistributedMember.class);
InternalDistributedMember otherMember2 = mock(InternalDistributedMember.class);
Node ourNode = mock(Node.class);
Node otherNode1 = mock(Node.class);
Node otherNode2 = mock(Node.class);
when(ourNode.getMemberId()).thenReturn(ourMember);
when(otherNode1.getMemberId()).thenReturn(otherMember1);
when(otherNode2.getMemberId()).thenReturn(otherMember2);
when(ourNode.isCacheLoaderAttached()).thenReturn(mockLoader != null);
when(ourNode.isCacheWriterAttached()).thenReturn(mockWriter != null);
VersionedArrayList prNodes = new VersionedArrayList();
prNodes.add(otherNode1);
prNodes.add(ourNode);
prNodes.add(otherNode2);
when(mockConfig.getNodes()).thenReturn(prNodes.getListCopy());
when(mockConfig.getPartitionAttrs()).thenReturn(mock(PartitionAttributesImpl.class));
doReturn(mockLoader).when(prSpy).basicGetLoader();
doReturn(mockWriter).when(prSpy).basicGetWriter();
PartitionedRegion.RegionLock mockLock = mock(PartitionedRegion.RegionLock.class);
doReturn(mockLock).when(prSpy).getRegionLock();
prSpy.updatePRNodeInformation();
Node verifyOurNode = null;
assertThat(mockConfig.getNodes().contains(ourNode)).isTrue();
for (Node node : mockConfig.getNodes()) {
if (node.getMemberId().equals(ourMember)) {
verifyOurNode = node;
}
}
verify(prRoot).get(prSpy.getRegionIdentifier());
verify(prSpy).updatePRConfig(mockConfig, false);
verify(prRoot).put(prSpy.getRegionIdentifier(), mockConfig);
assertThat(verifyOurNode).isNotNull();
assertThat(verifyOurNode.isCacheLoaderAttached()).isEqualTo(mockLoader != null);
assertThat(verifyOurNode.isCacheWriterAttached()).isEqualTo(mockWriter != null);
}
@Test
public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
int bucketId = 0;
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
EntryEventImpl clientEvent = mock(EntryEventImpl.class);
when(clientEvent.getOperation()).thenReturn(Operation.GET_FOR_REGISTER_INTEREST);
PartitionedRegion spyPR = spy(partitionedRegion);
doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
InternalDistributedMember memberForRegisterInterestRead =
spyPR.getBucketNodeForReadOrWrite(bucketId, clientEvent);
assertThat(memberForRegisterInterestRead).isSameAs(primaryMember);
verify(spyPR, times(1)).getNodeForBucketWrite(anyInt(), any());
}
@Test
public void getBucketNodeForReadOrWriteReturnsSecondaryNodeForNonRegisterInterest() {
int bucketId = 0;
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
EntryEventImpl clientEvent = mock(EntryEventImpl.class);
when(clientEvent.getOperation()).thenReturn(Operation.GET);
PartitionedRegion spyPR = spy(partitionedRegion);
doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
InternalDistributedMember memberForRegisterInterestRead =
spyPR.getBucketNodeForReadOrWrite(bucketId, clientEvent);
assertThat(memberForRegisterInterestRead).isSameAs(secondaryMember);
verify(spyPR, times(1)).getNodeForBucketRead(anyInt());
}
@Test
public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventIsNotPresent() {
int bucketId = 0;
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPR = spy(partitionedRegion);
doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
InternalDistributedMember memberForRegisterInterestRead =
spyPR.getBucketNodeForReadOrWrite(bucketId, null);
assertThat(memberForRegisterInterestRead).isSameAs(secondaryMember);
verify(spyPR, times(1)).getNodeForBucketRead(anyInt());
}
@Test
public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventOperationIsNotPresent() {
int bucketId = 0;
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
EntryEventImpl clientEvent = mock(EntryEventImpl.class);
when(clientEvent.getOperation()).thenReturn(null);
PartitionedRegion spyPR = spy(partitionedRegion);
doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
InternalDistributedMember memberForRegisterInterestRead =
spyPR.getBucketNodeForReadOrWrite(bucketId, null);
assertThat(memberForRegisterInterestRead).isSameAs(secondaryMember);
verify(spyPR, times(1)).getNodeForBucketRead(anyInt());
}
@Test
public void updateBucketMapsForInterestRegistrationWithSetOfKeysFetchesPrimaryBucketsForRead() {
Integer[] bucketIds = new Integer[] {0, 1};
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPR = spy(partitionedRegion);
doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(anyInt(), isNull());
doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(anyInt());
HashMap<InternalDistributedMember, HashSet<Integer>> nodeToBuckets = new HashMap<>();
Set<Integer> buckets = Arrays.stream(bucketIds).collect(Collectors.toCollection(HashSet::new));
spyPR.updateNodeToBucketMap(nodeToBuckets, buckets);
verify(spyPR, times(2)).getNodeForBucketWrite(anyInt(), isNull());
}
@Test
public void updateBucketMapsForInterestRegistrationWithAllKeysFetchesPrimaryBucketsForRead() {
Integer[] bucketIds = new Integer[] {0, 1};
InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
PartitionedRegion spyPR = spy(partitionedRegion);
doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(anyInt(), isNull());
doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(anyInt());
HashMap<InternalDistributedMember, HashMap<Integer, HashSet>> nodeToBuckets = new HashMap<>();
HashSet buckets = Arrays.stream(bucketIds).collect(Collectors.toCollection(HashSet::new));
HashMap<Integer, HashSet> bucketKeys = new HashMap<>();
bucketKeys.put(0, buckets);
spyPR.updateNodeToBucketMap(nodeToBuckets, bucketKeys);
verify(spyPR, times(1)).getNodeForBucketWrite(anyInt(), isNull());
}
@Test
public void filterOutNonParallelGatewaySendersShouldReturnCorrectly() {
GatewaySender parallelSender = mock(GatewaySender.class);
when(parallelSender.isParallel()).thenReturn(true);
when(parallelSender.getId()).thenReturn("parallel");
GatewaySender anotherParallelSender = mock(GatewaySender.class);
when(anotherParallelSender.isParallel()).thenReturn(true);
when(anotherParallelSender.getId()).thenReturn("anotherParallel");
GatewaySender serialSender = mock(GatewaySender.class);
when(serialSender.isParallel()).thenReturn(false);
when(serialSender.getId()).thenReturn("serial");
Set<GatewaySender> mockSenders =
Stream.of(parallelSender, anotherParallelSender, serialSender).collect(Collectors.toSet());
when(internalCache.getAllGatewaySenders()).thenReturn(mockSenders);
assertThat(partitionedRegion
.filterOutNonParallelGatewaySenders(Stream.of("serial").collect(Collectors.toSet())))
.isEmpty();
assertThat(partitionedRegion
.filterOutNonParallelGatewaySenders(Stream.of("unknownSender").collect(Collectors.toSet())))
.isEmpty();
assertThat(partitionedRegion.filterOutNonParallelGatewaySenders(
Stream.of("parallel", "serial").collect(Collectors.toSet()))).isNotEmpty()
.containsExactly("parallel");
assertThat(partitionedRegion.filterOutNonParallelGatewaySenders(
Stream.of("parallel", "serial", "anotherParallel").collect(Collectors.toSet())))
.isNotEmpty().containsExactly("parallel", "anotherParallel");
}
@Test
public void filterOutNonParallelAsyncEventQueuesShouldReturnCorrectly() {
AsyncEventQueue parallelQueue = mock(AsyncEventQueue.class);
when(parallelQueue.isParallel()).thenReturn(true);
when(parallelQueue.getId()).thenReturn(getSenderIdFromAsyncEventQueueId("parallel"));
AsyncEventQueue anotherParallelQueue = mock(AsyncEventQueue.class);
when(anotherParallelQueue.isParallel()).thenReturn(true);
when(anotherParallelQueue.getId())
.thenReturn(getSenderIdFromAsyncEventQueueId("anotherParallel"));
AsyncEventQueue serialQueue = mock(AsyncEventQueue.class);
when(serialQueue.isParallel()).thenReturn(false);
when(serialQueue.getId()).thenReturn(getSenderIdFromAsyncEventQueueId("serial"));
Set<AsyncEventQueue> mockQueues =
Stream.of(parallelQueue, anotherParallelQueue, serialQueue).collect(Collectors.toSet());
when(internalCache.getAsyncEventQueues()).thenReturn(mockQueues);
assertThat(partitionedRegion
.filterOutNonParallelAsyncEventQueues(Stream.of("serial").collect(Collectors.toSet())))
.isEmpty();
assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(
Stream.of("unknownSender").collect(Collectors.toSet()))).isEmpty();
assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(
Stream.of("parallel", "serial").collect(Collectors.toSet()))).isNotEmpty()
.containsExactly("parallel");
assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(
Stream.of("parallel", "serial", "anotherParallel").collect(Collectors.toSet())))
.isNotEmpty().containsExactly("parallel", "anotherParallel");
}
@Test
public void getLocalSizeDoesNotThrowIfRegionUninitialized() {
partitionedRegion = new PartitionedRegion("region", attributesFactory.create(), null,
internalCache, mock(InternalRegionArguments.class), disabledClock());
assertThatCode(partitionedRegion::getLocalSize).doesNotThrowAnyException();
}
@Test
// See GEODE-7106
public void generatePRIdShouldNotThrowNumberFormatExceptionIfAnErrorOccursWhileReleasingTheLock() {
PartitionedRegion prSpy = spy(partitionedRegion);
DistributedLockService mockLockService = mock(DistributedLockService.class);
doReturn(true).when(mockLockService).lock(any(), anyLong(), anyLong());
doThrow(new RuntimeException("Mock Exception")).when(mockLockService).unlock(any());
InternalDistributedSystem mockSystem = mock(InternalDistributedSystem.class);
when(mockSystem.getDistributionManager()).thenReturn(mock(DistributionManager.class));
when(mockSystem.getDistributionManager().getCancelCriterion())
.thenReturn(mock(CancelCriterion.class));
when(mockSystem.getDistributionManager().getOtherDistributionManagerIds())
.thenReturn(Collections.emptySet());
doReturn(mockLockService).when(prSpy).getPartitionedRegionLockService();
assertThatCode(() -> prSpy.generatePRId(mockSystem)).doesNotThrowAnyException();
}
}