blob: 790a155190b6b257910c2f25f963aa0ae8030a7b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.internal.cache;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.geode.internal.cache.PartitionedRegion.RETRY_TIMEOUT_PROPERTY;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionedRegionStorageException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.CacheTestCase;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
public class PartitionedRegionHADUnitTest extends CacheTestCase {
private static final String PR_ZERO_REDUNDANCY = "PR_ZeroRedundancy";
private static final String PR_ONE_REDUNDANCY = "PR_SingleRedundancy";
private static final String RETRY_TIMEOUT_VALUE = "20000";
private String regionName;
private String prNamePrefix;
private int numberOfRegions;
private int totalNumBuckets;
private int redundantCopies;
private VM vm0;
private VM vm1;
private VM vm2;
private VM vm3;
private VM datastoreVM0;
private VM datastoreVM2;
private VM accessorVM3;
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
public void setUp() throws Exception {
regionName = getUniqueName();
prNamePrefix = "PR-";
numberOfRegions = 1;
totalNumBuckets = 5;
redundantCopies = 0;
vm0 = getHost(0).getVM(0);
vm1 = getHost(0).getVM(1);
vm2 = getHost(0).getVM(2);
vm3 = getHost(0).getVM(3);
datastoreVM0 = vm0;
datastoreVM2 = vm2;
accessorVM3 = vm3;
* Test to ensure that we have proper bucket failover, with no data loss, in the face of
* sequential cache.close() events.
public void testBucketFailOverDuringCacheClose() throws Exception {
Boolean value = Boolean.TRUE;
vm2.invoke(() -> createPartitionedRegion());
vm3.invoke(() -> createPartitionedRegion());
vm3.invoke(() -> {
Cache cache = getCache();
PartitionedRegion partitionedRegion = (PartitionedRegion) cache.getRegion(regionName);
// Create keys such that all buckets are created, Integer works well
// assuming buckets are allocated on the mod of the key hashCode, x 2 just to be safe
int numEntries = partitionedRegion.getTotalNumberOfBuckets() * 2;
for (int i = numEntries; i >= 0; --i) {
partitionedRegion.put(i, value);
assertThat(partitionedRegion).hasSize(numEntries + 1);
vm3.invoke(() -> validateEntries(value));
vm2.invoke(() -> validateEntries(value));
// origin VM down!
vm2.invoke(() -> getCache().close());
// origin down, but no data loss
vm3.invoke(() -> validateEntries(value));
// get back to the desired redundancy
vm0.invoke(() -> createPartitionedRegion());
// verify no data loss
vm0.invoke(() -> validateEntries(value));
// 2nd oldest VM down!
vm3.invoke(() -> getCache().close());
// 2nd down, but no data loss
vm0.invoke(() -> validateEntries(value));
// get back (for 2nd time) to desired redundancy
vm1.invoke(() -> createPartitionedRegion());
// verify no data loss
vm1.invoke(() -> validateEntries(value));
vm0.invoke(() -> validateEntries(value));
public void testGrabBackupBuckets() throws Exception {
redundantCopies = 1;
datastoreVM0.invoke(() -> createPRsAndAwaitRecovery(200, redundantCopies, totalNumBuckets));
// Do put operations on these 2 PRs asynchronously (this test does not currently do this)
try (IgnoredException ie = addIgnoredException(PartitionedRegionStorageException.class)) {
// At this point redundancy criterion is not meet.
// now if we create PRs on more VMs, it should create those "supposed to
// be redundant" buckets on these nodes, if it can accommodate the data
// (localMaxMemory>0).
datastoreVM2.invoke(() -> createPRsAndAwaitRecovery(200, redundantCopies, totalNumBuckets));
accessorVM3.invoke(() -> {
Cache cache = getCache();
for (int i = 0; i < numberOfRegions; i++) {
String regionName = prNamePrefix + i;
createPartitionedRegion(cache, regionName, 0, redundantCopies, totalNumBuckets);
for (int i = 0; i < numberOfRegions; i++) {
final int whichRegion = i;
int vm2LBRsize =
(Integer) datastoreVM2.invoke(() -> validateLocalBucket2RegionMapSize(whichRegion));
int vm3LBRsize =
(Integer) accessorVM3.invoke(() -> validateLocalBucket2RegionMapSize(whichRegion));
// This would mean that up coming node didn't pick up any buckets
// This accessor should NOT have picked up any buckets.
int vm2B2Nsize = (Integer) datastoreVM2.invoke(() -> validateBucketsOnNode(whichRegion));
* This verifies the Bucket Regions on the basis of redundantCopies set in RegionAttributes.
public void testBucketsScope() throws Exception {
// Create PRs on only 2 VMs
vm0.invoke(() -> createPRs(PR_ZERO_REDUNDANCY, PR_ONE_REDUNDANCY));
vm1.invoke(() -> createPRs(PR_ZERO_REDUNDANCY, PR_ONE_REDUNDANCY));
// Do put operations on these 2 PRs asynchronously (test does not currently do this)
vm0.invoke(() -> {
Cache cache = getCache();
Region<Integer, Integer> regionZeroRedundancy = cache.getRegion(PR_ZERO_REDUNDANCY);
for (int k = 0; k < 10; k++) {
regionZeroRedundancy.put(k, k);
Region<Integer, Integer> regionOneRedundancy = cache.getRegion(PR_ONE_REDUNDANCY);
for (int k = 0; k < 10; k++) {
regionOneRedundancy.put(k, k);
vm0.invoke(() -> validateBucketScope(PR_ZERO_REDUNDANCY, PR_ONE_REDUNDANCY));
vm1.invoke(() -> validateBucketScope(PR_ZERO_REDUNDANCY, PR_ONE_REDUNDANCY));
private void validateBucketScope(String prZeroRedundancy, String prSingleRedundancy) {
Cache cache = getCache();
PartitionedRegion regionZeroRedundancy = (PartitionedRegion) cache.getRegion(prZeroRedundancy);
for (BucketRegion bucket : regionZeroRedundancy.getDataStore().getLocalBucket2RegionMap()
.values()) {
PartitionedRegion regionOneRedundancy = (PartitionedRegion) cache.getRegion(prSingleRedundancy);
for (Region bucket : regionOneRedundancy.getDataStore().getLocalBucket2RegionMap().values()) {
private void createPRs(String prZeroRedundancy, String prOneRedundancy) {
Cache cache = getCache();
// RedundantCopies = 0 , Scope = DISTRIBUTED_ACK
createPartitionedRegion(cache, prZeroRedundancy, 200, 0, totalNumBuckets);
// RedundantCopies > 0 , Scope = DISTRIBUTED_ACK
createPartitionedRegion(cache, prOneRedundancy, 200, 1, totalNumBuckets);
private Object validateBucketsOnNode(int whichRegion) {
int containsNode = 0;
Cache cache = getCache();
PartitionedRegion partitionedRegion =
(PartitionedRegion) cache.getRegion(prNamePrefix + whichRegion);
RegionAdvisor regionAdvisor = partitionedRegion.getRegionAdvisor();
try {
for (int bucketId : regionAdvisor.getBucketSet()) {
Set<InternalDistributedMember> nodeList = regionAdvisor.getBucketOwners(bucketId);
if (nodeList != null && nodeList.contains(partitionedRegion.getMyId())) {
} catch (NoSuchElementException ignored) {
return containsNode;
private Object validateLocalBucket2RegionMapSize(int whichRegion) {
int size = 0;
Cache cache = getCache();
PartitionedRegion partitionedRegion =
(PartitionedRegion) cache.getRegion(prNamePrefix + whichRegion);
PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
if (dataStore != null) {
size = dataStore.getBucketsManaged();
return size;
private void putsInDatastoreVM0() {
Cache cache = getCache();
for (int i = 0; i < numberOfRegions; i++) {
Region<String, String> region = cache.getRegion(prNamePrefix + i);
for (int k = 0; k < 10; k++) {
region.put(i + prNamePrefix + k, prNamePrefix + k);
private void createPRsAndAwaitRecovery(int localMaxMemory, int redundancy, int totalNumBuckets)
throws InterruptedException {
CountDownLatch recoveryDone = new CountDownLatch(numberOfRegions);
ResourceObserver waitForRecovery = new ResourceObserverAdapter() {
public void rebalancingOrRecoveryFinished(Region region) {
String originalValue = setSystemProperty(RETRY_TIMEOUT_PROPERTY, RETRY_TIMEOUT_VALUE);
try {
Cache cache = getCache();
for (int i = 0; i < numberOfRegions; i++) {
String regionName = prNamePrefix + i;
createPartitionedRegion(cache, regionName, localMaxMemory, redundancy, totalNumBuckets);
assertThat(recoveryDone.await(1, MINUTES)).isTrue();
} finally {
setSystemProperty(RETRY_TIMEOUT_PROPERTY, originalValue);
private String setSystemProperty(String property, String value) {
if (value == null) {
return System.clearProperty(property);
} else {
return System.setProperty(property, value);
private void validateEntries(boolean value) {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
for (int i = region.getTotalNumberOfBuckets() * 2; i >= 0; --i) {
private void createPartitionedRegion() throws InterruptedException {
CountDownLatch rebalancingFinished = new CountDownLatch(1);
ResourceObserver waitForRebalancing = new ResourceObserverAdapter() {
public void rebalancingOrRecoveryFinished(Region region) {
try {
Cache cache = getCache();
Region partitionedRegion = createPartitionedRegion(cache, regionName, 20, 1, totalNumBuckets);
assertThat(rebalancingFinished.await(1, MINUTES)).isTrue();
} finally {
private PartitionedRegion createPartitionedRegion(Cache cache, String regionName,
int localMaxMemory, int redundancy, int totalNumBuckets) {
PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION);
return (PartitionedRegion) regionFactory.create(regionName);