| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| |
| import javax.naming.Context; |
| import javax.naming.NamingException; |
| import javax.transaction.RollbackException; |
| import javax.transaction.Status; |
| import javax.transaction.UserTransaction; |
| |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.AttributesMutator; |
| import com.gemstone.gemfire.cache.CacheEvent; |
| import com.gemstone.gemfire.cache.CacheListener; |
| import com.gemstone.gemfire.cache.CacheLoader; |
| import com.gemstone.gemfire.cache.CacheLoaderException; |
| import com.gemstone.gemfire.cache.CacheTransactionManager; |
| import com.gemstone.gemfire.cache.CacheWriter; |
| import com.gemstone.gemfire.cache.CacheWriterException; |
| import com.gemstone.gemfire.cache.CommitConflictException; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.EntryEvent; |
| import com.gemstone.gemfire.cache.EntryNotFoundException; |
| import com.gemstone.gemfire.cache.ExpirationAction; |
| import com.gemstone.gemfire.cache.ExpirationAttributes; |
| import com.gemstone.gemfire.cache.InterestPolicy; |
| import com.gemstone.gemfire.cache.InterestResultPolicy; |
| import com.gemstone.gemfire.cache.LoaderHelper; |
| import com.gemstone.gemfire.cache.PartitionAttributesFactory; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.Region.Entry; |
| import com.gemstone.gemfire.cache.RegionEvent; |
| import com.gemstone.gemfire.cache.RegionFactory; |
| import com.gemstone.gemfire.cache.RegionShortcut; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.SubscriptionAttributes; |
| import com.gemstone.gemfire.cache.TransactionDataNotColocatedException; |
| import com.gemstone.gemfire.cache.TransactionDataRebalancedException; |
| import com.gemstone.gemfire.cache.TransactionEvent; |
| import com.gemstone.gemfire.cache.TransactionException; |
| import com.gemstone.gemfire.cache.TransactionId; |
| import com.gemstone.gemfire.cache.TransactionListener; |
| import com.gemstone.gemfire.cache.TransactionWriter; |
| import com.gemstone.gemfire.cache.TransactionWriterException; |
| import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException; |
| import com.gemstone.gemfire.cache.client.ClientCache; |
| import com.gemstone.gemfire.cache.client.ClientCacheFactory; |
| import com.gemstone.gemfire.cache.client.ClientRegionFactory; |
| import com.gemstone.gemfire.cache.client.ClientRegionShortcut; |
| import com.gemstone.gemfire.cache.execute.Function; |
| import com.gemstone.gemfire.cache.execute.FunctionAdapter; |
| import com.gemstone.gemfire.cache.execute.FunctionContext; |
| import com.gemstone.gemfire.cache.execute.FunctionService; |
| import com.gemstone.gemfire.cache.execute.RegionFunctionContext; |
| import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; |
| import com.gemstone.gemfire.cache.query.CqAttributes; |
| import com.gemstone.gemfire.cache.query.CqAttributesFactory; |
| import com.gemstone.gemfire.cache.query.CqEvent; |
| import com.gemstone.gemfire.cache.query.CqListener; |
| import com.gemstone.gemfire.cache.server.CacheServer; |
| import com.gemstone.gemfire.cache.util.CacheListenerAdapter; |
| import com.gemstone.gemfire.cache.util.CacheWriterAdapter; |
| import com.gemstone.gemfire.cache30.CacheTestCase; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.internal.cache.execute.CustomerIDPartitionResolver; |
| import com.gemstone.gemfire.internal.cache.execute.InternalFunctionService; |
| import com.gemstone.gemfire.internal.cache.execute.data.CustId; |
| import com.gemstone.gemfire.internal.cache.execute.data.Customer; |
| import com.gemstone.gemfire.internal.cache.execute.data.Order; |
| import com.gemstone.gemfire.internal.cache.execute.data.OrderId; |
| import com.gemstone.gemfire.internal.cache.versions.VersionTag; |
| |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.SerializableRunnable; |
| import dunit.VM; |
| |
| /** |
| * @author sbawaska |
| * |
| */ |
| public class RemoteTransactionDUnitTest extends CacheTestCase { |
| final protected String CUSTOMER = "custRegion"; |
| final protected String ORDER = "orderRegion"; |
| final protected String D_REFERENCE = "distrReference"; |
| |
| private final SerializableCallable getNumberOfTXInProgress = new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| return mgr.hostedTransactionsInProgressForTest(); |
| } |
| }; |
| private final SerializableCallable verifyNoTxState = new SerializableCallable() { |
| public Object call() throws Exception { |
| //TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| //assertEquals(0, mgr.hostedTransactionsInProgressForTest()); |
| final TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| waitForCriterion(new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return mgr.hostedTransactionsInProgressForTest() == 0; |
| } |
| |
| @Override |
| public String description() { |
| return ""; |
| } |
| }, 30 * 1000, 500, true/*throwOnTimeout*/); |
| return null; |
| } |
| }; |
| /** |
| * @param name |
| */ |
| public RemoteTransactionDUnitTest(String name) { |
| super(name); |
| } |
| |
| protected enum OP { |
| PUT, GET, DESTROY, INVALIDATE, KEYS, VALUES, ENTRIES, PUTALL, GETALL, REMOVEALL |
| } |
| |
| @Override |
| public void tearDown2() throws Exception { |
| // try { Thread.sleep(5000); } catch (InterruptedException e) { } // FOR MANUAL TESTING OF STATS - DON"T KEEP THIS |
| try { |
| invokeInEveryVM(verifyNoTxState); |
| } finally { |
| closeAllCache(); |
| super.tearDown2(); |
| } |
| } |
| |
| void createRegion(boolean accessor, int redundantCopies, InterestPolicy interestPolicy) { |
| AttributesFactory af = new AttributesFactory(); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| getCache().createRegion(D_REFERENCE,af.create()); |
| af = new AttributesFactory(); |
| af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| if (interestPolicy != null) { |
| af.setSubscriptionAttributes(new SubscriptionAttributes(interestPolicy)); |
| } |
| af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>() |
| .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1) |
| .setPartitionResolver(new CustomerIDPartitionResolver("resolver1")) |
| .setRedundantCopies(redundantCopies).create()); |
| getCache().createRegion(CUSTOMER, af.create()); |
| af.setPartitionAttributes(new PartitionAttributesFactory<OrderId, Order>() |
| .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1) |
| .setPartitionResolver(new CustomerIDPartitionResolver("resolver2")) |
| .setRedundantCopies(redundantCopies).setColocatedWith(CUSTOMER).create()); |
| getCache().createRegion(ORDER, af.create()); |
| } |
| |
| protected boolean getConcurrencyChecksEnabled() { |
| return false; |
| } |
| |
| void populateData() { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| Region orderRegion = getCache().getRegion(ORDER); |
| Region refRegion = getCache().getRegion(D_REFERENCE); |
| for (int i=0; i<5; i++) { |
| CustId custId = new CustId(i); |
| Customer customer = new Customer("customer"+i, "address"+i); |
| OrderId orderId = new OrderId(i, custId); |
| Order order = new Order("order"+i); |
| custRegion.put(custId, customer); |
| orderRegion.put(orderId, order); |
| refRegion.put(custId,customer); |
| } |
| } |
| |
| protected void initAccessorAndDataStore(VM accessor, VM datastore, final int redundantCopies) { |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| createRegion(true/*accessor*/, redundantCopies, null); |
| return null; |
| } |
| }); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| createRegion(false/*accessor*/, redundantCopies, null); |
| populateData(); |
| return null; |
| } |
| }); |
| } |
| |
| protected void initAccessorAndDataStore(VM accessor, VM datastore1, VM datastore2, final int redundantCopies) { |
| datastore2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| createRegion(false/*accessor*/, redundantCopies, null); |
| return null; |
| } |
| }); |
| |
| initAccessorAndDataStore(accessor, datastore1, redundantCopies); |
| } |
| |
| private void initAccessorAndDataStoreWithInterestPolicy(VM accessor, VM datastore1, VM datastore2, final int redundantCopies) { |
| datastore2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| createRegion(false/*accessor*/, redundantCopies, InterestPolicy.ALL); |
| return null; |
| } |
| }); |
| |
| initAccessorAndDataStore(accessor, datastore1, redundantCopies); |
| } |
| |
| protected class DoOpsInTX extends SerializableCallable { |
| private final OP op; |
| Customer expectedCust; |
| Customer expectedRefCust = null; |
| Order expectedOrder; |
| Order expectedOrder2; |
| Order expectedOrder3; |
| DoOpsInTX(OP op) { |
| this.op = op; |
| } |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| getLogWriter().fine("testTXPut starting tx"); |
| mgr.begin(); |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| OrderId orderId2 = new OrderId(2, custId); |
| OrderId orderId3 = new OrderId(3, custId); |
| switch (this.op) { |
| case PUT: |
| expectedCust = new Customer("foo", "bar"); |
| expectedOrder = new Order("fooOrder"); |
| expectedOrder2 = new Order("fooOrder2"); |
| expectedOrder3 = new Order("fooOrder3"); |
| custRegion.put(custId, expectedCust); |
| orderRegion.put(orderId, expectedOrder); |
| Map orders = new HashMap(); |
| orders.put(orderId2, expectedOrder2); |
| orders.put(orderId3, expectedOrder3); |
| getGemfireCache().getLoggerI18n().fine("SWAP:doingPutAll"); |
| //orderRegion.putAll(orders); |
| refRegion.put(custId,expectedCust); |
| Set<OrderId> ordersSet = new HashSet<OrderId>(); |
| ordersSet.add(orderId);ordersSet.add(orderId2);ordersSet.add(orderId3); |
| //validateContains(custId, ordersSet, true); |
| break; |
| case GET: |
| expectedCust = custRegion.get(custId); |
| expectedOrder = orderRegion.get(orderId); |
| expectedRefCust = refRegion.get(custId); |
| assertNotNull(expectedCust); |
| assertNotNull(expectedOrder); |
| assertNotNull(expectedRefCust); |
| validateContains(custId, Collections.singleton(orderId), true,true); |
| break; |
| case DESTROY: |
| validateContains(custId, Collections.singleton(orderId), true); |
| custRegion.destroy(custId); |
| orderRegion.destroy(orderId); |
| refRegion.destroy(custId); |
| validateContains(custId, Collections.singleton(orderId), false); |
| break; |
| case REMOVEALL: |
| validateContains(custId, Collections.singleton(orderId), true); |
| custRegion.removeAll(Collections.singleton(custId)); |
| orderRegion.removeAll(Collections.singleton(orderId)); |
| refRegion.removeAll(Collections.singleton(custId)); |
| validateContains(custId, Collections.singleton(orderId), false); |
| break; |
| case INVALIDATE: |
| validateContains(custId, Collections.singleton(orderId), true); |
| custRegion.invalidate(custId); |
| orderRegion.invalidate(orderId); |
| refRegion.invalidate(custId); |
| validateContains(custId,Collections.singleton(orderId),true,false); |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| return mgr.getTransactionId(); |
| } |
| }; |
| |
| void validateContains(CustId custId, Set<OrderId> orderId, boolean doesIt) { |
| validateContains(custId,orderId,doesIt,doesIt); |
| } |
| |
| void validateContains(CustId custId, Set<OrderId> ordersSet, boolean containsKey,boolean containsValue) { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Order> refRegion = getCache().getRegion(D_REFERENCE); |
| boolean rContainsKC = custRegion.containsKey(custId); |
| boolean rContainsKO = containsKey; |
| for (OrderId o : ordersSet) { |
| getGemfireCache().getLoggerI18n().fine("SWAP:rContainsKO:"+rContainsKO+" containsKey:"+orderRegion.containsKey(o)); |
| rContainsKO = rContainsKO && orderRegion.containsKey(o); |
| } |
| boolean rContainsKR = refRegion.containsKey(custId); |
| |
| boolean rContainsVC = custRegion.containsValueForKey(custId); |
| boolean rContainsVO = containsValue; |
| for (OrderId o: ordersSet) { |
| rContainsVO = rContainsVO && orderRegion.containsValueForKey(o); |
| } |
| boolean rContainsVR = refRegion.containsValueForKey(custId); |
| |
| |
| assertEquals(containsKey,rContainsKC); |
| assertEquals(containsKey,rContainsKO); |
| assertEquals(containsKey,rContainsKR); |
| assertEquals(containsValue,rContainsVR); |
| assertEquals(containsValue,rContainsVC); |
| assertEquals(containsValue,rContainsVO); |
| |
| |
| if(containsKey) { |
| Region.Entry eC = custRegion.getEntry(custId); |
| for (OrderId o : ordersSet) { |
| assertNotNull(orderRegion.getEntry(o)); |
| } |
| Region.Entry eR = refRegion.getEntry(custId); |
| assertNotNull(eC); |
| assertNotNull(eR); |
| // assertEquals(1,custRegion.size()); |
| // assertEquals(1,orderRegion.size()); |
| // assertEquals(1,refRegion.size()); |
| |
| } else { |
| //assertEquals(0,custRegion.size()); |
| //assertEquals(0,orderRegion.size()); |
| //assertEquals(0,refRegion.size()); |
| try { |
| Region.Entry eC = custRegion.getEntry(custId); |
| assertNull("should have had an EntryNotFoundException:"+eC,eC); |
| |
| } catch(EntryNotFoundException enfe) { |
| // this is what we expect |
| } |
| try { |
| for (OrderId o : ordersSet) { |
| assertNull("should have had an EntryNotFoundException:"+orderRegion.getEntry(o),orderRegion.getEntry(o)); |
| } |
| |
| } catch(EntryNotFoundException enfe) { |
| // this is what we expect |
| } |
| try { |
| Region.Entry eR = refRegion.getEntry(custId); |
| assertNull("should have had an EntryNotFoundException:"+eR,eR); |
| } catch(EntryNotFoundException enfe) { |
| // this is what we expect |
| } |
| |
| } |
| |
| } |
| |
| |
| void verifyAfterCommit(OP op) { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| OrderId orderId2 = new OrderId(2, custId); |
| OrderId orderId3 = new OrderId(3, custId); |
| Customer expectedCust; |
| Order expectedOrder; |
| Order expectedOrder2; |
| Order expectedOrder3; |
| Customer expectedRef; |
| switch (op) { |
| case PUT: |
| expectedCust = new Customer("foo", "bar"); |
| expectedOrder = new Order("fooOrder"); |
| expectedOrder2 = new Order("fooOrder2"); |
| expectedOrder3 = new Order("fooOrder3"); |
| expectedRef = expectedCust; |
| assertNotNull(custRegion.getEntry(custId)); |
| assertEquals(expectedCust, custRegion.getEntry(custId).getValue()); |
| /* |
| assertNotNull(orderRegion.getEntry(orderId)); |
| assertEquals(expectedOrder, orderRegion.getEntry(orderId).getValue()); |
| |
| assertNotNull(orderRegion.getEntry(orderId2)); |
| assertEquals(expectedOrder2, orderRegion.getEntry(orderId2).getValue()); |
| |
| assertNotNull(orderRegion.getEntry(orderId3)); |
| assertEquals(expectedOrder3, orderRegion.getEntry(orderId3).getValue()); |
| */ |
| assertNotNull(refRegion.getEntry(custId)); |
| assertEquals(expectedRef, refRegion.getEntry(custId).getValue()); |
| |
| //Set<OrderId> ordersSet = new HashSet<OrderId>(); |
| //ordersSet.add(orderId);ordersSet.add(orderId2);ordersSet.add(orderId3); |
| //validateContains(custId, ordersSet, true); |
| break; |
| case GET: |
| expectedCust = custRegion.get(custId); |
| expectedOrder = orderRegion.get(orderId); |
| expectedRef = refRegion.get(custId); |
| validateContains(custId, Collections.singleton(orderId), true); |
| break; |
| case DESTROY: |
| assertTrue(!custRegion.containsKey(custId)); |
| assertTrue(!orderRegion.containsKey(orderId)); |
| assertTrue(!refRegion.containsKey(custId)); |
| validateContains(custId, Collections.singleton(orderId), false); |
| break; |
| case INVALIDATE: |
| boolean validateContainsKey = true; |
| if (!((GemFireCacheImpl)custRegion.getCache()).isClient()) { |
| assertTrue(custRegion.containsKey(custId)); |
| assertTrue(orderRegion.containsKey(orderId)); |
| assertTrue(refRegion.containsKey(custId)); |
| } |
| assertNull(custRegion.get(custId)); |
| assertNull(orderRegion.get(orderId)); |
| assertNull(refRegion.get(custId)); |
| validateContains(custId,Collections.singleton(orderId),validateContainsKey,false); |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| } |
| |
| |
| void verifyAfterRollback(OP op) { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| assertNotNull(custRegion); |
| assertNotNull(orderRegion); |
| assertNotNull(refRegion); |
| |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| OrderId orderId2 = new OrderId(2, custId); |
| OrderId orderId3 = new OrderId(3, custId); |
| Customer expectedCust; |
| Order expectedOrder; |
| Customer expectedRef; |
| switch (op) { |
| case PUT: |
| expectedCust = new Customer("customer1", "address1"); |
| expectedOrder = new Order("order1"); |
| expectedRef = new Customer("customer1", "address1"); |
| assertEquals(expectedCust, custRegion.getEntry(custId).getValue()); |
| assertEquals(expectedOrder, orderRegion.getEntry(orderId).getValue()); |
| getCache().getLogger().info("SWAP:verifyRollback:"+orderRegion); |
| getCache().getLogger().info("SWAP:verifyRollback:"+orderRegion.getEntry(orderId2)); |
| assertNull(getGemfireCache().getTXMgr().getTXState()); |
| assertNull(""+orderRegion.getEntry(orderId2),orderRegion.getEntry(orderId2)); |
| assertNull(orderRegion.getEntry(orderId3)); |
| assertNull(orderRegion.get(orderId2)); |
| assertNull(orderRegion.get(orderId3)); |
| assertEquals(expectedRef, refRegion.getEntry(custId).getValue()); |
| validateContains(custId, Collections.singleton(orderId), true); |
| break; |
| case GET: |
| expectedCust = custRegion.getEntry(custId).getValue(); |
| expectedOrder = orderRegion.getEntry(orderId).getValue(); |
| expectedRef = refRegion.getEntry(custId).getValue(); |
| validateContains(custId, Collections.singleton(orderId), true); |
| break; |
| case DESTROY: |
| assertTrue(!custRegion.containsKey(custId)); |
| assertTrue(!orderRegion.containsKey(orderId)); |
| assertTrue(!refRegion.containsKey(custId)); |
| validateContains(custId, Collections.singleton(orderId), true); |
| break; |
| case INVALIDATE: |
| assertTrue(custRegion.containsKey(custId)); |
| assertTrue(orderRegion.containsKey(orderId)); |
| assertTrue(refRegion.containsKey(custId)); |
| assertNull(custRegion.get(custId)); |
| assertNull(orderRegion.get(orderId)); |
| assertNull(refRegion.get(custId)); |
| validateContains(custId,Collections.singleton(orderId),true,true); |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| } |
| |
| |
| |
| |
| public void testTXCreationAndCleanupAtCommit() throws Exception { |
| doBasicChecks(true); |
| } |
| |
| public void testTXCreationAndCleanupAtRollback() throws Exception { |
| doBasicChecks(false); |
| } |
| |
| private void doBasicChecks(final boolean commit) throws Exception { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| final TXId txId = (TXId)accessor.invoke(new DoOpsInTX(OP.PUT)); |
| |
| datastore.invoke(new SerializableCallable("verify tx") { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNotNull(tx); |
| mgr.resume(tx); |
| if (commit) { |
| mgr.commit(); |
| } else { |
| mgr.rollback(); |
| } |
| return null; |
| } |
| }); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertFalse(mgr.isHostedTxInProgress(txId)); |
| return null; |
| } |
| }); |
| if (commit) { |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| verifyAfterCommit(OP.PUT); |
| return null; |
| } |
| }); |
| } else { |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| verifyAfterRollback(OP.PUT); |
| return null; |
| } |
| }); |
| } |
| } |
| |
| public void testPRTXGet() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| final TXId txId = (TXId)accessor.invoke(new DoOpsInTX(OP.GET)); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| TXStateProxy tx = mgr.getHostedTXState(txId); |
| System.out.println("TXRS:"+tx.getRegions()); |
| assertEquals(3, tx.getRegions().size());// 2 buckets for the two puts we |
| // did in the accessor |
| // plus the dist. region |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof BucketRegion || r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertNotNull(es.getValue(key, r, false)); |
| assertFalse(es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| verifyAfterCommit(OP.GET); |
| return null; |
| } |
| }); |
| } |
| |
| public void testPRTXGetOnRemoteWithLoader() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| AttributesMutator am = getCache().getRegion(CUSTOMER).getAttributesMutator(); |
| am.setCacheLoader(new CacheLoader() { |
| public Object load(LoaderHelper helper) |
| throws CacheLoaderException { |
| return new Customer("sup dawg", "add"); |
| } |
| |
| public void close() { } |
| }); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Region cust = getCache().getRegion(CUSTOMER); |
| Customer s = (Customer)cust.get(new CustId(8)); |
| assertEquals(new Customer("sup dawg", "add"), s); |
| assertTrue(cust.containsKey(new CustId(8))); |
| TXStateProxy tx = ((TXManagerImpl)mgr).internalSuspend(); |
| assertFalse(cust.containsKey(new CustId(8))); |
| ((TXManagerImpl)mgr).resume(tx); |
| mgr.commit(); |
| Customer s2 = (Customer)cust.get(new CustId(8)); |
| Customer ex = new Customer("sup dawg", "add"); |
| assertEquals(ex,s); |
| assertEquals(ex,s2); |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * Make sure that getEntry returns null properly and values when it should |
| */ |
| public void testPRTXGetEntryOnRemoteSide() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getCache().getRegion(CUSTOMER); |
| CustId sup = new CustId(7); |
| Region.Entry e = cust.getEntry(sup); |
| assertNull(e); |
| CustId custId = new CustId(5); |
| cust.put(custId, new Customer("customer5", "address5")); |
| |
| Region.Entry ee = cust.getEntry(custId); |
| assertNotNull(ee); |
| |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Region.Entry e2 = cust.getEntry(sup); |
| assertNull(e2); |
| mgr.commit(); |
| Region.Entry e3 = cust.getEntry(sup); |
| assertNull(e3); |
| |
| |
| mgr.begin(); |
| Customer dawg = new Customer("dawg", "dawgaddr"); |
| cust.put(sup, dawg); |
| Region.Entry e4 = cust.getEntry(sup); |
| assertNotNull(e4); |
| assertEquals(dawg,e4.getValue()); |
| mgr.commit(); |
| |
| Region.Entry e5 = cust.getEntry(sup); |
| assertNotNull(e5); |
| assertEquals(dawg,e5.getValue()); |
| return null; |
| } |
| }); |
| } |
| |
| |
| |
| public void testPRTXGetOnLocalWithLoader() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| AttributesMutator am = getCache().getRegion(CUSTOMER).getAttributesMutator(); |
| am.setCacheLoader(new CacheLoader() { |
| public Object load(LoaderHelper helper) |
| throws CacheLoaderException { |
| return new Customer("sup dawg", "addr"); |
| } |
| |
| public void close() { } |
| }); |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Region cust = getCache().getRegion(CUSTOMER); |
| CustId custId = new CustId(6); |
| Customer s = (Customer)cust.get(custId); |
| mgr.commit(); |
| Customer s2 = (Customer)cust.get(custId); |
| Customer expectedCust = new Customer("sup dawg", "addr"); |
| assertEquals(s, expectedCust); |
| assertEquals(s2, expectedCust); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| return null; |
| } |
| }); |
| } |
| |
| |
| |
| public void testTXPut() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| |
| final TXId txId = (TXId)accessor.invoke(new DoOpsInTX(OP.PUT)); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| TXStateProxy tx = mgr.getHostedTXState(txId); |
| assertEquals(3, tx.getRegions().size());// 2 buckets for the two puts we |
| // did in the accessor |
| // +1 for the dist_region |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof BucketRegion || r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertNotNull(es.getValue(key, r, false)); |
| assertTrue(es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertNotNull(mgr.getTXState()); |
| getCache().getLogger().fine("SWAP:accessorTXState:"+mgr.getTXState()); |
| mgr.commit(); |
| verifyAfterCommit(OP.PUT); |
| assertNull(mgr.getTXState()); |
| return null; |
| } |
| }); |
| } |
| |
| public void testTXInvalidate() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| |
| final TXId txId = (TXId)accessor.invoke(new DoOpsInTX(OP.INVALIDATE)); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| TXStateProxy tx = mgr.getHostedTXState(txId); |
| assertEquals(3, tx.getRegions().size());// 2 buckets for the two puts we |
| // did in the accessor |
| // plus the dist. region |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof BucketRegion || r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertNotNull(es.getValue(key, r, false)); |
| assertTrue(es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| verifyAfterCommit(OP.INVALIDATE); |
| return null; |
| } |
| }); |
| } |
| |
| |
| public void testTXDestroy() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| |
| final TXId txId = (TXId)accessor.invoke(new DoOpsInTX(OP.DESTROY)); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| TXStateProxy tx = mgr.getHostedTXState(txId); |
| assertEquals(3, tx.getRegions().size());// 2 buckets for the two puts we |
| // did in the accessor |
| // plus the dist. region |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof BucketRegion || r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertNull(es.getValue(key, r, false)); |
| assertTrue(es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| verifyAfterCommit(OP.DESTROY); |
| return null; |
| } |
| }); |
| } |
| |
| public void testTxPutIfAbsent() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| final CustId newCustId = new CustId(10); |
| final Customer updateCust = new Customer("customer10", "address10"); |
| final TXId txId = (TXId)accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> rr = getGemfireCache().getRegion(D_REFERENCE); |
| Customer expectedCust = new Customer("customer"+1, "address"+1); |
| getGemfireCache().getLoggerI18n().fine("SWAP:doingPutIfAbsent"); |
| CustId oldCustId = new CustId(1); |
| Customer old = cust.putIfAbsent(oldCustId, updateCust); |
| assertTrue("expected:"+expectedCust+" but was "+old, expectedCust.equals(old)); |
| //transaction should be bootstrapped |
| old = rr.putIfAbsent(oldCustId, updateCust); |
| assertTrue("expected:"+expectedCust+" but was "+old, expectedCust.equals(old)); |
| //now a key that does not exist |
| old = cust.putIfAbsent(newCustId, updateCust); |
| assertNull(old); |
| old = rr.putIfAbsent(newCustId, updateCust); |
| assertNull(old); |
| Region<OrderId, Order> order = getGemfireCache().getRegion(ORDER); |
| Order oldOrder = order.putIfAbsent(new OrderId(10, newCustId), new Order("order10")); |
| assertNull(old); |
| assertNull(oldOrder); |
| assertNotNull(cust.get(newCustId)); |
| assertNotNull(rr.get(newCustId)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNull(cust.get(newCustId)); |
| assertNull(rr.get(newCustId)); |
| mgr.resume(tx); |
| cust.put(oldCustId, new Customer("foo", "bar")); |
| rr.put(oldCustId, new Customer("foo", "bar")); |
| return mgr.getTransactionId(); |
| } |
| }); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getGemfireCache().getRegion(CUSTOMER); |
| int hash1 = PartitionedRegionHelper.getHashKey((PartitionedRegion)cust, new CustId(1)); |
| int hash10 = PartitionedRegionHelper.getHashKey((PartitionedRegion)cust, new CustId(10)); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| TXStateProxy tx = mgr.getHostedTXState(txId); |
| assertEquals(3 + (hash1 == hash10 ? 0 : 1), tx.getRegions().size());// 2 buckets for the two puts we |
| // did in the accessor one dist. region, and one more bucket if Cust1 and Cust10 resolve to different buckets |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof BucketRegion || r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertNotNull(es.getValue(key, r, false)); |
| assertTrue("key:"+key+" r:"+r.getFullPath(), es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> rr = getGemfireCache().getRegion(D_REFERENCE); |
| assertEquals(updateCust, cust.get(newCustId)); |
| assertEquals(updateCust, rr.get(newCustId)); |
| //test conflict |
| mgr.begin(); |
| CustId conflictCust = new CustId(11); |
| cust.putIfAbsent(conflictCust, new Customer("name11", "address11")); |
| TXStateProxy tx = mgr.internalSuspend(); |
| cust.put(conflictCust, new Customer("foo", "bar")); |
| mgr.resume(tx); |
| try { |
| mgr.commit(); |
| fail("expected exception not thrown"); |
| } catch (CommitConflictException cce) { |
| } |
| return null; |
| } |
| }); |
| |
| } |
| |
| public VM getVMForTransactions(VM accessor, VM datastore) { |
| return accessor; |
| } |
| |
| public void testTxRemove() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| final CustId custId = new CustId(1); |
| final TXId txId = (TXId)accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Customer customer = new Customer("customer1", "address1"); |
| Customer fakeCust = new Customer("foo", "bar"); |
| assertFalse(cust.remove(custId, fakeCust)); |
| assertTrue(cust.remove(custId, customer)); |
| assertFalse(ref.remove(custId, fakeCust)); |
| assertTrue(ref.remove(custId, customer)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNotNull(cust.get(custId)); |
| assertNotNull(ref.get(custId)); |
| mgr.resume(tx); |
| return mgr.getTransactionId(); |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| TXStateProxy tx = mgr.getHostedTXState(txId); |
| assertEquals(2, tx.getRegions().size());// 2 buckets for the two puts we |
| // did in the accessor one dist. region, and one more bucket if Cust1 and Cust10 resolve to different buckets |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof BucketRegion || r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertNull(es.getValue(key, r, false)); |
| assertTrue("key:"+key+" r:"+r.getFullPath(), es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> rr = getGemfireCache().getRegion(D_REFERENCE); |
| assertNull(cust.get(custId)); |
| assertNull(rr.get(custId)); |
| //check conflict |
| mgr.begin(); |
| CustId conflictCust = new CustId(2); |
| Customer customer = new Customer("customer2", "address2"); |
| getGemfireCache().getLoggerI18n().fine("SWAP:removeConflict"); |
| assertTrue(cust.remove(conflictCust, customer)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| cust.put(conflictCust, new Customer("foo", "bar")); |
| mgr.resume(tx); |
| try { |
| mgr.commit(); |
| fail("expected exception not thrown"); |
| } catch (CommitConflictException e) { |
| } |
| return null; |
| } |
| }); |
| } |
| |
| public void testTxRemoveAll() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| final CustId custId1 = new CustId(1); |
| final CustId custId2 = new CustId(2); |
| final CustId custId20 = new CustId(20); |
| final TXId txId = (TXId)accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Customer customer = new Customer("customer1", "address1"); |
| Customer customer2 = new Customer("customer2", "address2"); |
| Customer fakeCust = new Customer("foo2", "bar2"); |
| cust.removeAll(Arrays.asList(custId1, custId2, custId20)); |
| ref.removeAll(Arrays.asList(custId1, custId2, custId20)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNotNull(cust.get(custId1)); |
| assertNotNull(ref.get(custId2)); |
| mgr.resume(tx); |
| return mgr.getTransactionId(); |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| TXStateProxy tx = mgr.getHostedTXState(txId); |
| assertEquals(4, tx.getRegions().size());// 2 buckets for the two puts we |
| // did in the accessor one dist. region, and one more bucket if Cust1 and Cust10 resolve to different buckets |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof BucketRegion || r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertNull(es.getValue(key, r, false)); |
| //custId20 won't be dirty because it doesn't exist. |
| assertTrue("key:"+key+" r:"+r.getFullPath(), key.equals(custId20) || es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> rr = getGemfireCache().getRegion(D_REFERENCE); |
| assertNull(cust.get(custId1)); |
| assertNull(rr.get(custId2)); |
| //check conflict |
| mgr.begin(); |
| CustId custId3 = new CustId(3); |
| CustId custId4 = new CustId(4); |
| getGemfireCache().getLoggerI18n().fine("SWAP:removeConflict"); |
| cust.removeAll(Arrays.asList(custId3, custId20, custId4)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| // cust.put(custId3, new Customer("foo", "bar")); |
| cust.put(custId20, new Customer("foo", "bar")); |
| assertNotNull(cust.get(custId20)); |
| cust.put(custId4, new Customer("foo", "bar")); |
| mgr.resume(tx); |
| try { |
| mgr.commit(); |
| fail("expected exception not thrown"); |
| } catch (CommitConflictException e) { |
| } |
| assertNotNull(cust.get(custId3)); |
| assertNotNull(cust.get(custId4)); |
| assertNotNull(cust.get(custId20)); |
| |
| //Test a removeall an already missing key. |
| //custId2 has already been removed |
| mgr.begin(); |
| getGemfireCache().getLoggerI18n().fine("SWAP:removeConflict"); |
| cust.removeAll(Arrays.asList(custId2, custId3)); |
| tx = mgr.internalSuspend(); |
| cust.put(custId2, new Customer("foo", "bar")); |
| mgr.resume(tx); |
| mgr.commit(); |
| assertNotNull(cust.get(custId2)); |
| assertNull(cust.get(custId3)); |
| return null; |
| } |
| }); |
| } |
| |
| public void testTxRemoveAllNotColocated() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(3); |
| datastore2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| createRegion(false/*accessor*/, 0, null); |
| return null; |
| } |
| }); |
| initAccessorAndDataStore(acc, datastore1, 0); |
| VM accessor = getVMForTransactions(acc, datastore1); |
| |
| final CustId custId0 = new CustId(0); |
| final CustId custId1 = new CustId(1); |
| final CustId custId2 = new CustId(2); |
| final CustId custId3 = new CustId(3); |
| final CustId custId4 = new CustId(4); |
| final CustId custId20 = new CustId(20); |
| final TXId txId = (TXId)accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Customer customer = new Customer("customer1", "address1"); |
| Customer customer2 = new Customer("customer2", "address2"); |
| Customer fakeCust = new Customer("foo2", "bar2"); |
| try { |
| cust.removeAll(Arrays.asList(custId0, custId4, custId1, custId2, custId3, custId20)); |
| fail("expected exception not thrown"); |
| } catch (TransactionDataNotColocatedException e) { |
| mgr.rollback(); |
| } |
| assertNotNull(cust.get(custId0)); |
| assertNotNull(cust.get(custId1)); |
| assertNotNull(cust.get(custId2)); |
| assertNotNull(cust.get(custId3)); |
| assertNotNull(cust.get(custId4)); |
| return mgr.getTransactionId(); |
| } |
| }); |
| } |
| |
| public void testTxRemoveAllWithRedundancy() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(3); |
| |
| //Create a second data store. |
| datastore2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| createRegion(false/*accessor*/, 1, null); |
| return null; |
| } |
| }); |
| |
| initAccessorAndDataStore(acc, datastore1, 1); |
| VM accessor = getVMForTransactions(acc, datastore1); |
| |
| //There are 4 buckets, so 0, 4, and 20 are all colocated |
| final CustId custId0 = new CustId(0); |
| final CustId custId4 = new CustId(4); |
| final CustId custId20 = new CustId(20); |
| final TXId txId = (TXId)accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| cust.removeAll(Arrays.asList(custId0, custId4)); |
| mgr.commit(); |
| assertNull(cust.get(custId0)); |
| assertNull(cust.get(custId4)); |
| return mgr.getTransactionId(); |
| } |
| }); |
| |
| SerializableCallable checkArtifacts = new SerializableCallable() { |
| public Object call() throws Exception { |
| PartitionedRegion cust = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); |
| assertNull(cust.get(custId0)); |
| assertNull(cust.get(custId4)); |
| return null; |
| } |
| }; |
| datastore1.invoke(checkArtifacts); |
| datastore2.invoke(checkArtifacts); |
| } |
| |
| public void testTxReplace() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| final CustId custId = new CustId(1); |
| final Customer updatedCust = new Customer("updated", "updated"); |
| final TXId txId = (TXId)accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Customer customer = new Customer("customer1", "address1"); |
| Customer fakeCust = new Customer("foo", "bar"); |
| assertFalse(cust.replace(custId, fakeCust, updatedCust)); |
| assertTrue(cust.replace(custId, customer, updatedCust)); |
| assertFalse(ref.replace(custId, fakeCust, updatedCust)); |
| assertTrue(ref.replace(custId, customer, updatedCust)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertEquals(cust.get(custId), customer); |
| assertEquals(ref.get(custId), customer); |
| mgr.resume(tx); |
| return mgr.getTransactionId(); |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertTrue(mgr.isHostedTxInProgress(txId)); |
| TXStateProxy tx = mgr.getHostedTXState(txId); |
| assertEquals(2, tx.getRegions().size());// 2 buckets for the two puts we |
| // did in the accessor one dist. region, and one more bucket if Cust1 and Cust10 resolve to different buckets |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof BucketRegion || r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertNotNull(es.getValue(key, r, false)); |
| assertTrue("key:"+key+" r:"+r.getFullPath(), es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> rr = getGemfireCache().getRegion(D_REFERENCE); |
| assertEquals(updatedCust, cust.get(custId)); |
| assertEquals(updatedCust, rr.get(custId)); |
| //check conflict |
| mgr.begin(); |
| CustId conflictCust = new CustId(2); |
| Customer customer = new Customer("customer2", "address2"); |
| getGemfireCache().getLoggerI18n().fine("SWAP:removeConflict"); |
| assertTrue(cust.replace(conflictCust, customer, new Customer("conflict", "conflict"))); |
| TXStateProxy tx = mgr.internalSuspend(); |
| cust.put(conflictCust, new Customer("foo", "bar")); |
| mgr.resume(tx); |
| try { |
| mgr.commit(); |
| fail("expected exception not thrown"); |
| } catch (CommitConflictException e) { |
| } |
| return null; |
| } |
| }); |
| } |
| |
| |
| /** |
| * When we have narrowed down on a target node for a transaction, test that |
| * we throw an exception if that node does not host primary for subsequent entries |
| */ |
| public void testNonColocatedTX() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| |
| datastore2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| createRegion(false, 1, null); |
| return null; |
| } |
| }); |
| |
| initAccessorAndDataStore(accessor, datastore1, 1); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| try { |
| put10Entries(custRegion, orderRegion); |
| fail("Expected TransactionDataNotColocatedException not thrown"); |
| } catch (TransactionDataNotColocatedException e) { |
| } |
| mgr.rollback(); |
| put10Entries(custRegion, orderRegion); |
| |
| mgr.begin(); |
| try { |
| put10Entries(custRegion, orderRegion); |
| fail("Expected TransactionDataNotColocatedException not thrown"); |
| } catch (TransactionDataNotColocatedException e) { |
| } |
| mgr.rollback(); |
| return null; |
| } |
| private void put10Entries(Region custRegion, Region orderRegion) { |
| for (int i=0; i<10; i++) { |
| CustId custId = new CustId(i); |
| Customer customer = new Customer("customer"+i, "address"+i); |
| OrderId orderId = new OrderId(i, custId); |
| Order order = new Order("order"+i); |
| custRegion.put(custId, customer); |
| orderRegion.put(orderId, order); |
| } |
| } |
| }); |
| } |
| |
| public void testListenersForPut() { |
| doTestListeners(OP.PUT); |
| } |
| |
| public void testListenersForDestroy() { |
| doTestListeners(OP.DESTROY); |
| } |
| |
| public void testListenersForInvalidate() { |
| doTestListeners(OP.INVALIDATE); |
| } |
| |
| public void testListenersForRemoveAll() { |
| doTestListeners(OP.REMOVEALL); |
| } |
| |
| private void doTestListeners(final OP op) { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region ref = getCache().getRegion(D_REFERENCE); |
| ref.getAttributesMutator().addCacheListener(new TestCacheListener(true)); |
| ref.getAttributesMutator().setCacheWriter(new TestCacheWriter(true)); |
| Region cust = getCache().getRegion(CUSTOMER); |
| cust.getAttributesMutator().addCacheListener(new TestCacheListener(true)); |
| cust.getAttributesMutator().setCacheWriter(new TestCacheWriter(true)); |
| Region order = getCache().getRegion(ORDER); |
| order.getAttributesMutator().addCacheListener(new TestCacheListener(true)); |
| order.getAttributesMutator().setCacheWriter(new TestCacheWriter(true)); |
| getGemfireCache().getTxManager().addListener(new TestTxListener(true)); |
| if (!getGemfireCache().isClient()) { |
| getGemfireCache().getTxManager().setWriter(new TestTxWriter(true)); |
| } |
| return null; |
| } |
| }); |
| |
| SerializableCallable addListenersToDataStore = new SerializableCallable() { |
| public Object call() throws Exception { |
| Region ref = getCache().getRegion(D_REFERENCE); |
| ref.getAttributesMutator().addCacheListener(new TestCacheListener(false)); |
| ref.getAttributesMutator().setCacheWriter(new TestCacheWriter(false)); |
| Region cust = getCache().getRegion(CUSTOMER); |
| cust.getAttributesMutator().addCacheListener(new TestCacheListener(false)); |
| cust.getAttributesMutator().setCacheWriter(new TestCacheWriter(false)); |
| Region order = getCache().getRegion(ORDER); |
| order.getAttributesMutator().addCacheListener(new TestCacheListener(false)); |
| order.getAttributesMutator().setCacheWriter(new TestCacheWriter(false)); |
| getGemfireCache().getTxManager().addListener(new TestTxListener(false)); |
| if (!getGemfireCache().isClient()) { |
| getGemfireCache().getTxManager().setWriter(new TestTxWriter(false)); |
| } |
| return null; |
| } |
| }; |
| datastore.invoke(addListenersToDataStore); |
| |
| accessor.invoke(new DoOpsInTX(op)); |
| |
| //Invalidate operations don't fire cache writers, so don't assert they were fired. |
| if(op != OP.INVALIDATE) { |
| //Ensure the cache writer was not fired in accessor |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getCache().getRegion(CUSTOMER); |
| assertFalse(((TestCacheWriter) cust.getAttributes().getCacheWriter()).wasFired); |
| Region order = getCache().getRegion(ORDER); |
| assertFalse(((TestCacheWriter) order.getAttributes().getCacheWriter()).wasFired); |
| Region ref = getCache().getRegion(D_REFERENCE); |
| assertFalse(((TestCacheWriter) ref.getAttributes().getCacheWriter()).wasFired); |
| return null; |
| } |
| }); |
| |
| //Ensure the cache writer was fired in the primary |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getCache().getRegion(CUSTOMER); |
| assertTrue(((TestCacheWriter) cust.getAttributes().getCacheWriter()).wasFired); |
| Region order = getCache().getRegion(ORDER); |
| assertTrue(((TestCacheWriter) order.getAttributes().getCacheWriter()).wasFired); |
| Region ref = getCache().getRegion(D_REFERENCE); |
| assertTrue(((TestCacheWriter) ref.getAttributes().getCacheWriter()).wasFired); |
| return null; |
| } |
| }); |
| } |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TestTxListener l = (TestTxListener)getGemfireCache().getTxManager().getListener(); |
| assertTrue(l.isListenerInvoked()); |
| return null; |
| } |
| }); |
| SerializableCallable verifyListeners = new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getCache().getRegion(CUSTOMER); |
| Region order = getCache().getRegion(ORDER); |
| throwListenerException(cust); |
| throwListenerException(order); |
| throwWriterException(cust); |
| throwWriterException(order); |
| if (!getGemfireCache().isClient()) { |
| throwTransactionCallbackException(); |
| } |
| return null; |
| } |
| private void throwTransactionCallbackException() throws Exception { |
| TestTxListener l = (TestTxListener)getGemfireCache().getTxManager().getListener(); |
| if (l.ex != null) { |
| throw l.ex; |
| } |
| TestTxWriter w = (TestTxWriter)getGemfireCache().getTxManager().getWriter(); |
| if (w.ex != null) { |
| throw w.ex; |
| } |
| } |
| private void throwListenerException(Region r) throws Exception { |
| Exception e = null; |
| CacheListener listener = r.getAttributes().getCacheListeners()[0]; |
| if (listener instanceof TestCacheListener) { |
| e = ((TestCacheListener)listener).ex; |
| } else { |
| // e = ((ClientListener)listener).??? |
| } |
| if (e != null) { |
| throw e; |
| } |
| } |
| private void throwWriterException(Region r) throws Exception { |
| Exception e = null; |
| CacheListener listener = r.getAttributes().getCacheListeners()[0]; |
| if (listener instanceof TestCacheListener) { |
| e = ((TestCacheListener)listener).ex; |
| } else { |
| // e = ((ClientListener)listener).??? |
| } |
| if (e != null) { |
| throw e; |
| } |
| } |
| }; |
| accessor.invoke(verifyListeners); |
| datastore.invoke(verifyListeners); |
| } |
| |
| abstract class CacheCallback { |
| protected boolean isAccessor; |
| protected Exception ex = null; |
| protected void verifyOrigin(EntryEvent event) { |
| try { |
| assertEquals(!isAccessor, event.isOriginRemote()); |
| } catch (Exception e) { |
| ex = e; |
| } |
| } |
| protected void verifyPutAll(EntryEvent event) { |
| CustId knownCustId = new CustId(1); |
| OrderId knownOrderId = new OrderId(2, knownCustId); |
| if (event.getKey().equals(knownOrderId)) { |
| try { |
| assertTrue(event.getOperation().isPutAll()); |
| assertNotNull(event.getTransactionId()); |
| } catch (Exception e) { |
| ex = e; |
| } |
| } |
| } |
| |
| } |
| |
| class TestCacheListener extends CacheCallback implements CacheListener { |
| TestCacheListener(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| public void afterCreate(EntryEvent event) { |
| verifyOrigin(event); |
| verifyPutAll(event); |
| } |
| public void afterUpdate(EntryEvent event) { |
| verifyOrigin(event); |
| verifyPutAll(event); |
| } |
| public void afterDestroy(EntryEvent event) { |
| verifyOrigin(event); |
| } |
| public void afterInvalidate(EntryEvent event) { |
| verifyOrigin(event); |
| } |
| public void afterRegionClear(RegionEvent event) { |
| } |
| public void afterRegionCreate(RegionEvent event) { |
| } |
| public void afterRegionDestroy(RegionEvent event) { |
| } |
| public void afterRegionInvalidate(RegionEvent event) { |
| } |
| public void afterRegionLive(RegionEvent event) { |
| } |
| public void close() { |
| } |
| } |
| |
| class TestCacheWriter extends CacheCallback implements CacheWriter { |
| |
| private volatile boolean wasFired = false; |
| |
| TestCacheWriter(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| public void beforeCreate(EntryEvent event) throws CacheWriterException { |
| getGemfireCache().getLogger().info("SWAP:beforeCreate:"+event+" op:"+event.getOperation()); |
| verifyOrigin(event); |
| verifyPutAll(event); |
| setFired(event); |
| } |
| public void setFired(EntryEvent event) { |
| wasFired = true; |
| } |
| public void beforeUpdate(EntryEvent event) throws CacheWriterException { |
| getGemfireCache().getLogger().info("SWAP:beforeCreate:"+event+" op:"+event.getOperation()); |
| verifyOrigin(event); |
| verifyPutAll(event); |
| setFired(event); |
| } |
| public void beforeDestroy(EntryEvent event) throws CacheWriterException { |
| verifyOrigin(event); |
| setFired(event); |
| } |
| public void beforeRegionClear(RegionEvent event) |
| throws CacheWriterException { |
| setFired(null); |
| } |
| public void beforeRegionDestroy(RegionEvent event) |
| throws CacheWriterException { |
| setFired(null); |
| } |
| public void close() { |
| } |
| } |
| |
| abstract class txCallback { |
| protected boolean isAccessor; |
| protected Exception ex = null; |
| protected void verify(TransactionEvent txEvent) { |
| for (CacheEvent e : txEvent.getEvents()) { |
| verifyOrigin(e); |
| verifyPutAll(e); |
| } |
| } |
| private void verifyOrigin(CacheEvent event) { |
| try { |
| assertEquals(true, event.isOriginRemote()); //change to !isAccessor after fixing #41498 |
| } catch (Exception e) { |
| ex = e; |
| } |
| } |
| private void verifyPutAll(CacheEvent p_event) { |
| if (!(p_event instanceof EntryEvent)) { |
| return; |
| } |
| EntryEvent event = (EntryEvent)p_event; |
| CustId knownCustId = new CustId(1); |
| OrderId knownOrderId = new OrderId(2, knownCustId); |
| if (event.getKey().equals(knownOrderId)) { |
| try { |
| assertTrue(event.getOperation().isPutAll()); |
| assertNotNull(event.getTransactionId()); |
| } catch (Exception e) { |
| ex = e; |
| } |
| } |
| } |
| } |
| |
| class TestTxListener extends txCallback implements TransactionListener { |
| private boolean listenerInvoked; |
| TestTxListener(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| public void afterCommit(TransactionEvent event) { |
| listenerInvoked = true; |
| verify(event); |
| } |
| public void afterFailedCommit(TransactionEvent event) { |
| verify(event); |
| } |
| public void afterRollback(TransactionEvent event) { |
| listenerInvoked = true; |
| verify(event); |
| } |
| public boolean isListenerInvoked() { |
| return this.listenerInvoked; |
| } |
| public void close() { |
| } |
| } |
| |
| class TestTxWriter extends txCallback implements TransactionWriter { |
| public TestTxWriter(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| public void beforeCommit(TransactionEvent event) { |
| verify(event); |
| } |
| public void close() { |
| } |
| } |
| |
| public void testRemoteExceptionThrown() { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getGemfireCache().getTxManager().setWriter(new TransactionWriter() { |
| public void close() { |
| } |
| public void beforeCommit(TransactionEvent event) |
| throws TransactionWriterException { |
| throw new TransactionWriterException("TestException"); |
| } |
| }); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getGemfireCache().getTxManager().begin(); |
| Region r = getCache().getRegion(CUSTOMER); |
| r.put(new CustId(8), new Customer("name8", "address8")); |
| try { |
| getGemfireCache().getTxManager().commit(); |
| fail("Expected exception not thrown"); |
| } catch (Exception e) { |
| assertEquals("TestException", e.getCause().getMessage()); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| public void testSize() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| assertEquals(5, custRegion.size()); |
| assertNotNull(mgr.getTXState()); |
| return null; |
| } |
| }); |
| datastore1.invoke(verifyNoTxState); |
| datastore2.invoke(verifyNoTxState); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| Region orderRegion = getCache().getRegion(ORDER); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertNotNull(mgr.getTXState()); |
| CustId custId = new CustId(5); |
| OrderId orderId = new OrderId(5, custId); |
| custRegion.put(custId, new Customer("customer5", "address5")); |
| orderRegion.put(orderId, new Order("order5")); |
| assertEquals(6, custRegion.size()); |
| return mgr.getTransactionId(); |
| } |
| }); |
| final Integer txOnDatastore1 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(1, txOnDatastore1+txOnDatastore2); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| |
| datastore1.invoke(verifyNoTxState); |
| datastore2.invoke(verifyNoTxState); |
| |
| |
| final Integer txOnDatastore1_1 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2_2 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(0, txOnDatastore1_1.intValue()); |
| assertEquals(0, txOnDatastore2_2.intValue()); |
| } |
| |
| public void testKeysIterator() { |
| doTestIterator(OP.KEYS, 0, OP.PUT); |
| } |
| |
| public void testValuesIterator() { |
| doTestIterator(OP.VALUES, 0, OP.PUT); |
| } |
| |
| public void testEntriesIterator() { |
| doTestIterator(OP.ENTRIES, 0, OP.PUT); |
| } |
| |
| public void testKeysIterator1() { |
| doTestIterator(OP.KEYS, 1, OP.PUT); |
| } |
| |
| public void testValuesIterator1() { |
| doTestIterator(OP.VALUES, 1, OP.PUT); |
| } |
| |
| public void testEntriesIterator1() { |
| doTestIterator(OP.ENTRIES, 1, OP.PUT); |
| } |
| |
| public void testKeysIteratorOnDestroy() { |
| doTestIterator(OP.KEYS, 0, OP.DESTROY); |
| } |
| |
| public void testValuesIteratorOnDestroy() { |
| doTestIterator(OP.VALUES, 0, OP.DESTROY); |
| } |
| |
| public void testEntriesIteratorOnDestroy() { |
| doTestIterator(OP.ENTRIES, 0, OP.DESTROY); |
| } |
| |
| public void testKeysIterator1OnDestroy() { |
| doTestIterator(OP.KEYS, 1, OP.DESTROY); |
| } |
| |
| public void testValuesIterator1OnDestroy() { |
| doTestIterator(OP.VALUES, 1, OP.DESTROY); |
| } |
| |
| public void testEntriesIterator1OnDestroy() { |
| doTestIterator(OP.ENTRIES, 1, OP.DESTROY); |
| } |
| |
| private void doTestIterator(final OP iteratorType, final int redundancy, final OP op) { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, redundancy); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| Set originalSet; |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| switch (iteratorType) { |
| case KEYS: |
| originalSet = getCustIdSet(5); |
| assertTrue(originalSet.containsAll(custRegion.keySet())); |
| assertEquals(5, custRegion.keySet().size()); |
| break; |
| case VALUES: |
| originalSet = getCustomerSet(5); |
| assertTrue(originalSet.containsAll(custRegion.values())); |
| assertEquals(5, custRegion.values().size()); |
| break; |
| case ENTRIES: |
| Set originalKeySet = getCustIdSet(5); |
| Set originalValueSet = getCustomerSet(5); |
| Set entrySet = new HashSet(); |
| Region.Entry entry; |
| for (Iterator it = custRegion.entrySet().iterator(); it.hasNext();) { |
| entrySet.add(it.next()); |
| } |
| for (Iterator it = entrySet.iterator(); it.hasNext();) { |
| entry = (Entry)it.next(); |
| assertTrue(originalKeySet.contains(entry.getKey())); |
| assertTrue(originalValueSet.contains(entry.getValue())); |
| } |
| assertEquals(5, custRegion.entrySet().size()); |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| assertNotNull(mgr.getTXState()); |
| return null; |
| } |
| }); |
| datastore1.invoke(verifyNoTxState); |
| datastore2.invoke(verifyNoTxState); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| Region orderRegion = getCache().getRegion(ORDER); |
| |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertNotNull(mgr.getTXState()); |
| int expectedSetSize = 0; |
| switch (op) { |
| case PUT: |
| CustId custId = new CustId(5); |
| OrderId orderId = new OrderId(5, custId); |
| custRegion.put(custId, new Customer("customer5", "address5")); |
| orderRegion.put(orderId, new Order("order5")); |
| expectedSetSize = 6; |
| break; |
| case DESTROY: |
| CustId custId1 = new CustId(4); |
| OrderId orderId1 = new OrderId(4, custId1); |
| custRegion.destroy(custId1); |
| orderRegion.destroy(orderId1); |
| expectedSetSize = 4; |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| |
| Set expectedSet; |
| switch (iteratorType) { |
| case KEYS: |
| expectedSet = getCustIdSet(expectedSetSize); |
| assertTrue(expectedSet.containsAll(custRegion.keySet())); |
| assertEquals(expectedSetSize, custRegion.keySet().size()); |
| break; |
| case VALUES: |
| expectedSet = getCustomerSet(expectedSetSize); |
| assertTrue(expectedSet.containsAll(custRegion.values())); |
| assertEquals(expectedSetSize, custRegion.values().size()); |
| break; |
| case ENTRIES: |
| Set originalKeySet = getCustIdSet(expectedSetSize); |
| Set originalValueSet = getCustomerSet(expectedSetSize); |
| Set entrySet = new HashSet(); |
| Region.Entry entry; |
| for (Iterator it = custRegion.entrySet().iterator(); it.hasNext();) { |
| entrySet.add(it.next()); |
| } |
| for (Iterator it = entrySet.iterator(); it.hasNext();) { |
| entry = (Entry)it.next(); |
| assertTrue(originalKeySet.contains(entry.getKey())); |
| assertTrue(originalValueSet.contains(entry.getValue())); |
| } |
| assertEquals(expectedSetSize, custRegion.entrySet().size()); |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| |
| return null; |
| } |
| }); |
| final Integer txOnDatastore1 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(1, txOnDatastore1+txOnDatastore2); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| final Integer txOnDatastore1_1 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2_2 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(0, txOnDatastore1_1+txOnDatastore2_2); |
| |
| datastore1.invoke(new SerializableCallable() { |
| CustId custId; |
| Customer customer; |
| PartitionedRegion custRegion; |
| int originalSetSize; |
| int expectedSetSize; |
| |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTXMgr(); |
| custRegion = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| mgr.begin(); |
| doLocalOp(); |
| Set expectedSet; |
| switch (iteratorType) { |
| case KEYS: |
| expectedSet = getExpectedCustIdSet(); |
| assertEquals(expectedSet, custRegion.keySet()); |
| assertEquals(expectedSetSize, custRegion.keySet().size()); |
| break; |
| case VALUES: |
| expectedSet = getExpectedCustomerSet(); |
| assertEquals(expectedSet, custRegion.values()); |
| assertEquals(expectedSetSize, custRegion.values().size()); |
| break; |
| case ENTRIES: |
| Set originalKeySet = getExpectedCustIdSet(); |
| Set originalValueSet = getExpectedCustomerSet(); |
| Set entrySet = new HashSet(); |
| Region.Entry entry; |
| for (Iterator it = custRegion.entrySet().iterator(); it.hasNext();) { |
| entrySet.add(it.next()); |
| } |
| for (Iterator it = entrySet.iterator(); it.hasNext();) { |
| entry = (Entry)it.next(); |
| assertTrue(originalKeySet.contains(entry.getKey())); |
| assertTrue(originalValueSet.contains(entry.getValue())); |
| } |
| assertEquals(expectedSetSize, custRegion.entrySet().size()); |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| mgr.commit(); |
| return null; |
| } |
| |
| private void doLocalOp() { |
| switch (op) { |
| case PUT: |
| for (int i=6;;i++) { |
| custId = new CustId(i); |
| customer = new Customer("customer"+i, "address"+i); |
| int bucketId = PartitionedRegionHelper.getHashKey(custRegion, custId); |
| InternalDistributedMember primary = custRegion.getBucketPrimary(bucketId); |
| if (primary.equals(getGemfireCache().getMyId())) { |
| custRegion.put(custId, customer); |
| break; |
| } |
| } |
| originalSetSize = 6; |
| expectedSetSize = 7; |
| break; |
| case DESTROY: |
| for (int i=3;; i--) { |
| custId = new CustId(i); |
| customer = new Customer("customer"+i, "address"+i); |
| int bucketId = PartitionedRegionHelper.getHashKey(custRegion, custId); |
| InternalDistributedMember primary = custRegion.getBucketPrimary(bucketId); |
| if (primary.equals(getGemfireCache().getMyId())) { |
| custRegion.destroy(custId); |
| break; |
| } |
| } |
| originalSetSize = 4; |
| expectedSetSize = 3; |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| } |
| |
| private Set getExpectedCustIdSet() { |
| Set retVal = getCustIdSet(originalSetSize); |
| switch (op) { |
| case PUT: |
| retVal.add(custId); |
| break; |
| case DESTROY: |
| retVal.remove(custId); |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| return retVal; |
| } |
| |
| private Set getExpectedCustomerSet() { |
| Set retVal = getCustomerSet(originalSetSize); |
| switch (op) { |
| case PUT: |
| retVal.add(customer); |
| break; |
| case DESTROY: |
| retVal.remove(customer); |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| return retVal; |
| } |
| |
| }); |
| } |
| |
| public void testKeyIterationOnRR() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| Region rr = getGemfireCache().getRegion(D_REFERENCE); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| CustId custId = new CustId(5); |
| Customer customer = new Customer("customer5", "address5"); |
| custRegion.put(custId, customer); |
| Set set = rr.keySet(); |
| Iterator it = set.iterator(); |
| int i=0; |
| while (it.hasNext()) { |
| i++; |
| it.next(); |
| } |
| assertEquals(5, i); |
| assertTrue(getCustIdSet(5).equals(set)); |
| assertEquals(5, rr.keySet().size()); |
| rr.put(custId, customer); |
| set = rr.keySet(); |
| assertTrue(getCustIdSet(6).equals(set)); |
| it = set.iterator(); |
| i=0; |
| while (it.hasNext()) { |
| i++; |
| it.next(); |
| } |
| assertEquals(6, i); |
| assertEquals(6, rr.keySet().size()); |
| assertNotNull(rr.get(custId)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertEquals(getCustIdSet(5), rr.keySet()); |
| assertEquals(5, rr.keySet().size()); |
| assertNull(rr.get(custId)); |
| mgr.resume(tx); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| } |
| |
| public void testValuesIterationOnRR() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| Region rr = getGemfireCache().getRegion(D_REFERENCE); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| CustId custId = new CustId(5); |
| Customer customer = new Customer("customer5", "address5"); |
| custRegion.put(custId, customer); |
| Set set = (Set)rr.values(); |
| Iterator it = set.iterator(); |
| int i=0; |
| while (it.hasNext()) { |
| i++; |
| it.next(); |
| } |
| assertEquals(5, i); |
| assertTrue(getCustomerSet(5).equals(set)); |
| assertEquals(5, rr.values().size()); |
| rr.put(custId, customer); |
| set = (Set)rr.values(); |
| assertTrue(getCustomerSet(6).equals(set)); |
| it = set.iterator(); |
| i=0; |
| while (it.hasNext()) { |
| i++; |
| it.next(); |
| } |
| assertEquals(6, i); |
| assertEquals(6, rr.values().size()); |
| assertNotNull(rr.get(custId)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertEquals(getCustomerSet(5), rr.values()); |
| assertEquals(5, rr.values().size()); |
| assertNull(rr.get(custId)); |
| mgr.resume(tx); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| } |
| |
| public void testEntriesIterationOnRR() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| Region rr = getGemfireCache().getRegion(D_REFERENCE); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| CustId custId = new CustId(5); |
| Customer customer = new Customer("customer5", "address5"); |
| custRegion.put(custId, customer); |
| Set set = rr.entrySet(); |
| Iterator it = set.iterator(); |
| int i=0; |
| while (it.hasNext()) { |
| i++; |
| it.next(); |
| } |
| assertEquals(5, i); |
| //assertTrue(getCustIdSet(5).equals(set)); |
| assertEquals(5, rr.entrySet().size()); |
| rr.put(custId, customer); |
| set = rr.entrySet(); |
| //assertTrue(getCustIdSet(6).equals(set)); |
| it = set.iterator(); |
| i=0; |
| while (it.hasNext()) { |
| i++; |
| it.next(); |
| } |
| assertEquals(6, i); |
| assertEquals(6, rr.entrySet().size()); |
| assertNotNull(rr.get(custId)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| //assertEquals(getCustIdSet(5), rr.entrySet()); |
| assertEquals(5, rr.entrySet().size()); |
| assertNull(rr.get(custId)); |
| mgr.resume(tx); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| } |
| |
| public void testIllegalIteration() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| SerializableCallable doIllegalIteration = new SerializableCallable() { |
| public Object call() throws Exception { |
| Region r = getGemfireCache().getRegion(CUSTOMER); |
| Set keySet = r.keySet(); |
| Set entrySet = r.entrySet(); |
| Set valueSet = (Set)r.values(); |
| CacheTransactionManager mgr = getGemfireCache().getTXMgr(); |
| mgr.begin(); |
| // now we allow for using non-TX iterators in TX context |
| try { |
| keySet.size(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| entrySet.size(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| valueSet.size(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| keySet.iterator(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| entrySet.iterator(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| valueSet.iterator(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| |
| // TX iterators |
| keySet = r.keySet(); |
| entrySet = r.entrySet(); |
| valueSet = (Set)r.values(); |
| mgr.commit(); |
| // don't allow for TX iterator after TX has committed |
| try { |
| keySet.size(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| entrySet.size(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| valueSet.size(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| keySet.iterator(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| entrySet.iterator(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| try { |
| valueSet.iterator(); |
| fail("Expected exception not thrown"); |
| } catch (IllegalStateException expected) { |
| //ignore |
| } |
| return null; |
| } |
| }; |
| |
| accessor.invoke(doIllegalIteration); |
| datastore1.invoke(doIllegalIteration); |
| } |
| |
| final CustId expectedCustId = new CustId(6); |
| final Customer expectedCustomer = new Customer("customer6", "address6"); |
| class TXFunction implements Function { |
| static final String id = "TXFunction"; |
| public void execute(FunctionContext context) { |
| Region r = null; |
| r = getGemfireCache().getRegion(CUSTOMER); |
| getGemfireCache().getLogger().fine("SWAP:callingPut"); |
| r.put(expectedCustId, expectedCustomer); |
| GemFireCacheImpl.getInstance().getLogger().warning(" XXX DOIN A PUT ",new Exception()); |
| context.getResultSender().lastResult(Boolean.TRUE); |
| } |
| public String getId() { |
| return id; |
| } |
| public boolean hasResult() { |
| return true; |
| } |
| public boolean optimizeForWrite() { |
| return true; |
| } |
| public boolean isHA() { |
| return false; |
| } |
| } |
| |
| enum Executions { |
| OnRegion, |
| OnMember |
| } |
| |
| public void testTxFunctionOnRegion() { |
| doTestTxFunction(Executions.OnRegion); |
| } |
| |
| public void testTxFunctionOnMember() { |
| doTestTxFunction(Executions.OnMember); |
| } |
| |
| private void doTestTxFunction(final Executions e) { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| SerializableCallable registerFunction = new SerializableCallable() { |
| public Object call() throws Exception { |
| FunctionService.registerFunction(new TXFunction()); |
| return null; |
| } |
| }; |
| |
| accessor.invoke(registerFunction); |
| datastore1.invoke(registerFunction); |
| datastore2.invoke(registerFunction); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| PartitionedRegion custRegion = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| TXManagerImpl mgr = getGemfireCache().getTXMgr(); |
| Set regions = new HashSet(); |
| regions.add(custRegion); |
| regions.add(getGemfireCache().getRegion(ORDER)); |
| mgr.begin(); |
| try { |
| switch (e) { |
| case OnRegion: |
| FunctionService.onRegion(custRegion).execute(TXFunction.id).getResult(); |
| break; |
| case OnMember: |
| FunctionService.onMembers(system).execute(TXFunction.id).getResult(); |
| break; |
| } |
| fail("Expected exception not thrown"); |
| } catch (TransactionException expected) { |
| } |
| try { |
| InternalFunctionService.onRegions(regions).execute(TXFunction.id).getResult(); |
| fail("Expected exception not thrown"); |
| } catch (TransactionException expected) { |
| } |
| Set filter = new HashSet(); |
| filter.add(expectedCustId); |
| switch (e) { |
| case OnRegion: |
| FunctionService.onRegion(custRegion).withFilter(filter).execute(TXFunction.id).getResult(); |
| break; |
| case OnMember: |
| DistributedMember owner = custRegion.getOwnerForKey(custRegion.getKeyInfo(expectedCustId)); |
| FunctionService.onMember(system, owner).execute(TXFunction.id).getResult(); |
| break; |
| } |
| TXStateProxy tx = mgr.internalSuspend(); |
| GemFireCacheImpl.getInstance().getLogger().warning("TX SUSPENDO:"+tx); |
| assertNull(custRegion.get(expectedCustId)); |
| mgr.resume(tx); |
| return null; |
| } |
| }); |
| |
| final Integer txOnDatastore1 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(1, txOnDatastore1+txOnDatastore2); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTXMgr(); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| |
| datastore1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| assertEquals(expectedCustomer, custRegion.get(expectedCustId)); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTXMgr(); |
| mgr.begin(); |
| PartitionedRegion custRegion = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| Set filter = new HashSet(); |
| filter.add(expectedCustId); |
| switch (e) { |
| case OnRegion: |
| FunctionService.onRegion(custRegion).withFilter(filter).execute(TXFunction.id).getResult(); |
| break; |
| case OnMember: |
| DistributedMember owner = custRegion.getOwnerForKey(custRegion.getKeyInfo(expectedCustId)); |
| FunctionService.onMember(system, owner).execute(TXFunction.id).getResult(); |
| break; |
| } |
| TXStateProxy tx = mgr.internalSuspend(); |
| custRegion.put(expectedCustId, new Customer("Cust6", "updated6")); |
| mgr.resume(tx); |
| try { |
| mgr.commit(); |
| fail("expected commit conflict not thrown"); |
| } catch (CommitConflictException expected) { |
| } |
| return null; |
| } |
| }); |
| } |
| |
| public void testNestedTxFunction() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| class NestedTxFunction2 extends FunctionAdapter { |
| static final String id = "NestedTXFunction2"; |
| @Override |
| public void execute(FunctionContext context) { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertNotNull(mgr.getTXState()); |
| try { |
| mgr.commit(); |
| fail("expected exceptio not thrown"); |
| } catch (UnsupportedOperationInTransactionException e) { |
| } |
| context.getResultSender().lastResult(Boolean.TRUE); |
| } |
| @Override |
| public String getId() { |
| return id; |
| } |
| } |
| class NestedTxFunction extends FunctionAdapter { |
| static final String id = "NestedTXFunction"; |
| @Override |
| public void execute(FunctionContext context) { |
| Region r = null; |
| if (context instanceof RegionFunctionContext) { |
| r = PartitionRegionHelper.getLocalDataForContext((RegionFunctionContext)context); |
| } else { |
| r = getGemfireCache().getRegion(CUSTOMER); |
| } |
| assertNotNull(getGemfireCache().getTxManager().getTXState()); |
| PartitionedRegion pr = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| Set filter = new HashSet(); |
| filter.add(expectedCustId); |
| getLogWriter().info("SWAP:inside NestedTxFunc calling func2:"); |
| r.put(expectedCustId, expectedCustomer); |
| FunctionService.onRegion(pr).withFilter(filter).execute(new NestedTxFunction2()).getResult(); |
| assertNotNull(getGemfireCache().getTxManager().getTXState()); |
| context.getResultSender().lastResult(Boolean.TRUE); |
| } |
| @Override |
| public boolean optimizeForWrite() { |
| return true; |
| } |
| @Override |
| public String getId() { |
| return id; |
| } |
| } |
| |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| PartitionedRegion pr = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| mgr.begin(); |
| Set filter = new HashSet(); |
| filter.add(expectedCustId); |
| FunctionService.onRegion(pr).withFilter(filter).execute(new NestedTxFunction()).getResult(); |
| assertNotNull(getGemfireCache().getTxManager().getTXState()); |
| mgr.commit(); |
| assertEquals(expectedCustomer, pr.get(expectedCustId)); |
| return null; |
| } |
| }); |
| } |
| |
| public void testDRFunctionExecution() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| |
| class CreateDR extends SerializableCallable { |
| private final boolean isAccessor; |
| public CreateDR(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| public Object call() throws Exception { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(isAccessor? DataPolicy.EMPTY : DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| getCache().createRegion(CUSTOMER, af.create()); |
| if (isAccessor) { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| for (int i=0; i<5; i++) { |
| CustId custId = new CustId(i); |
| Customer customer = new Customer("customer"+i, "address"+i); |
| custRegion.put(custId, customer); |
| } |
| } |
| return null; |
| } |
| } |
| |
| datastore1.invoke(new CreateDR(false)); |
| datastore2.invoke(new CreateDR(false)); |
| accessor.invoke(new CreateDR(true)); |
| |
| SerializableCallable registerFunction = new SerializableCallable() { |
| public Object call() throws Exception { |
| FunctionService.registerFunction(new TXFunction()); |
| return null; |
| } |
| }; |
| |
| accessor.invoke(registerFunction); |
| datastore1.invoke(registerFunction); |
| datastore2.invoke(registerFunction); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| TXManagerImpl mgr = getGemfireCache().getTXMgr(); |
| mgr.begin(); |
| FunctionService.onRegion(custRegion).execute(TXFunction.id).getResult(); |
| assertNotNull(mgr.getTXState()); |
| TXStateProxy tx= mgr.internalSuspend(); |
| assertNull(mgr.getTXState()); |
| getGemfireCache().getLogger().fine("SWAP:callingget"); |
| assertNull("expected null but was:"+custRegion.get(expectedCustId), custRegion.get(expectedCustId)); |
| mgr.resume(tx); |
| mgr.commit(); |
| assertEquals(expectedCustomer, custRegion.get(expectedCustId)); |
| return null; |
| } |
| }); |
| |
| datastore1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| final Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| TXManagerImpl mgr = getGemfireCache().getTXMgr(); |
| mgr.begin(); |
| FunctionService.onRegion(custRegion).execute(new FunctionAdapter() { |
| @Override |
| public String getId() { |
| return "LocalDS"; |
| } |
| @Override |
| public void execute(FunctionContext context) { |
| assertNotNull(getGemfireCache().getTxManager().getTXState()); |
| custRegion.destroy(expectedCustId); |
| context.getResultSender().lastResult(Boolean.TRUE); |
| } |
| }).getResult(); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertEquals(custRegion.get(expectedCustId), expectedCustomer); |
| mgr.resume(tx); |
| mgr.commit(); |
| assertNull(custRegion.get(expectedCustId)); |
| return null; |
| } |
| }); |
| } |
| |
| public void testTxFunctionWithOtherOps() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| SerializableCallable registerFunction = new SerializableCallable() { |
| public Object call() throws Exception { |
| FunctionService.registerFunction(new TXFunction()); |
| return null; |
| } |
| }; |
| |
| accessor.invoke(registerFunction); |
| datastore1.invoke(registerFunction); |
| datastore2.invoke(registerFunction); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| TXManagerImpl mgr = getGemfireCache().getTXMgr(); |
| mgr.begin(); |
| try { |
| FunctionService.onRegion(custRegion).execute(TXFunction.id).getResult(); |
| fail("Expected exception not thrown"); |
| } catch (TransactionException expected) { |
| } |
| Set filter = new HashSet(); |
| filter.add(expectedCustId); |
| FunctionService.onRegion(custRegion).withFilter(filter).execute(TXFunction.id).getResult(); |
| assertEquals(expectedCustomer, custRegion.get(expectedCustId)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNull(custRegion.get(expectedCustId)); |
| mgr.resume(tx); |
| return null; |
| } |
| }); |
| |
| final Integer txOnDatastore1 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(1, txOnDatastore1+txOnDatastore2); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| CacheTransactionManager mgr = getGemfireCache().getTXMgr(); |
| mgr.commit(); |
| assertEquals(expectedCustomer, custRegion.get(expectedCustId)); |
| custRegion.destroy(expectedCustId); |
| return null; |
| } |
| }); |
| //test onMembers |
| SerializableCallable getMember = new SerializableCallable() { |
| public Object call() throws Exception { |
| return getGemfireCache().getMyId(); |
| } |
| }; |
| final InternalDistributedMember ds1 = (InternalDistributedMember)datastore1.invoke(getMember); |
| final InternalDistributedMember ds2 = (InternalDistributedMember)datastore2.invoke(getMember); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| PartitionedRegion pr = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| //get owner for expectedKey |
| DistributedMember owner = pr.getOwnerForKey(pr.getKeyInfo(expectedCustId)); |
| //get key on datastore1 |
| CustId keyOnOwner = null; |
| keyOnOwner = getKeyOnMember(owner, pr); |
| |
| TXManagerImpl mgr = getGemfireCache().getTXMgr(); |
| mgr.begin(); |
| //bootstrap tx on owner |
| pr.get(keyOnOwner); |
| Set<DistributedMember> members = new HashSet<DistributedMember>(); |
| members.add(ds1);members.add(ds2); |
| try { |
| FunctionService.onMembers(system, members).execute(TXFunction.id).getResult(); |
| fail("expected exception not thrown"); |
| } catch (TransactionException expected) { |
| } |
| FunctionService.onMember(system, owner).execute(TXFunction.id).getResult(); |
| assertEquals(expectedCustomer, pr.get(expectedCustId)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNull(pr.get(expectedCustId)); |
| mgr.resume(tx); |
| return null; |
| } |
| }); |
| final Integer txOnDatastore1_1 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2_1 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(1, txOnDatastore1_1+txOnDatastore2_1); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| CacheTransactionManager mgr = getGemfireCache().getTXMgr(); |
| mgr.commit(); |
| assertEquals(expectedCustomer, custRegion.get(expectedCustId)); |
| custRegion.destroy(expectedCustId); |
| return null; |
| } |
| }); |
| //test function execution on data store |
| final DistributedMember owner = (DistributedMember)accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| PartitionedRegion pr = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| return pr.getOwnerForKey(pr.getKeyInfo(expectedCustId)); |
| } |
| }); |
| |
| SerializableCallable testFnOnDs = new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTXMgr(); |
| PartitionedRegion pr = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| CustId keyOnDs = getKeyOnMember(pr.getMyId(), pr); |
| mgr.begin(); |
| pr.get(keyOnDs); |
| Set filter = new HashSet(); |
| filter.add(keyOnDs); |
| FunctionService.onRegion(pr).withFilter(filter).execute(TXFunction.id).getResult(); |
| assertEquals(expectedCustomer, pr.get(expectedCustId)); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNull(pr.get(expectedCustId)); |
| mgr.resume(tx); |
| return null; |
| } |
| }; |
| SerializableCallable closeTx = new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getGemfireCache().getRegion(CUSTOMER); |
| CacheTransactionManager mgr = getGemfireCache().getTXMgr(); |
| mgr.commit(); |
| assertEquals(expectedCustomer, custRegion.get(expectedCustId)); |
| custRegion.destroy(expectedCustId); |
| return null; |
| } |
| }; |
| |
| if (owner.equals(ds1)) { |
| datastore1.invoke(testFnOnDs); |
| final Integer txOnDatastore1_2 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2_2 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(0, txOnDatastore1_2+txOnDatastore2_2);//ds1 has a local transaction, not remote |
| datastore1.invoke(closeTx); |
| } else { |
| datastore2.invoke(testFnOnDs); |
| final Integer txOnDatastore1_2 = (Integer)datastore1.invoke(getNumberOfTXInProgress); |
| final Integer txOnDatastore2_2 = (Integer)datastore2.invoke(getNumberOfTXInProgress); |
| assertEquals(0, txOnDatastore1_2+txOnDatastore2_2);//ds1 has a local transaction, not remote |
| datastore2.invoke(closeTx); |
| } |
| |
| //test that function is rejected if function target is not same as txState target |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTXMgr(); |
| PartitionedRegion pr = (PartitionedRegion)getGemfireCache().getRegion(CUSTOMER); |
| CustId keyOnDs1 = getKeyOnMember(ds1, pr); |
| CustId keyOnDs2 = getKeyOnMember(ds2, pr); |
| mgr.begin(); |
| pr.get(keyOnDs1);//bootstrap txState |
| Set filter = new HashSet(); |
| filter.add(keyOnDs2); |
| try { |
| FunctionService.onRegion(pr).withFilter(filter).execute(TXFunction.id).getResult(); |
| fail("expected Exception not thrown"); |
| } catch (TransactionDataRebalancedException expected) { |
| } |
| try { |
| FunctionService.onMember(system, ds2).execute(TXFunction.id).getResult(); |
| fail("expected exception not thrown"); |
| } catch (TransactionDataNotColocatedException expected) { |
| } |
| mgr.commit(); |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * @param ds1 |
| * @param pr |
| * @return first key found on the given member |
| */ |
| CustId getKeyOnMember(final DistributedMember owner, |
| PartitionedRegion pr) { |
| CustId retVal = null; |
| for (int i=0; i<5; i++) { |
| CustId custId = new CustId(i); |
| DistributedMember member = pr.getOwnerForKey(pr.getKeyInfo(custId)); |
| if (member.equals(owner)) { |
| retVal = custId; |
| break; |
| } |
| } |
| return retVal; |
| } |
| /** |
| * @param i |
| * @return |
| */ |
| protected Set<Customer> getCustomerSet(int size) { |
| Set<Customer> expectedSet = new HashSet<Customer>(); |
| for (int i=0; i<size; i++) { |
| expectedSet.add(new Customer("customer"+i, "address"+i)); |
| } |
| return expectedSet; |
| } |
| |
| Set<CustId> getCustIdSet(int size) { |
| Set<CustId> expectedSet = new HashSet<CustId>(); |
| for (int i=0; i<size; i++) { |
| expectedSet.add(new CustId(i)); |
| } |
| return expectedSet; |
| } |
| |
| public void testRemoteJTACommit() { |
| doRemoteJTA(true); |
| } |
| |
| public void testRemoteJTARollback() { |
| doRemoteJTA(false); |
| } |
| |
| private void doRemoteJTA(final boolean isCommit) { |
| Host host = Host.getHost(0); |
| VM acc = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(acc, datastore, 0); |
| VM accessor = getVMForTransactions(acc, datastore); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getGemfireCache().getTxManager().addListener(new TestTxListener(false)); |
| return null; |
| } |
| }); |
| final CustId expectedCustId = new CustId(6); |
| final Customer expectedCustomer = new Customer("customer6", "address6"); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getGemfireCache().getTxManager().addListener(new TestTxListener(true)); |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| Context ctx = getCache().getJNDIContext(); |
| UserTransaction tx = (UserTransaction)ctx.lookup("java:/UserTransaction"); |
| assertEquals(Status.STATUS_NO_TRANSACTION, tx.getStatus()); |
| tx.begin(); |
| assertEquals(Status.STATUS_ACTIVE, tx.getStatus()); |
| custRegion.put(expectedCustId, expectedCustomer); |
| assertEquals(expectedCustomer, custRegion.get(expectedCustId)); |
| return null; |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| assertNull(custRegion.get(expectedCustId)); |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| Context ctx = getCache().getJNDIContext(); |
| UserTransaction tx = (UserTransaction)ctx.lookup("java:/UserTransaction"); |
| if (isCommit) { |
| tx.commit(); |
| assertEquals(expectedCustomer, custRegion.get(expectedCustId)); |
| } else { |
| tx.rollback(); |
| assertNull(custRegion.get(expectedCustId)); |
| } |
| return null; |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TestTxListener l = (TestTxListener)getGemfireCache().getTXMgr().getListener(); |
| assertTrue(l.isListenerInvoked()); |
| return null; |
| } |
| }); |
| } |
| |
| |
| public void testOriginRemoteIsTrueForRemoteReplicatedRegions() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| class OriginRemoteRRWriter extends CacheWriterAdapter { |
| int fireC =0 ; |
| int fireD =0 ; |
| int fireU =0 ; |
| public void beforeCreate(EntryEvent event) |
| throws CacheWriterException { |
| if (!event.isOriginRemote()) { |
| throw new CacheWriterException( |
| "SUP?? This CREATE is supposed to be isOriginRemote"); |
| } |
| fireC++; |
| } |
| |
| public void beforeDestroy(EntryEvent event) throws CacheWriterException { |
| getGemfireCache().getLoggerI18n().fine( |
| "SWAP:writer:createEvent:" + event); |
| if (!event.isOriginRemote()) { |
| throw new CacheWriterException( |
| "SUP?? This DESTROY is supposed to be isOriginRemote"); |
| } |
| fireD++; |
| } |
| |
| public void beforeUpdate(EntryEvent event) throws CacheWriterException { |
| if (!event.isOriginRemote()) { |
| throw new CacheWriterException( |
| "SUP?? This UPDATE is supposed to be isOriginRemote"); |
| } |
| fireU++; |
| } |
| } |
| |
| datastore.invoke(new SerializableCallable() { |
| |
| public Object call() throws Exception { |
| Region refRegion = getCache().getRegion(D_REFERENCE); |
| refRegion.getAttributesMutator().setCacheWriter(new OriginRemoteRRWriter()); |
| return null; |
| } |
| |
| }); |
| |
| |
| accessor.invoke(new DoOpsInTX(OP.PUT)); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNotNull(tx); |
| mgr.resume(tx); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new DoOpsInTX(OP.DESTROY)); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNotNull(tx); |
| mgr.resume(tx); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| |
| |
| accessor.invoke(new DoOpsInTX(OP.PUT)); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNotNull(tx); |
| mgr.resume(tx); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| |
| datastore.invoke(new SerializableCallable() { |
| |
| public Object call() throws Exception { |
| Region refRegion = getCache().getRegion(D_REFERENCE); |
| OriginRemoteRRWriter w = (OriginRemoteRRWriter) refRegion.getAttributes().getCacheWriter(); |
| assertEquals(1, w.fireC); |
| assertEquals(1, w.fireD); |
| assertEquals(1, w.fireU); |
| return null; |
| } |
| }); |
| } |
| |
| |
| |
| public void testRemoteCreateInReplicatedRegion() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| accessor.invoke(new DoOpsInTX(OP.PUT)); |
| |
| accessor.invoke(new SerializableCallable() { |
| |
| public Object call() throws Exception { |
| Region refRegion = getCache().getRegion(D_REFERENCE); |
| refRegion.create("sup","dawg"); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| TXStateProxy tx = mgr.internalSuspend(); |
| assertNotNull(tx); |
| mgr.resume(tx); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| |
| public Object call() throws Exception { |
| Region refRegion = getCache().getRegion(D_REFERENCE); |
| assertEquals("dawg",refRegion.get("sup")); |
| return null; |
| } |
| }); |
| } |
| |
| public void testRemoteTxCleanupOnCrash() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getGemfireCache().getRegion(CUSTOMER); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| cust.put(new CustId(6), new Customer("customer6", "address6")); |
| return null; |
| } |
| }); |
| final InternalDistributedMember member = (InternalDistributedMember)accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| return getGemfireCache().getMyId(); |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| assertEquals(1, mgr.hostedTransactionsInProgressForTest()); |
| mgr.memberDeparted(member, true); |
| assertEquals(0, mgr.hostedTransactionsInProgressForTest()); |
| return null; |
| } |
| }); |
| } |
| |
| public void testNonColocatedPutAll() { |
| doNonColocatedbulkOp(OP.PUTALL); |
| } |
| |
| /** |
| * disabled because rather than throwing an exception, |
| * getAll catches all exceptions and logs a warning |
| * message |
| */ |
| public void _SWAP_testNonColocatedGetAll() { |
| doNonColocatedbulkOp(OP.GETALL); |
| } |
| |
| private void doNonColocatedbulkOp(final OP op) { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Map custMap = new HashMap(); |
| for (int i=0; i<10; i++) { |
| CustId cId = new CustId(i); |
| Customer c = new Customer("name"+i, "addr"+i); |
| custMap.put(cId, c); |
| } |
| GemFireCacheImpl cache = getGemfireCache(); |
| cache.getCacheTransactionManager().begin(); |
| Region r = cache.getRegion(CUSTOMER); |
| try { |
| switch (op) { |
| case PUTALL: |
| r.putAll(custMap); |
| break; |
| case GETALL: |
| r.put(new CustId(1), new Customer("cust1", "addr1")); |
| r.getAll(custMap.keySet()); |
| break; |
| default: |
| break; |
| } |
| fail("expected exception not thrown"); |
| } catch (TransactionDataNotColocatedException e) { |
| } |
| cache.getCacheTransactionManager().rollback(); |
| return null; |
| } |
| }); |
| } |
| |
| public void testBasicPutAll() { |
| doTestBasicBulkOP(OP.PUTALL); |
| } |
| |
| public void testBasicRemoveAll() { |
| doTestBasicBulkOP(OP.REMOVEALL); |
| } |
| |
| private void doTestBasicBulkOP(final OP op) { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 1); |
| |
| if (op.equals(OP.REMOVEALL)) { |
| // for remove all populate more data |
| accessor.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| for (int i=0; i<50; i++) { |
| custRegion.put(new CustId(i), new Customer("name"+i, "address"+i)); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| final List ds1Buckets = (List) datastore1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| // do local operations with rollback and then commit |
| Map<CustId, Customer> custMap = new HashMap<>(); |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| PartitionedRegion pr = ((PartitionedRegion)custRegion); |
| List localBuckets = pr.getLocalPrimaryBucketsListTestOnly(); |
| System.out.println("localBuckets:"+localBuckets); |
| for (int i=10; i<20; i++) { |
| int hash = PartitionedRegionHelper.getHashKey(i, pr.getPartitionAttributes().getTotalNumBuckets()); |
| if (localBuckets.contains(hash)) { |
| custMap.put(new CustId(i), new Customer("name"+i, "address"+i)); |
| } |
| } |
| System.out.println("SWAP:custMap:"+custMap); |
| int regionSize = custRegion.size(); |
| getCache().getCacheTransactionManager().begin(); |
| if (op.equals(OP.PUTALL)) { |
| custRegion.putAll(custMap); |
| } else { |
| custRegion.removeAll(custMap.keySet()); |
| } |
| getCache().getCacheTransactionManager().rollback(); |
| assertEquals(regionSize, custRegion.size()); |
| // now commit |
| getCache().getCacheTransactionManager().begin(); |
| if (op.equals(OP.PUTALL)) { |
| custRegion.putAll(custMap); |
| } else { |
| custRegion.removeAll(custMap.keySet()); |
| } |
| getCache().getCacheTransactionManager().commit(); |
| assertEquals(getExpectedSize(custMap, regionSize), custRegion.size()); |
| |
| // bulk op on other member |
| custMap.clear(); |
| for (int i=10; i<20; i++) { |
| int hash = PartitionedRegionHelper.getHashKey(i, pr.getPartitionAttributes().getTotalNumBuckets()); |
| if (!localBuckets.contains(hash)) { // not on local member |
| custMap.put(new CustId(i), new Customer("name"+i, "address"+i)); |
| } |
| } |
| System.out.println("SWAP:custMap:"+custMap); |
| regionSize = custRegion.size(); |
| getCache().getCacheTransactionManager().begin(); |
| if (op.equals(OP.PUTALL)) { |
| custRegion.putAll(custMap); |
| } else { |
| custRegion.removeAll(custMap.keySet()); |
| } |
| getCache().getCacheTransactionManager().rollback(); |
| assertEquals(regionSize, custRegion.size()); |
| // now commit |
| getCache().getCacheTransactionManager().begin(); |
| if (op.equals(OP.PUTALL)) { |
| custRegion.putAll(custMap); |
| } else { |
| custRegion.removeAll(custMap.keySet()); |
| } |
| getCache().getCacheTransactionManager().commit(); |
| assertEquals(getExpectedSize(custMap, regionSize), custRegion.size()); |
| return localBuckets; |
| } |
| |
| private int getExpectedSize(Map<CustId, Customer> custMap, int regionSize) { |
| if (op.equals(OP.REMOVEALL)) { |
| return regionSize - custMap.size(); |
| } |
| return regionSize + custMap.size(); |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| // do a transaction on one of the nodes |
| Map<CustId, Customer> custMap = new HashMap<>(); |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| for (int i=20; i<30; i++) { |
| int hash = PartitionedRegionHelper.getHashKey(i, custRegion.getAttributes().getPartitionAttributes().getTotalNumBuckets()); |
| if (ds1Buckets.contains(hash)) { |
| custMap.put(new CustId(i), new Customer("name"+i, "address"+i)); |
| } |
| } |
| System.out.println("SWAP:custMap:"+custMap); |
| int regionSize = custRegion.size(); |
| getCache().getCacheTransactionManager().begin(); |
| if (op.equals(OP.PUTALL)) { |
| custRegion.putAll(custMap); |
| } else { |
| custRegion.removeAll(custMap.keySet()); |
| } |
| getCache().getCacheTransactionManager().rollback(); |
| assertEquals(regionSize, custRegion.size()); |
| // now commit |
| getCache().getCacheTransactionManager().begin(); |
| if (op.equals(OP.PUTALL)) { |
| custRegion.putAll(custMap); |
| } else { |
| custRegion.removeAll(custMap.keySet()); |
| } |
| getCache().getCacheTransactionManager().commit(); |
| assertEquals(getExpectedSize(custMap, regionSize), custRegion.size()); |
| return null; |
| } |
| private int getExpectedSize(Map<CustId, Customer> custMap, int regionSize) { |
| if (op.equals(OP.REMOVEALL)) { |
| return regionSize - custMap.size(); |
| } |
| return regionSize + custMap.size(); |
| } |
| }); |
| } |
| |
| public void testDestroyCreateConflation() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| initAccessorAndDataStore(accessor, datastore, 0); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getCache().getRegion(D_REFERENCE); |
| cust.put("meow","this is a meow, deal with it"); |
| cust.getAttributesMutator().addCacheListener(new OneUpdateCacheListener()); |
| cust.getAttributesMutator().setCacheWriter(new OneDestroyAndThenOneCreateCacheWriter()); |
| |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getCache().getRegion(D_REFERENCE); |
| cust.getAttributesMutator().addCacheListener(new OneUpdateCacheListener()); |
| return null; |
| } |
| }); |
| |
| |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| CacheTransactionManager mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| Region cust = getCache().getRegion(D_REFERENCE); |
| cust.destroy("meow"); |
| cust.create("meow","this is the new meow, not the old meow"); |
| mgr.commit(); |
| return null; |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getCache().getRegion(D_REFERENCE); |
| OneUpdateCacheListener rat = (OneUpdateCacheListener)cust.getAttributes().getCacheListener(); |
| if(!rat.getSuccess()) { |
| fail("The OneUpdateCacheListener didnt get an update"); |
| } |
| return null; |
| } |
| }); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region cust = getCache().getRegion(D_REFERENCE); |
| OneDestroyAndThenOneCreateCacheWriter wri = (OneDestroyAndThenOneCreateCacheWriter)cust.getAttributes().getCacheWriter(); |
| wri.checkSuccess(); |
| return null; |
| } |
| }); |
| |
| |
| } |
| |
| class OneUpdateCacheListener extends CacheListenerAdapter { |
| boolean success = false; |
| |
| public boolean getSuccess() { |
| return success; |
| } |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| fail("create not expected"); |
| } |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| if(!success) { |
| System.out.println("WE WIN!"); |
| success = true; |
| } else { |
| fail("Should have only had one update"); |
| } |
| } |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| fail("destroy not expected"); |
| } |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| fail("invalidate not expected"); |
| } |
| } |
| |
| class OneDestroyAndThenOneCreateCacheWriter extends CacheWriterAdapter { |
| private boolean oneDestroy; |
| private boolean oneCreate; |
| |
| public void checkSuccess() throws Exception { |
| if(oneDestroy && oneCreate) { |
| // chill |
| } else { |
| fail("Didn't get both events. oneDestroy="+oneDestroy+" oneCreate="+oneCreate); |
| } |
| } |
| |
| @Override |
| public void beforeCreate(EntryEvent event) throws CacheWriterException { |
| if(!oneDestroy) { |
| fail("destroy should have arrived in writer before create"); |
| } else { |
| if(oneCreate) { |
| fail("more than one create detected! expecting destroy then create"); |
| } else { |
| oneCreate = true; |
| } |
| } |
| } |
| @Override |
| public void beforeUpdate(EntryEvent event) throws CacheWriterException { |
| fail("update not expected"); |
| } |
| @Override |
| public void beforeDestroy(EntryEvent event) throws CacheWriterException { |
| if(oneDestroy) { |
| fail("only one destroy expected"); |
| } else { |
| if(oneCreate) { |
| fail("destroy is supposed to precede create"); |
| } else { |
| oneDestroy = true; |
| } |
| } |
| } |
| |
| } |
| |
| protected Integer startServer(VM vm) { |
| return (Integer) vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| CacheServer s = getCache().addCacheServer(); |
| s.setPort(port); |
| s.start(); |
| return port; |
| } |
| }); |
| } |
| protected void createClientRegion(VM vm, final int port, final boolean isEmpty, final boolean ri) { |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| ClientCacheFactory ccf = new ClientCacheFactory(); |
| ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port); |
| ccf.setPoolSubscriptionEnabled(true); |
| ccf.set("log-level", getDUnitLogLevel()); |
| ClientCache cCache = getClientCache(ccf); |
| ClientRegionFactory<Integer, String> crf = cCache |
| .createClientRegionFactory(isEmpty ? ClientRegionShortcut.PROXY |
| : ClientRegionShortcut.CACHING_PROXY); |
| crf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| crf.addCacheListener(new ClientListener()); |
| Region r = crf.create(D_REFERENCE); |
| Region cust = crf.create(CUSTOMER); |
| Region order = crf.create(ORDER); |
| if (ri) { |
| r.registerInterestRegex(".*"); |
| cust.registerInterestRegex(".*"); |
| order.registerInterestRegex(".*"); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| protected class ClientCQListener implements CqListener { |
| |
| boolean invoked = false; |
| public void onError(CqEvent aCqEvent) { |
| // TODO Auto-generated method stub |
| |
| } |
| |
| public void onEvent(CqEvent aCqEvent) { |
| // TODO Auto-generated method stub |
| invoked =true; |
| |
| } |
| |
| public void close() { |
| // TODO Auto-generated method stub |
| |
| } |
| |
| } |
| |
| protected static class ClientListener extends CacheListenerAdapter { |
| boolean invoked = false; |
| int invokeCount = 0; |
| int invalidateCount = 0; |
| int putCount = 0; |
| boolean putAllOp = false; |
| boolean isOriginRemote = false; |
| int creates; |
| int updates; |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER CREATE:"+event.getKey()); |
| invoked = true; |
| invokeCount++; |
| putCount++; |
| creates++; |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER CREATE:"+event.getKey()+" isPutAll:"+event.getOperation().isPutAll()+" op:"+event.getOperation()); |
| putAllOp = event.getOperation().isPutAll(); |
| isOriginRemote = event.isOriginRemote(); |
| } |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER UPDATE:"+event.getKey()+" isPutAll:"+event.getOperation().isPutAll()+" op:"+event.getOperation()); |
| putAllOp = event.getOperation().isPutAll(); |
| invoked = true; |
| invokeCount++; |
| putCount++; |
| updates++; |
| isOriginRemote = event.isOriginRemote(); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER UPDATE:"+event.getKey()); |
| invoked = true; |
| invokeCount++; |
| invalidateCount++; |
| isOriginRemote = event.isOriginRemote(); |
| } |
| |
| public void reset() { |
| invoked = false; |
| invokeCount = 0; |
| invalidateCount = 0; |
| putCount = 0; |
| isOriginRemote = false; |
| creates = 0; |
| updates = 0; |
| } |
| } |
| |
| protected static class ServerListener extends CacheListenerAdapter { |
| boolean invoked = false; |
| int creates; |
| int updates; |
| @Override |
| public void afterCreate(EntryEvent event) { |
| invoked = true; |
| creates++; |
| } |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| invoked = true; |
| updates++; |
| } |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| invoked = true; |
| } |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| invoked = true; |
| } |
| } |
| |
| protected static class ServerWriter extends CacheWriterAdapter { |
| boolean invoked = false; |
| @Override |
| public void beforeCreate(EntryEvent event) throws CacheWriterException { |
| invoked = true; |
| event.getRegion().getCache().getLogger().info("SWAP:writer:"+event); |
| assertTrue(event.isOriginRemote()); |
| } |
| @Override |
| public void beforeUpdate(EntryEvent event) throws CacheWriterException { |
| invoked = true; |
| event.getRegion().getCache().getLogger().info("SWAP:writer:"+event); |
| assertTrue(event.isOriginRemote()); |
| } |
| @Override |
| public void beforeDestroy(EntryEvent event) throws CacheWriterException { |
| invoked = true; |
| event.getRegion().getCache().getLogger().info("SWAP:writer:"+event); |
| assertTrue(event.isOriginRemote()); |
| } |
| } |
| |
| |
| public void testTXWithRI() throws Exception { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| VM client = host.getVM(2); |
| |
| initAccessorAndDataStore(accessor, datastore, 0); |
| int port = startServer(datastore); |
| |
| createClientRegion(client, port, false, true); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| getCache().getCacheTransactionManager().begin(); |
| custRegion.put(custId, new Customer("foo", "bar")); |
| orderRegion.put(orderId, new Order("fooOrder")); |
| refRegion.put(custId, new Customer("foo", "bar")); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| Thread.sleep(10000); |
| client.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0]; |
| getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked); |
| assertTrue(cl.invoked); |
| return null; |
| } |
| }); |
| } |
| |
| private static final String EMPTY_REGION = "emptyRegionName"; |
| |
| public void testBug43176() { |
| Host host = Host.getHost(0); |
| VM datastore = host.getVM(0); |
| VM client = host.getVM(1); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| AttributesFactory<Integer, String> af = new AttributesFactory<Integer, String>(); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setDataPolicy(DataPolicy.EMPTY); |
| af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| getCache().createRegionFactory(af.create()).create(EMPTY_REGION); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| getCache().createRegionFactory(af.create()).create(D_REFERENCE); |
| return null; |
| } |
| }); |
| |
| final int port = startServer(datastore); |
| client.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| ClientCacheFactory ccf = new ClientCacheFactory(); |
| ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port); |
| ccf.setPoolSubscriptionEnabled(true); |
| ccf.set("log-level", getDUnitLogLevel()); |
| ClientCache cCache = getClientCache(ccf); |
| ClientRegionFactory<Integer, String> crf = cCache |
| .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY); |
| crf.addCacheListener(new ClientListener()); |
| crf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| Region r = crf.create(D_REFERENCE); |
| Region empty = crf.create(EMPTY_REGION); |
| r.registerInterest("ALL_KEYS", InterestResultPolicy.KEYS_VALUES); |
| empty.registerInterest("ALL_KEYS", InterestResultPolicy.KEYS_VALUES); |
| return null; |
| } |
| }); |
| |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region ref = getCache().getRegion(D_REFERENCE); |
| Region empty = getCache().getRegion(EMPTY_REGION); |
| getGemfireCache().getCacheTransactionManager().begin(); |
| ref.put("one", "value1"); |
| empty.put("eone", "valueOne"); |
| getCache().getLogger().info("SWAP:callingCommit"); |
| getGemfireCache().getCacheTransactionManager().commit(); |
| assertTrue(ref.containsKey("one")); |
| assertEquals("value1", ref.get("one")); |
| assertFalse(empty.containsKey("eone")); |
| assertNull(empty.get("eone")); |
| return null; |
| } |
| }); |
| |
| client.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region empty = getCache().getRegion(EMPTY_REGION); |
| final ClientListener l = (ClientListener) empty.getAttributes().getCacheListeners()[0]; |
| WaitCriterion wc = new WaitCriterion() { |
| public boolean done() { |
| return l.invoked; |
| } |
| public String description() { |
| return "listener invoked:"+l.invoked; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(wc, 10*1000, 200, true); |
| return null; |
| } |
| }); |
| } |
| |
| public void testTXWithRICommitInDatastore() throws Exception { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| VM client = host.getVM(2); |
| |
| initAccessorAndDataStore(accessor, datastore, 0); |
| int port = startServer(datastore); |
| |
| createClientRegion(client, port, false, true); |
| datastore.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| getCache().getCacheTransactionManager().begin(); |
| custRegion.put(custId, new Customer("foo", "bar")); |
| orderRegion.put(orderId, new Order("fooOrder")); |
| refRegion.put(custId, new Customer("foo", "bar")); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| Thread.sleep(10000); |
| client.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0]; |
| getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked); |
| assertTrue(cl.invoked); |
| return null; |
| } |
| }); |
| } |
| |
| |
| public void testListenersNotInvokedOnSecondary() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| |
| initAccessorAndDataStoreWithInterestPolicy(accessor, datastore1, datastore2, 1); |
| SerializableCallable registerListener = new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| custRegion.getAttributesMutator().addCacheListener(new ListenerInvocationCounter()); |
| return null; |
| } |
| }; |
| datastore1.invoke(registerListener); |
| datastore2.invoke(registerListener); |
| |
| datastore1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| getCache().getCacheTransactionManager().begin(); |
| CustId custId = new CustId(1); |
| Customer customer = new Customer("customerNew", "addressNew"); |
| custRegion.put(custId, customer); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| SerializableCallable getListenerCount = new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| ListenerInvocationCounter l = (ListenerInvocationCounter) custRegion.getAttributes().getCacheListeners()[0]; |
| getCache().getLogger().info("SWAP:listenerCount:"+l.invocationCount); |
| return l.invocationCount; |
| } |
| }; |
| |
| int totalInvocation = (Integer) datastore1.invoke(getListenerCount) + (Integer)datastore2.invoke(getListenerCount); |
| assertEquals(1, totalInvocation); |
| } |
| |
| private class ListenerInvocationCounter extends CacheListenerAdapter { |
| private int invocationCount = 0; |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| invocationCount++; |
| } |
| } |
| |
| public void testBug33073() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore1 = host.getVM(1); |
| VM datastore2 = host.getVM(2); |
| |
| initAccessorAndDataStore(accessor, datastore1, datastore2, 0); |
| final CustId custId = new CustId(19); |
| |
| datastore1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| assertNull(refRegion.get(custId)); |
| getCache().getCacheTransactionManager().begin(); |
| refRegion.put(custId, new Customer("name1", "address1")); |
| return null; |
| } |
| }); |
| datastore2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| assertNull(refRegion.get(custId)); |
| refRegion.put(custId, new Customer("nameNew", "addressNew")); |
| return null; |
| } |
| }); |
| datastore1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| try { |
| getCache().getCacheTransactionManager().commit(); |
| fail("expected commit conflict not thrown"); |
| } catch (CommitConflictException cc) { |
| } |
| return null; |
| } |
| }); |
| } |
| |
| public void testBug43081() throws Exception { |
| createRegion(false, 0, null); |
| Context ctx = getCache().getJNDIContext(); |
| UserTransaction tx = (UserTransaction)ctx.lookup("java:/UserTransaction"); |
| assertEquals(Status.STATUS_NO_TRANSACTION, tx.getStatus()); |
| Region pr = getCache().getRegion(CUSTOMER); |
| Region rr = getCache().getRegion(D_REFERENCE); |
| // test all ops |
| for (int i=0; i<6; i++) { |
| pr.put(new CustId(1), new Customer("name1", "address1")); |
| rr.put("key1", "value1"); |
| tx.begin(); |
| switch (i) { |
| case 0: |
| pr.get(new CustId(1)); |
| rr.get("key1"); |
| break; |
| case 1: |
| pr.put(new CustId(1), new Customer("nameNew", "addressNew")); |
| rr.put("key1", "valueNew"); |
| break; |
| case 2: |
| pr.invalidate(new CustId(1)); |
| rr.invalidate("key1"); |
| break; |
| case 3: |
| pr.destroy(new CustId(1)); |
| rr.destroy("key1"); |
| break; |
| case 4: |
| Map m = new HashMap(); |
| m.put(new CustId(1), new Customer("nameNew", "addressNew")); |
| pr.putAll(m); |
| m = new HashMap(); |
| m.put("key1", "valueNew"); |
| rr.putAll(m); |
| break; |
| case 5: |
| Set s = new HashSet(); |
| s.add(new CustId(1)); |
| pr.getAll(s); |
| s = new HashSet(); |
| s.add("key1"); |
| pr.getAll(s); |
| break; |
| case 6: |
| pr.getEntry(new CustId(1)); |
| rr.getEntry("key1"); |
| break; |
| default: |
| break; |
| } |
| |
| //Putting a string key causes this, the partition resolver |
| //doesn't handle it. |
| addExpectedException("IllegalStateException"); |
| assertEquals(Status.STATUS_ACTIVE, tx.getStatus()); |
| final CountDownLatch latch = new CountDownLatch(1); |
| Thread t = new Thread(new Runnable() { |
| public void run() { |
| Context ctx = getCache().getJNDIContext(); |
| try { |
| UserTransaction tx = (UserTransaction)ctx.lookup("java:/UserTransaction"); |
| } catch (NamingException e) { |
| e.printStackTrace(); |
| } |
| Region pr = getCache().getRegion(CUSTOMER); |
| Region rr = getCache().getRegion(D_REFERENCE); |
| pr.put(new CustId(1), new Customer("name11", "address11")); |
| rr.put("key1", "value1"); |
| latch.countDown(); |
| } |
| }); |
| t.start(); |
| latch.await(); |
| try { |
| pr.put(new CustId(1), new Customer("name11", "address11")); |
| tx.commit(); |
| fail("expected exception not thrown"); |
| } catch (RollbackException e) { |
| } |
| } |
| } |
| |
| public void testBug45556() { |
| Host host = Host.getHost(0); |
| VM accessor = host.getVM(0); |
| VM datastore = host.getVM(1); |
| final String name = getName(); |
| |
| class CountingListener extends CacheListenerAdapter { |
| private int count; |
| @Override |
| public void afterCreate(EntryEvent event) { |
| getLogWriter().info("afterCreate invoked for " + event); |
| count++; |
| } |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| getLogWriter().info("afterUpdate invoked for " + event); |
| count++; |
| } |
| } |
| |
| accessor.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region r = getCache().createRegionFactory(RegionShortcut.REPLICATE_PROXY).create(name); |
| r.getAttributesMutator().addCacheListener(new CountingListener()); |
| return null; |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region r = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(name); |
| r.getAttributesMutator().addCacheListener(new CountingListener()); |
| r.put("key1", "value1"); |
| return null; |
| } |
| }); |
| final TransactionId txid = (TransactionId) accessor.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region r = getCache().getRegion(name); |
| CacheTransactionManager tm = getCache().getCacheTransactionManager(); |
| getCache().getLogger().fine("SWAP:BeginTX"); |
| tm.begin(); |
| r.put("txkey", "txvalue"); |
| return tm.suspend(); |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region rgn = getCache().getRegion(name); |
| assertNull(rgn.get("txkey")); |
| TXManagerImpl txMgr = getGemfireCache().getTxManager(); |
| TXStateProxy tx = txMgr.getHostedTXState((TXId) txid); |
| assertEquals(1, tx.getRegions().size()); |
| for (LocalRegion r : tx.getRegions()) { |
| assertTrue(r instanceof DistributedRegion); |
| TXRegionState rs = tx.readRegion(r); |
| for (Object key : rs.getEntryKeys()) { |
| TXEntryState es = rs.readEntry(key); |
| assertEquals("txkey", key); |
| assertNotNull(es.getValue(key, r, false)); |
| if (key.equals("txkey")) assertTrue(es.isDirty()); |
| } |
| } |
| return null; |
| } |
| }); |
| accessor.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region rgn = getCache().getRegion(name); |
| assertNull(rgn.get("txkey")); |
| CacheTransactionManager mgr = getCache().getCacheTransactionManager(); |
| mgr.resume(txid); |
| mgr.commit(); |
| CountingListener cl = (CountingListener) rgn.getAttributes().getCacheListeners()[0]; |
| assertEquals(0, cl.count); |
| assertEquals("txvalue", rgn.get("txkey")); |
| return null; |
| } |
| }); |
| datastore.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region rgn = getCache().getRegion(name); |
| CountingListener cl = (CountingListener) rgn.getAttributes().getCacheListeners()[0]; |
| assertEquals(2, cl.count); |
| return null; |
| } |
| }); |
| } |
| |
| public void testExpirySuspend_bug45984() { |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| final String regionName = getName(); |
| |
| //create region with expiration |
| vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| RegionFactory<String, String> rf = getCache().createRegionFactory(); |
| rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.LOCAL_DESTROY)); |
| rf.setScope(Scope.DISTRIBUTED_ACK); |
| rf.create(regionName); |
| return null; |
| } |
| }); |
| |
| //create replicate region |
| vm2.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName); |
| return null; |
| } |
| }); |
| |
| vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| final Region<String, String> r = getCache().getRegion(regionName); |
| r.put("key", "value"); |
| r.put("nonTXkey", "nonTXvalue"); |
| getCache().getCacheTransactionManager().begin(); |
| r.put("key", "newvalue"); |
| // wait for entry to expire |
| DistributedTestCase.pause(5000); |
| TransactionId tx = getCache().getCacheTransactionManager().suspend(); |
| // A remote tx will allow expiration to happen on the side that |
| // is not hosting the tx. But it will not allow an expiration |
| // initiated on the hosting jvm. |
| assertFalse(r.containsKey("key")); |
| assertFalse(r.containsKey("nonTXkey")); |
| getCache().getCacheTransactionManager().resume(tx); |
| getCache().getCacheTransactionManager().commit(); |
| WaitCriterion wc2 = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return !r.containsKey("key") && !r.containsKey("nonTXKey"); |
| } |
| |
| @Override |
| public String description() { |
| return "did not expire"; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(wc2, 30000, 5, true); |
| return null; |
| } |
| }); |
| |
| } |
| |
| public void testRemoteFetchVersionMessage() { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final String regionName = getName(); |
| |
| final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| LocalRegion r = (LocalRegion) getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName); |
| r.put("key", "value"); |
| return r.getRegionEntry("key").getVersionStamp().asVersionTag(); |
| } |
| }); |
| |
| vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.EMPTY); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| DistributedRegion r = (DistributedRegion) getCache().createRegion(regionName, af.create()); |
| r.cache.getLogger().info("SWAP:sending:remoteTagRequest"); |
| VersionTag remote = r.fetchRemoteVersionTag("key"); |
| r.cache.getLogger().info("SWAP:remoteTag:"+remote); |
| try { |
| remote = r.fetchRemoteVersionTag("nonExistentKey"); |
| fail("expected exception not thrown"); |
| } catch (EntryNotFoundException e) { |
| } |
| assertEquals(tag, remote); |
| return null; |
| } |
| }); |
| } |
| |
| public void testTransactionWithRemoteVersionFetch() { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| final String regionNameNormal = getName() + "_normal"; |
| final String regionName = getName(); |
| |
| vm0.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region r = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName); |
| Region n = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionNameNormal); |
| n.put("key", "value"); |
| n.put("key", "value1"); |
| n.put("key", "value2"); |
| return null; |
| } |
| }); |
| |
| final VersionTag tag = (VersionTag) vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region r = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName); |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.NORMAL); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| Region n = getCache().createRegion(regionNameNormal, af.create()); |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| mgr.begin(); |
| r.put("key", "value"); |
| assertTrue(mgr.getTXState().isRealDealLocal()); |
| getCache().getLogger().fine("SWAP:doingPutInNormalRegion"); |
| n.put("key", "value"); |
| getCache().getLogger().fine("SWAP:commiting"); |
| mgr.commit(); |
| return ((LocalRegion)n).getRegionEntry("key").getVersionStamp().asVersionTag(); |
| } |
| }); |
| |
| vm0.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region n = getCache().getRegion(regionNameNormal); |
| VersionTag localTag = ((LocalRegion)n).getRegionEntry("key").getVersionStamp().asVersionTag(); |
| assertEquals(tag.getEntryVersion(), localTag.getEntryVersion()); |
| assertEquals(tag.getRegionVersion(), localTag.getRegionVersion()); |
| return null; |
| } |
| }); |
| } |
| |
| public void testBug49398() { |
| disconnectAllFromDS(); |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| final String lrName = getName() + "_lr"; |
| |
| SerializableCallable createRegion = new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| createRegion(false, 1, null); |
| getCache().createRegionFactory(RegionShortcut.LOCAL).create(lrName); |
| return null; |
| } |
| }; |
| |
| vm1.invoke(createRegion); |
| vm2.invoke(createRegion); |
| |
| vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| CacheTransactionManager txMgr = getCache().getCacheTransactionManager(); |
| Region ref = getCache().getRegion(D_REFERENCE); |
| Region lr = getCache().getRegion(lrName); |
| txMgr.begin(); |
| ref.put(new CustId(1), new Customer("name1", "address1")); |
| lr.put("key", "value"); |
| txMgr.commit(); |
| return null; |
| } |
| }); |
| |
| // make sure local region changes are not reflected in the other vm |
| vm2.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region lr = getCache().getRegion(lrName); |
| assertNull(lr.get("key")); |
| return null; |
| } |
| }); |
| } |
| |
| public Object getEntryValue(final CustId custId0, PartitionedRegion cust) { |
| RegionEntry entry = cust.getBucketRegion(custId0).getRegionEntry(custId0); |
| Object value = entry._getValue(); |
| return value; |
| } |
| } |