| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.execute; |
| |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.assertj.core.api.Assertions.catchThrowable; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.assertj.core.api.Assertions; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import util.TestException; |
| |
| import org.apache.geode.cache.CacheTransactionManager; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.Region.Entry; |
| import org.apache.geode.cache.TransactionDataNotColocatedException; |
| import org.apache.geode.cache.TransactionDataRebalancedException; |
| import org.apache.geode.cache.TransactionException; |
| import org.apache.geode.cache.TransactionId; |
| import org.apache.geode.cache.execute.Execution; |
| import org.apache.geode.cache.execute.Function; |
| import org.apache.geode.cache.execute.FunctionException; |
| import org.apache.geode.cache.execute.FunctionService; |
| import org.apache.geode.cache.partition.PartitionRegionHelper; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.NanoTimer; |
| import org.apache.geode.internal.cache.ForceReattemptException; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.TXManagerImpl; |
| import org.apache.geode.internal.cache.execute.data.CustId; |
| import org.apache.geode.internal.cache.execute.data.Customer; |
| import org.apache.geode.internal.cache.execute.data.Order; |
| import org.apache.geode.internal.cache.execute.data.OrderId; |
| import org.apache.geode.internal.cache.execute.data.Shipment; |
| import org.apache.geode.internal.cache.execute.data.ShipmentId; |
| import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; |
| import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp; |
| import org.apache.geode.internal.cache.partitioned.rebalance.BucketOperatorImpl; |
| import org.apache.geode.internal.cache.partitioned.rebalance.ExplicitMoveDirector; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.dunit.Assert; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.SerializableCallable; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| |
| /** |
| * Test for co-located PR transactions. the test creates two data hosts and uses function execution |
| * to execute transactions. |
| */ |
| public class PRTransactionDUnitTest extends PRColocationDUnitTest { |
| private static final Logger logger = LogService.getLogger(); |
| |
| static final int VERIFY_TX = 0; |
| |
| static final int VERIFY_ROLLBACK = 1; |
| |
| static final int VERIFY_NON_COLOCATION = 2; |
| |
| static final int VERIFY_DESTROY = 4; |
| |
| static final int VERIFY_LISTENER_CALLBACK = 5; |
| |
| static final int VERIFY_INVALIDATE = 6; |
| |
| static final int VERIFY_TXSTATE_CONFLICT = 7; |
| |
| static final int VERIFY_REP_READ = 8; |
| |
| static final int UPDATE_NON_COLOCATION = 9; |
| |
| private final int totalIterations = 50; |
| |
| private final int warmupIterations = 10; |
| |
| private final int perfOrderShipmentPairs = 1; |
| |
| public PRTransactionDUnitTest() { |
| super(); |
| } |
| |
| @Test |
| public void testBasicPRTransactionRedundancy0() { |
| basicPRTXInFunction(0, true); |
| } |
| |
| @Test |
| public void testBasicPRTransactionRedundancy1() { |
| basicPRTXInFunction(1, true); |
| } |
| |
| @Test |
| public void testBasicPRTransactionRedundancy2() { |
| basicPRTXInFunction(2, true); |
| } |
| |
| @Test |
| public void testBasicPRTransactionNoDataRedundancy0() { |
| basicPRTXInFunction(0, false); |
| } |
| |
| @Test |
| public void testBasicPRTransactionNoDataRedundancy1() { |
| basicPRTXInFunction(1, false); |
| } |
| |
| @Test |
| public void testBasicPRTransactionNoDataRedundancy2() { |
| basicPRTXInFunction(2, false); |
| } |
| |
| @Test |
| public void testBasicPRTransactionNonColocatedFunction0() { |
| basicPRTXInNonColocatedFunction(0); |
| } |
| |
| /** |
| * Test two non colocated functions in a transaction. This method invokes |
| * {@link MyTransactionFunction} and tells it what to test, using different arguments. |
| * |
| * @param redundantBuckets redundant buckets for colocated PRs |
| */ |
| private void basicPRTXInNonColocatedFunction(int redundantBuckets) { |
| setupColocatedRegions(redundantBuckets); |
| |
| dataStore1.invoke(this::registerFunction); |
| dataStore2.invoke(this::registerFunction); |
| |
| dataStore1.invoke(this::runTXFunctions); |
| } |
| |
| private void registerFunction() { |
| logger.info("register Fn"); |
| Function txFunction = new MyTransactionFunction(); |
| FunctionService.registerFunction(txFunction); |
| } |
| |
| private void runFunction(final Region pr, int cust, boolean isFirstFunc) { |
| CustId custId = new CustId(cust); |
| Customer newCus = new Customer("foo", "bar"); |
| ArrayList args = new ArrayList(); |
| Execution execution = FunctionService.onRegion(pr); |
| Set filter = new HashSet(); |
| |
| args.add(UPDATE_NON_COLOCATION); |
| logger.info("UPDATE_NON_COLOCATION"); |
| args.add(custId); |
| args.add(newCus); |
| filter.add(custId); |
| try { |
| execution.withFilter(filter).setArguments(args).execute(new MyTransactionFunction().getId()) |
| .getResult(); |
| assertTrue("Expected exception was not thrown", isFirstFunc); |
| } catch (Exception exp) { |
| if (!isFirstFunc) { |
| if (exp instanceof TransactionException && exp.getMessage() |
| .startsWith("Function execution is not colocated with transaction.")) { |
| } else { |
| logger.info("Expected to catch TransactionException but caught exception " + exp, exp); |
| Assert.fail("Expected to catch TransactionException but caught exception ", exp); |
| } |
| } else { |
| logger.info("Caught unexpected exception", exp); |
| Assert.fail("Unexpected exception was thrown", exp); |
| } |
| } |
| } |
| |
| private void runTXFunctions() { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager(); |
| mgr.begin(); |
| runFunction(pr, 1, true); |
| runFunction(pr, 2, false); |
| mgr.commit(); |
| } |
| |
| /** |
| * Test all the basic functionality of colocated transactions. This method invokes |
| * {@link MyTransactionFunction} and tells it what to test, using different arguments. |
| * |
| * @param redundantBuckets redundant buckets for colocated PRs |
| * @param populateData if false tests are carried out on empty colocated PRs |
| */ |
| private void basicPRTXInFunction(int redundantBuckets, boolean populateData) { |
| |
| if (populateData) { |
| createPopulateAndVerifyCoLocatedPRs(redundantBuckets); |
| } else { |
| createColocatedPRs(redundantBuckets); |
| } |
| |
| dataStore1.invoke(this::registerFunction); |
| dataStore2.invoke(this::registerFunction); |
| dataStore3.invoke(this::registerFunction); |
| |
| accessor.invoke(new SerializableCallable("run function") { |
| @Override |
| public Object call() { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| PartitionedRegion orderpr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + OrderPartitionedRegionName); |
| CustId custId = new CustId(2); |
| Customer newCus = new Customer("foo", "bar"); |
| Order order = new Order("fooOrder"); |
| OrderId orderId = new OrderId(22, custId); |
| ArrayList args = new ArrayList(); |
| Function txFunction = new MyTransactionFunction(); |
| FunctionService.registerFunction(txFunction); |
| Execution execution = FunctionService.onRegion(pr); |
| Set filter = new HashSet(); |
| // test transaction non-coLocated operations |
| filter.clear(); |
| args.clear(); |
| args.add(VERIFY_NON_COLOCATION); |
| LogWriterUtils.getLogWriter().info("VERIFY_NON_COLOCATION"); |
| args.add(custId); |
| args.add(newCus); |
| args.add(orderId); |
| args.add(order); |
| filter.add(custId); |
| try { |
| execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult(); |
| fail("Expected exception was not thrown"); |
| } catch (FunctionException fe) { |
| LogWriterUtils.getLogWriter().info("Caught Expected exception"); |
| if (fe.getCause() instanceof TransactionDataNotColocatedException) { |
| } else { |
| throw new TestException( |
| "Expected to catch FunctionException with cause TransactionDataNotColocatedException" |
| + " but cause is " + fe.getCause(), |
| fe.getCause()); |
| } |
| } |
| // verify that the transaction modifications are applied |
| args.set(0, VERIFY_TX); |
| LogWriterUtils.getLogWriter().info("VERIFY_TX"); |
| orderpr.put(orderId, order); |
| assertNotNull(orderpr.get(orderId)); |
| execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult(); |
| assertTrue("Unexpected customer value after commit", newCus.equals(pr.get(custId))); |
| Order commitedOrder = (Order) orderpr.get(orderId); |
| assertTrue( |
| "Unexpected order value after commit. Expected:" + order + " Found:" + commitedOrder, |
| order.equals(commitedOrder)); |
| // verify conflict detection |
| args.set(0, VERIFY_TXSTATE_CONFLICT); |
| execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult(); |
| // verify that the transaction is rolled back |
| args.set(0, VERIFY_ROLLBACK); |
| LogWriterUtils.getLogWriter().info("VERIFY_ROLLBACK"); |
| execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult(); |
| // verify destroy |
| args.set(0, VERIFY_DESTROY); |
| LogWriterUtils.getLogWriter().info("VERIFY_DESTROY"); |
| execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult(); |
| // verify invalidate |
| args.set(0, VERIFY_INVALIDATE); |
| LogWriterUtils.getLogWriter().info("VERIFY_INVALIDATE"); |
| execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult(); |
| return Boolean.TRUE; |
| } |
| }); |
| } |
| |
| private void createColocatedPRs(int redundantBuckets) { |
| createCacheInAllVms(); |
| |
| redundancy = redundantBuckets; |
| localMaxmemory = 50; |
| totalNumBuckets = 11; |
| |
| // Create Customer PartitionedRegion in All VMs |
| createPRWithCoLocation(CustomerPartitionedRegionName, null); |
| |
| // Create Order PartitionedRegion in All VMs |
| createPRWithCoLocation(OrderPartitionedRegionName, CustomerPartitionedRegionName); |
| |
| // Create Shipment PartitionedRegion in All VMs |
| createPRWithCoLocation(ShipmentPartitionedRegionName, OrderPartitionedRegionName); |
| |
| // Initial Validation for the number of data stores and number of profiles |
| accessor.invoke(() -> PRColocationDUnitTest |
| .validateBeforePutCustomerPartitionedRegion(CustomerPartitionedRegionName)); |
| } |
| |
| @Override |
| protected void createCacheInAllVms() { |
| dataStore1.invoke(runGetCache); |
| dataStore2.invoke(runGetCache); |
| dataStore3.invoke(runGetCache); |
| accessor.invoke(runGetCache); |
| } |
| |
| private final SerializableCallable runGetCache = new SerializableCallable("runGetCache") { |
| @Override |
| public Object call() { |
| getCache(); |
| return null; |
| } |
| }; |
| |
| private void populateAndVerifyColocatedPRs(int redundantBuckets) { |
| // Put the customer 1-10 in CustomerPartitionedRegion |
| accessor.invoke( |
| () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName)); |
| |
| // Put the order 1-10 for each Customer in OrderPartitionedRegion |
| accessor |
| .invoke(() -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName)); |
| |
| // Put the shipment 1-10 for each order in ShipmentPartitionedRegion |
| accessor.invoke( |
| () -> PRColocationDUnitTest.putShipmentPartitionedRegion(ShipmentPartitionedRegionName)); |
| |
| // for VM0 DataStore check the number of buckets created and the size of |
| // bucket for all partitionedRegion |
| Integer totalBucketsInDataStore1 = dataStore1 |
| .invoke(() -> PRColocationDUnitTest.validateDataStore(CustomerPartitionedRegionName, |
| OrderPartitionedRegionName, ShipmentPartitionedRegionName)); |
| |
| // for VM1 DataStore check the number of buckets created and the size of |
| // bucket for all partitionedRegion |
| Integer totalBucketsInDataStore2 = dataStore2 |
| .invoke(() -> PRColocationDUnitTest.validateDataStore(CustomerPartitionedRegionName, |
| OrderPartitionedRegionName, ShipmentPartitionedRegionName)); |
| |
| // for VM3 Datastore check the number of buckets created and the size of |
| // bucket for all partitionedRegion |
| Integer totalBucketsInDataStore3 = dataStore3 |
| .invoke(() -> PRColocationDUnitTest.validateDataStore(CustomerPartitionedRegionName, |
| OrderPartitionedRegionName, ShipmentPartitionedRegionName)); |
| |
| // Check the total number of buckets created in all three Vms are equalto 30 |
| totalNumBucketsInTest = totalBucketsInDataStore1 |
| + totalBucketsInDataStore2 + totalBucketsInDataStore3; |
| assertEquals(30 + (redundantBuckets * 30), totalNumBucketsInTest); |
| |
| // This is the importatnt check. Checks that the colocated Customer,Order |
| // and Shipment are in the same VM |
| |
| accessor.invoke(() -> PRColocationDUnitTest.validateAfterPutPartitionedRegion( |
| CustomerPartitionedRegionName, OrderPartitionedRegionName, ShipmentPartitionedRegionName)); |
| |
| } |
| |
| private void createPopulateAndVerifyCoLocatedPRs(int redundantBuckets) { |
| |
| createColocatedPRs(redundantBuckets); |
| populateAndVerifyColocatedPRs(redundantBuckets); |
| } |
| |
| private void createPRWithCoLocation(String prName, String coLocatedWith) { |
| setAttributes(prName, coLocatedWith); |
| createPartitionedRegion(attributeObjects); |
| } |
| |
| private void setAttributes(String prName, String coLocatedWith) { |
| regionName = prName; |
| colocatedWith = coLocatedWith; |
| isPartitionResolver = Boolean.TRUE; |
| attributeObjects = new Object[] {regionName, redundancy, localMaxmemory, totalNumBuckets, |
| colocatedWith, isPartitionResolver, getEnableConcurrency()}; |
| } |
| |
| protected boolean getEnableConcurrency() { |
| return false; |
| } |
| |
| /** |
| * This method executes a transaction with get on non colocated entries and expects the |
| * transaction to fail with TransactionDataNotColocatedException. |
| * |
| * @param bucketRedundancy redundancy for the colocated PRs |
| */ |
| private void basicPRTXWithNonColocatedGet(int bucketRedundancy) { |
| setupColocatedRegions(bucketRedundancy); |
| |
| dataStore1.invoke(verifyNonColocated); |
| dataStore2.invoke(verifyNonColocated); |
| |
| dataStore1.invoke(getTx); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void setupColocatedRegions(int bucketRedundancy) { |
| dataStore1.invoke(runGetCache); |
| dataStore2.invoke(runGetCache); |
| redundancy = bucketRedundancy; |
| localMaxmemory = 50; |
| totalNumBuckets = 2; |
| |
| createColocatedRegionsInTwoNodes(); |
| |
| // Put the customer 1-2 in CustomerPartitionedRegion |
| dataStore1.invoke( |
| () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName, 2)); |
| |
| // Put the associated order in colocated OrderPartitionedRegion |
| dataStore1.invoke( |
| () -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName, 2)); |
| } |
| |
| private void createColocatedRegionsInTwoNodes() { |
| setAttributes(CustomerPartitionedRegionName, null); |
| createPRInTwoNodes(); |
| setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName); |
| createPRInTwoNodes(); |
| } |
| |
| |
| @SuppressWarnings("serial") |
| private final SerializableRunnable verifyNonColocated = |
| new SerializableRunnable("verifyNonColocated") { |
| @Override |
| public void run() throws PRLocallyDestroyedException, ForceReattemptException { |
| containsKeyLocally(); |
| } |
| }; |
| |
| @SuppressWarnings("serial") |
| private final SerializableRunnable getTx = new SerializableRunnable("getTx") { |
| @Override |
| public void run() { |
| performGetTx(); |
| } |
| }; |
| |
| |
| @SuppressWarnings({"rawtypes"}) |
| private void containsKeyLocally() throws PRLocallyDestroyedException, ForceReattemptException { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| |
| CustId cust1 = new CustId(1); |
| CustId cust2 = new CustId(2); |
| int bucketId1 = pr.getKeyInfo(cust1).getBucketId(); |
| int bucketId2 = pr.getKeyInfo(cust2).getBucketId(); |
| |
| List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly(); |
| Set localBucket1Keys; |
| Set localBucket2Keys; |
| assertTrue(localPrimaryBucketList.size() == 1); |
| for (int bucketId : localPrimaryBucketList) { |
| if (bucketId == bucketId1) { |
| // primary bucket has cust1 |
| localBucket1Keys = pr.getDataStore().getKeysLocally(bucketId1, false); |
| for (Object key : localBucket1Keys) { |
| LogService.getLogger().info("local key set contains " + key); |
| } |
| assertTrue(localBucket1Keys.size() == 1); |
| } else { |
| localBucket2Keys = pr.getDataStore().getKeysLocally(bucketId2, false); |
| for (Object key : localBucket2Keys) { |
| LogService.getLogger().info("local key set contains " + key); |
| } |
| assertTrue(localBucket2Keys.size() == 1); |
| } |
| } |
| } |
| |
| |
| private void performGetTx() { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager(); |
| CustId cust1 = new CustId(1); |
| CustId cust2 = new CustId(2); |
| boolean isCust1Local = isCust1Local(pr, cust1); |
| |
| // touch first get on remote node -- using TXStateStub |
| Assertions.assertThatThrownBy(() -> getTx(!isCust1Local, mgr, pr, cust1, cust2)) |
| .isInstanceOf(TransactionDataNotColocatedException.class); |
| |
| // touch first get on local node-- using TXState |
| Assertions.assertThatThrownBy(() -> getTx(isCust1Local, mgr, pr, cust1, cust2)) |
| .isInstanceOf(TransactionDataNotColocatedException.class); |
| } |
| |
| private boolean isCust1Local(PartitionedRegion pr, CustId cust1) { |
| int bucketId1 = pr.getKeyInfo(cust1).getBucketId(); |
| List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly(); |
| assertTrue(localPrimaryBucketList.size() == 1); |
| return localPrimaryBucketList.get(0) == bucketId1; |
| } |
| |
| private void getTx(boolean doCust1First, CacheTransactionManager mgr, PartitionedRegion pr, |
| CustId cust1, CustId cust2) { |
| CustId first = doCust1First ? cust1 : cust2; |
| CustId second = !doCust1First ? cust1 : cust2; |
| |
| mgr.begin(); |
| boolean doRollback = true; |
| try { |
| pr.get(first); |
| pr.get(second); |
| doRollback = false; |
| } finally { |
| if (doRollback) { |
| mgr.rollback(); |
| } else { |
| mgr.commit(); |
| } |
| } |
| } |
| |
| /** |
| * This method executes a transaction with operation on a key in a moved bucket, and expects |
| * transaction to fail with TransactionDataRebalancedException. |
| * |
| * @param op which entry op to be executed |
| * @param bucketRedundancy redundancy for the colocated PRs |
| */ |
| private void basicPRTXWithOpOnMovedBucket(Op op, int bucketRedundancy, DistributedMember dm1, |
| DistributedMember dm2) { |
| // First transaction. |
| TransactionId txId = dataStore1.invoke(() -> beginTx(false)); |
| dataStore1.invoke(resumeTx(op, txId, dm1, dm2)); |
| |
| // Second one. Will go through different path (using TXState or TXStateStub) |
| txId = dataStore1.invoke(() -> beginTx(false)); |
| dataStore1.invoke(resumeTx(op, txId, dm1, dm2)); |
| } |
| |
| @SuppressWarnings({"rawtypes", "serial"}) |
| private SerializableCallable getDM() { |
| return new SerializableCallable("getDM") { |
| @Override |
| public Object call() { |
| return basicGetCache().getMyId(); |
| } |
| }; |
| } |
| |
| private TransactionId beginTx(boolean doPut) { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager(); |
| CustId cust1 = new CustId(1); |
| mgr.begin(); |
| Object value = pr.get(cust1); |
| assertNotNull(value); |
| if (doPut) { |
| pr.put(cust1, "bar"); |
| } |
| return mgr.suspend(); |
| } |
| |
| @SuppressWarnings("serial") |
| private SerializableRunnable resumeTx(Op op, TransactionId txId, DistributedMember dm1, |
| DistributedMember dm2) { |
| return new SerializableRunnable("resume tx") { |
| @Override |
| public void run() { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + OrderPartitionedRegionName); |
| CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager(); |
| |
| moveBucket(op, dm1, dm2); |
| |
| Assertions.assertThatThrownBy(() -> _resumeTx(op, txId, pr, mgr)) |
| .isInstanceOf(TransactionDataRebalancedException.class); |
| } |
| }; |
| } |
| |
| private void _resumeTx(Op op, TransactionId txId, PartitionedRegion pr, |
| CacheTransactionManager mgr) { |
| CustId cust1 = new CustId(1); |
| OrderId order1 = new OrderId(11, cust1); |
| OrderId neworder = new OrderId(111, cust1); |
| mgr.resume(txId); |
| try { |
| switch (op) { |
| case GET: |
| pr.get(order1); |
| break; |
| case CONTAINSVALUEFORKEY: |
| pr.containsValueForKey(order1); |
| break; |
| case CONTAINSKEY: |
| pr.containsKey(order1); |
| break; |
| case CREATE: |
| pr.create(neworder, new Order("test")); |
| break; |
| case PUT: |
| pr.put(order1, new Order("test")); |
| break; |
| case INVALIDATE: |
| pr.invalidate(order1); |
| break; |
| case DESTROY: |
| pr.destroy(order1); |
| break; |
| case GETENTRY: |
| pr.getEntry(order1); |
| break; |
| default: |
| throw new AssertionError("Unknown operations " + op); |
| } |
| |
| } finally { |
| mgr.rollback(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void moveBucket(Op op, DistributedMember dm1, DistributedMember dm2) { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| CustId cust1 = new CustId(1); |
| OrderId order1 = new OrderId(11, cust1); |
| boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1); |
| DistributedMember source = isCust1Local ? dm1 : dm2; |
| DistributedMember destination = isCust1Local ? dm2 : dm1; |
| PartitionedRegion prOrder = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + OrderPartitionedRegionName); |
| |
| LogService.getLogger().info("source ={}, destination ={}", source, destination); |
| |
| switch (op) { |
| case GET: |
| moveBucketForGet(order1, isCust1Local, source, destination, prOrder); |
| break; |
| case CONTAINSVALUEFORKEY: |
| case CONTAINSKEY: |
| case CREATE: |
| case PUT: |
| case INVALIDATE: |
| case DESTROY: |
| case GETENTRY: |
| PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); |
| break; |
| default: |
| throw new AssertionError("Unknown operations " + op); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void moveBucketForGet(OrderId order1, boolean isCust1Local, DistributedMember source, |
| DistributedMember destination, PartitionedRegion prOrder) { |
| if (isCust1Local && useBucketReadHook) { |
| // Use TXState |
| setBucketReadHook(order1, source, destination, prOrder); |
| } else { |
| // Use TXState and TXStateStub -- transaction data on remote node |
| PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); |
| } |
| } |
| |
| private void setBucketReadHook(OrderId order1, DistributedMember source, |
| DistributedMember destination, PartitionedRegion prOrder) { |
| prOrder.getDataStore().setBucketReadHook(() -> { |
| LogService.getLogger().info("In bucketReadHook"); |
| PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); |
| }); |
| } |
| |
| private void createPRInTwoNodes() { |
| dataStore1.invoke(PRColocationDUnitTest.class, "createPR", attributeObjects); |
| dataStore2.invoke(PRColocationDUnitTest.class, "createPR", attributeObjects); |
| } |
| |
| private boolean isCust1LocalSingleBucket(PartitionedRegion pr, CustId cust1) { |
| List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly(); |
| return localPrimaryBucketList.size() == 1; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void setupMoveBucket(int bucketRedundancy) { |
| dataStore1.invoke(runGetCache); |
| dataStore2.invoke(runGetCache); |
| redundancy = bucketRedundancy; |
| localMaxmemory = 50; |
| totalNumBuckets = 1; |
| |
| createColocatedRegionsInTwoNodes(); |
| |
| // Put the customer 1 in CustomerPartitionedRegion |
| dataStore1.invoke( |
| () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName, 1)); |
| |
| // Put the associated order in colocated OrderPartitionedRegion |
| dataStore1.invoke( |
| () -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName, 1)); |
| } |
| |
| @Test |
| public void testTxWithNonColocatedGet() { |
| basicPRTXWithNonColocatedGet(0); |
| } |
| |
| /** |
| * This method executes a transaction with entry op on non colocated entries and expects the |
| * transaction to fail with TransactionDataNotColocatedException. |
| * |
| * @param bucketRedundancy redundancy for the colocated PRs |
| */ |
| private void basicPRTXWithNonColocatedOp(Op op, int bucketRedundancy) { |
| dataStore1.invoke(verifyNonColocated); |
| dataStore2.invoke(verifyNonColocated); |
| |
| // TXState and TXStateStub will be exectued on different nodes. |
| dataStore1.invoke(doOp(op)); |
| dataStore2.invoke(doOp(op)); |
| } |
| |
| @SuppressWarnings("serial") |
| private SerializableRunnable doOp(Op op) { |
| return new SerializableRunnable("doOp") { |
| @Override |
| public void run() { |
| basicDoOp(op); |
| } |
| }; |
| } |
| |
| private void basicDoOp(Op op) { |
| CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager(); |
| |
| Assertions.assertThatThrownBy(() -> doOp(op, mgr)) |
| .isInstanceOf(TransactionDataNotColocatedException.class); |
| } |
| |
| private void doOp(Op op, CacheTransactionManager mgr) { |
| PartitionedRegion cust = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| PartitionedRegion order = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + OrderPartitionedRegionName); |
| |
| CustId cust1 = new CustId(1); |
| CustId cust2 = new CustId(2); |
| OrderId order2 = new OrderId(21, cust2); |
| OrderId neworder2 = new OrderId(221, cust2); |
| |
| mgr.begin(); |
| cust.get(cust1); // touch 1 bucket |
| |
| try { |
| switch (op) { |
| case GET: |
| order.get(order2); |
| break; |
| case CONTAINSVALUEFORKEY: |
| order.containsValueForKey(order2); |
| break; |
| case CONTAINSKEY: |
| order.containsKey(order2); |
| break; |
| case CREATE: |
| order.create(neworder2, new Order("test")); |
| break; |
| case PUT: |
| order.put(order2, new Order("test")); |
| break; |
| case INVALIDATE: |
| order.invalidate(order2); |
| break; |
| case DESTROY: |
| order.destroy(order2); |
| break; |
| case GETENTRY: |
| order.getEntry(order2); |
| break; |
| default: |
| throw new AssertionError("Unknown operations " + op); |
| } |
| } finally { |
| mgr.rollback(); |
| } |
| } |
| |
| @Test |
| public void testTxWithNonColocatedOps() { |
| setupColocatedRegions(0); |
| |
| for (Op op : Op.values()) { |
| LogService.getLogger().info("Testing Operation: " + op.name()); |
| basicPRTXWithNonColocatedOp(op, 0); |
| } |
| } |
| |
| private boolean useBucketReadHook; |
| |
| @SuppressWarnings("unchecked") |
| @Test |
| public void testTxWithOpsOnMovedBucket() { |
| setupMoveBucket(0); |
| |
| DistributedMember dm1 = (DistributedMember) dataStore1.invoke(getDM()); |
| DistributedMember dm2 = (DistributedMember) dataStore2.invoke(getDM()); |
| |
| for (Op op : Op.values()) { |
| LogService.getLogger().info("Testing Operation: " + op.name()); |
| basicPRTXWithOpOnMovedBucket(op, 0, dm1, dm2); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Test |
| public void testTxWithGetOnMovedBucketUsingBucketReadHook() { |
| setupMoveBucket(0); |
| |
| DistributedMember dm1 = (DistributedMember) dataStore1.invoke(getDM()); |
| DistributedMember dm2 = (DistributedMember) dataStore2.invoke(getDM()); |
| |
| Op op = Op.GET; |
| useBucketReadHook = true; |
| basicPRTXWithOpOnMovedBucket(op, 0, dm1, dm2); |
| useBucketReadHook = false; |
| } |
| |
| @Test |
| public void testPRTXInCacheListenerRedundancy0() { |
| basicPRTXInCacheListener(0); |
| } |
| |
| @Test |
| public void testPRTXInCacheListenerRedundancy1() { |
| basicPRTXInCacheListener(1); |
| } |
| |
| @Test |
| public void testPRTXInCacheListenerRedundancy2() { |
| basicPRTXInCacheListener(2); |
| } |
| |
| /** |
| * This method executes a transaction inside a cache listener |
| * |
| * @param bucketRedundancy redundancy for the colocated PRs |
| */ |
| private void basicPRTXInCacheListener(int bucketRedundancy) { |
| createCacheInAllVms(); |
| redundancy = bucketRedundancy; |
| localMaxmemory = 50; |
| totalNumBuckets = 11; |
| // Create Customer PartitionedRegion in All VMs |
| createPRWithCoLocation(CustomerPartitionedRegionName, null); |
| createPRWithCoLocation(OrderPartitionedRegionName, CustomerPartitionedRegionName); |
| // register Cache Listeners |
| SerializableCallable registerListeners = new SerializableCallable() { |
| @Override |
| public Object call() { |
| Region custRegion = |
| basicGetCache().getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| custRegion.getAttributesMutator().addCacheListener(new TransactionListener()); |
| return null; |
| } |
| }; |
| |
| dataStore1.invoke(registerListeners); |
| dataStore2.invoke(registerListeners); |
| dataStore3.invoke(registerListeners); |
| |
| // Put the customer 1-10 in CustomerPartitionedRegion |
| accessor.invoke( |
| () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName)); |
| |
| dataStore1.invoke(PRTransactionDUnitTest::validatePRTXInCacheListener); |
| dataStore2.invoke(PRTransactionDUnitTest::validatePRTXInCacheListener); |
| dataStore3.invoke(PRTransactionDUnitTest::validatePRTXInCacheListener); |
| |
| } |
| |
| /** |
| * verify that 10 orders are created for each customer |
| * |
| */ |
| private static void validatePRTXInCacheListener() { |
| PartitionedRegion customerPartitionedregion = null; |
| PartitionedRegion orderPartitionedregion = null; |
| try { |
| customerPartitionedregion = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| orderPartitionedregion = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + OrderPartitionedRegionName); |
| } catch (Exception e) { |
| Assert.fail("validateAfterPutPartitionedRegion : failed while getting the region", e); |
| } |
| assertNotNull(customerPartitionedregion); |
| assertNotNull(orderPartitionedregion); |
| |
| customerPartitionedregion.getDataStore().dumpEntries(false); |
| orderPartitionedregion.getDataStore().dumpEntries(false); |
| Iterator custIterator = customerPartitionedregion.getDataStore().getEntries().iterator(); |
| LogWriterUtils.getLogWriter() |
| .info("Found " + customerPartitionedregion.getDataStore().getEntries().size() |
| + " Customer entries in the partition"); |
| Region.Entry custEntry = null; |
| while (custIterator.hasNext()) { |
| custEntry = (Entry) custIterator.next(); |
| CustId custid = (CustId) custEntry.getKey(); |
| Customer cust = (Customer) custEntry.getValue(); |
| Iterator orderIterator = orderPartitionedregion.getDataStore().getEntries().iterator(); |
| LogWriterUtils.getLogWriter() |
| .info("Found " + orderPartitionedregion.getDataStore().getEntries().size() |
| + " Order entries in the partition"); |
| int orderPerCustomer = 0; |
| Region.Entry orderEntry = null; |
| while (orderIterator.hasNext()) { |
| orderEntry = (Entry) orderIterator.next(); |
| OrderId orderId = (OrderId) orderEntry.getKey(); |
| Order order = (Order) orderEntry.getValue(); |
| if (custid.equals(orderId.getCustId())) { |
| orderPerCustomer++; |
| } |
| } |
| assertEquals(10, orderPerCustomer); |
| } |
| } |
| |
| @Ignore("BUG46661") |
| @Test |
| public void testCacheListenerCallbacks() { |
| createPopulateAndVerifyCoLocatedPRs(1); |
| |
| SerializableCallable registerListeners = new SerializableCallable() { |
| @Override |
| public Object call() { |
| Region custRegion = |
| basicGetCache().getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| custRegion.getAttributesMutator().addCacheListener(new TransactionListener2()); |
| return null; |
| } |
| }; |
| |
| accessor.invoke(registerListeners); |
| dataStore1.invoke(registerListeners); |
| dataStore2.invoke(registerListeners); |
| dataStore3.invoke(registerListeners); |
| |
| accessor.invoke(new SerializableCallable("run function") { |
| @Override |
| public Object call() { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| PartitionedRegion orderpr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + OrderPartitionedRegionName); |
| CustId custId = new CustId(2); |
| Customer newCus = new Customer("foo", "bar"); |
| Order order = new Order("fooOrder"); |
| OrderId orderId = new OrderId(22, custId); |
| ArrayList args = new ArrayList(); |
| Function txFunction = new MyTransactionFunction(); |
| FunctionService.registerFunction(txFunction); |
| Execution execution = FunctionService.onRegion(pr); |
| Set filter = new HashSet(); |
| boolean caughtException = false; |
| // test transaction non-coLocated operations |
| filter.clear(); |
| args.clear(); |
| args.add(VERIFY_LISTENER_CALLBACK); |
| LogWriterUtils.getLogWriter().info("VERIFY_LISTENER_CALLBACK"); |
| args.add(custId); |
| args.add(newCus); |
| args.add(orderId); |
| args.add(order); |
| filter.add(custId); |
| caughtException = false; |
| execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult(); |
| return null; |
| } |
| }); |
| |
| } |
| |
| @Test |
| public void testRepeatableRead() { |
| createColocatedPRs(1); |
| |
| dataStore1.invoke(this::registerFunction); |
| dataStore2.invoke(this::registerFunction); |
| dataStore3.invoke(this::registerFunction); |
| |
| accessor.invoke(new SerializableCallable("run function") { |
| @Override |
| public Object call() { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| PartitionedRegion orderpr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + OrderPartitionedRegionName); |
| CustId custId = new CustId(2); |
| Customer newCus = new Customer("foo", "bar"); |
| Order order = new Order("fooOrder"); |
| OrderId orderId = new OrderId(22, custId); |
| ArrayList args = new ArrayList(); |
| Function txFunction = new MyTransactionFunction(); |
| FunctionService.registerFunction(txFunction); |
| Execution execution = FunctionService.onRegion(pr); |
| Set filter = new HashSet(); |
| filter.clear(); |
| args.clear(); |
| args.add(VERIFY_REP_READ); |
| LogWriterUtils.getLogWriter().info("VERIFY_REP_READ"); |
| args.add(custId); |
| args.add(newCus); |
| args.add(orderId); |
| args.add(order); |
| filter.add(custId); |
| execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult(); |
| |
| return null; |
| } |
| }); |
| |
| } |
| |
| @Test |
| public void testPRTXPerformance() { |
| PRColocationDistributedTestHelper.defaultStringSize = 1024; |
| |
| createPopulateAndVerifyCoLocatedPRs(1); |
| |
| // register functions |
| SerializableCallable registerPerfFunctions = new SerializableCallable("register Fn") { |
| @Override |
| public Object call() { |
| Function perfFunction = new PerfFunction(); |
| FunctionService.registerFunction(perfFunction); |
| Function perfTxFunction = new PerfTxFunction(); |
| FunctionService.registerFunction(perfTxFunction); |
| return Boolean.TRUE; |
| } |
| }; |
| |
| dataStore1.invoke(registerPerfFunctions); |
| dataStore2.invoke(registerPerfFunctions); |
| dataStore3.invoke(registerPerfFunctions); |
| accessor.invoke(registerPerfFunctions); |
| |
| SerializableCallable runPerfFunction = new SerializableCallable("runPerfFunction") { |
| @Override |
| public Object call() { |
| long perfTime = 0; |
| Region customerPR = basicGetCache().getRegion(CustomerPartitionedRegionName); |
| Execution execution = FunctionService.onRegion(customerPR); |
| // for each customer, update order and shipment |
| for (int iterations = 1; iterations <= totalIterations; iterations++) { |
| LogWriterUtils.getLogWriter().info("running perfFunction"); |
| long startTime = 0; |
| ArrayList args = new ArrayList(); |
| CustId custId = new CustId(iterations % 10); |
| for (int i = 1; i <= perfOrderShipmentPairs; i++) { |
| OrderId orderId = new OrderId(custId.getCustId() * 10 + i, custId); |
| Order order = new Order("NewOrder" + i + iterations); |
| ShipmentId shipmentId = |
| new ShipmentId(orderId.getOrderId() * 10 + i, orderId); |
| Shipment shipment = new Shipment("newShipment" + i + iterations); |
| args.add(orderId); |
| args.add(order); |
| args.add(shipmentId); |
| args.add(shipment); |
| } |
| Set filter = new HashSet(); |
| filter.add(custId); |
| if (iterations > warmupIterations) { |
| startTime = NanoTimer.getTime(); |
| } |
| execution.withFilter(filter).setArguments(args).execute("perfFunction").getResult(); |
| if (startTime > 0) { |
| perfTime += NanoTimer.getTime() - startTime; |
| } |
| } |
| return perfTime; |
| } |
| }; |
| |
| Long perfTime = (Long) accessor.invoke(runPerfFunction); |
| |
| SerializableCallable runPerfTxFunction = new SerializableCallable("runPerfTxFunction") { |
| @Override |
| public Object call() { |
| long perfTime = 0; |
| Region customerPR = basicGetCache().getRegion(CustomerPartitionedRegionName); |
| Execution execution = FunctionService.onRegion(customerPR); |
| // for each customer, update order and shipment |
| for (int iterations = 1; iterations <= totalIterations; iterations++) { |
| LogWriterUtils.getLogWriter().info("Running perfFunction"); |
| long startTime = 0; |
| ArrayList args = new ArrayList(); |
| CustId custId = new CustId(iterations % 10); |
| for (int i = 1; i <= perfOrderShipmentPairs; i++) { |
| OrderId orderId = new OrderId(custId.getCustId() * 10 + i, custId); |
| Order order = new Order("NewOrder" + i + iterations); |
| ShipmentId shipmentId = |
| new ShipmentId(orderId.getOrderId() * 10 + i, orderId); |
| Shipment shipment = new Shipment("newShipment" + i + iterations); |
| args.add(orderId); |
| args.add(order); |
| args.add(shipmentId); |
| args.add(shipment); |
| } |
| Set filter = new HashSet(); |
| filter.add(custId); |
| if (iterations > warmupIterations) { |
| startTime = NanoTimer.getTime(); |
| } |
| execution.withFilter(filter).setArguments(args).execute("perfTxFunction").getResult(); |
| if (startTime > 0) { |
| perfTime += NanoTimer.getTime() - startTime; |
| } |
| } |
| return perfTime; |
| } |
| }; |
| |
| Long perfTxTime = (Long) accessor.invoke(runPerfTxFunction); |
| |
| double diff = (perfTime - perfTxTime) * 1.0; |
| double percentDiff = (diff / perfTime) * 100; |
| |
| LogWriterUtils.getLogWriter() |
| .info((totalIterations - warmupIterations) + " iterations of function took:" |
| + +perfTime + " Nanos, and transaction function took:" |
| + perfTxTime + " Nanos, difference :" + diff + " percentDifference:" |
| + percentDiff); |
| |
| } |
| |
| @Test |
| public void testCommitToFailAfterPrimaryBucketMoved() { |
| basicPRTXCommitToFailAfterPrimaryBucketMoved(1); |
| } |
| |
| /** |
| * Test commit fail after the transactional node no longer hosts the primary bucket of the |
| * operations executed in the transaction. |
| * |
| * @param redundantBuckets redundant buckets for colocated PRs |
| */ |
| private void basicPRTXCommitToFailAfterPrimaryBucketMoved(int redundantBuckets) { |
| setupColocatedRegions(redundantBuckets); |
| |
| InternalDistributedMember dm1 = (InternalDistributedMember) dataStore1.invoke(getDM()); |
| InternalDistributedMember dm2 = (InternalDistributedMember) dataStore2.invoke(getDM()); |
| |
| TransactionId txId = dataStore1.invoke(() -> beginTx(true)); |
| dataStore1.invoke(() -> movePrimaryBucket(dm1, dm2)); |
| dataStore1.invoke(() -> resumeTxAfterPrimaryMoved(txId)); |
| |
| } |
| |
| private void movePrimaryBucket(InternalDistributedMember dm1, InternalDistributedMember dm2) { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| CustId cust1 = new CustId(1); |
| int bucketId = pr.getKeyInfo(cust1).getBucketId(); |
| boolean isCust1LocalPrimary = pr.getBucketRegion(cust1).getBucketAdvisor().isPrimary(); |
| InternalDistributedMember destination = isCust1LocalPrimary ? dm2 : dm1; |
| InternalDistributedMember source = isCust1LocalPrimary ? dm1 : dm2; |
| |
| ExplicitMoveDirector director = new ExplicitMoveDirector(cust1, bucketId, source, destination, |
| pr.getCache().getDistributedSystem()); |
| PartitionedRegionRebalanceOp rebalanceOp = |
| new PartitionedRegionRebalanceOp(pr, false, director, true, true); |
| BucketOperatorImpl operator = new BucketOperatorImpl(rebalanceOp); |
| boolean moved = operator.movePrimary(source, destination, bucketId); |
| if (!moved) { |
| fail("Not able to move primary bucket by invoking BucketOperatorImpl.movePrimary"); |
| } |
| } |
| |
| private void resumeTxAfterPrimaryMoved(TransactionId txId) { |
| PartitionedRegion pr = (PartitionedRegion) basicGetCache() |
| .getRegion(SEPARATOR + CustomerPartitionedRegionName); |
| CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager(); |
| |
| mgr.resume(txId); |
| |
| Throwable thrown = catchThrowable(mgr::commit); |
| assertTrue(thrown instanceof TransactionDataRebalancedException); |
| } |
| |
| // Don't want to run the test twice |
| @Override |
| @Test |
| public void testColocatedPartitionedRegion() {} |
| |
| @Override |
| @Test |
| public void testColocationPartitionedRegion() {} |
| |
| @Override |
| @Test |
| public void testColocationPartitionedRegionWithRedundancy() {} |
| |
| @Override |
| @Test |
| public void testPartitionResolverPartitionedRegion() {} |
| |
| @Override |
| @Test |
| public void testColocationPartitionedRegionWithNullColocationSpecifiedOnOneNode() {} |
| |
| @Override |
| @Test |
| public void testColocatedPRRedundancyRecovery() {} |
| |
| @Override |
| @Test |
| public void testColocatedPRWithAccessorOnDifferentNode1() {} |
| |
| @Override |
| @Test |
| public void testColocatedPRWithAccessorOnDifferentNode2() {} |
| |
| @Override |
| @Test |
| public void testColocatedPRWithDestroy() {} |
| |
| @Override |
| @Test |
| public void testColocatedPRWithLocalDestroy() {} |
| |
| @Override |
| @Test |
| public void testColocatedPRWithPROnDifferentNode1() {} |
| |
| @Override |
| public final void preTearDownCacheTestCase() { |
| Invoke.invokeInEveryVM(verifyNoTxState); |
| } |
| |
| private final SerializableCallable verifyNoTxState = new SerializableCallable() { |
| @Override |
| public Object call() { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertEquals(0, mgr.hostedTransactionsInProgressForTest()); |
| return null; |
| } |
| }; |
| |
| enum Op { |
| GET, CONTAINSVALUEFORKEY, CONTAINSKEY, CREATE, PUT, INVALIDATE, DESTROY, GETENTRY |
| } |
| |
| static class TransactionListener extends CacheListenerAdapter { |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| // for each customer, put 10 orders in a tx. |
| Region custPR = event.getRegion(); |
| Region orderPR = custPR.getCache().getRegion(SEPARATOR + OrderPartitionedRegionName); |
| CacheTransactionManager mgr = custPR.getCache().getCacheTransactionManager(); |
| mgr.begin(); |
| CustId custId = (CustId) event.getKey(); |
| for (int j = 1; j <= 10; j++) { |
| int oid = (custId.getCustId() * 10) + j; |
| OrderId orderId = new OrderId(oid, custId); |
| Order order = new Order("OREDR" + oid); |
| try { |
| assertNotNull(orderPR); |
| orderPR.put(orderId, order); |
| } catch (Exception e) { |
| // mgr.rollback(); |
| Assert.fail(" failed while doing put operation in CacheListener ", e); |
| } |
| } |
| mgr.commit(); |
| LogWriterUtils.getLogWriter().info("COMMIT completed"); |
| } |
| } |
| |
| static class TransactionListener2 extends CacheListenerAdapter { |
| |
| private int numberOfPutCallbacks; |
| private int numberOfDestroyCallbacks; |
| private int numberOfInvalidateCallbacks; |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| synchronized (this) { |
| numberOfPutCallbacks++; |
| } |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| afterCreate(event); |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| synchronized (this) { |
| numberOfDestroyCallbacks++; |
| } |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| synchronized (this) { |
| numberOfInvalidateCallbacks++; |
| } |
| } |
| |
| synchronized int getNumberOfPutCallbacks() { |
| return numberOfPutCallbacks; |
| } |
| |
| synchronized int getNumberOfDestroyCallbacks() { |
| return numberOfDestroyCallbacks; |
| } |
| |
| synchronized int getNumberOfInvalidateCallbacks() { |
| return numberOfInvalidateCallbacks; |
| } |
| } |
| } |