| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.cache30; |
| |
| import com.gemstone.gemfire.CopyHelper; |
| import com.gemstone.gemfire.cache.*; |
| import com.gemstone.gemfire.cache.query.IndexType; |
| import com.gemstone.gemfire.cache.query.Query; |
| import com.gemstone.gemfire.cache.query.QueryService; |
| import com.gemstone.gemfire.cache.query.SelectResults; |
| import com.gemstone.gemfire.cache.query.transaction.Person; |
| import com.gemstone.gemfire.cache.util.*; |
| import com.gemstone.gemfire.distributed.*; |
| import com.gemstone.gemfire.distributed.internal.*; |
| import java.util.*; |
| |
| import javax.naming.Context; |
| import javax.transaction.UserTransaction; |
| |
| import dunit.*; |
| /** |
| * Test the order of operations done on the farside of a tx. |
| * |
| * @author darrel |
| * @since 5.0 |
| */ |
| public class TXOrderDUnitTest extends CacheTestCase { |
| |
| private transient Region r; |
| private transient DistributedMember otherId; |
| protected transient int invokeCount; |
| |
| public TXOrderDUnitTest(String name) { |
| super(name); |
| } |
| |
| private VM getOtherVm() { |
| Host host = Host.getHost(0); |
| return host.getVM(0); |
| } |
| |
| private void initOtherId() { |
| VM vm = getOtherVm(); |
| vm.invoke(new CacheSerializableRunnable("Connect") { |
| public void run2() throws CacheException { |
| getCache(); |
| } |
| }); |
| this.otherId = (DistributedMember)vm.invoke(TXOrderDUnitTest.class, "getVMDistributedMember"); |
| } |
| private void doCommitOtherVm() { |
| VM vm = getOtherVm(); |
| vm.invoke(new CacheSerializableRunnable("create root") { |
| public void run2() throws CacheException { |
| AttributesFactory af = new AttributesFactory(); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| Region r1 = createRootRegion("r1", af.create()); |
| Region r2 = r1.createSubregion("r2", af.create()); |
| Region r3 = r2.createSubregion("r3", af.create()); |
| CacheTransactionManager ctm = getCache().getCacheTransactionManager(); |
| ctm.begin(); |
| r2.put("b", "value1"); |
| r3.put("c", "value2"); |
| r1.put("a", "value3"); |
| r1.put("a2", "value4"); |
| r3.put("c2", "value5"); |
| r2.put("b2", "value6"); |
| ctm.commit(); |
| } |
| }); |
| } |
| |
| public static DistributedMember getVMDistributedMember() { |
| return InternalDistributedSystem.getAnyInstance().getDistributedMember(); |
| } |
| |
| ////////////////////// Test Methods ////////////////////// |
| |
| List expectedKeys; |
| int clCount = 0; |
| |
| Object getCurrentExpectedKey() { |
| Object result = this.expectedKeys.get(this.clCount); |
| this.clCount += 1; |
| return result; |
| } |
| /** |
| * make sure listeners get invoked in correct order on far side of tx |
| */ |
| public void testFarSideOrder() throws CacheException { |
| initOtherId(); |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| CacheListener cl1 = new CacheListenerAdapter() { |
| public void afterCreate(EntryEvent e) { |
| assertEquals(getCurrentExpectedKey(), e.getKey()); |
| } |
| }; |
| af.addCacheListener(cl1); |
| Region r1 = createRootRegion("r1", af.create()); |
| Region r2 = r1.createSubregion("r2", af.create()); |
| r2.createSubregion("r3", af.create()); |
| |
| TransactionListener tl1 = new TransactionListenerAdapter() { |
| public void afterCommit(TransactionEvent e) { |
| assertEquals(6, e.getEvents().size()); |
| ArrayList keys = new ArrayList(); |
| Iterator it = e.getEvents().iterator(); |
| while (it.hasNext()) { |
| EntryEvent ee = (EntryEvent)it.next(); |
| keys.add(ee.getKey()); |
| assertEquals(null, ee.getCallbackArgument()); |
| assertEquals(true, ee.isCallbackArgumentAvailable()); |
| } |
| assertEquals(TXOrderDUnitTest.this.expectedKeys, keys); |
| TXOrderDUnitTest.this.invokeCount = 1; |
| } |
| }; |
| CacheTransactionManager ctm = getCache().getCacheTransactionManager(); |
| ctm.addListener(tl1); |
| |
| this.invokeCount = 0; |
| this.clCount = 0; |
| this.expectedKeys = Arrays.asList(new String[]{"b", "c", "a", "a2", "c2", "b2"}); |
| doCommitOtherVm(); |
| assertEquals(1, this.invokeCount); |
| assertEquals(6, this.clCount); |
| } |
| |
| /** |
| * test bug#40870 |
| * @throws Exception |
| */ |
| public void _testFarSideOpForLoad() throws Exception { |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| vm1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| CacheListener cl1 = new CacheListenerAdapter() { |
| public void afterCreate(EntryEvent e) { |
| assertTrue(e.getOperation().isLocalLoad()); |
| } |
| }; |
| af.addCacheListener(cl1); |
| CacheLoader cl = new CacheLoader() { |
| public Object load(LoaderHelper helper) throws CacheLoaderException { |
| getLogWriter().info("Loading value:"+helper.getKey()+"_value"); |
| return helper.getKey()+"_value"; |
| } |
| public void close() { |
| } |
| }; |
| af.setCacheLoader(cl); |
| createRootRegion("r1", af.create()); |
| return null; |
| } |
| }); |
| |
| vm2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| CacheListener cl1 = new CacheListenerAdapter() { |
| public void afterCreate(EntryEvent e) { |
| getLogWriter().info("op:"+e.getOperation().toString()); |
| assertTrue(!e.getOperation().isLocalLoad()); |
| } |
| }; |
| af.addCacheListener(cl1); |
| createRootRegion("r1", af.create()); |
| return null; |
| } |
| }); |
| |
| vm1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region r = getRootRegion("r1"); |
| getCache().getCacheTransactionManager().begin(); |
| r.get("obj_2"); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| } |
| |
| public void testInternalRegionNotExposed() { |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| SerializableCallable createRegion = new SerializableCallable() { |
| public Object call() throws Exception { |
| ExposedRegionTransactionListener tl = new ExposedRegionTransactionListener(); |
| CacheTransactionManager ctm = getCache().getCacheTransactionManager(); |
| ctm.addListener(tl); |
| ExposedRegionCacheListener cl = new ExposedRegionCacheListener(); |
| AttributesFactory af = new AttributesFactory(); |
| PartitionAttributes pa = new PartitionAttributesFactory() |
| .setRedundantCopies(1) |
| .setTotalNumBuckets(1) |
| .create(); |
| af.setPartitionAttributes(pa); |
| af.addCacheListener(cl); |
| Region pr = createRootRegion("testTxEventForRegion", af.create()); |
| return null; |
| } |
| }; |
| vm1.invoke(createRegion); |
| vm2.invoke(createRegion); |
| vm1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region pr = getRootRegion("testTxEventForRegion"); |
| CacheTransactionManager ctm = getCache().getCacheTransactionManager(); |
| pr.put(2, "tw"); |
| pr.put(3, "three"); |
| pr.put(4, "four"); |
| ctm.begin(); |
| pr.put(1, "one"); |
| pr.put(2, "two"); |
| pr.invalidate(3); |
| pr.destroy(4); |
| ctm.commit(); |
| return null; |
| } |
| }); |
| SerializableCallable verifyListener = new SerializableCallable() { |
| public Object call() throws Exception { |
| Region pr = getRootRegion("testTxEventForRegion"); |
| CacheTransactionManager ctm = getCache().getCacheTransactionManager(); |
| ExposedRegionTransactionListener tl = (ExposedRegionTransactionListener)ctm.getListeners()[0]; |
| ExposedRegionCacheListener cl = (ExposedRegionCacheListener)pr.getAttributes().getCacheListeners()[0]; |
| assertFalse(tl.exceptionOccurred); |
| assertFalse(cl.exceptionOccurred); |
| return null; |
| } |
| }; |
| vm1.invoke(verifyListener); |
| vm2.invoke(verifyListener); |
| } |
| |
| class ExposedRegionTransactionListener extends TransactionListenerAdapter { |
| private boolean exceptionOccurred = false; |
| @Override |
| public void afterCommit(TransactionEvent event) { |
| List<CacheEvent<?, ?>> events = event.getEvents(); |
| for (CacheEvent<?, ?>e : events) { |
| if (!"/testTxEventForRegion".equals(e.getRegion().getFullPath())) { |
| exceptionOccurred = true; |
| } |
| } |
| } |
| } |
| class ExposedRegionCacheListener extends CacheListenerAdapter { |
| private boolean exceptionOccurred = false; |
| @Override |
| public void afterCreate(EntryEvent event) { |
| verifyRegion(event); |
| } |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| verifyRegion(event); |
| } |
| private void verifyRegion(EntryEvent event) { |
| if (!"/testTxEventForRegion".equals(event.getRegion().getFullPath())) { |
| exceptionOccurred = true; |
| } |
| } |
| } |
| |
| private final int TEST_PUT = 0; |
| private final int TEST_INVALIDATE = 1; |
| private final int TEST_DESTROY = 2; |
| /** |
| * verify that queries on indexes work with transaction |
| * @see bug#40842 |
| * @throws Exception |
| */ |
| public void testFarSideIndexOnPut() throws Exception { |
| doTest(TEST_PUT); |
| } |
| |
| public void testFarSideIndexOnInvalidate() throws Exception { |
| doTest(TEST_INVALIDATE); |
| } |
| |
| public void testFarSideIndexOnDestroy() throws Exception { |
| doTest(TEST_DESTROY); |
| } |
| |
| private void doTest(final int op) throws Exception { |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| SerializableCallable createRegionAndIndex = new SerializableCallable() { |
| public Object call() throws Exception { |
| AttributesFactory af = new AttributesFactory(); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| Region region = createRootRegion("sample", af.create()); |
| QueryService qs = getCache().getQueryService(); |
| qs.createIndex("foo", IndexType.FUNCTIONAL, "age", "/sample"); |
| return null; |
| } |
| }; |
| vm1.invoke(createRegionAndIndex); |
| vm2.invoke(createRegionAndIndex); |
| |
| //do transactional puts in vm1 |
| vm1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Context ctx = getCache().getJNDIContext(); |
| UserTransaction utx = (UserTransaction)ctx.lookup("java:/UserTransaction"); |
| Region region = getRootRegion("sample"); |
| Integer x = new Integer(0); |
| utx.begin(); |
| region.create(x, new Person("xyz", 45)); |
| utx.commit(); |
| QueryService qs = getCache().getQueryService(); |
| Query q = qs.newQuery("select * from /sample where age < 50"); |
| assertEquals(1, ((SelectResults)q.execute()).size()); |
| Person dsample = (Person)CopyHelper.copy(region.get(x)); |
| dsample.setAge(55); |
| utx.begin(); |
| switch (op) { |
| case TEST_PUT: |
| region.put(x, dsample); |
| break; |
| case TEST_INVALIDATE: |
| region.invalidate(x); |
| break; |
| case TEST_DESTROY: |
| region.destroy(x); |
| break; |
| default: |
| fail("unknown op"); |
| } |
| utx.commit(); |
| assertEquals(0, ((SelectResults)q.execute()).size()); |
| return null; |
| } |
| }); |
| |
| //run query and verify results in other vm |
| vm2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| QueryService qs = getCache().getQueryService(); |
| Query q = qs.newQuery("select * from /sample where age < 50"); |
| assertEquals(0, ((SelectResults)q.execute()).size()); |
| return null; |
| } |
| }); |
| } |
| |
| public void testBug43353() { |
| Host host = Host.getHost(0); |
| VM vm1 = host.getVM(0); |
| VM vm2 = host.getVM(1); |
| |
| SerializableCallable createRegion = new SerializableCallable() { |
| public Object call() throws Exception { |
| getCache().createRegionFactory(RegionShortcut.REPLICATE).create(testName); |
| return null; |
| } |
| }; |
| |
| vm1.invoke(createRegion); |
| vm2.invoke(createRegion); |
| |
| vm1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region r = getCache().getRegion(testName); |
| r.put("ikey", "value"); |
| getCache().getCacheTransactionManager().begin(); |
| r.put("key1", new byte[20]); |
| r.invalidate("ikey"); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| vm2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region r = getCache().getRegion(testName); |
| Object v = r.get("key1"); |
| assertNotNull(v); |
| assertTrue(v instanceof byte[]); |
| assertNull(r.get("ikey")); |
| return null; |
| } |
| }); |
| } |
| } |