blob: c15f3d922d1d82fdf64d0ac3c739397df09ba58f [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.execute;
/**
* This is a dunit test for PartitionedRegion creation and Region API's
* for put and get functionality in case of Custom Partitioning.
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.EntryOperation;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.PartitionResolver;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.internal.cache.EntryOperationImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDUnitTestCase;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore.BucketVisitor;
import com.gemstone.gemfire.internal.cache.xmlcache.Declarable2;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
public class PRCustomPartitioningDUnitTest extends
PartitionedRegionDUnitTestCase {
public PRCustomPartitioningDUnitTest(String name) {
super(name);
}
protected static Cache cache = null;
Properties props = new Properties();
VM vm0 = null;
VM vm1 = null;
VM vm2 = null;
VM vm3 = null;
static final int totalNumBuckets = 7;
static ArrayList listOfKeys1 = new ArrayList();
static ArrayList listOfKeys2 = new ArrayList();
static ArrayList listOfKeys3 = new ArrayList();
static ArrayList listOfKeys4 = new ArrayList();
public static void createCacheInVm() throws Exception{
Properties props = new Properties();
//props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
//props.setProperty(DistributionConfig.LOCATORS_NAME, "");
new PRCustomPartitioningDUnitTest("temp").createCache(props);
}
private void createCache(Properties props) throws Exception
{
DistributedSystem ds = getSystem(props);
assertNotNull(ds);
ds.disconnect();
ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
/* SerializableRunnable object to create PR with a partition resolver */
SerializableRunnable createPrRegionWithPartitionResolver = new CacheSerializableRunnable(
"createPrRegionWithDS") {
public void run2() throws CacheException {
AttributesFactory attr = new AttributesFactory();
PartitionResolver resolver = MonthBasedPartitionResolver.getInstance();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(totalNumBuckets);
paf.setPartitionResolver(resolver);
paf.setRedundantCopies(0);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
RegionAttributes regionAttribs = attr.create();
cache.createRegion("PR1", regionAttribs);
}
};
/* SerializableRunnable object to create PR with a partition resolver */
SerializableRunnable createPrRegionOnlyAccessorWithPartitionResolver = new CacheSerializableRunnable(
"createPrRegionOnlyAccessor") {
public void run2() throws CacheException {
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
PartitionResolver resolver = MonthBasedPartitionResolver.getInstance();
PartitionAttributes prAttr = paf.setLocalMaxMemory(0).setTotalNumBuckets(
totalNumBuckets).setPartitionResolver(resolver).setRedundantCopies(0)
.create();
attr.setPartitionAttributes(prAttr);
RegionAttributes regionAttribs = attr.create();
cache.createRegion("PR1", regionAttribs);
}
};
/**
* Search the entires PartitionedRegion for the key, to validate that indeed
* it doesn't exist
*
* @returns true if it does exist
* @param par
* @param key
*/
public static boolean searchForKey(PartitionedRegion par, Date key) {
// Check to make super sure that the key exists
boolean foundIt = false;
final int numBucks = par.getTotalNumberOfBuckets();
for (int b = 0; b < numBucks; b++) {
if (par.getBucketKeys(b).contains(key)) {
foundIt = true;
getLogWriter().info("Key " + key + " found in bucket " + b);
break;
}
}
if (!foundIt) {
getLogWriter().severe("Key " + key + " not found in any bucket");
}
return foundIt;
}
public void partitionedRegionTest(final String prName) {
/*
* Do put() operations through VM with PR having
* both Accessor and Datastore
*/
vm0
.invoke(new CacheSerializableRunnable(
"doPutCreateInvalidateOperations1") {
public void run2() throws CacheException {
Calendar cal = Calendar.getInstance();
final Region pr = cache.getRegion(prName);
if (pr == null) {
fail(prName + " not created");
}
int size = 0;
size = pr.size();
assertEquals("Size doesnt return expected value", 0, size);
assertEquals(
"isEmpty doesnt return proper state of the PartitionedRegion",
true, pr.isEmpty());
assertEquals(0, pr.keySet().size());
for (int i = 0; i <= 11; i++) {
int yr = (new Integer((int)(Math.random() * 2100))).intValue();
int month = i;
int date = (new Integer((int)(Math.random() * 30))).intValue();
cal.set(yr, month, date);
Object key = cal.getTime();
listOfKeys1.add(key);
assertNotNull(pr);
pr.put(key, Integer.toString(i));
assertEquals(Integer.toString(i), pr.get(key));
}
PartitionedRegion ppr = (PartitionedRegion)pr;
try {
ppr.dumpAllBuckets(false);
}
catch (ReplyException re) {
fail("dumpAllBuckets", re);
}
}
});
vm1
.invoke(new CacheSerializableRunnable(
"doPutCreateInvalidateOperations2") {
public void run2() throws CacheException {
Calendar cal = Calendar.getInstance();
final Region pr = cache.getRegion(prName);
if (pr == null) {
fail(prName + " not created");
}
for (int i = 0; i <= 11; i++) {
int yr = (new Integer((int)(Math.random() * 2200))).intValue();
int month = i;
int date = (new Integer((int)(Math.random() * 30))).intValue();
cal.set(yr, month, date);
Object key = cal.getTime();
listOfKeys2.add(key);
assertNotNull(pr);
pr.put(key, Integer.toString(i));
assertEquals(Integer.toString(i), pr.get(key));
}
PartitionedRegion ppr = (PartitionedRegion)pr;
try {
ppr.dumpAllBuckets(false);
}
catch (ReplyException re) {
fail("dumpAllBuckets", re);
}
}
});
vm2
.invoke(new CacheSerializableRunnable(
"doPutCreateInvalidateOperations2") {
public void run2() throws CacheException {
Calendar cal = Calendar.getInstance();
final Region pr = cache.getRegion(prName);
if (pr == null) {
fail(prName + " not created");
}
for (int i = 0; i <= 11; i++) {
int yr = (new Integer((int)(Math.random() * 2300))).intValue();
int month = i;
int date = (new Integer((int)(Math.random() * 30))).intValue();
cal.set(yr, month, date);
Object key = cal.getTime();
listOfKeys3.add(key);
assertNotNull(pr);
pr.put(key, Integer.toString(i));
assertEquals(Integer.toString(i), pr.get(key));
}
PartitionedRegion ppr = (PartitionedRegion)pr;
try {
ppr.dumpAllBuckets(false);
}
catch (ReplyException re) {
fail("dumpAllBuckets", re);
}
}
});
vm3
.invoke(new CacheSerializableRunnable(
"doPutCreateInvalidateOperations3") {
public void run2() throws CacheException {
Calendar cal = Calendar.getInstance();
final Region pr = cache.getRegion(prName);
if (pr == null) {
fail(prName + " not created");
}
for (int i = 0; i <= 11; i++) {
int yr = (new Integer((int)(Math.random() * 2400))).intValue();
int month = i;
int date = (new Integer((int)(Math.random() * 30))).intValue();
cal.set(yr, month, date);
Object key = cal.getTime();
listOfKeys4.add(key);
assertNotNull(pr);
pr.put(key, Integer.toString(i));
assertEquals(Integer.toString(i), pr.get(key));
}
PartitionedRegion ppr = (PartitionedRegion)pr;
try {
ppr.dumpAllBuckets(false);
}
catch (ReplyException re) {
fail("dumpAllBuckets", re);
}
}
});
vm0.invoke(new CacheSerializableRunnable(
"verifyKeysonVM0") {
public void run2() throws CacheException {
// Calendar cal = Calendar.getInstance();
final PartitionedRegion pr = (PartitionedRegion)cache.getRegion(prName);
if (pr == null) {
fail(prName + " not created");
}
Iterator itr = listOfKeys1.iterator();
while (itr.hasNext()) {
assertTrue(searchForKey(pr, (Date)itr.next()));
}
pr.getDataStore().visitBuckets(new BucketVisitor() {
public void visit(Integer bucketId, Region r) {
Set s = pr.getBucketKeys(bucketId.intValue());
Iterator it = s.iterator();
while(it.hasNext()){
EntryOperation eo = new EntryOperationImpl(pr,null,it.next(),null,null);
PartitionResolver rr = pr.getPartitionResolver();
Object o = rr.getRoutingObject(eo);
Integer i = new Integer(o.hashCode()% totalNumBuckets);
assertEquals(bucketId, i);
} //getLogWriter().severe("Key " + key + " found in bucket " + b);
}
});
}
});
vm1.invoke(new CacheSerializableRunnable(
"verifyKeysonVM1") {
public void run2() throws CacheException {
// Calendar cal = Calendar.getInstance();
final PartitionedRegion pr = (PartitionedRegion)cache.getRegion(prName);
if (pr == null) {
fail(prName + " not created");
}
Iterator itr = listOfKeys2.iterator();
while(itr.hasNext()) {
assertTrue(searchForKey(pr,(Date)itr.next()));
}
pr.getDataStore().visitBuckets(new BucketVisitor() {
public void visit(Integer bucketId, Region r)
{
Set s = pr.getBucketKeys(bucketId.intValue());
Iterator it = s.iterator();
while(it.hasNext()){
EntryOperation eo = new EntryOperationImpl(pr,null,it.next(),null,null);
PartitionResolver rr = pr.getPartitionResolver();
Object o = rr.getRoutingObject(eo);
Integer i = new Integer(o.hashCode()% totalNumBuckets);
assertEquals(bucketId, i);
} //getLogWriter().severe("Key " + key + " found in bucket " + b);
}}
);
}
});
vm2.invoke(new CacheSerializableRunnable(
"verifyKeysonVM2") {
public void run2() throws CacheException {
// Calendar cal = Calendar.getInstance();
final PartitionedRegion pr = (PartitionedRegion)cache.getRegion(prName);
if (pr == null) {
fail(prName + " not created");
}
Iterator itr = listOfKeys3.iterator();
itr = listOfKeys3.iterator();
while (itr.hasNext()) {
assertTrue(searchForKey(pr, (Date)itr.next()));
}
pr.getDataStore().visitBuckets(new BucketVisitor() {
public void visit(Integer bucketId, Region r) {
Set s = pr.getBucketKeys(bucketId.intValue());
Iterator it = s.iterator();
while(it.hasNext()){
EntryOperation eo = new EntryOperationImpl(pr,null,it.next(),null,null);
PartitionResolver rr = pr.getPartitionResolver();
Object o = rr.getRoutingObject(eo);
Integer i = new Integer(o.hashCode()% totalNumBuckets);
//assertEquals(bucketId, bucketId);
assertEquals(bucketId, i);
} //getLogWriter().severe("Key " + key + " found in bucket " + b);
}
});
}
});
vm3.invoke(new CacheSerializableRunnable(
"verifyKeysonVM3") {
public void run2() throws CacheException {
// Calendar cal = Calendar.getInstance();
final PartitionedRegion pr = (PartitionedRegion)cache.getRegion(prName);
if (pr == null) {
fail(prName + " not created");
}
Iterator itr = listOfKeys4.iterator();
itr = listOfKeys4.iterator();
while (itr.hasNext()) {
assertTrue(searchForKey(pr, (Date)itr.next()));
}
assertEquals(pr.getDataStore(), null);
}
});
}
/**
* This is a PartitionedRegion test for Custom Partitioning . 4 VMs are used to create the PR with
* and without(Only Accessor) the DataStore.
*/
public void testPartitionedRegionOperationsCustomPartitioning()
throws Exception {
Host host = Host.getHost(0);
// create the VM(0 - 4)
vm0 = host.getVM(0);
vm1 = host.getVM(1);
vm2 = host.getVM(2);
vm3 = host.getVM(3);
final VM accessor = vm3;
//create cache in all vms
vm0.invoke(PRCustomPartitioningDUnitTest.class, "createCacheInVm");
vm1.invoke(PRCustomPartitioningDUnitTest.class, "createCacheInVm");
vm2.invoke(PRCustomPartitioningDUnitTest.class, "createCacheInVm");
accessor.invoke(PRCustomPartitioningDUnitTest.class, "createCacheInVm");
// Create PR;s in different VM's
vm0.invoke(createPrRegionWithPartitionResolver);
vm1.invoke(createPrRegionWithPartitionResolver);
vm2.invoke(createPrRegionWithPartitionResolver);
accessor.invoke(createPrRegionOnlyAccessorWithPartitionResolver);
partitionedRegionTest("/PR1");
/*
* destroy the Region.
*/
destroyTheRegion("/PR1");
}
public void destroyTheRegion(final String name) {
/*
* destroy the Region.
*/
vm0.invoke(new CacheSerializableRunnable("destroyRegionOp") {
public void run2() throws CacheException {
Region pr = cache.getRegion(name);
if (pr == null) {
fail(name + " not created");
}
pr.destroyRegion();
}
});
}
}
/**
* Example implementation of a Partition Resolver which uses part of the value
* for custom partitioning. This example is a simplification of what SQLFabric
* may do when the DDL specifies "partition by"
*/
class MonthBasedPartitionResolver implements PartitionResolver, Declarable2 {
private static MonthBasedPartitionResolver mbrResolver = null;
final static String id = "MonthBasedPartitionResolverid1";
private Properties properties;
private String resolverName;
public MonthBasedPartitionResolver ()
{ }
public static MonthBasedPartitionResolver getInstance() {
if(mbrResolver == null) {
mbrResolver = new MonthBasedPartitionResolver();
}
return mbrResolver;
}
public Serializable getRoutingObject(EntryOperation opDetails) {
Serializable routingObj = (Serializable)opDetails.getKey();
Calendar cal = Calendar.getInstance();
cal.setTime((Date)routingObj);
return new SerializableMonth(cal.get(Calendar.MONTH));
}
public void close() {
// Close internal state when Region closes
}
public void init(Properties props) {
this.properties = props;
}
// public Properties getProperties(){
//return this.properties;
// }
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof MonthBasedPartitionResolver) {
// MonthBasedPartitionResolver epc = (MonthBasedPartitionResolver) obj;
return id.equals(MonthBasedPartitionResolver.id);
} else {
return false;
}
}
public String getName()
{
return this.resolverName;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.xmlcache.Declarable2#getConfig()
*/
public Properties getConfig() {
return this.properties;
}
}
class SerializableMonth implements DataSerializable{
private int month;
public SerializableMonth(int month) {
this.month = month;
}
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
this.month = in.readInt();
}
public void toData(DataOutput out) throws IOException {
out.writeInt(this.month);
}
public int hashCode() {
if(this.month<4)
return 1;
else if(this.month >= 4 && this.month < 8)
return 2;
else
return 3;
}
}