blob: 921ffbe744ae215cab088e4c5dc9bfd6adce5e42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.execute;
import java.util.ArrayList;
import java.util.Iterator;
import util.TestException;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.TXEntryState;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXRegionState;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.execute.PRTransactionDUnitTest.TransactionListener2;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Customer;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
public class MyTransactionFunction implements Function {
@Override
public void execute(FunctionContext context) {
RegionFunctionContext ctx = (RegionFunctionContext) context;
verifyExecutionOnPrimary(ctx);
ArrayList args = (ArrayList) ctx.getArguments();
Integer testOperation = (Integer) args.get(0);
int op = testOperation.intValue();
switch (op) {
case PRTransactionDUnitTest.VERIFY_TX:
verifyTransactionExecution(ctx);
ctx.getDataSet().getCache().getLogger().info("verifyTransactionExecution Passed");
break;
case PRTransactionDUnitTest.VERIFY_ROLLBACK:
verifyTransactionRollback(ctx);
ctx.getDataSet().getCache().getLogger().info("verifyTransactionRollback Passed");
break;
case PRTransactionDUnitTest.VERIFY_DESTROY:
verifyDestroyOperation(ctx);
ctx.getDataSet().getCache().getLogger().info("verifyDestroy Passed");
break;
case PRTransactionDUnitTest.VERIFY_INVALIDATE:
verifyInvalidateOperation(ctx);
ctx.getDataSet().getCache().getLogger().info("verifyInvalidate Passed");
break;
case PRTransactionDUnitTest.VERIFY_NON_COLOCATION:
verifyNonCoLocatedOpsRejection(ctx);
ctx.getDataSet().getCache().getLogger().info("verifyNonCoLocatedOpsRejection Passed");
break;
case PRTransactionDUnitTest.VERIFY_LISTENER_CALLBACK:
verifyListenerCallback(ctx);
break;
case PRTransactionDUnitTest.VERIFY_TXSTATE_CONFLICT:
verifyTxStateAndConflicts(ctx);
break;
case PRTransactionDUnitTest.VERIFY_REP_READ:
verifyRepeatableRead(ctx);
break;
case PRTransactionDUnitTest.UPDATE_NON_COLOCATION:
updateNonColocation(ctx);
break;
}
context.getResultSender().lastResult(null);
}
@Override
public String getId() {
return "txFuntion";
}
private void verifyTransactionExecution(RegionFunctionContext ctx) {
Region custPR = ctx.getDataSet();
Region orderPR = custPR.getCache().getRegion(PRTransactionDUnitTest.OrderPartitionedRegionName);
CacheTransactionManager mgr = custPR.getCache().getCacheTransactionManager();
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
Customer newCus = (Customer) args.get(2);
OrderId orderId = (OrderId) args.get(3);
Order order = (Order) args.get(4);
mgr.begin();
custPR.put(custId, newCus);
Assert.assertTrue(custPR.containsKey(custId));
Assert.assertTrue(custPR.containsValueForKey(custId));
orderPR.put(orderId, order);
Assert.assertTrue(orderPR.containsKey(orderId));
Assert.assertTrue(orderPR.containsValueForKey(orderId));
mgr.commit();
Customer commitedCust = (Customer) custPR.get(custId);
Assert.assertTrue(newCus.equals(commitedCust),
"Expected Customer to be:" + newCus + " but was:" + commitedCust);
Order commitedOrder = (Order) orderPR.get(orderId);
Assert.assertTrue(order.equals(commitedOrder),
"Expected Order to be:" + order + " but was:" + commitedOrder);
// put a never before put key
OrderId newOrderId = new OrderId(4000, custId);
Order newOrder = new Order("NewOrder");
mgr.begin();
custPR.put(custId, newCus);
orderPR.put(newOrderId, newOrder);
mgr.commit();
commitedOrder = (Order) orderPR.get(newOrderId);
Assert.assertTrue(newOrder.equals(commitedOrder),
"Expected Order to be:" + order + " but was:" + commitedOrder);
}
private void verifyDestroyOperation(RegionFunctionContext ctx) {
Region custPR = ctx.getDataSet();
Region orderPR = custPR.getCache().getRegion(PRColocationDUnitTest.OrderPartitionedRegionName);
CacheTransactionManager mgr = custPR.getCache().getCacheTransactionManager();
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
Customer newCus = (Customer) args.get(2);
OrderId orderId = (OrderId) args.get(3);
Order order = (Order) args.get(4);
Customer oldCustomer = (Customer) custPR.get(custId);
Customer commitedCust = null;
// test destroy rollback
mgr.begin();
custPR.put(custId, newCus);
custPR.destroy(custId);
orderPR.put(orderId, order);
mgr.rollback();
commitedCust = (Customer) custPR.get(custId);
Assert.assertTrue(oldCustomer.equals(commitedCust),
"Expected customer to rollback to:" + oldCustomer + " but was:" + commitedCust);
// test destroy rollback on unmodified entry
mgr.begin();
custPR.destroy(custId);
orderPR.put(orderId, order);
mgr.rollback();
commitedCust = (Customer) custPR.get(custId);
Assert.assertTrue(oldCustomer.equals(commitedCust),
"Expected customer to rollback to:" + oldCustomer + " but was:" + commitedCust);
// test remote destroy
boolean caughtEx = false;
try {
mgr.begin();
Customer cust = new Customer("foo", "bar");
custPR.put(custId, cust);
custPR.destroy(custId);
custPR.putIfAbsent(custId, cust);
custPR.remove(custId, cust);
custPR.destroy(new CustId(1));
custPR.destroy(new CustId(3));
custPR.destroy(new CustId(7));
mgr.commit();
} catch (Exception e) {
mgr.rollback();
if (e instanceof TransactionDataNotColocatedException) {
caughtEx = true;
} else if (e instanceof TransactionDataRebalancedException) {
caughtEx = true;
} else if (e instanceof EntryNotFoundException
&& e.getMessage().matches("Entry not found for key.*1")) {
caughtEx = true;
} else {
throw new TestException(
"Expected to catch PR remote destroy exception, but caught:" + e.getMessage(), e);
}
}
if (!caughtEx) {
throw new TestException("An Expected exception was not thrown");
}
// test destroy on unmodified entry
mgr.begin();
custPR.destroy(custId);
orderPR.put(orderId, order);
mgr.commit();
commitedCust = (Customer) custPR.get(custId);
Assert.assertTrue(commitedCust == null, "Expected Customer to be null but was:" + commitedCust);
Order commitedOrder = (Order) orderPR.get(orderId);
Assert.assertTrue(order.equals(commitedOrder),
"Expected Order to be:" + order + " but was:" + commitedOrder);
// put the customer again for invalidate verification
mgr.begin();
custPR.putIfAbsent(custId, newCus);
mgr.commit();
// test destroy on new entry
// TODO: This throws EntryNotFound
OrderId newOrderId = new OrderId(5000, custId);
mgr.begin();
Order newOrder = new Order("New Order to be destroyed");
orderPR.put(newOrderId, newOrder);
orderPR.destroy(newOrderId);
mgr.commit();
Assert.assertTrue(orderPR.get(newOrderId) == null, "Did not expect orderId to be present");
// test ConcurrentMap operations
mgr.begin();
Order order1 = new Order("New Order to be replaced");
Order order2 = new Order("New Order to be destroyed");
orderPR.putIfAbsent(newOrderId, order1);
Assert.assertTrue(order1.equals(orderPR.replace(newOrderId, order2)));
mgr.commit(); // value is order2
Assert.assertTrue(order2.equals(orderPR.get(newOrderId)));
mgr.begin();
Assert.assertTrue(orderPR.replace(newOrderId, order2, order1));
mgr.commit(); // value is order1
Assert.assertTrue(orderPR.get(newOrderId).equals(order1));
mgr.begin();
// this should return false since the value is order1
Assert.assertTrue(!orderPR.remove(newOrderId, new Object()));
mgr.commit();
Assert.assertTrue(orderPR.get(newOrderId).equals(order1));
mgr.begin();
Assert.assertTrue(orderPR.remove(newOrderId, order1));
mgr.commit(); // gone now
Assert.assertTrue(orderPR.get(newOrderId) == null);
}
private void verifyInvalidateOperation(RegionFunctionContext ctx) {
Region custPR = ctx.getDataSet();
Region orderPR = custPR.getCache().getRegion(PRTransactionDUnitTest.OrderPartitionedRegionName);
CacheTransactionManager mgr = custPR.getCache().getCacheTransactionManager();
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
Customer newCus = (Customer) args.get(2);
OrderId orderId = (OrderId) args.get(3);
Order order = (Order) args.get(4);
Customer oldCustomer = (Customer) custPR.get(custId);
Customer commitedCust = null;
// test destroy rollback
mgr.begin();
custPR.put(custId, newCus);
custPR.invalidate(custId);
orderPR.put(orderId, order);
mgr.rollback();
commitedCust = (Customer) custPR.get(custId);
Assert.assertTrue(oldCustomer.equals(commitedCust),
"Expected customer to rollback to:" + oldCustomer + " but was:" + commitedCust);
// test destroy rollback on unmodified entry
mgr.begin();
custPR.invalidate(custId);
orderPR.put(orderId, order);
mgr.rollback();
commitedCust = (Customer) custPR.get(custId);
Assert.assertTrue(oldCustomer.equals(commitedCust),
"Expected customer to rollback to:" + oldCustomer + " but was:" + commitedCust);
// test remote destroy
boolean caughtEx = false;
try {
mgr.begin();
custPR.put(custId, new Customer("foo", "bar"));
custPR.invalidate(custId);
custPR.invalidate(new CustId(1));
custPR.invalidate(new CustId(3));
custPR.invalidate(new CustId(7));
mgr.commit();
} catch (Exception e) {
mgr.rollback();
if ((e instanceof TransactionDataNotColocatedException)
|| (e instanceof TransactionDataRebalancedException)) {
caughtEx = true;
} else if (e instanceof EntryNotFoundException
&& e.getMessage().matches("Entry not found for key.*1")) {
caughtEx = true;
} else {
throw new TestException(
"Expected to catch PR remote destroy exception, but caught:" + e.getMessage(), e);
}
}
if (!caughtEx) {
throw new TestException("An Expected exception was not thrown");
}
// test destroy on unmodified entry
mgr.begin();
custPR.invalidate(custId);
orderPR.put(orderId, order);
mgr.commit();
commitedCust = (Customer) custPR.get(custId);
Assert.assertTrue(commitedCust == null, "Expected Customer to be null but was:" + commitedCust);
Order commitedOrder = (Order) orderPR.get(orderId);
Assert.assertTrue(order.equals(commitedOrder),
"Expected Order to be:" + order + " but was:" + commitedOrder);
// test destroy on new entry
// TODO: This throws EntryNotFound
/*
* OrderId newOrderId = new OrderId(5000,custId); mgr.begin(); orderPR.put(newOrderId, new
* Order("New Order to be destroyed")); orderPR.invalidate(newOrderId); mgr.commit();
* Assert.assertTrue(orderPR.get(newOrderId)==null,"Did not expect orderId to be present");
*/
}
private void verifyTransactionRollback(RegionFunctionContext ctx) {
Region custPR = ctx.getDataSet();
Region orderPR = custPR.getCache().getRegion(PRTransactionDUnitTest.OrderPartitionedRegionName);
CacheTransactionManager mgr = custPR.getCache().getCacheTransactionManager();
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
Customer newCus = (Customer) args.get(2);
OrderId orderId = (OrderId) args.get(3);
Order order = (Order) args.get(4);
Customer oldCustomer = (Customer) custPR.get(custId);
Order oldOrder = (Order) orderPR.get(orderId);
mgr.begin();
custPR.put(custId, newCus);
Customer txCust = (Customer) custPR.get(custId);
orderPR.put(orderId, order);
Order txOrder = (Order) orderPR.get(orderId);
Assert.assertTrue(newCus.equals(txCust),
"Expected Customer to be:" + newCus + " but was:" + txCust);
Assert.assertTrue(txOrder.equals(order),
"Expected Order to be:" + order + " but was:" + txOrder);
mgr.rollback();
Customer commitedCust = (Customer) custPR.get(custId);
Assert.assertTrue(oldCustomer.equals(commitedCust),
"Expected Customer to be:" + oldCustomer + " but was:" + commitedCust);
Order commitedOrder = (Order) orderPR.get(orderId);
Assert.assertTrue(oldOrder.equals(commitedOrder),
"Expected Order to be:" + oldOrder + " but was:" + commitedOrder);
mgr.begin();
Assert.assertTrue(custPR.remove(custId, oldCustomer));
orderPR.replace(orderId, order);
mgr.rollback();
Assert.assertTrue(oldCustomer.equals(custPR.get(custId)));
Assert.assertTrue(oldOrder.equals(orderPR.get(orderId)));
mgr.begin();
Assert.assertTrue(custPR.replace(custId, oldCustomer, newCus));
orderPR.remove(orderId, oldOrder);
Assert.assertTrue(null == orderPR.putIfAbsent(orderId, order));
mgr.rollback();
Assert.assertTrue(oldCustomer.equals(custPR.get(custId)));
Assert.assertTrue(oldOrder.equals(orderPR.get(orderId)));
}
private void verifyNonCoLocatedOpsRejection(RegionFunctionContext ctx) {
Region custPR = ctx.getDataSet();
Region orderPR = custPR.getCache().getRegion(PRTransactionDUnitTest.OrderPartitionedRegionName);
CacheTransactionManager mgr = custPR.getCache().getCacheTransactionManager();
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
Customer newCus = (Customer) args.get(2);
OrderId orderId = (OrderId) args.get(3);
Order order = (Order) args.get(4);
mgr.begin();
try {
custPR.put(custId, newCus);
custPR.put(new CustId(4), "foo4");
custPR.put(new CustId(5), "foo5");
custPR.put(new CustId(6), "foo6");
orderPR.put(orderId, order);
Assert.assertTrue(false);
} finally {
mgr.rollback();
}
}
private void verifyListenerCallback(RegionFunctionContext ctx) {
verifyTransactionExecution(ctx);
TransactionListener2 listener =
(TransactionListener2) ctx.getDataSet().getAttributes().getCacheListeners()[0];
Assert.assertTrue(listener.getNumberOfPutCallbacks() == 2,
"Expected 2 put callback, but " + "got " + listener.getNumberOfPutCallbacks());
verifyDestroyOperation(ctx);
Assert.assertTrue(listener.getNumberOfDestroyCallbacks() == 1,
"Expected 1 destroy callbacks, but " + "got " + listener.getNumberOfDestroyCallbacks());
verifyInvalidateOperation(ctx);
Assert.assertTrue(listener.getNumberOfInvalidateCallbacks() == 1,
"Expected 1 invalidate callbacks, but " + "got "
+ listener.getNumberOfInvalidateCallbacks());
}
private void verifyExecutionOnPrimary(RegionFunctionContext ctx) {
PartitionedRegion pr = (PartitionedRegion) ctx.getDataSet();
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
int bucketId = PartitionedRegionHelper.getHashKey(pr, null, custId, null, null);
DistributedMember primary = pr.getRegionAdvisor().getPrimaryMemberForBucket(bucketId);
DistributedMember me = pr.getCache().getDistributedSystem().getDistributedMember();
Assert.assertTrue(me.equals(primary), "Function should have been executed on primary:" + primary
+ " but was executed on member:" + me);
}
private void verifyTxStateAndConflicts(RegionFunctionContext ctx) {
Region custPR = ctx.getDataSet();
Region orderPR = custPR.getCache().getRegion(PRTransactionDUnitTest.OrderPartitionedRegionName);
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
CacheTransactionManager mgr = custPR.getCache().getCacheTransactionManager();
OrderId vOrderId = new OrderId(3000, custId);
Order vOrder = new Order("vOrder");
TXManagerImpl mImp = (TXManagerImpl) mgr;
mImp.begin();
orderPR.put(vOrderId, vOrder);
TXStateProxy txState = mImp.pauseTransaction();
Iterator it = txState.getRegions().iterator();
Assert.assertTrue(txState.getRegions().size() == 1,
"Expected 1 region; " + "found:" + txState.getRegions().size());
LocalRegion lr = (LocalRegion) it.next();
Assert.assertTrue(lr instanceof BucketRegion);
TXRegionState txRegion = txState.readRegion(lr);
TXEntryState txEntry = txRegion.readEntry(txRegion.getEntryKeys().iterator().next());
mImp.unpauseTransaction(txState);
orderPR.put(vOrderId, new Order("foo"));
TransactionId txId = null;
txId = mImp.suspend();
// since both puts were on same key, verify that
// TxRegionState and TXEntryState are same
LocalRegion lr1 = (LocalRegion) txState.getRegions().iterator().next();
Assert.assertTrue(lr == lr1);
TXRegionState txRegion1 = txState.readRegion(lr);
TXEntryState txEntry1 = txRegion1.readEntry(txRegion.getEntryKeys().iterator().next());
Assert.assertTrue(txEntry == txEntry1);
// to check for conflicts, start a new transaction, operate on same key,
// commit the second and expect the first to fail
mImp.begin();
orderPR.put(vOrderId, new Order("foobar"));
mImp.commit();
// now begin the first
mImp.resume(txId);
boolean caughtException = false;
try {
mImp.commit();
} catch (CommitConflictException e) {
caughtException = true;
}
if (!caughtException) {
throw new TestException("An expected exception was not thrown");
}
}
private void verifyRepeatableRead(RegionFunctionContext ctx) {
Region custPR = ctx.getDataSet();
Region orderPR = custPR.getCache().getRegion(PRColocationDUnitTest.OrderPartitionedRegionName);
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
Customer cust = (Customer) args.get(2);
Assert.assertTrue(custPR.get(custId) == null);
CacheTransactionManager mgr = custPR.getCache().getCacheTransactionManager();
TXManagerImpl mImp = (TXManagerImpl) mgr;
mImp.begin();
custPR.put(custId, cust);
Assert.assertTrue(cust.equals(custPR.get(custId)));
TXStateProxy txState = mImp.pauseTransaction();
Assert.assertTrue(custPR.get(custId) == null);
mImp.unpauseTransaction(txState);
mImp.commit();
// change value
mImp.begin();
Customer oldCust = (Customer) custPR.get(custId);
Assert.assertTrue(oldCust.equals(cust));
txState = mImp.pauseTransaction();
Customer newCust = new Customer("fooNew", "barNew");
custPR.put(custId, newCust);
mImp.unpauseTransaction(txState);
Assert.assertTrue(oldCust.equals(custPR.get(custId)));
mImp.commit();
}
private void updateNonColocation(RegionFunctionContext ctx) {
Region custPR = ctx.getDataSet();
ArrayList args = (ArrayList) ctx.getArguments();
CustId custId = (CustId) args.get(1);
Customer newCus = (Customer) args.get(2);
custPR.put(custId, newCus);
Assert.assertTrue(custPR.containsKey(custId));
Assert.assertTrue(custPR.containsValueForKey(custId));
}
@Override
public boolean hasResult() {
return true;
}
@Override
public boolean optimizeForWrite() {
return true;
}
@Override
public boolean isHA() {
return false;
}
}