blob: a80cb9bdcbac800ab39b722a25480a59f81dd1c2 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.control;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheLoader;
import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskStore;
import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.LoaderHelper;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
import com.gemstone.gemfire.cache.control.RebalanceOperation;
import com.gemstone.gemfire.cache.control.RebalanceResults;
import com.gemstone.gemfire.cache.control.ResourceManager;
import com.gemstone.gemfire.cache.partition.PartitionMemberInfo;
import com.gemstone.gemfire.cache.partition.PartitionRebalanceInfo;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.cache.partition.PartitionRegionInfo;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.ColocationHelper;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
import dunit.AsyncInvocation;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @author dsmith
*
*/
@SuppressWarnings("synthetic-access")
public class RebalanceOperationDUnitTest extends CacheTestCase {
private static final long MAX_WAIT = 6000;
@Override
public void tearDown2() throws Exception {
super.tearDown2();
invokeInEveryVM(new SerializableRunnable() {
public void run() {
InternalResourceManager.setResourceObserver(null);
}
});
InternalResourceManager.setResourceObserver(null);
}
/**
* @param name
*/
public RebalanceOperationDUnitTest(String name) {
super(name);
}
public void testRecoverRedundancySimulation() {
recoverRedundancy(true);
}
public void testRecoverRedundancy() {
recoverRedundancy(false);
}
public void recoverRedundancy(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
region.put(Integer.valueOf(1), "A");
region.put(Integer.valueOf(2), "A");
region.put(Integer.valueOf(3), "A");
region.put(Integer.valueOf(4), "A");
region.put(Integer.valueOf(5), "A");
region.put(Integer.valueOf(6), "A");
}
});
SerializableRunnable checkLowRedundancy = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(6,details.getLowRedundancyBucketCount());
}
};
//make sure we can tell that the buckets have low redundancy
vm0.invoke(checkLowRedundancy);
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
//Make sure we still have low redundancy
vm0.invoke(checkLowRedundancy);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("simulateRebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(6, results.getTotalBucketCreatesCompleted());
assertEquals(3, results.getTotalPrimaryTransfersCompleted());
assertEquals(0, results.getTotalBucketTransferBytes());
assertEquals(0, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(6, details.getBucketCreatesCompleted());
assertEquals(3, details.getPrimaryTransfersCompleted());
assertEquals(0, details.getBucketTransferBytes());
assertEquals(0, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(2, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(6, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
if(!simulate) {
verifyStats(manager, results);
}
}
});
if(!simulate) {
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
}
};
vm0.invoke(checkRedundancyFixed);
vm1.invoke(checkRedundancyFixed);
} else {
//Make sure the simulation didn't do anything
vm0.invoke(checkLowRedundancy);
}
}
/** Manual test.*/
public void z_testRedundancyLoop() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(2);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
vm3.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
for(int i =0; i < 500; i++) {
region.put(Integer.valueOf(i), "A");
}
}
});
SerializableRunnable checkLowRedundancy = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(113, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(113,details.getLowRedundancyBucketCount());
}
};
//make sure we can tell that the buckets have low redundancy
vm0.invoke(checkLowRedundancy);
SerializableRunnable closePrRegion = new SerializableRunnable("createRegion") {
public void run()
{
disconnectFromDS();
// Cache cache = getCache();
// Region region = cache.getRegion("region1");
// region.localDestroyRegion();
}
};
for(int i =0; i < 50; i++) {
long start = System.nanoTime();
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
vm2.invoke(createPrRegion);
//Now simulate a rebalance
vm1.invoke(new SerializableRunnable("simulateRebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(false, manager);
// assertEquals(113, results.getTotalBucketCreatesCompleted());
}
});
vm1.invoke(closePrRegion);
vm2.invoke(closePrRegion);
long end = System.nanoTime();
System.err.println("Elapsed = " + TimeUnit.NANOSECONDS.toMillis(end - start));
}
}
public void testEnforceIP() {
enforceIp(false);
}
public void testEnforceIPSimulation() {
enforceIp(true);
}
public void enforceIp(final boolean simulate) {
invokeInEveryVM(new SerializableRunnable() {
public void run() {
Properties props = new Properties();
props.setProperty(DistributionConfig.ENFORCE_UNIQUE_HOST_NAME, "true");
getSystem(props);
}
});
try {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
region.put(Integer.valueOf(1), "A");
region.put(Integer.valueOf(2), "A");
region.put(Integer.valueOf(3), "A");
region.put(Integer.valueOf(4), "A");
region.put(Integer.valueOf(5), "A");
region.put(Integer.valueOf(6), "A");
}
});
SerializableRunnable checkLowRedundancy = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(6,details.getLowRedundancyBucketCount());
}
};
//make sure we can tell that the buckets have low redundancy
vm0.invoke(checkLowRedundancy);
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
//Make sure we still have low redundancy
vm0.invoke(checkLowRedundancy);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("simulateRebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
assertEquals(0, results.getTotalPrimaryTransfersCompleted());
//We actually *will* transfer buckets, because that improves
//the balance
assertEquals(3, results.getTotalBucketTransfersCompleted());
// assertEquals(0, results.getTotalBucketTransferBytes());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(0, details.getBucketCreatesCompleted());
assertEquals(0, details.getPrimaryTransfersCompleted());
assertEquals(3, details.getBucketTransfersCompleted());
// assertEquals(0, details.getBucketTransferBytes());
if(!simulate) {
verifyStats(manager, results);
}
}
});
//Make sure we still have low redundancy
vm0.invoke(checkLowRedundancy);
vm1.invoke(checkLowRedundancy);
} finally {
disconnectFromDS();
invokeInEveryVM(new SerializableRunnable() {
public void run() {
disconnectFromDS();
}
});
}
}
public void testEnforceZone() {
enforceZone(false);
}
public void testEnforceZoneSimulation() {
enforceZone(true);
}
/**
* Test that we correctly use the redundancy-zone
* property to determine where to place redundant copies of a buckets.
* @param simulate
*/
public void enforceZone(final boolean simulate) {
try {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
setRedundancyZone(vm0, "A");
setRedundancyZone(vm1, "A");
final DistributedMember zoneBMember = setRedundancyZone(vm2, "B");
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
region.put(Integer.valueOf(1), "A");
region.put(Integer.valueOf(2), "A");
region.put(Integer.valueOf(3), "A");
region.put(Integer.valueOf(4), "A");
region.put(Integer.valueOf(5), "A");
region.put(Integer.valueOf(6), "A");
}
});
SerializableRunnable checkLowRedundancy = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(6,details.getLowRedundancyBucketCount());
}
};
//make sure we can tell that the buckets have low redundancy
vm0.invoke(checkLowRedundancy);
//Create the region in the other VMs (should have no effect)
vm1.invoke(createPrRegion);
vm2.invoke(createPrRegion);
//Make sure we still have low redundancy
vm0.invoke(checkLowRedundancy);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("simulateRebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
//We expect to satisfy redundancy with the zone B member
assertEquals(6, results.getTotalBucketCreatesCompleted());
//2 primaries will go to vm2, leaving vm0 and vm1 with 2 primaries each
assertEquals(2, results.getTotalPrimaryTransfersCompleted());
//We actually *will* transfer 3 buckets to the other member in zone A, because that improves
//the balance
assertEquals(3, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(6, details.getBucketCreatesCompleted());
assertEquals(2, details.getPrimaryTransfersCompleted());
assertEquals(3, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
for(PartitionMemberInfo info : afterDetails) {
if(info.getDistributedMember().equals(zoneBMember)) {
assertEquals(6, info.getBucketCount());
} else {
assertEquals(3, info.getBucketCount());
}
assertEquals(2, info.getPrimaryCount());
}
// assertEquals(0, details.getBucketTransferBytes());
if(!simulate) {
verifyStats(manager, results);
}
}
});
if(!simulate) {
checkBucketCount(vm0, 3);
checkBucketCount(vm1, 3);
checkBucketCount(vm2, 6);
}
} finally {
disconnectFromDS();
invokeInEveryVM(new SerializableRunnable() {
public void run() {
disconnectFromDS();
}
});
}
}
private void checkBucketCount(VM vm0, final int numLocalBuckets) {
vm0.invoke(new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
assertEquals(numLocalBuckets, region.getLocalBucketsListTestOnly().size());
}
});
}
private DistributedMember setRedundancyZone(VM vm, final String zone) {
return (DistributedMember) vm.invoke(new SerializableCallable("set redundancy zone") {
public Object call() {
Properties props = new Properties();
props.setProperty(DistributionConfig.REDUNDANCY_ZONE_NAME, zone);
DistributedSystem system = getSystem(props);
return system.getDistributedMember();
}
});
}
private RebalanceResults doRebalance(final boolean simulate,
ResourceManager manager) {
return doRebalance(simulate, manager, null, null);
}
/**
* @param simulate
* @param manager
* @return
*/
private RebalanceResults doRebalance(final boolean simulate,
ResourceManager manager, Set<String> includes, Set<String> excludes) {
RebalanceResults results = null;
if(simulate) {
try {
results = manager.createRebalanceFactory()
.includeRegions(includes)
.excludeRegions(excludes)
.simulate()
.getResults(MAX_WAIT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("Interrupted waiting on rebalance", e);
} catch (TimeoutException e) {
fail("Timeout waiting on rebalance", e);
}
} else {
try {
results = manager.createRebalanceFactory()
.includeRegions(includes)
.excludeRegions(excludes)
.start()
.getResults(MAX_WAIT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("Interrupted waiting on rebalance", e);
} catch (TimeoutException e) {
fail("Timeout waiting on rebalance", e);
}
}
assertEquals(Collections.emptySet(), manager.getRebalanceOperations());
return results;
}
public void testRecoverRedundancyBalancingSimulation() {
recoverRedundancyBalancing(true);
}
public void testRecoverRedundancyBalancing() {
recoverRedundancyBalancing(false);
}
public void recoverRedundancyBalancing(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final DistributedMember member1 = createPrRegion(vm0, "region1", 200, null);
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
for(int i = 0; i < 12; i++) {
region.put(Integer.valueOf(i), "A");
}
}
});
SerializableRunnable checkRedundancy= new SerializableRunnable("checkRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(12,details.getLowRedundancyBucketCount());
}
};
vm0.invoke(checkRedundancy);
//Now create the region in 2 more VMs with half the localMaxMemory
createPrRegion(vm1, "region1", 100, null);
createPrRegion(vm2, "region1", 100, null);
vm0.invoke(checkRedundancy);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("rebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(12, results.getTotalBucketCreatesCompleted());
assertEquals(6, results.getTotalPrimaryTransfersCompleted());
assertEquals(0, results.getTotalBucketTransferBytes());
assertEquals(0, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(12, details.getBucketCreatesCompleted());
assertEquals(6, details.getPrimaryTransfersCompleted());
assertEquals(0, details.getBucketTransferBytes());
assertEquals(0, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(3, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
//We have 1 member with a size of 200 and two members with size 100
if(memberDetails.getDistributedMember().equals(member1)) {
assertEquals(12, memberDetails.getBucketCount());
assertEquals(6, memberDetails.getPrimaryCount());
} else {
assertEquals(6, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
}
if(!simulate) {
verifyStats(manager, results);
}
}
});
if(!simulate) {
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
}
};
vm0.invoke(checkRedundancyFixed);
vm1.invoke(checkRedundancyFixed);
vm2.invoke(checkRedundancyFixed);
}
}
private DistributedMember createPrRegion(VM vm, final String region, final int localMaxMemory, final String colocatedWith) {
SerializableCallable createPrRegion = new SerializableCallable("createRegion") {
public Object call()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
paf.setLocalMaxMemory(localMaxMemory);
if(colocatedWith != null) {
paf.setColocatedWith(colocatedWith);
}
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion(region, attr.create());
return cache.getDistributedSystem().getDistributedMember();
}
};
return (DistributedMember) vm.invoke(createPrRegion);
}
public void testRecoverRedundancyColocatedRegionsSimulation() {
recoverRedundancyColocatedRegions(true);
}
public void testRecoverRedundancyColocatedRegions() {
recoverRedundancyColocatedRegions(false);
}
public void recoverRedundancyColocatedRegions(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final DistributedMember member1 = createPrRegion(vm0, "region1", 200, null);
createPrRegion(vm0, "region2", 200, "region1");
//Create some buckets.
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
Region region2 = cache.getRegion("region2");
for(int i =0; i< 12; i++) {
region.put(Integer.valueOf(i), "A");
region2.put(Integer.valueOf(i), "A");
}
}
});
//check to make sure our redundancy is impaired
SerializableRunnable checkLowRedundancy = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(0, details.getActualRedundantCopies());
assertEquals(12 ,details.getLowRedundancyBucketCount());
region = cache.getRegion("region2");
details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(0, details.getActualRedundantCopies());
assertEquals(12,details.getLowRedundancyBucketCount());
}
};
vm0.invoke(checkLowRedundancy);
//Now create the region in 2 more vms, each which
//has local max memory of 1/2 that of the original VM.
createPrRegion(vm1, "region1", 100, null);
createPrRegion(vm2, "region1", 100, null);
createPrRegion(vm1, "region2", 100, "region1");
createPrRegion(vm2, "region2", 100, "region1");
vm0.invoke(checkLowRedundancy);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("rebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(24, results.getTotalBucketCreatesCompleted());
assertEquals(12, results.getTotalPrimaryTransfersCompleted());
assertEquals(0, results.getTotalBucketTransferBytes());
assertEquals(0, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(2, detailSet.size());
for(PartitionRebalanceInfo details : detailSet) {
assertEquals(12, details.getBucketCreatesCompleted());
assertEquals(6, details.getPrimaryTransfersCompleted());
assertEquals(0, details.getBucketTransferBytes());
assertEquals(0, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(3, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
if(memberDetails.getDistributedMember().equals(member1)) {
assertEquals(12, memberDetails.getBucketCount());
assertEquals(6, memberDetails.getPrimaryCount());
} else {
assertEquals(6, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
}
if(!simulate) {
verifyStats(manager, results);
}
}
}
});
if(!simulate) {
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
PartitionedRegion region2 = (PartitionedRegion) cache.getRegion("region2");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(cache.getRegion("region1"));
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
details = PartitionRegionHelper.getPartitionRegionInfo(cache.getRegion("region2"));
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
assertEquals(region1.getLocalPrimaryBucketsListTestOnly(), region2.getLocalPrimaryBucketsListTestOnly());
assertEquals(region1.getLocalBucketsListTestOnly(), region2.getLocalBucketsListTestOnly());
}
};
vm0.invoke(checkRedundancyFixed);
vm1.invoke(checkRedundancyFixed);
vm2.invoke(checkRedundancyFixed);
}
}
public void testRecoverRedundancyParallelAsyncEventQueueSimulation() {
recoverRedundancyParallelAsyncEventQueue(true);
}
public void testRecoverRedundancyParallelAsyncEventQueue() {
recoverRedundancyParallelAsyncEventQueue(false);
}
public void recoverRedundancyParallelAsyncEventQueue(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final DistributedMember member1 = createPRRegionWithAsyncQueue(vm0, 200);
//Create some buckets. Put enough data to cause the queue to overflow (more than 1 MB)
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
Region region2 = cache.getRegion("region2");
for(int i =0; i< 12; i++) {
region.put(Integer.valueOf(i), "A", new byte[1024 * 512]);
}
}
});
//check to make sure our redundancy is impaired
SerializableRunnable checkLowRedundancy = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(0, details.getActualRedundantCopies());
assertEquals(12 ,details.getLowRedundancyBucketCount());
//Get the async event queue region (It's a colocated region)
PartitionedRegion region2 = ColocationHelper.getColocatedChildRegions((PartitionedRegion)region).get(0);
details = PartitionRegionHelper.getPartitionRegionInfo(region2);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(0, details.getActualRedundantCopies());
assertEquals(12,details.getLowRedundancyBucketCount());
}
};
vm0.invoke(checkLowRedundancy);
//Create the region on two more members, each with 1/2 of the memory
createPRRegionWithAsyncQueue(vm1, 100);
createPRRegionWithAsyncQueue(vm2, 100);
vm0.invoke(checkLowRedundancy);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("rebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(24, results.getTotalBucketCreatesCompleted());
assertEquals(12, results.getTotalPrimaryTransfersCompleted());
assertEquals(0, results.getTotalBucketTransferBytes());
assertEquals(0, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(2, detailSet.size());
for(PartitionRebalanceInfo details : detailSet) {
assertEquals(12, details.getBucketCreatesCompleted());
assertEquals(6, details.getPrimaryTransfersCompleted());
assertEquals(0, details.getBucketTransferBytes());
assertEquals(0, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(3, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
if(memberDetails.getDistributedMember().equals(member1)) {
assertEquals(12, memberDetails.getBucketCount());
assertEquals(6, memberDetails.getPrimaryCount());
} else {
assertEquals(6, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
}
if(!simulate) {
verifyStats(manager, results);
}
}
}
});
if(!simulate) {
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
//Get the async event queue region (It's a colocated region)
PartitionedRegion region2 = ColocationHelper.getColocatedChildRegions(region1).get(0);
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(cache.getRegion("region1"));
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
details = PartitionRegionHelper.getPartitionRegionInfo(region2);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
assertEquals(region1.getLocalPrimaryBucketsListTestOnly(), region2.getLocalPrimaryBucketsListTestOnly());
assertEquals(region1.getLocalBucketsListTestOnly(), region2.getLocalBucketsListTestOnly());
}
};
vm0.invoke(checkRedundancyFixed);
vm1.invoke(checkRedundancyFixed);
vm2.invoke(checkRedundancyFixed);
}
}
private DistributedMember createPRRegionWithAsyncQueue(VM vm0, final int localMaxMemory) {
SerializableCallable createPrRegion = new SerializableCallable("createRegion") {
public Object call()
{
Cache cache = getCache();
//Create an async event listener that doesn't dispatch anything
cache.createAsyncEventQueueFactory().setMaximumQueueMemory(1).setParallel(true).create("parallelQueue", new AsyncEventListener() {
@Override
public void close() {
}
@Override
public boolean processEvents(List<AsyncEvent> events) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
return false;
}
return false;
}
});
AttributesFactory attr = new AttributesFactory();
attr.addAsyncEventQueueId("parallelQueue");
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
paf.setLocalMaxMemory(localMaxMemory);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
return cache.getDistributedSystem().getDistributedMember();
}
};
final DistributedMember member1 = (DistributedMember) vm0.invoke(createPrRegion);
return member1;
}
public void testCancelOperation() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
region.put(Integer.valueOf(1), "A");
}
});
SerializableRunnable checkLowRedundancy = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(1, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(1,details.getLowRedundancyBucketCount());
}
};
//make sure we can tell that the buckets have low redundancy
vm0.invoke(checkLowRedundancy);
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
//Make sure we still have low redundancy
vm0.invoke(checkLowRedundancy);
//Now do a rebalance, but cancel it in the middle
vm0.invoke(new SerializableCallable("D rebalance") {
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
InternalResourceManager manager = cache.getResourceManager();
final CountDownLatch rebalancingCancelled = new CountDownLatch(1);
final CountDownLatch rebalancingFinished = new CountDownLatch(1);
InternalResourceManager.setResourceObserver(new ResourceObserverAdapter() {
@Override
public void rebalancingOrRecoveryStarted(Region region) {
try {
rebalancingCancelled.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void rebalancingOrRecoveryFinished(Region region) {
rebalancingFinished.countDown();
}
});
RebalanceOperation op = manager.createRebalanceFactory().start();
assertFalse(op.isCancelled());
assertFalse(op.isDone());
assertEquals(Collections.singleton(op), manager.getRebalanceOperations());
try {
op.getResults(5, TimeUnit.SECONDS);
fail("Should have received a timeout exception");
} catch(TimeoutException expected) {
}
assertTrue(op.cancel());
rebalancingCancelled.countDown();
assertTrue(op.isCancelled());
assertTrue(op.isDone());
rebalancingFinished.await();
try {
op.getResults(60, TimeUnit.SECONDS);
fail("Should have received a cancellation exception");
} catch(CancellationException expected) {
}
assertEquals(Collections.emptySet(), manager.getRebalanceOperations());
return null;
}
});
//We should still have low redundancy, because the rebalancing was cancelled
vm0.invoke(checkLowRedundancy);
}
/**
* Test that the rebalancing operation picks up on
* a concurrent membership change
*/
public void testMembershipChange() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
final SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
region.put(Integer.valueOf(1), "A");
region.put(Integer.valueOf(2), "A");
region.put(Integer.valueOf(3), "A");
region.put(Integer.valueOf(4), "A");
region.put(Integer.valueOf(5), "A");
region.put(Integer.valueOf(6), "A");
}
});
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
//Now do a rebalance, but start another member in the middle
vm0.invoke(new SerializableCallable("D rebalance") {
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
InternalResourceManager manager = cache.getResourceManager();
final CountDownLatch rebalancingStarted = new CountDownLatch(1);
final CountDownLatch memberAdded = new CountDownLatch(1);
InternalResourceManager.setResourceObserver(new ResourceObserverAdapter() {
boolean firstBucket = true;
@Override
public void movingBucket(Region region,
int bucketId,
DistributedMember source,
DistributedMember target) {
if(firstBucket) {
firstBucket = false;
vm2.invoke(createPrRegion);
}
}
});
RebalanceResults results = doRebalance(false, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
assertEquals(0, results.getTotalPrimaryTransfersCompleted());
assertEquals(4, results.getTotalBucketTransfersCompleted());
assertTrue(0 < results.getTotalBucketTransferBytes());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(0, details.getBucketCreatesCompleted());
assertEquals(0, details.getPrimaryTransfersCompleted());
assertTrue(0 < details.getBucketTransferBytes());
assertEquals(4, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsBefore();
//there should have only been 2 members when the rebalancing started.
assertEquals(2, beforeDetails.size());
//if it was done, there should now be 3 members.
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(3, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(2, memberDetails.getBucketCount());
assertEquals(2, memberDetails.getPrimaryCount());
}
verifyStats(manager, results);
InternalResourceManager mgr = (InternalResourceManager) manager;
ResourceManagerStats stats = mgr.getStats();
assertEquals(1, stats.getRebalanceMembershipChanges());
return null;
}
});
}
public void testMoveBucketsNoRedundancySimulation() {
moveBucketsNoRedundancy(true);
}
public void testMoveBucketsNoRedundancy() {
moveBucketsNoRedundancy(false);
}
/**
* Check to make sure that we balance
* buckets between two hosts with no redundancy.
* @param simulate
*/
public void moveBucketsNoRedundancy(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
attr.setCacheLoader(new Bug40228Loader());
cache.createRegion("region1", attr.create());
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
region.put(Integer.valueOf(1), "A");
region.put(Integer.valueOf(2), "A");
region.put(Integer.valueOf(3), "A");
region.put(Integer.valueOf(4), "A");
region.put(Integer.valueOf(5), "A");
region.put(Integer.valueOf(6), "A");
}
});
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("simulateRebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
assertEquals(0, results.getTotalPrimaryTransfersCompleted());
assertEquals(3, results.getTotalBucketTransfersCompleted());
assertTrue(0 < results.getTotalBucketTransferBytes());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(0, details.getBucketCreatesCompleted());
assertEquals(0, details.getPrimaryTransfersCompleted());
assertTrue(0 < details.getBucketTransferBytes());
assertEquals(3, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(2, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(3, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
if(!simulate) {
verifyStats(manager, results);
}
}
});
if(!simulate) {
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkRedundancyFixed") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
assertEquals(2, details.getPartitionMemberInfo().size());
for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
assertEquals(3, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
//check to make sure that moving buckets didn't close the cache loader
Bug40228Loader loader = (Bug40228Loader) cache.getRegion("region1").getAttributes().getCacheLoader();
assertFalse(loader.isClosed());
}
};
vm0.invoke(checkRedundancyFixed);
vm1.invoke(checkRedundancyFixed);
SerializableRunnable checkBug40228Fixed = new SerializableRunnable("checkBug40228Fixed") {
public void run() {
Cache cache = getCache();
Bug40228Loader loader = (Bug40228Loader) cache.getRegion("region1").getAttributes().getCacheLoader();
assertFalse(loader.isClosed());
//check to make sure that closing the PR closes the cache loader
cache.getRegion("region1").close();
assertTrue(loader.isClosed());
}
};
vm0.invoke(checkBug40228Fixed);
vm1.invoke(checkBug40228Fixed);
}
}
public void testFilterRegionsSimulation() {
filterRegions(true);
}
public void testFilterRegions() {
filterRegions(false);
}
/**
* Check to make sure that we balance
* buckets between two hosts with no redundancy.
* @param simulate
*/
public void filterRegions(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final int NUM_REGIONS = 4;
final Set<String> INCLUDED = new HashSet<String>();
INCLUDED.add("region0");
INCLUDED.add("region1");
final Set<String> EXCLUDED = new HashSet<String>();
EXCLUDED.add("region0");
EXCLUDED.add("region3");
final HashSet<String> EXPECTED_REBALANCED = new HashSet<String>();
EXPECTED_REBALANCED.add("/region0");
EXPECTED_REBALANCED.add("/region1");
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
for(int i = 0; i < NUM_REGIONS; i++) {
cache.createRegion("region" + i, attr.create());
}
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
for(int i = 0; i < NUM_REGIONS; i++) {
Region region = cache.getRegion("region" + i);
for(int j = 0; j < 6; j++) {
region.put(Integer.valueOf(j), "A");
}
}
}
});
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("simulateRebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager, INCLUDED, EXCLUDED);
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
// assertEquals(3, detailSet.size());
Set<String> names = new HashSet<String>();
for(PartitionRebalanceInfo details: detailSet) {
assertEquals(0, details.getBucketCreatesCompleted());
assertEquals(0, details.getPrimaryTransfersCompleted());
assertTrue(0 < details.getBucketTransferBytes());
assertEquals(3, details.getBucketTransfersCompleted());
names.add(details.getRegionPath());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(2, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(3, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
}
assertEquals(EXPECTED_REBALANCED, names);
assertEquals(0, results.getTotalBucketCreatesCompleted());
assertEquals(0, results.getTotalPrimaryTransfersCompleted());
assertEquals(6, results.getTotalBucketTransfersCompleted());
assertTrue(0 < results.getTotalBucketTransferBytes());
if(!simulate) {
verifyStats(manager, results);
}
}
});
if(!simulate) {
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkRedundancyFixed") {
public void run() {
Cache cache = getCache();
for(String name: EXPECTED_REBALANCED) {
Region region = cache.getRegion(name);
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
assertEquals(2, details.getPartitionMemberInfo().size());
for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
assertEquals(3, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
}
Region region = cache.getRegion("region2");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
assertEquals(2, details.getPartitionMemberInfo().size());
for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
int bucketCount = memberDetails.getBucketCount();
int primaryCount = memberDetails.getPrimaryCount();
assertTrue(
"Wrong number of buckets on non rebalanced region buckets="
+ bucketCount + " primarys=" + primaryCount,
bucketCount == 6 && primaryCount == 6
|| bucketCount == 0 && primaryCount == 0);
}
}
};
vm0.invoke(checkRedundancyFixed);
vm1.invoke(checkRedundancyFixed);
}
}
public void testMoveBucketsWithRedundancySimulation() {
moveBucketsWithRedundancy(true);
}
public void testMoveBucketsWithRedundancy() {
moveBucketsWithRedundancy(false);
}
/**
* Test to make sure we balance buckets between three hosts with redundancy
*/
public void moveBucketsWithRedundancy(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//Create the region in two VMs
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
for(int i =0; i < 12; i++) {
region.put(Integer.valueOf(i), "A");
}
}
});
//Create the region in one more VM.
vm2.invoke(createPrRegion);
//Now simulate a rebalance
final Long totalSize = (Long) vm0.invoke(new SerializableCallable("simulateRebalance") {
public Object call() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
//We don't know how many primaries will move, it depends on
//if the move bucket code moves the primary or a redundant bucket
//assertEquals(0, results.getTotalPrimaryTransfersCompleted());
assertEquals(8, results.getTotalBucketTransfersCompleted());
assertTrue(0 < results.getTotalBucketTransferBytes());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(0, details.getBucketCreatesCompleted());
assertTrue(0 < details.getBucketTransferBytes());
assertEquals(8, details.getBucketTransfersCompleted());
long totalSize = 0;
Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsAfter();
for(PartitionMemberInfo memberDetails: beforeDetails) {
totalSize += memberDetails.getSize();
}
long afterSize = 0;
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(3, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(8, memberDetails.getBucketCount());
assertEquals(4, memberDetails.getPrimaryCount());
afterSize += memberDetails.getSize();
}
assertEquals(totalSize, afterSize);
if(!simulate) {
verifyStats(manager, results);
}
return Long.valueOf(totalSize);
}
});
if(!simulate) {
SerializableRunnable checkBalance = new SerializableRunnable("checkBalance") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
getLogWriter().info("details=" + details.getPartitionMemberInfo());
long afterSize = 0;
for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
assertEquals(8, memberDetails.getBucketCount());
assertEquals(4, memberDetails.getPrimaryCount());
afterSize += memberDetails.getSize();
}
assertEquals(totalSize.longValue(), afterSize);
}
};
vm0.invoke(checkBalance);
vm1.invoke(checkBalance);
vm2.invoke(checkBalance);
}
}
/**
* Test to make sure we balance buckets between three hosts with redundancy
*/
public void testMoveBucketsNestedPR() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Region parent;
Cache cache = getCache();
{
AttributesFactory attr = new AttributesFactory();
attr.setDataPolicy(DataPolicy.REPLICATE);
parent = cache.createRegion("parent", attr.create());
}
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
parent.createSubregion("region1", attr.create());
}
};
//Create the region in two VMs
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("parent/region1");
for(int i =0; i < 12; i++) {
region.put(Integer.valueOf(i), "A");
}
}
});
//Create the region in one more VM.
vm2.invoke(createPrRegion);
//Now simulate a rebalance
final Long totalSize = (Long) vm0.invoke(new SerializableCallable("simulateRebalance") {
public Object call() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(false, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
//We don't know how many primaries will move, it depends on
//if the move bucket code moves the primary or a redundant bucket
//assertEquals(0, results.getTotalPrimaryTransfersCompleted());
assertEquals(8, results.getTotalBucketTransfersCompleted());
assertTrue(0 < results.getTotalBucketTransferBytes());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(0, details.getBucketCreatesCompleted());
assertTrue(0 < details.getBucketTransferBytes());
assertEquals(8, details.getBucketTransfersCompleted());
long totalSize = 0;
Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsAfter();
for(PartitionMemberInfo memberDetails: beforeDetails) {
totalSize += memberDetails.getSize();
}
long afterSize = 0;
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(3, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(8, memberDetails.getBucketCount());
assertEquals(4, memberDetails.getPrimaryCount());
afterSize += memberDetails.getSize();
}
assertEquals(totalSize, afterSize);
verifyStats(manager, results);
return Long.valueOf(totalSize);
}
});
SerializableRunnable checkBalance = new SerializableRunnable("checkBalance") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("parent/region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
getLogWriter().info("details=" + details.getPartitionMemberInfo());
long afterSize = 0;
for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
assertEquals(8, memberDetails.getBucketCount());
assertEquals(4, memberDetails.getPrimaryCount());
afterSize += memberDetails.getSize();
}
assertEquals(totalSize.longValue(), afterSize);
}
};
vm0.invoke(checkBalance);
vm1.invoke(checkBalance);
vm2.invoke(checkBalance);
}
public void testMoveBucketsColocatedRegionsSimulation() {
moveBucketsColocatedRegions(true);
}
public void testMoveBucketsColocatedRegions() {
moveBucketsColocatedRegions(false);
}
/**
* Test to make sure that we move buckets to balance between
* three hosts with a redundancy of 1 and two colocated regions.
* Makes sure that the buckets stay colocated when we move them.
* @param simulate
*/
public void moveBucketsColocatedRegions(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPrRegion(vm0, "region1", 200, null);
createPrRegion(vm0, "region2", 200, "region1");
createPrRegion(vm1, "region1", 200, null);
createPrRegion(vm1, "region2", 200, "region1");
//Create some buckets.
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
Region region2 = cache.getRegion("region2");
for(int i =0; i< 12; i++) {
region.put(Integer.valueOf(i), "A");
region2.put(Integer.valueOf(i), "A");
}
}
});
//create the leader region, but not the colocated
//region in one of the VMs.
createPrRegion(vm2, "region1", 200, null);
//Simulate a rebalance, and make sure we don't
//move any buckets yet, because we don't have
//the colocated region in the new VMs.
vm0.invoke(new SerializableRunnable("rebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
assertEquals(0, results.getTotalPrimaryTransfersCompleted());
assertEquals(0, results.getTotalBucketTransferBytes());
assertEquals(0, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(2, detailSet.size());
for(PartitionRebalanceInfo details : detailSet) {
assertEquals(0, details.getBucketCreatesCompleted());
assertEquals(0, details.getPrimaryTransfersCompleted());
assertEquals(0, details.getBucketTransferBytes());
assertEquals(0, details.getBucketTransfersCompleted());
}
}
});
//now create the colocated region in the last VM.
createPrRegion(vm2, "region2", 200, "region1");
vm0.invoke(new SerializableRunnable("rebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
assertEquals(16, results.getTotalBucketTransfersCompleted());
assertTrue(0 < results.getTotalBucketTransferBytes());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(2, detailSet.size());
for(PartitionRebalanceInfo details : detailSet) {
assertEquals(0, details.getBucketCreatesCompleted());
assertTrue(0 < details.getBucketTransferBytes());
assertEquals(8, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(3, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(8, memberDetails.getBucketCount());
assertEquals(4, memberDetails.getPrimaryCount());
}
}
}
});
if(!simulate) {
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
PartitionedRegion region2 = (PartitionedRegion) cache.getRegion("region2");
ResourceManager manager = cache.getResourceManager();
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(cache.getRegion("region1"));
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
details = PartitionRegionHelper.getPartitionRegionInfo(cache.getRegion("region2"));
assertEquals(12, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
assertEquals(region1.getLocalPrimaryBucketsListTestOnly(), region2.getLocalPrimaryBucketsListTestOnly());
assertEquals(region1.getLocalBucketsListTestOnly(), region2.getLocalBucketsListTestOnly());
}
};
vm0.invoke(checkRedundancyFixed);
vm1.invoke(checkRedundancyFixed);
vm2.invoke(checkRedundancyFixed);
}
}
/**
* Test to make sure that moving primaries waits for a put
* @throws Exception
*/
public void testWaitForPut() throws Exception {
runTestWaitForOperation(new Operation() {
public void execute(Region region, Integer key) {
region.put(key, "B");
}
});
}
/**
* Test to make sure that moving primaries waits for a put
* @throws Exception
*/
public void testWaitForInvalidate() throws Exception {
runTestWaitForOperation(new Operation() {
public void execute(Region region, Integer key) {
region.invalidate(key);
}
});
}
/**
* Test to make sure that moving primaries waits for a put
* @throws Exception
*/
public void testWaitForDestroy() throws Exception {
runTestWaitForOperation(new Operation() {
public void execute(Region region, Integer key) {
region.destroy(key);
}
});
}
/**
* Test to make sure that moving primaries waits for a put
* @throws Exception
*/
public void testWaitForCacheLoader() throws Exception {
runTestWaitForOperation(new Operation() {
public void execute(Region region, Integer key) {
PartitionedRegion r = (PartitionedRegion) region;
//get a key that doesn't exist, but is in the same bucket as the given key
region.get(key + r.getPartitionAttributes().getTotalNumBuckets());
}
});
}
/**
* Test to ensure that we wait for
* in progress write operations before moving a primary.
* @throws InterruptedException
* @throws CancellationException
* @throws TimeoutException
*/
public void runTestWaitForOperation(final Operation op) throws CancellationException, InterruptedException, TimeoutException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
attr.setCacheLoader(new CacheLoader() {
public Object load(LoaderHelper helper) throws CacheLoaderException {
return "anobject";
}
public void close() {
}
});
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//Create a region in this VM with a cache writer
//and cache loader
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
attr.setCacheLoader(new CacheLoader() {
public Object load(LoaderHelper helper) throws CacheLoaderException {
return "anobject";
}
public void close() {
}
});
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
final Region region = cache.createRegion("region1", attr.create());
//create some buckets
region.put(Integer.valueOf(1), "A");
region.put(Integer.valueOf(2), "A");
BlockingCacheListener cacheWriter = new BlockingCacheListener(2);
region.getAttributesMutator().addCacheListener(cacheWriter);
//start two threads doing operations, one on each bucket
//the threads will block on the cache writer. The rebalance operation
//will try to move one of these buckets, but it shouldn't
//be able to because of the in progress operation.
Thread thread1 = new Thread() {
public void run() {
op.execute(region, Integer.valueOf(1));
}
};
thread1.start();
Thread thread2 = new Thread() {
public void run() {
op.execute(region, Integer.valueOf(2));
}
};
thread2.start();
cacheWriter.waitForOperationsStarted();
SerializableRunnable checkLowRedundancy = new SerializableRunnable("checkLowRedundancy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(2, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(2,details.getLowRedundancyBucketCount());
}
};
//make sure we can tell that the buckets have low redundancy
checkLowRedundancy.run();
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
//Make sure we still have low redundancy
checkLowRedundancy.run();
ResourceManager manager = cache.getResourceManager();
RebalanceOperation rebalance = manager.createRebalanceFactory().start();
try {
rebalance.getResults(5, TimeUnit.SECONDS);
fail("Operation should not have completed");
} catch(TimeoutException expected) {
//do nothing
}
cacheWriter.release();
getLogWriter().info("starting wait for rebalance. Will wait for " + MAX_WAIT + " seconds");
RebalanceResults results = rebalance.getResults(MAX_WAIT, TimeUnit.SECONDS);
assertEquals(2, results.getTotalBucketCreatesCompleted());
assertEquals(1, results.getTotalPrimaryTransfersCompleted());
assertEquals(0, results.getTotalBucketTransferBytes());
assertEquals(0, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(2, details.getBucketCreatesCompleted());
assertEquals(1, details.getPrimaryTransfersCompleted());
assertEquals(0, details.getBucketTransferBytes());
assertEquals(0, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(2, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(2, memberDetails.getBucketCount());
assertEquals(1, memberDetails.getPrimaryCount());
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkRedundancyFixed") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(2, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
}
};
checkRedundancyFixed.run();
vm1.invoke(checkRedundancyFixed);
}
}
public void testRecoverRedundancyWithOfflinePersistenceSimulation() throws Throwable {
recoverRedundancyWithOfflinePersistence(true, false);
}
public void testRecoverRedundancyWithOfflinePersistence() throws Throwable {
recoverRedundancyWithOfflinePersistence(false, false);
}
public void testRecoverRedundancyWithOfflinePersistenceAccessorSimulation() throws Throwable {
recoverRedundancyWithOfflinePersistence(true, true);
}
public void testRecoverRedundancyWithOfflinePersistenceAccessor() throws Throwable {
recoverRedundancyWithOfflinePersistence(false, true);
}
public void recoverRedundancyWithOfflinePersistence(final boolean simulate, final boolean useAccessor) throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
DiskStore ds1 = dsf.setDiskDirs(getDiskDirs())
.create(getUniqueName());
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
attr.setDiskSynchronous(true);
attr.setDiskStoreName(getUniqueName());
cache.createRegion("region1", attr.create());
}
};
//Create the region in only 2 VMs
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
VM rebalanceVM;
SerializableRunnable createAccessor = new SerializableRunnable(("createAccessor")) {
public void run() {
Cache cache = getCache();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
DiskStore ds1 = dsf.setDiskDirs(getDiskDirs())
.create("ds1");
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
paf.setLocalMaxMemory(0);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
if(useAccessor) {
//Create an accessor and reblance from that VM
vm3.invoke(createAccessor);
rebalanceVM = vm3;
} else {
rebalanceVM = vm0;
}
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
region.put(Integer.valueOf(1), "A");
region.put(Integer.valueOf(2), "A");
region.put(Integer.valueOf(3), "A");
region.put(Integer.valueOf(4), "A");
region.put(Integer.valueOf(5), "A");
region.put(Integer.valueOf(6), "A");
}
});
SerializableRunnable closeCache = new SerializableRunnable("close cache") {
public void run() {
Cache cache = getCache();
cache.getRegion("region1").close();
}
};
//Close the cache in vm1
final Set<Integer> vm1Buckets = getBucketList("region1", vm1);
vm1.invoke(closeCache);
SerializableRunnable checkLowRedundancyBeforeRebalance = new SerializableRunnable("checkLowRedundancyBeforeRebalance") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(6,details.getLowRedundancyBucketCount());
}
};
SerializableRunnable checkLowRedundancyAfterRebalance = new SerializableRunnable("checkLowRedundancyAfterRebalance") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(1,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
}
};
//make sure we can tell that the buckets have low redundancy
vm0.invoke(checkLowRedundancyBeforeRebalance);
//Now create the cache in another member
vm2.invoke(createPrRegion);
//Make sure we still have low redundancy
vm0.invoke(checkLowRedundancyBeforeRebalance);
/*
* Simulates a rebalance if simulation flag is set.
* Otherwise, performs a rebalance.
*
* A rebalance will replace offline buckets, so this
* should restore redundancy
*/
rebalanceVM.invoke(new SerializableRunnable("simulateRebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(6, results.getTotalBucketCreatesCompleted());
assertEquals(3, results.getTotalPrimaryTransfersCompleted());
assertEquals(0, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(6, details.getBucketCreatesCompleted());
assertEquals(3, details.getPrimaryTransfersCompleted());
assertEquals(0, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(2, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(6, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
if(!simulate) {
verifyStats(manager, results);
}
}
});
Set<Integer> vm0Buckets = getBucketList("region1", vm0);
Set<Integer> vm2Buckets = getBucketList("region1", vm2);
//Make sure redundancy is repaired if not simulated
if(!simulate) {
vm0.invoke(checkLowRedundancyAfterRebalance);
} else {
// Othewise, we should still have broken redundancy at this point
vm0.invoke(checkLowRedundancyBeforeRebalance);
}
vm2.invoke(closeCache);
vm0.invoke(closeCache);
if(useAccessor) {
vm3.invoke(closeCache);
}
//We need to restart both VMs at the same time, because
//they will wait for each other before allowing operations.
AsyncInvocation async0 = vm0.invokeAsync(createPrRegion);
AsyncInvocation async2 = vm2.invokeAsync(createPrRegion);
async0.getResult(30000);
async0.getResult(30000);
if(useAccessor) {
vm3.invoke(createAccessor);
}
// pause for async bucket recovery threads to finish their work. Otherwise
// the rebalance op may think that the other member doesn't have buckets, then
// ask it to create them and get a negative reply because it actually does
// have the buckets, causing the test to fail
pause(10000);
//Try to rebalance again. This shouldn't do anything, because
//we already recovered redundancy earlier.
//Note that we don't check the primary balance. This rebalance
//has a race with the redundancy recovery thread, which is also trying
//to rebalance primaries. So this thread might end up rebalancing primaries,
//or it might not.
if(!simulate) {
rebalanceVM.invoke(new SerializableRunnable("rebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
assertEquals(0, results.getTotalBucketTransfersCompleted());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(0, details.getBucketCreatesCompleted());
assertEquals(0, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(2, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(6, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
}
});
//Redundancy should be repaired.
vm0.invoke(checkLowRedundancyAfterRebalance);
}
vm1.invoke(createPrRegion);
//Look at vm0 buckets.
assertEquals(vm0Buckets, getBucketList("region1", vm0));
/*
* Look at vm1 buckets.
*/
if(!simulate) {
/*
* vm1 should have no buckets because offline buckets were recovered
* when vm0 and vm2 were rebalanced above.
*/
assertEquals(0,getBucketList("region1",vm1).size());
} else {
/*
* No rebalancing above because the simulation flag is on.
* Therefore, vm1 will have recovered its buckets.
*/
assertEquals(vm1Buckets,getBucketList("region1",vm1));
}
// look at vm2 buckets
assertEquals(vm2Buckets, getBucketList("region1", vm2));
}
public void testMoveBucketsWithUnrecoveredValues() {
moveBucketsWithUnrecoveredValuesRedundancy(false);
}
/**
* Check to make sure that we balance
* buckets between two hosts with no redundancy,
*
* even if the values have not yet been faulted in from disk.
* @param simulate
*/
public void moveBucketsWithUnrecoveredValuesRedundancy(final boolean simulate) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
System.setProperty(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, "false");
try {
Cache cache = getCache();
if(cache.findDiskStore("store") == null) {
cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs())
.setMaxOplogSize(1)
.create("store");
}
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
attr.setDiskStoreName("store");
attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
attr.setCacheLoader(new Bug40228Loader());
cache.createRegion("region1", attr.create());
} finally {
System.setProperty(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, "true");
}
}
};
//Create the region in only 1 VM
vm0.invoke(createPrRegion);
//Create some buckets
vm0.invoke(new SerializableRunnable("createSomeBuckets") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
region.put(Integer.valueOf(1), "A");
region.put(Integer.valueOf(2), "A");
region.put(Integer.valueOf(3), "A");
region.put(Integer.valueOf(4), "A");
region.put(Integer.valueOf(5), "A");
region.put(Integer.valueOf(6), "A");
}
});
final long[] bucketSizes = (long[]) vm0.invoke(new SerializableCallable("get sizes and close cache") {
public Object call() {
PartitionedRegion region = (PartitionedRegion) getCache().getRegion("region1");
PartitionedRegionDataStore dataStore = region.getDataStore();
long[] bucketSizes = new long[7];
for(int i =1; i <= 6; i++) {
BucketRegion bucket = dataStore.getLocalBucketById(i);
bucketSizes[i] = bucket.getTotalBytes();
assertEquals(0, bucket.getNumOverflowBytesOnDisk());
assertEquals(0, bucket.getNumOverflowOnDisk());
assertEquals(1, bucket.getNumEntriesInVM());
}
getCache().close();
return bucketSizes;
}
});
//now recover the region
vm0.invoke(createPrRegion);
vm0.invoke(new SerializableRunnable("check sizes") {
public void run() {
PartitionedRegion region = (PartitionedRegion) getCache().getRegion("region1");
PartitionedRegionDataStore dataStore = region.getDataStore();
for(int i =1; i <= 6; i++) {
BucketRegion bucket = dataStore.getLocalBucketById(i);
assertEquals(1, bucket.getNumOverflowOnDisk());
assertEquals(0, bucket.getNumEntriesInVM());
//the size recorded on disk is not the same is the in memory size, apparently
assertTrue("Bucket size was " + bucket.getNumOverflowBytesOnDisk(), 1 < bucket.getNumOverflowBytesOnDisk());
assertEquals(bucket.getNumOverflowBytesOnDisk(), bucket.getTotalBytes());
}
}
});
//Create the region in the other VM (should have no effect)
vm1.invoke(createPrRegion);
//Now simulate a rebalance
vm0.invoke(new SerializableRunnable("simulateRebalance") {
public void run() {
Cache cache = getCache();
ResourceManager manager = cache.getResourceManager();
RebalanceResults results = doRebalance(simulate, manager);
assertEquals(0, results.getTotalBucketCreatesCompleted());
assertEquals(0, results.getTotalPrimaryTransfersCompleted());
assertEquals(3, results.getTotalBucketTransfersCompleted());
assertTrue("Transfered Bytes = " + results.getTotalBucketTransferBytes(), 0 < results.getTotalBucketTransferBytes());
Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
assertEquals(1, detailSet.size());
PartitionRebalanceInfo details = detailSet.iterator().next();
assertEquals(0, details.getBucketCreatesCompleted());
assertEquals(0, details.getPrimaryTransfersCompleted());
assertTrue(0 < details.getBucketTransferBytes());
assertEquals(3, details.getBucketTransfersCompleted());
Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
assertEquals(2, afterDetails.size());
for(PartitionMemberInfo memberDetails: afterDetails) {
assertEquals(3, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
if(!simulate) {
verifyStats(manager, results);
}
}
});
if(!simulate) {
SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkRedundancyFixed") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(6, details.getCreatedBucketCount());
assertEquals(0,details.getActualRedundantCopies());
assertEquals(0,details.getLowRedundancyBucketCount());
assertEquals(2, details.getPartitionMemberInfo().size());
for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
assertEquals(3, memberDetails.getBucketCount());
assertEquals(3, memberDetails.getPrimaryCount());
}
//check to make sure that moving buckets didn't close the cache loader
Bug40228Loader loader = (Bug40228Loader) cache.getRegion("region1").getAttributes().getCacheLoader();
assertFalse(loader.isClosed());
}
};
vm0.invoke(checkRedundancyFixed);
vm1.invoke(checkRedundancyFixed);
SerializableRunnable checkBug40228Fixed = new SerializableRunnable("checkBug40228Fixed") {
public void run() {
Cache cache = getCache();
Bug40228Loader loader = (Bug40228Loader) cache.getRegion("region1").getAttributes().getCacheLoader();
assertFalse(loader.isClosed());
//check to make sure that closing the PR closes the cache loader
cache.getRegion("region1").close();
assertTrue(loader.isClosed());
}
};
vm0.invoke(checkBug40228Fixed);
vm1.invoke(checkBug40228Fixed);
}
}
//TODO test colocated regions where buckets aren't created for all subregions
//TODO test colocated regions where members aren't consistent in which regions they have
private void verifyStats(ResourceManager manager, RebalanceResults results) {
InternalResourceManager mgr = (InternalResourceManager) manager;
ResourceManagerStats stats = mgr.getStats();
assertEquals(0, stats.getRebalancesInProgress());
assertEquals(1, stats.getRebalancesCompleted());
assertEquals(0, stats.getRebalanceBucketCreatesInProgress());
assertEquals(results.getTotalBucketCreatesCompleted(), stats.getRebalanceBucketCreatesCompleted());
assertEquals(0, stats.getRebalanceBucketCreatesFailed());
assertEquals(results.getTotalBucketCreateTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalanceBucketCreateTime()));
assertEquals(results.getTotalBucketCreateBytes(), stats.getRebalanceBucketCreateBytes());
assertEquals(0, stats.getRebalanceBucketTransfersInProgress());
assertEquals(results.getTotalBucketTransfersCompleted(), stats.getRebalanceBucketTransfersCompleted());
assertEquals(0, stats.getRebalanceBucketTransfersFailed());
//assertEquals(results.getTotalBucketTransferTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalanceBucketTransfersTime()));
assertEquals(results.getTotalBucketTransferBytes(), stats.getRebalanceBucketTransfersBytes());
assertEquals(0, stats.getRebalancePrimaryTransfersInProgress());
assertEquals(results.getTotalPrimaryTransfersCompleted(), stats.getRebalancePrimaryTransfersCompleted());
assertEquals(0, stats.getRebalancePrimaryTransfersFailed());
assertEquals(results.getTotalPrimaryTransferTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalancePrimaryTransferTime()));
}
private Set<Integer> getBucketList(final String regionName, VM vm0) {
SerializableCallable getBuckets = new SerializableCallable("get buckets") {
public Object call() throws Exception {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
return new TreeSet<Integer>(region.getDataStore().getAllLocalBucketIds());
}
};
return (Set<Integer>) vm0.invoke(getBuckets);
}
private void waitForBucketList(final String regionName, VM vm0, final Collection<Integer> expected) {
SerializableCallable getBuckets = new SerializableCallable("get buckets") {
public Object call() throws Exception {
Cache cache = getCache();
final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
waitForCriterion(new WaitCriterion() {
@Override
public boolean done() {
TreeSet<Integer> results = getBuckets();
return results.equals(expected);
}
protected TreeSet<Integer> getBuckets() {
TreeSet<Integer> results = new TreeSet<Integer>(region.getDataStore().getAllLocalBucketIds());
return results;
}
@Override
public String description() {
return "Timeout waiting for buckets to match. Expected " + expected + " but got " + getBuckets();
}
}, 60000, 100, true);
return null;
}
};
vm0.invoke(getBuckets);
}
private static class BlockingCacheListener extends CacheListenerAdapter {
CountDownLatch operationStartedLatch;
CountDownLatch resumeOperationLatch = new CountDownLatch(1);
public BlockingCacheListener(int threads) {
operationStartedLatch = new CountDownLatch(threads);
}
public void waitForOperationsStarted() throws InterruptedException {
operationStartedLatch.await(MAX_WAIT, TimeUnit.SECONDS);
}
public void close() {
}
private void doWait() {
operationStartedLatch.countDown();
try {
resumeOperationLatch.await(MAX_WAIT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new CacheWriterException(e);
}
}
@Override
public void afterUpdate(EntryEvent event) throws CacheWriterException {
doWait();
}
@Override
public void afterCreate(EntryEvent event) {
doWait();
}
@Override
public void afterDestroy(EntryEvent event) {
doWait();
}
@Override
public void afterInvalidate(EntryEvent event) {
doWait();
}
public void release() {
resumeOperationLatch.countDown();
}
}
private static interface Operation {
void execute(Region region, Integer key);
}
private static class Bug40228Loader implements CacheLoader {
private volatile boolean closed = false;
public Object load(LoaderHelper helper) throws CacheLoaderException {
return null;
}
public boolean isClosed() {
return closed;
}
public void close() {
closed =true;
}
}
}