blob: 1695cf0cec94f71d1a07035a9f1c27f90ae7dd3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache30.CertifiableTestCacheListener;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.CacheTestCase;
/**
* Tests for PartitionedRegion cleanup on Node Failure through Membership listener.<br>
*
* (1) testMetaDataCleanupOnSinglePRNodeFail - Test for PartitionedRegion metadata cleanup for
* single failed node.<br>
*
* (2) testMetaDataCleanupOnMultiplePRNodeFail - Test for PartitionedRegion metadata cleanup for
* multiple failed nodes.
*/
public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends CacheTestCase {
private static final int VM_COUNT = 4;
private VM[] vms;
private String uniqueName;
private int localMaxMemory;
private int redundancy;
private int recoveryDelay;
@Before
public void setUp() {
vms = new VM[4];
for (int i = 0; i < 4; i++) {
vms[i] = getHost(0).getVM(i);
}
uniqueName = getUniqueName();
localMaxMemory = 200;
redundancy = 1;
recoveryDelay = -1;
}
/**
* Test for PartitionedRegion metadata cleanup for single node failure.<br>
*
* (1)Creates 4 Vms<br>
*
* (2)Randomly create different number of PartitionedRegion on all 4 VMs<br>
*
* (3)Disconnect vm0 from the distributed system<br>
*
* (4) Validate Failed node's config metadata<br>
*
* (5) Validate Failed node's bucket2Node Region metadata.
*/
@Test
public void testMetaDataCleanupOnSinglePRNodeFail() throws Exception {
createPartitionRegionsInAllVMs(uniqueName, 4, localMaxMemory, recoveryDelay, redundancy);
// Add a listener to the config meta data
addConfigListenerInAllVMs();
// disconnect vm0.
InternalDistributedMember member = vms[0].invoke(this::disconnect);
// validate that the metadata clean up is done at all the VM's.
vms[1].invoke(() -> validateNodeFailMetaDataCleanUp(member));
vms[2].invoke(() -> validateNodeFailMetaDataCleanUp(member));
vms[3].invoke(() -> validateNodeFailMetaDataCleanUp(member));
// validate that bucket2Node clean up is done at all the VM's.
vms[1].invoke(() -> validateNodeFailBucket2NodeCleanUp(member));
vms[2].invoke(() -> validateNodeFailBucket2NodeCleanUp(member));
vms[3].invoke(() -> validateNodeFailBucket2NodeCleanUp(member));
}
/**
* Test for PartitionedRegion metadata cleanup for multiple node failure.<br>
*
* (1)Creates 4 Vms<br>
*
* (2)Randomly create different number of PartitionedRegion on all 4 VMs<br>
*
* (3) Disconnect vm0 and vm1 from the distributed system<br>
*
* (4) Validate all Failed node's config metadata<br>
*
* (5) Validate all Failed node's bucket2Node Region metadata.
*/
@Test
public void testMetaDataCleanupOnMultiplePRNodeFail() throws Exception {
createPartitionRegionsInAllVMs(uniqueName, 4, localMaxMemory, recoveryDelay, redundancy);
addConfigListenerInAllVMs();
// disconnect vm0
InternalDistributedMember member0 = vms[0].invoke(this::disconnect);
// validate that the metadata clean up is done at all the VM's for first failed node.
vms[1].invoke(() -> validateNodeFailMetaDataCleanUp(member0));
vms[2].invoke(() -> validateNodeFailMetaDataCleanUp(member0));
vms[3].invoke(() -> validateNodeFailMetaDataCleanUp(member0));
// validate that bucket2Node clean up is done at all the VM's for all failed nodes.
vms[1].invoke(() -> validateNodeFailBucket2NodeCleanUp(member0));
vms[2].invoke(() -> validateNodeFailBucket2NodeCleanUp(member0));
vms[3].invoke(() -> validateNodeFailBucket2NodeCleanUp(member0));
// Clear state of listener, skipping the vms[0] which was disconnected
vms[1].invoke(this::clearConfigListenerState);
vms[2].invoke(this::clearConfigListenerState);
vms[3].invoke(this::clearConfigListenerState);
// disconnect vm1
InternalDistributedMember dsMember2 = vms[1].invoke(this::disconnect);
// validate that the metadata clean up is done at all the VM's for first failed node.
vms[2].invoke(() -> validateNodeFailMetaDataCleanUp(member0));
vms[3].invoke(() -> validateNodeFailMetaDataCleanUp(member0));
// validate that the metadata clean up is done at all the VM's for second failed node.
vms[2].invoke(() -> validateNodeFailMetaDataCleanUp(dsMember2));
vms[3].invoke(() -> validateNodeFailMetaDataCleanUp(dsMember2));
vms[2].invoke(() -> validateNodeFailBucket2NodeCleanUp(dsMember2));
vms[3].invoke(() -> validateNodeFailBucket2NodeCleanUp(dsMember2));
}
/**
* Test for peer recovery of buckets when a member is removed from the distributed system
*/
@Test
public void testRecoveryOfSingleMemberFailure() throws Exception {
localMaxMemory = 20;
redundancy = 2;
recoveryDelay = 0;
// Create PR in all VMs
createPartitionRegionsInAllVMs(uniqueName, 1, localMaxMemory, recoveryDelay, redundancy);
// Create some buckets, pick one and get one of the members hosting it
DistributedMember bucketHost = vms[0].invoke(() -> {
PartitionedRegion region = (PartitionedRegion) getCache().getRegion(uniqueName + "0");
// Create some buckets
for (int k = 0; region.getRegionAdvisor().getBucketSet().size() < 2; k++) {
assertThat(k).isLessThanOrEqualTo(region.getTotalNumberOfBuckets());
region.put(k, k);
}
// Grab a bucket id
int bucketId = region.getRegionAdvisor().getBucketSet().iterator().next();
assertThat(bucketId).isNotNull();
// Find a host for the bucket
Set<InternalDistributedMember> bucketOwners =
region.getRegionAdvisor().getBucketOwners(bucketId);
assertThat(bucketOwners).hasSize(redundancy + 1);
InternalDistributedMember bucketOwner = bucketOwners.iterator().next();
assertThat(bucketOwner).isNotNull();
return bucketOwner;
});
assertThat(bucketHost).isNotNull();
// Disconnect the selected host
Map<VM, Boolean> stillHasDS = invokeInEveryVM(() -> {
if (getSystem().getDistributedMember().equals(bucketHost)) {
disconnectFromDS();
return FALSE;
}
return TRUE;
});
// Wait for each member to finish recovery of redundancy for the selected bucket
int count = 0;
for (int i = 0; i < 4; i++) {
if (awaitRedundancyRecovery(stillHasDS, vms[i])) {
count++;
}
}
assertThat(count).isEqualTo(3);
// Validate all buckets have proper redundancy
for (int i = 0; i < 4; i++) {
validateBucketRedundancy(stillHasDS, vms[i]);
}
}
private boolean awaitRedundancyRecovery(Map<VM, Boolean> stillHasDS, VM vm) {
// only wait on the remaining VMs
if (stillHasDS.get(vm)) {
vm.invoke(() -> {
Region region = getCache().getRegion(uniqueName + "0");
assertTrue(region instanceof PartitionedRegion);
PartitionedRegion partitionedRegion = (PartitionedRegion) region;
PartitionedRegionStats prs = partitionedRegion.getPrStats();
// Wait for recovery
await()
.untilAsserted(() -> assertThat(prs.getLowRedundancyBucketCount()).isEqualTo(0));
});
return true;
} else {
return false;
}
}
private void validateBucketRedundancy(Map<VM, Boolean> stillHasDS, VM vm) {
// only validate buckets on remaining VMs
if (stillHasDS.get(vm)) {
vm.invoke(() -> {
PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(uniqueName + "0");
for (int bucketId : pr.getRegionAdvisor().getBucketSet()) {
assertThatBucketHasRedundantCopies(pr, bucketId);
}
});
}
}
private void assertThatBucketHasRedundantCopies(PartitionedRegion pr, int bucketId) {
boolean forceReattempt;
do {
forceReattempt = false;
try {
List owners = pr.getBucketOwnersForValidation(bucketId);
assertThat(owners).hasSize(pr.getRedundantCopies() + 1);
} catch (ForceReattemptException ignored) {
forceReattempt = true;
}
} while (forceReattempt);
}
private void clearConfigListenerState() {
Region rootReg = PartitionedRegionHelper.getPRRoot(getCache());
CacheListener[] cls = rootReg.getAttributes().getCacheListeners();
assertThat(cls).hasSize(2);
CertifiableTestCacheListener ctcl = (CertifiableTestCacheListener) cls[1];
ctcl.clearState();
}
private void validateNodeFailMetaDataCleanUp(final DistributedMember expectedMember) {
Region rootReg = PartitionedRegionHelper.getPRRoot(getCache());
CacheListener[] cls = rootReg.getAttributes().getCacheListeners();
assertThat(cls).hasSize(2);
CertifiableTestCacheListener ctcl = (CertifiableTestCacheListener) cls[1];
for (Object regionNameObject : rootReg.keySet()) {
String regionName = (String) regionNameObject;
ctcl.waitForUpdated(regionName);
Object prConfigObject = rootReg.get(regionName);
if (prConfigObject != null) {
PartitionRegionConfig prConfig = (PartitionRegionConfig) prConfigObject;
Set<Node> nodes = prConfig.getNodes();
for (Node node : nodes) {
DistributedMember member = node.getMemberId();
assertThat(member).isNotEqualTo(expectedMember);
}
}
}
}
private void validateNodeFailBucket2NodeCleanUp(final InternalDistributedMember member) {
getCache();
Map prIdToPR = PartitionedRegion.getPrIdToPR();
for (Object prRegionObject : prIdToPR.values()) {
if (prRegionObject == PartitionedRegion.PRIdMap.DESTROYED) {
continue;
}
PartitionedRegion prRegion = (PartitionedRegion) prRegionObject;
for (int bucketId : prRegion.getRegionAdvisor().getBucketSet()) {
Set<InternalDistributedMember> bucketOwners =
prRegion.getRegionAdvisor().getBucketOwners(bucketId);
assertThat(bucketOwners).doesNotContain(member);
}
}
}
private InternalDistributedMember disconnect() {
InternalDistributedMember member = getCache().getMyId();
getCache().close();
return member;
}
private void createPartitionRegionsInAllVMs(final String regionNamePrefix,
final int numberOfRegions, final int localMaxMemory, final int recoveryDelay,
final int redundancy) {
for (int count = 0; count < VM_COUNT; count++) {
VM vm = vms[count];
vm.invoke(() -> {
createPartitionRegions(regionNamePrefix, numberOfRegions, localMaxMemory, recoveryDelay,
redundancy);
});
}
}
private void createPartitionRegions(final String regionNamePrefix, final int numberOfRegions,
final int localMaxMemory, final int recoveryDelay, final int redundancy) {
for (int i = 0; i < numberOfRegions; i++) {
PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
partitionAttributesFactory.setLocalMaxMemory(localMaxMemory);
partitionAttributesFactory.setRedundantCopies(redundancy);
partitionAttributesFactory.setRecoveryDelay(recoveryDelay);
RegionFactory regionFactory = getCache().createRegionFactory(RegionShortcut.PARTITION);
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
regionFactory.create(regionNamePrefix + i);
}
}
private void addConfigListenerInAllVMs() {
for (int count = 0; count < 4; count++) {
VM vm = vms[count];
vm.invoke(this::addConfigListener);
}
}
private void addConfigListener() {
Region partitionedRootRegion = PartitionedRegionHelper.getPRRoot(getCache());
partitionedRootRegion.getAttributesMutator()
.addCacheListener(new CertifiableTestCacheListener());
}
}