blob: e2822b74a4721bbda9d281351890b1e54ba9337b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.partition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.junit.Test;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.partitioned.fixed.QuarterPartitionResolver;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
public class PartitionRegionHelperDUnitTest extends JUnit4CacheTestCase {
public PartitionRegionHelperDUnitTest() {
super();
}
@Test
public void testAssignBucketsToPartitions() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
@Override
public void run() {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setTotalNumBuckets(12);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
vm2.invoke(createPrRegion);
SerializableRunnable assignBuckets = new SerializableRunnable("assign partitions") {
@Override
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionHelper.assignBucketsToPartitions(region);
}
};
AsyncInvocation future1 = vm0.invokeAsync(assignBuckets);
AsyncInvocation future2 = vm1.invokeAsync(assignBuckets);
AsyncInvocation future3 = vm2.invokeAsync(assignBuckets);
future1.join(60 * 1000);
future2.join(60 * 1000);
future3.join(60 * 1000);
if (future1.exceptionOccurred())
throw future1.getException();
if (future2.exceptionOccurred())
throw future2.getException();
if (future3.exceptionOccurred())
throw future3.getException();
SerializableRunnable checkAssignment = new SerializableRunnable("check assignment") {
@Override
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo info = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(12, info.getCreatedBucketCount());
assertEquals(0, info.getLowRedundancyBucketCount());
for (PartitionMemberInfo member : info.getPartitionMemberInfo()) {
assertEquals(8, member.getBucketCount());
// TODO unfortunately, with redundancy, we can end up
// not balancing primaries in favor of balancing buckets. The problem
// is that We could create too many redundant copies on a node, which means
// when we get to the last bucket, which should be primary on that node, we
// don't even put a copy of the bucket on that node
// See bug #40470
// assertIndexDetailsEquals(4, member.getPrimaryCount());
}
}
};
vm0.invoke(checkAssignment);
}
@Test
public void testAssignBucketsToPartitions_FPR() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
SerializableRunnable createPrRegion1 = new SerializableRunnable("createRegion") {
@Override
public void run() {
Cache cache = getCache();
FixedPartitionAttributes fpa1 =
FixedPartitionAttributes.createFixedPartition("Q1", true, 3);
FixedPartitionAttributes fpa2 =
FixedPartitionAttributes.createFixedPartition("Q2", false, 3);
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.addFixedPartitionAttributes(fpa1);
paf.addFixedPartitionAttributes(fpa2);
paf.setPartitionResolver(new QuarterPartitionResolver());
paf.setRedundantCopies(1);
paf.setTotalNumBuckets(12);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
vm0.invoke(createPrRegion1);
SerializableRunnable createPrRegion2 = new SerializableRunnable("createRegion") {
@Override
public void run() {
Cache cache = getCache();
FixedPartitionAttributes fpa1 =
FixedPartitionAttributes.createFixedPartition("Q2", true, 3);
FixedPartitionAttributes fpa2 =
FixedPartitionAttributes.createFixedPartition("Q3", false, 3);
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.addFixedPartitionAttributes(fpa1);
paf.addFixedPartitionAttributes(fpa2);
paf.setPartitionResolver(new QuarterPartitionResolver());
paf.setRedundantCopies(1);
paf.setTotalNumBuckets(12);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
vm1.invoke(createPrRegion2);
SerializableRunnable createPrRegion3 = new SerializableRunnable("createRegion") {
@Override
public void run() {
Cache cache = getCache();
FixedPartitionAttributes fpa1 =
FixedPartitionAttributes.createFixedPartition("Q3", true, 3);
FixedPartitionAttributes fpa2 =
FixedPartitionAttributes.createFixedPartition("Q1", false, 3);
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.addFixedPartitionAttributes(fpa1);
paf.addFixedPartitionAttributes(fpa2);
paf.setPartitionResolver(new QuarterPartitionResolver());
paf.setRedundantCopies(1);
paf.setTotalNumBuckets(12);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
vm2.invoke(createPrRegion3);
SerializableRunnable assignBuckets = new SerializableRunnable("assign partitions") {
@Override
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionHelper.assignBucketsToPartitions(region);
}
};
AsyncInvocation future1 = vm0.invokeAsync(assignBuckets);
AsyncInvocation future2 = vm1.invokeAsync(assignBuckets);
AsyncInvocation future3 = vm2.invokeAsync(assignBuckets);
future1.join();
future2.join();
future3.join();
if (future1.exceptionOccurred())
throw future1.getException();
if (future2.exceptionOccurred())
throw future2.getException();
if (future3.exceptionOccurred())
throw future3.getException();
SerializableRunnable checkAssignment = new SerializableRunnable("check assignment") {
@Override
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
PartitionRegionInfo info = PartitionRegionHelper.getPartitionRegionInfo(region);
assertEquals(9, info.getCreatedBucketCount());
assertEquals(0, info.getLowRedundancyBucketCount());
for (PartitionMemberInfo member : info.getPartitionMemberInfo()) {
assertEquals(6, member.getBucketCount());
}
}
};
vm0.invoke(checkAssignment);
SerializableRunnable createPrRegion4 = new SerializableRunnable("createRegion") {
@Override
public void run() {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setPartitionResolver(new QuarterPartitionResolver());
paf.setLocalMaxMemory(0);
paf.setRedundantCopies(1);
paf.setTotalNumBuckets(12);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
Region region = cache.createRegion("region1", attr.create());
for (Months_Accessor month : Months_Accessor.values()) {
String dateString = 10 + "-" + month + "-" + "2009";
String DATE_FORMAT = "dd-MMM-yyyy";
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT, Locale.US);
Date date = null;
try {
date = sdf.parse(dateString);
} catch (ParseException e) {
Assert.fail("Exception Occurred while parseing date", e);
}
String value = month.toString() + 10;
region.put(date, value);
}
}
};
vm3.invoke(createPrRegion4);
SerializableRunnable checkMembers = new SerializableRunnable("createRegion") {
@Override
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region1");
for (Months_Accessor month : Months_Accessor.values()) {
String dateString = 10 + "-" + month + "-" + "2009";
String DATE_FORMAT = "dd-MMM-yyyy";
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT, Locale.US);
Date date = null;
try {
date = sdf.parse(dateString);
} catch (ParseException e) {
Assert.fail("Exception Occurred while parseing date", e);
}
DistributedMember key1Pri = PartitionRegionHelper.getPrimaryMemberForKey(region, date);
assertNotNull(key1Pri);
Set<DistributedMember> buk0AllMems =
PartitionRegionHelper.getAllMembersForKey(region, date);
assertEquals(2, buk0AllMems.size());
Set<DistributedMember> buk0RedundantMems =
PartitionRegionHelper.getRedundantMembersForKey(region, date);
assertEquals(1, buk0RedundantMems.size());
}
}
};
vm3.invoke(checkMembers);
}
@Test
public void testMembersForKey() throws Exception {
Host host = Host.getHost(0);
VM accessor = host.getVM(0);
VM ds1 = host.getVM(1);
VM ds2 = host.getVM(2);
VM ds3 = host.getVM(3);
final String prName = getUniqueName();
final int tb = 11;
final int rc = 1;
accessor.invoke(new SerializableRunnable("createAccessor") {
@Override
public void run() {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
attr.setPartitionAttributes(new PartitionAttributesFactory().setLocalMaxMemory(0)
.setRedundantCopies(rc).setTotalNumBuckets(tb).create());
cache.createRegion(prName, attr.create());
}
});
HashMap<DistributedMember, VM> d2v = new HashMap<DistributedMember, VM>();
SerializableCallable createPrRegion = new SerializableCallable("createDataStore") {
@Override
public Object call() throws Exception {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
attr.setPartitionAttributes(new PartitionAttributesFactory().setRedundantCopies(rc)
.setTotalNumBuckets(tb).create());
cache.createRegion(prName, attr.create());
return cache.getDistributedSystem().getDistributedMember();
}
};
DistributedMember dm = (DistributedMember) ds1.invoke(createPrRegion);
d2v.put(dm, ds1);
dm = (DistributedMember) ds2.invoke(createPrRegion);
d2v.put(dm, ds2);
dm = (DistributedMember) ds3.invoke(createPrRegion);
d2v.put(dm, ds3);
final Integer buk0Key1 = new Integer(0);
final Integer buk0Key2 = new Integer(buk0Key1.intValue() + tb);
final Integer buk1Key1 = new Integer(1);
accessor.invoke(new CacheSerializableRunnable("nonPRcheck") {
@SuppressWarnings("unchecked")
@Override
public void run2() throws CacheException {
AttributesFactory attr = new AttributesFactory();
{
attr.setScope(Scope.LOCAL);
Region lr = getCache().createRegion(prName + "lr", attr.create());
try {
// no-pr check
nonPRMemberForKey(lr, buk0Key1);
} finally {
lr.destroyRegion();
}
}
{
attr = new AttributesFactory();
attr.setScope(Scope.DISTRIBUTED_ACK);
Region dr = getCache().createRegion(prName + "dr", attr.create());
try {
// no-pr check
nonPRMemberForKey(dr, buk0Key1);
} finally {
dr.destroyRegion();
}
}
}
private void nonPRMemberForKey(Region lr, final Object key) {
try {
PartitionRegionHelper.getPrimaryMemberForKey(lr, key);
fail();
} catch (IllegalArgumentException expected) {
}
try {
PartitionRegionHelper.getAllMembersForKey(lr, key);
fail();
} catch (IllegalArgumentException expected) {
}
try {
PartitionRegionHelper.getRedundantMembersForKey(lr, key);
fail();
} catch (IllegalArgumentException expected) {
}
}
});
Object[] noKeyThenKeyStuff =
(Object[]) accessor.invoke(new SerializableCallable("noKeyThenKey") {
@Override
public Object call() throws Exception {
Region<Integer, String> r = getCache().getRegion(prName);
// NPE check
try {
PartitionRegionHelper.getPrimaryMemberForKey(r, null);
fail();
} catch (IllegalStateException expected) {
}
try {
PartitionRegionHelper.getAllMembersForKey(r, null);
fail();
} catch (IllegalStateException expected) {
}
try {
PartitionRegionHelper.getRedundantMembersForKey(r, null);
fail();
} catch (IllegalStateException expected) {
}
// buk0
assertNull(PartitionRegionHelper.getPrimaryMemberForKey(r, buk0Key1));
assertTrue(PartitionRegionHelper.getAllMembersForKey(r, buk0Key1).isEmpty());
assertTrue(PartitionRegionHelper.getRedundantMembersForKey(r, buk0Key1).isEmpty());
// buk1
assertNull(PartitionRegionHelper.getPrimaryMemberForKey(r, buk1Key1));
assertTrue(PartitionRegionHelper.getAllMembersForKey(r, buk1Key1).isEmpty());
assertTrue(PartitionRegionHelper.getRedundantMembersForKey(r, buk1Key1).isEmpty());
r.put(buk0Key1, "zero");
// buk0, key1
DistributedMember key1Pri = PartitionRegionHelper.getPrimaryMemberForKey(r, buk0Key1);
assertNotNull(key1Pri);
Set<DistributedMember> buk0AllMems =
PartitionRegionHelper.getAllMembersForKey(r, buk0Key1);
assertEquals(rc + 1, buk0AllMems.size());
Set<DistributedMember> buk0RedundantMems =
PartitionRegionHelper.getRedundantMembersForKey(r, buk0Key1);
assertEquals(rc, buk0RedundantMems.size());
DistributedMember me = r.getCache().getDistributedSystem().getDistributedMember();
try {
buk0AllMems.add(me);
fail();
} catch (UnsupportedOperationException expected) {
}
try {
buk0AllMems.remove(me);
fail();
} catch (UnsupportedOperationException expected) {
}
try {
buk0RedundantMems.add(me);
fail();
} catch (UnsupportedOperationException expected) {
}
try {
buk0RedundantMems.remove(me);
fail();
} catch (UnsupportedOperationException expected) {
}
assertTrue(buk0AllMems.containsAll(buk0RedundantMems));
assertTrue(buk0AllMems.contains(key1Pri));
assertTrue(!buk0RedundantMems.contains(key1Pri));
// buk0, key2
DistributedMember key2Pri = PartitionRegionHelper.getPrimaryMemberForKey(r, buk0Key2);
assertNotNull(key2Pri);
buk0AllMems = PartitionRegionHelper.getAllMembersForKey(r, buk0Key2);
assertEquals(rc + 1, buk0AllMems.size());
buk0RedundantMems = PartitionRegionHelper.getRedundantMembersForKey(r, buk0Key2);
assertEquals(rc, buk0RedundantMems.size());
assertTrue(buk0AllMems.containsAll(buk0RedundantMems));
assertTrue(buk0AllMems.contains(key2Pri));
assertTrue(!buk0RedundantMems.contains(key2Pri));
// buk1
assertNull(PartitionRegionHelper.getPrimaryMemberForKey(r, buk1Key1));
assertTrue(PartitionRegionHelper.getAllMembersForKey(r, buk1Key1).isEmpty());
assertTrue(PartitionRegionHelper.getRedundantMembersForKey(r, buk1Key1).isEmpty());
return new Object[] {key1Pri, buk0AllMems, buk0RedundantMems};
}
});
final DistributedMember buk0Key1Pri = (DistributedMember) noKeyThenKeyStuff[0];
final Set<DistributedMember> buk0AllMems = (Set<DistributedMember>) noKeyThenKeyStuff[1];
final Set<DistributedMember> buk0Redundants = (Set<DistributedMember>) noKeyThenKeyStuff[2];
VM buk0Key1PriVM = d2v.get(buk0Key1Pri);
buk0Key1PriVM.invoke(new CacheSerializableRunnable("assertPrimaryness") {
@Override
public void run2() throws CacheException {
PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(prName);
Integer bucketId =
new Integer(PartitionedRegionHelper.getHashKey(pr, null, buk0Key1, null, null));
try {
BucketRegion buk0 = pr.getDataStore().getInitializedBucketForId(buk0Key1, bucketId);
assertNotNull(buk0);
assertTrue(buk0.getBucketAdvisor().isPrimary());
} catch (ForceReattemptException e) {
LogWriterUtils.getLogWriter().severe(e);
fail();
}
}
});
CacheSerializableRunnable assertHasBucket =
new CacheSerializableRunnable("assertHasBucketAndKey") {
@Override
public void run2() throws CacheException {
PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(prName);
Integer bucketId =
new Integer(PartitionedRegionHelper.getHashKey(pr, null, buk0Key1, null, null));
try {
BucketRegion buk0 = pr.getDataStore().getInitializedBucketForId(buk0Key1, bucketId);
assertNotNull(buk0);
Entry k1e = buk0.getEntry(buk0Key1);
assertNotNull(k1e);
} catch (ForceReattemptException e) {
LogWriterUtils.getLogWriter().severe(e);
fail();
}
}
};
for (DistributedMember bom : buk0AllMems) {
VM v = d2v.get(bom);
LogWriterUtils.getLogWriter()
.info("Visiting bucket owner member " + bom + " for key " + buk0Key1);
v.invoke(assertHasBucket);
}
CacheSerializableRunnable assertRed = new CacheSerializableRunnable("assertRedundant") {
@Override
public void run2() throws CacheException {
PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(prName);
Integer bucketId =
new Integer(PartitionedRegionHelper.getHashKey(pr, null, buk0Key1, null, null));
try {
BucketRegion buk0 = pr.getDataStore().getInitializedBucketForId(buk0Key1, bucketId);
assertNotNull(buk0);
assertFalse(buk0.getBucketAdvisor().isPrimary());
} catch (ForceReattemptException e) {
LogWriterUtils.getLogWriter().severe(e);
fail();
}
}
};
for (DistributedMember redm : buk0Redundants) {
VM v = d2v.get(redm);
LogWriterUtils.getLogWriter()
.info("Visiting redundant member " + redm + " for key " + buk0Key1);
v.invoke(assertRed);
}
}
@Test
public void testMoveSingleBucket() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableCallable createPrRegion = new SerializableCallable("createRegion") {
@Override
public Object call() {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setTotalNumBuckets(12);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
return cache.getDistributedSystem().getDistributedMember();
}
};
final DistributedMember member0 = (DistributedMember) vm0.invoke(createPrRegion);
final DistributedMember member1 = (DistributedMember) vm1.invoke(createPrRegion);
// populate the region with data so we have some buckets
vm0.invoke(new SerializableRunnable("create data") {
@Override
public void run() {
for (int i = 0; i < 8; i++) {
Region<Object, Object> region = getCache().getRegion("region1");
region.put(i, "one");
}
}
});
// Create VM 2 later so that it doesn't have any buckets
final DistributedMember member2 = (DistributedMember) vm2.invoke(createPrRegion);
// Try some explicit moves
vm0.invoke(new SerializableRunnable("create data") {
@Override
public void run() {
Region<Object, Object> region = getCache().getRegion("region1");
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
// Try to move a bucket to a member that already has the bucket
try {
PartitionRegionHelper.moveBucketByKey(region, member0, member1, 1);
fail("Should have received an exception");
} catch (IllegalStateException expected) {
System.err.println(expected);
}
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
// Try to move the bucket from a member that doesn't have the bucket
try {
PartitionRegionHelper.moveBucketByKey(region, member2, member2, 1);
fail("Should have received an exception");
} catch (IllegalStateException expected) {
System.err.println(expected);
}
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
// Try to move the bucket from an invalid member
try {
PartitionRegionHelper.moveBucketByKey(region, member2,
new InternalDistributedMember("localhost", 5), 1);
fail("Should have received an exception");
} catch (IllegalStateException expected) {
System.err.println(expected);
}
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
// Try to move the bucket that doesn't exist
try {
PartitionRegionHelper.moveBucketByKey(region, member0, member2, 10);
fail("Should have received an exception");
} catch (IllegalStateException expected) {
System.err.println(expected);
}
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 10));
// Do some successful moves.
PartitionRegionHelper.moveBucketByKey(region, member0, member2, 1);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member1, member2);
PartitionRegionHelper.moveBucketByKey(region, member2, member0, 1);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
PartitionRegionHelper.moveBucketByKey(region, member0, member2, 2);
PartitionRegionHelper.moveBucketByKey(region, member1, member2, 3);
PartitionRegionHelper.moveBucketByKey(region, member1, member2, 4);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 2), member1, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 3), member0, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 4), member0, member2);
}
});
}
@Test
public void testMovePercentage() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableCallable createPrRegion = new SerializableCallable("createRegion") {
@Override
public Object call() {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setTotalNumBuckets(12);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
return cache.getDistributedSystem().getDistributedMember();
}
};
final DistributedMember member0 = (DistributedMember) vm0.invoke(createPrRegion);
final DistributedMember member1 = (DistributedMember) vm1.invoke(createPrRegion);
// populate the region with data so we have some buckets
vm0.invoke(new SerializableRunnable("create data") {
@Override
public void run() {
for (int i = 0; i < 8; i++) {
Region<Object, Object> region = getCache().getRegion("region1");
region.put(i, "one");
}
}
});
// Create VM 2 later so that it doesn't have any buckets
final DistributedMember member2 = (DistributedMember) vm2.invoke(createPrRegion);
// Try some percentage moves
vm0.invoke(new SerializableRunnable("create data") {
@Override
public void run() {
Region<Object, Object> region = getCache().getRegion("region1");
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
// Try to move the bucket to an invalid member
try {
PartitionRegionHelper.moveData(region, member1,
new InternalDistributedMember("localhost", 5), 100f);
fail("Should have received an exception");
} catch (IllegalStateException expected) {
System.err.println(expected);
}
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
// Try to move the bucket from an invalid member
try {
PartitionRegionHelper.moveData(region, new InternalDistributedMember("localhost", 5),
member2, 10f);
fail("Should have received an exception");
} catch (IllegalStateException expected) {
System.err.println(expected);
}
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
// Try to move the bucket to the same member.
try {
PartitionRegionHelper.moveData(region, member0, member0, 10f);
fail("Should have received an exception");
} catch (IllegalStateException expected) {
System.err.println(expected);
}
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member1);
// Try to move a really small percentage of data.
assertEquals(0, PartitionRegionHelper.moveData(region, member0, member2, .001f)
.getTotalBucketTransfersCompleted());
// Try to move data between members that have the same buckets
// There should be nothing to move
assertEquals(0, PartitionRegionHelper.moveData(region, member0, member1, 100f)
.getTotalBucketTransfersCompleted());
// Move all of the data
assertEquals(8, PartitionRegionHelper.moveData(region, member0, member2, 100f)
.getTotalBucketTransfersCompleted());
// See if the buckets have moved.
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member1, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 2), member1, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 3), member1, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 4), member1, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 5), member1, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 6), member1, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 7), member1, member2);
// Retry the move, to show there's nothing left
assertEquals(0, PartitionRegionHelper.moveData(region, member0, member2, 50f)
.getTotalBucketTransfersCompleted());
// Move data in chunks
assertEquals(2, PartitionRegionHelper.moveData(region, member1, member0, 25f)
.getTotalBucketTransfersCompleted());
assertEquals(2, PartitionRegionHelper.moveData(region, member1, member0, 34f)
.getTotalBucketTransfersCompleted());
assertEquals(2, PartitionRegionHelper.moveData(region, member1, member0, 50f)
.getTotalBucketTransfersCompleted());
assertEquals(2, PartitionRegionHelper.moveData(region, member1, member0, 100f)
.getTotalBucketTransfersCompleted());
// Should be nothing left to move
assertEquals(0, PartitionRegionHelper.moveData(region, member1, member0, 100)
.getTotalBucketTransfersCompleted());
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 1), member0, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 2), member0, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 3), member0, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 4), member0, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 5), member0, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 6), member0, member2);
assertHasMembers(PartitionRegionHelper.getAllMembersForKey(region, 7), member0, member2);
}
});
}
public void assertHasMembers(Set<DistributedMember> got, DistributedMember... expected) {
HashSet expectedSet = new HashSet(Arrays.asList(expected));
assertEquals(expectedSet, got);
}
public enum Months_Accessor {
JAN, FEB, MAR, APR, MAY, JUN, JUL, AUG, SEP
}
}