blob: 06e1c2298a66fcc7f1f3d3a1b4513686687d5436 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.partitioned;
import static org.junit.Assert.assertEquals;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion.IteratorType;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.CustomerIDPartitionResolver;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Customer;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.RegionsTest;
/**
* Verifies that the {@link org.apache.geode.cache.PartitionResolver} is called only once on a node,
* and not called while local iteration.
*
*/
@Category({RegionsTest.class})
public class PartitionResolverDUnitTest extends JUnit4CacheTestCase {
private static final String CUSTOMER = "custRegion";
private static final String ORDER = "orderRegion";
Host host;
VM accessor;
VM datastore1;
VM datastore2;
public PartitionResolverDUnitTest() {
super();
}
@Override
public final void postSetUp() throws Exception {
host = Host.getHost(0);
accessor = host.getVM(0);
datastore1 = host.getVM(1);
datastore2 = host.getVM(2);
}
@Override
public final void postTearDownCacheTestCase() throws Exception {
CountingResolver.resetResolverCount();
}
void createRegion(boolean isAccessor, int redundantCopies) {
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af = new AttributesFactory();
af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
.setTotalNumBuckets(4).setLocalMaxMemory(isAccessor ? 0 : 1)
.setPartitionResolver(new CountingResolver("CountingResolverCust"))
.setRedundantCopies(redundantCopies).create());
getCache().createRegion(CUSTOMER, af.create());
af.setPartitionAttributes(new PartitionAttributesFactory<OrderId, Order>().setTotalNumBuckets(4)
.setLocalMaxMemory(isAccessor ? 0 : 1)
.setPartitionResolver(new CountingResolver("CountingResolverOrder"))
.setRedundantCopies(redundantCopies).setColocatedWith(CUSTOMER).create());
getCache().createRegion(ORDER, af.create());
}
void populateData() {
Region custRegion = getCache().getRegion(CUSTOMER);
Region orderRegion = getCache().getRegion(ORDER);
for (int i = 0; i < 5; i++) {
CustId custId = new CustId(i);
Customer customer = new Customer("customer" + i, "address" + i);
OrderId orderId = new OrderId(i, custId);
Order order = new Order("order" + i);
custRegion.put(custId, customer);
orderRegion.put(orderId, order);
}
}
static class CountingResolver extends CustomerIDPartitionResolver {
static AtomicInteger count = new AtomicInteger();
public CountingResolver(String resolverID) {
super(resolverID);
}
@Override
public Serializable getRoutingObject(EntryOperation opDetails) {
count.incrementAndGet();
opDetails.getRegion().getCache().getLogger().fine(
"Resolver called key:" + opDetails.getKey() + " Region " + opDetails.getRegion().getName()
+ " id:" + ((GemFireCacheImpl) opDetails.getRegion().getCache()).getMyId(),
new Throwable());
return super.getRoutingObject(opDetails);
}
public static void resetResolverCount() {
count.set(0);
}
}
private void initAccessorAndDataStore(final int redundantCopies) {
accessor.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
createRegion(true/* accessor */, redundantCopies);
return null;
}
});
datastore1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
createRegion(false/* accessor */, redundantCopies);
return null;
}
});
datastore2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
createRegion(false, redundantCopies);
populateData();
return null;
}
});
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, getNumberOfKeysOwnedByVM(datastore1));
verifyResolverCountInVM(datastore2, 10);
getCache().getLogger().fine("Reset resolver count");
}
private int getNumberOfKeysOwnedByVM(VM datastore12) {
final Integer numKeys = (Integer) datastore12.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
PartitionedRegion custRegion = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
Set<BucketRegion> bucketSet = custRegion.getDataStore().getAllLocalPrimaryBucketRegions();
int count = 0;
for (BucketRegion br : bucketSet) {
count += br.size();
}
return count;
}
});
return numKeys * 2 /* for Customer region */;
}
private void verifyResolverCountInVM(final VM vm, final int i) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
assertEquals("My id: " + getGemfireCache().getMyId(), i, CountingResolver.count.get());
CountingResolver.resetResolverCount();
return null;
}
});
}
@Test
public void testKeysInIterationOnAccessor() {
resolverInIteration(IteratorType.KEYS, accessor);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, 0);
verifyResolverCountInVM(datastore2, 0);
}
@Test
public void testValuesInIterationOnAccessor() {
resolverInIteration(IteratorType.VALUES, accessor);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, getNumberOfKeysOwnedByVM(datastore1));
verifyResolverCountInVM(datastore2, getNumberOfKeysOwnedByVM(datastore2));
}
@Test
public void testEntriesInIterationOnAccessor() {
resolverInIteration(IteratorType.ENTRIES, accessor);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, getNumberOfKeysOwnedByVM(datastore1));
verifyResolverCountInVM(datastore2, getNumberOfKeysOwnedByVM(datastore2));
}
@Test
public void testKeysInIterationOnDataStore() {
resolverInIteration(IteratorType.KEYS, datastore1);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, 0);
verifyResolverCountInVM(datastore2, 0);
}
@Test
public void testValuesInIterationOnDataStore() {
resolverInIteration(IteratorType.VALUES, datastore1);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, 0);
verifyResolverCountInVM(datastore2, getNumberOfKeysOwnedByVM(datastore2));
}
@Test
public void testEntriesInIterationOnDataStore() {
resolverInIteration(IteratorType.ENTRIES, datastore1);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, 0);
verifyResolverCountInVM(datastore2, getNumberOfKeysOwnedByVM(datastore2));
}
private void resolverInIteration(final IteratorType type, final VM vm) {
initAccessorAndDataStore(0);
SerializableCallable doIteration = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
Region orderRegion = getGemfireCache().getRegion(ORDER);
Iterator custIterator = null;
Iterator orderIterator = null;
switch (type) {
case ENTRIES:
custIterator = custRegion.entrySet().iterator();
orderIterator = orderRegion.entrySet().iterator();
break;
case KEYS:
custIterator = custRegion.keySet().iterator();
orderIterator = orderRegion.keySet().iterator();
break;
case VALUES:
custIterator = custRegion.values().iterator();
orderIterator = orderRegion.values().iterator();
break;
default:
break;
}
while (custIterator.hasNext()) {
custIterator.next();
}
while (orderIterator.hasNext()) {
orderIterator.next();
}
return null;
}
};
vm.invoke(doIteration);
}
@Test
public void testKeysIterationInFunctionExection() {
doIterationInFunction(IteratorType.KEYS);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, 0);
verifyResolverCountInVM(datastore2, 0);
}
@Test
public void testValuesIterationInFunctionExection() {
doIterationInFunction(IteratorType.VALUES);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, 0);
verifyResolverCountInVM(datastore2, 0);
}
@Test
public void testEntriesIterationInFunctionExection() {
doIterationInFunction(IteratorType.ENTRIES);
verifyResolverCountInVM(accessor, 0);
verifyResolverCountInVM(datastore1, 0);
verifyResolverCountInVM(datastore2, 0);
}
private void doIterationInFunction(final IteratorType type) {
initAccessorAndDataStore(0);
SerializableCallable registerFunction = new SerializableCallable() {
@Override
public Object call() throws Exception {
FunctionService.registerFunction(new IteratorFunction());
return null;
}
};
accessor.invoke(registerFunction);
datastore1.invoke(registerFunction);
datastore2.invoke(registerFunction);
accessor.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
FunctionService.onRegion(getGemfireCache().getRegion(CUSTOMER)).setArguments(type)
.execute(IteratorFunction.id).getResult();
return null;
}
});
}
static class IteratorFunction implements Function {
private static final String id = "IteratorFunction";
@Override
public void execute(FunctionContext context) {
Region custRegion =
PartitionRegionHelper.getLocalDataForContext((RegionFunctionContext) context);
Region orderRegion =
PartitionRegionHelper.getLocalData(custRegion.getCache().getRegion(ORDER));
IteratorType type = (IteratorType) context.getArguments();
Iterator custIterator = null;
Iterator orderIterator = null;
switch (type) {
case ENTRIES:
custIterator = custRegion.entrySet().iterator();
orderIterator = orderRegion.entrySet().iterator();
break;
case KEYS:
custIterator = custRegion.keySet().iterator();
orderIterator = orderRegion.keySet().iterator();
break;
case VALUES:
custIterator = custRegion.values().iterator();
orderIterator = orderRegion.values().iterator();
break;
default:
break;
}
while (custIterator.hasNext()) {
custIterator.next();
}
while (orderIterator.hasNext()) {
orderIterator.next();
}
context.getResultSender().lastResult(Boolean.TRUE);
}
@Override
public String getId() {
return id;
}
@Override
public boolean hasResult() {
return true;
}
@Override
public boolean optimizeForWrite() {
return false;
}
@Override
public boolean isHA() {
return false;
}
}
@Test
public void testOps() {
initAccessorAndDataStore(0);
doOps(false);
verifyResolverCountInVM(accessor, 7);
verifyResolverCountInVM(datastore1, getResolverCountForVM(datastore1));
verifyResolverCountInVM(datastore2, getResolverCountForVM(datastore2));
}
@Ignore("TODO")
@Test
public void testTxOps() {
initAccessorAndDataStore(0);
doOps(true);
verifyResolverCountInVM(accessor, 7);
verifyResolverCountInVM(datastore1, getResolverCountForVM(datastore1));
verifyResolverCountInVM(datastore2, getResolverCountForVM(datastore2));
}
private void doOps(final boolean isTx) {
accessor.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
CacheTransactionManager mgr = getGemfireCache().getCacheTransactionManager();
CustId custId = new CustId(6);
Customer customer = new Customer("customer6", "address6");
getGemfireCache().getLogger().fine("SWAP:Begin");
if (isTx)
mgr.begin();
custRegion.put(custId, customer);
if (isTx)
mgr.commit();
if (isTx)
mgr.begin();
custRegion.invalidate(custId);
if (isTx)
mgr.commit();
if (isTx)
mgr.begin();
custRegion.put(custId, customer);
if (isTx)
mgr.commit();
if (isTx)
mgr.begin();
custRegion.destroy(custId);
if (isTx)
mgr.commit();
if (isTx)
mgr.begin();
custRegion.put(custId, customer);
if (isTx)
mgr.commit();
if (isTx)
mgr.begin();
custRegion.containsKey(custId);
if (isTx)
mgr.commit();
if (isTx)
mgr.begin();
custRegion.containsValueForKey(custId);
if (isTx)
mgr.commit();
return null;
}
});
}
private int getResolverCountForVM(final VM datastore12) {
Integer containsTestKey = (Integer) datastore12.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = PartitionRegionHelper.getLocalData(getGemfireCache().getRegion(CUSTOMER));
Iterator it = r.keySet().iterator();
while (it.hasNext()) {
if (it.next().equals(new CustId(6))) {
return 1;
}
}
return 0;
}
});
return containsTestKey * 7;
}
}