| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; |
| import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.CacheEvent; |
| import org.apache.geode.cache.CacheListener; |
| import org.apache.geode.cache.CacheWriter; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.InterestPolicy; |
| import org.apache.geode.cache.PartitionAttributesFactory; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionEvent; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.SubscriptionAttributes; |
| import org.apache.geode.cache.TransactionEvent; |
| import org.apache.geode.cache.TransactionListener; |
| import org.apache.geode.cache.TransactionWriter; |
| import org.apache.geode.cache.client.ClientCache; |
| import org.apache.geode.cache.client.ClientCacheFactory; |
| import org.apache.geode.cache.client.ClientRegionFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.cache.execute.Function; |
| import org.apache.geode.cache.execute.FunctionContext; |
| import org.apache.geode.cache.query.CqAttributes; |
| import org.apache.geode.cache.query.CqAttributesFactory; |
| import org.apache.geode.cache.query.CqEvent; |
| import org.apache.geode.cache.query.CqListener; |
| import org.apache.geode.cache.query.CqQuery; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.cache.util.CacheWriterAdapter; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.internal.cache.execute.CustomerIDPartitionResolver; |
| import org.apache.geode.internal.cache.execute.data.CustId; |
| import org.apache.geode.internal.cache.execute.data.Customer; |
| import org.apache.geode.internal.cache.execute.data.Order; |
| import org.apache.geode.internal.cache.execute.data.OrderId; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.SerializableCallable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| @Category({ClientSubscriptionTest.class}) |
| public class RemoteCQTransactionDUnitTest extends JUnit4CacheTestCase { |
| private final VM accessor = VM.getVM(0); |
| private final VM dataStore = VM.getVM(1); |
| private final VM client = VM.getVM(2); |
| private final CustId expectedCustId = new CustId(6); |
| private final Customer expectedCustomer = new Customer("customer6", "address6"); |
| |
| private static final String CUSTOMER = "custRegion"; |
| private static final String ORDER = "orderRegion"; |
| private static final String D_REFERENCE = "distrReference"; |
| |
| private final SerializableCallable getNumberOfTXInProgress = new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| return mgr.hostedTransactionsInProgressForTest(); |
| } |
| }; |
| |
| private final SerializableCallable verifyNoTxState = new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| final TXManagerImpl mgr = getGemfireCache().getTxManager(); |
| await().untilAsserted(() -> assertEquals(0, mgr.hostedTransactionsInProgressForTest())); |
| return null; |
| } |
| }; |
| |
| protected enum OP { |
| PUT, GET, DESTROY, INVALIDATE, KEYS, VALUES, ENTRIES, PUTALL, GETALL, REMOVEALL |
| } |
| |
| @Override |
| public final void preTearDownCacheTestCase() throws Exception { |
| try { |
| Invoke.invokeInEveryVM(verifyNoTxState); |
| } finally { |
| closeAllCache(); |
| } |
| } |
| |
| void createRegion(boolean accessor, int redundantCopies, InterestPolicy interestPolicy) { |
| AttributesFactory af = new AttributesFactory(); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| getCache().createRegion(D_REFERENCE, af.create()); |
| af = new AttributesFactory(); |
| af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| if (interestPolicy != null) { |
| af.setSubscriptionAttributes(new SubscriptionAttributes(interestPolicy)); |
| } |
| af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>() |
| .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1) |
| .setPartitionResolver(new CustomerIDPartitionResolver("resolver1")) |
| .setRedundantCopies(redundantCopies).create()); |
| getCache().createRegion(CUSTOMER, af.create()); |
| af.setPartitionAttributes(new PartitionAttributesFactory<OrderId, Order>().setTotalNumBuckets(4) |
| .setLocalMaxMemory(accessor ? 0 : 1) |
| .setPartitionResolver(new CustomerIDPartitionResolver("resolver2")) |
| .setRedundantCopies(redundantCopies).setColocatedWith(CUSTOMER).create()); |
| getCache().createRegion(ORDER, af.create()); |
| } |
| |
| protected boolean getConcurrencyChecksEnabled() { |
| return false; |
| } |
| |
| void populateData() { |
| Region custRegion = getCache().getRegion(CUSTOMER); |
| Region orderRegion = getCache().getRegion(ORDER); |
| Region refRegion = getCache().getRegion(D_REFERENCE); |
| for (int i = 0; i < 5; i++) { |
| CustId custId = new CustId(i); |
| Customer customer = new Customer("customer" + i, "address" + i); |
| OrderId orderId = new OrderId(i, custId); |
| Order order = new Order("order" + i); |
| custRegion.put(custId, customer); |
| orderRegion.put(orderId, order); |
| refRegion.put(custId, customer); |
| } |
| } |
| |
| protected void initAccessorAndDataStore(VM accessor, VM dataStore, final int redundantCopies) { |
| accessor.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| createRegion(true/* accessor */, redundantCopies, null); |
| return null; |
| } |
| }); |
| |
| dataStore.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| createRegion(false/* accessor */, redundantCopies, null); |
| populateData(); |
| return null; |
| } |
| }); |
| } |
| |
| protected void initAccessorAndDataStore(VM accessor, VM dataStore1, VM dataStore2, |
| final int redundantCopies) { |
| dataStore2.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| createRegion(false/* accessor */, redundantCopies, null); |
| return null; |
| } |
| }); |
| |
| initAccessorAndDataStore(accessor, dataStore1, redundantCopies); |
| } |
| |
| private void initAccessorAndDataStoreWithInterestPolicy(VM accessor, VM dataStore1, VM dataStore2, |
| final int redundantCopies) { |
| dataStore2.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| createRegion(false/* accessor */, redundantCopies, InterestPolicy.ALL); |
| return null; |
| } |
| }); |
| |
| initAccessorAndDataStore(accessor, dataStore1, redundantCopies); |
| } |
| |
| |
| void validateContains(CustId custId, Set<OrderId> orderId, boolean doesIt) { |
| validateContains(custId, orderId, doesIt, doesIt); |
| } |
| |
| void validateContains(CustId custId, Set<OrderId> ordersSet, boolean containsKey, |
| boolean containsValue) { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Order> refRegion = getCache().getRegion(D_REFERENCE); |
| boolean rContainsKC = custRegion.containsKey(custId); |
| boolean rContainsKO = containsKey; |
| for (OrderId o : ordersSet) { |
| getGemfireCache().getLogger() |
| .fine("SWAP:rContainsKO:" + rContainsKO + " containsKey:" + orderRegion.containsKey(o)); |
| rContainsKO = rContainsKO && orderRegion.containsKey(o); |
| } |
| boolean rContainsKR = refRegion.containsKey(custId); |
| |
| boolean rContainsVC = custRegion.containsValueForKey(custId); |
| boolean rContainsVO = containsValue; |
| for (OrderId o : ordersSet) { |
| rContainsVO = rContainsVO && orderRegion.containsValueForKey(o); |
| } |
| boolean rContainsVR = refRegion.containsValueForKey(custId); |
| |
| assertEquals(containsKey, rContainsKC); |
| assertEquals(containsKey, rContainsKO); |
| assertEquals(containsKey, rContainsKR); |
| assertEquals(containsValue, rContainsVR); |
| assertEquals(containsValue, rContainsVC); |
| assertEquals(containsValue, rContainsVO); |
| |
| if (containsKey) { |
| Region.Entry eC = custRegion.getEntry(custId); |
| for (OrderId o : ordersSet) { |
| assertNotNull(orderRegion.getEntry(o)); |
| } |
| Region.Entry eR = refRegion.getEntry(custId); |
| assertNotNull(eC); |
| assertNotNull(eR); |
| |
| } else { |
| try { |
| Region.Entry eC = custRegion.getEntry(custId); |
| assertNull("should have had an EntryNotFoundException:" + eC, eC); |
| |
| } catch (EntryNotFoundException enfe) { |
| // this is what we expect |
| } |
| try { |
| for (OrderId o : ordersSet) { |
| assertNull("should have had an EntryNotFoundException:" + orderRegion.getEntry(o), |
| orderRegion.getEntry(o)); |
| } |
| |
| } catch (EntryNotFoundException enfe) { |
| // this is what we expect |
| } |
| try { |
| Region.Entry eR = refRegion.getEntry(custId); |
| assertNull("should have had an EntryNotFoundException:" + eR, eR); |
| } catch (EntryNotFoundException enfe) { |
| // this is what we expect |
| } |
| } |
| } |
| |
| void verifyAfterCommit(OP op) { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| OrderId orderId2 = new OrderId(2, custId); |
| OrderId orderId3 = new OrderId(3, custId); |
| Customer expectedCust; |
| Order expectedOrder; |
| Order expectedOrder2; |
| Order expectedOrder3; |
| Customer expectedRef; |
| switch (op) { |
| case PUT: |
| expectedCust = new Customer("foo", "bar"); |
| expectedOrder = new Order("fooOrder"); |
| expectedOrder2 = new Order("fooOrder2"); |
| expectedOrder3 = new Order("fooOrder3"); |
| expectedRef = expectedCust; |
| assertNotNull(custRegion.getEntry(custId)); |
| assertEquals(expectedCust, custRegion.getEntry(custId).getValue()); |
| |
| assertNotNull(refRegion.getEntry(custId)); |
| assertEquals(expectedRef, refRegion.getEntry(custId).getValue()); |
| |
| break; |
| case GET: |
| expectedCust = custRegion.get(custId); |
| expectedOrder = orderRegion.get(orderId); |
| expectedRef = refRegion.get(custId); |
| validateContains(custId, Collections.singleton(orderId), true); |
| break; |
| case DESTROY: |
| assertTrue(!custRegion.containsKey(custId)); |
| assertTrue(!orderRegion.containsKey(orderId)); |
| assertTrue(!refRegion.containsKey(custId)); |
| validateContains(custId, Collections.singleton(orderId), false); |
| break; |
| case INVALIDATE: |
| boolean validateContainsKey = true; |
| if (!((GemFireCacheImpl) custRegion.getCache()).isClient()) { |
| assertTrue(custRegion.containsKey(custId)); |
| assertTrue(orderRegion.containsKey(orderId)); |
| assertTrue(refRegion.containsKey(custId)); |
| } |
| assertNull(custRegion.get(custId)); |
| assertNull(orderRegion.get(orderId)); |
| assertNull(refRegion.get(custId)); |
| validateContains(custId, Collections.singleton(orderId), validateContainsKey, false); |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| } |
| |
| void verifyAfterRollback(OP op) { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| assertNotNull(custRegion); |
| assertNotNull(orderRegion); |
| assertNotNull(refRegion); |
| |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| OrderId orderId2 = new OrderId(2, custId); |
| OrderId orderId3 = new OrderId(3, custId); |
| Customer expectedCust; |
| Order expectedOrder; |
| Customer expectedRef; |
| switch (op) { |
| case PUT: |
| expectedCust = new Customer("customer1", "address1"); |
| expectedOrder = new Order("order1"); |
| expectedRef = new Customer("customer1", "address1"); |
| assertEquals(expectedCust, custRegion.getEntry(custId).getValue()); |
| assertEquals(expectedOrder, orderRegion.getEntry(orderId).getValue()); |
| getCache().getLogger().info("SWAP:verifyRollback:" + orderRegion); |
| getCache().getLogger().info("SWAP:verifyRollback:" + orderRegion.getEntry(orderId2)); |
| assertNull(getGemfireCache().getTXMgr().getTXState()); |
| assertNull("" + orderRegion.getEntry(orderId2), orderRegion.getEntry(orderId2)); |
| assertNull(orderRegion.getEntry(orderId3)); |
| assertNull(orderRegion.get(orderId2)); |
| assertNull(orderRegion.get(orderId3)); |
| assertEquals(expectedRef, refRegion.getEntry(custId).getValue()); |
| validateContains(custId, Collections.singleton(orderId), true); |
| break; |
| case GET: |
| expectedCust = custRegion.getEntry(custId).getValue(); |
| expectedOrder = orderRegion.getEntry(orderId).getValue(); |
| expectedRef = refRegion.getEntry(custId).getValue(); |
| validateContains(custId, Collections.singleton(orderId), true); |
| break; |
| case DESTROY: |
| assertTrue(!custRegion.containsKey(custId)); |
| assertTrue(!orderRegion.containsKey(orderId)); |
| assertTrue(!refRegion.containsKey(custId)); |
| validateContains(custId, Collections.singleton(orderId), true); |
| break; |
| case INVALIDATE: |
| assertTrue(custRegion.containsKey(custId)); |
| assertTrue(orderRegion.containsKey(orderId)); |
| assertTrue(refRegion.containsKey(custId)); |
| assertNull(custRegion.get(custId)); |
| assertNull(orderRegion.get(orderId)); |
| assertNull(refRegion.get(custId)); |
| validateContains(custId, Collections.singleton(orderId), true, true); |
| break; |
| default: |
| throw new IllegalStateException(); |
| } |
| } |
| |
| abstract static class CacheCallback { |
| |
| protected boolean isAccessor; |
| protected Exception ex = null; |
| |
| protected void verifyOrigin(EntryEvent event) { |
| try { |
| assertEquals(!isAccessor, event.isOriginRemote()); |
| } catch (Exception e) { |
| ex = e; |
| } |
| } |
| |
| protected void verifyPutAll(EntryEvent event) { |
| CustId knownCustId = new CustId(1); |
| OrderId knownOrderId = new OrderId(2, knownCustId); |
| if (event.getKey().equals(knownOrderId)) { |
| try { |
| assertTrue(event.getOperation().isPutAll()); |
| assertNotNull(event.getTransactionId()); |
| } catch (Exception e) { |
| ex = e; |
| } |
| } |
| } |
| } |
| |
| static class TestCacheListener extends CacheCallback implements CacheListener { |
| TestCacheListener(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| verifyOrigin(event); |
| verifyPutAll(event); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| verifyOrigin(event); |
| verifyPutAll(event); |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| verifyOrigin(event); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| verifyOrigin(event); |
| } |
| |
| @Override |
| public void afterRegionClear(RegionEvent event) {} |
| |
| @Override |
| public void afterRegionCreate(RegionEvent event) {} |
| |
| @Override |
| public void afterRegionDestroy(RegionEvent event) {} |
| |
| @Override |
| public void afterRegionInvalidate(RegionEvent event) {} |
| |
| @Override |
| public void afterRegionLive(RegionEvent event) {} |
| |
| @Override |
| public void close() {} |
| } |
| |
| class TestCacheWriter extends CacheCallback implements CacheWriter { |
| |
| private volatile boolean wasFired = false; |
| |
| TestCacheWriter(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| |
| @Override |
| public void beforeCreate(EntryEvent event) throws CacheWriterException { |
| getGemfireCache().getLogger() |
| .info("SWAP:beforeCreate:" + event + " op:" + event.getOperation()); |
| verifyOrigin(event); |
| verifyPutAll(event); |
| setFired(event); |
| } |
| |
| public void setFired(EntryEvent event) { |
| wasFired = true; |
| } |
| |
| @Override |
| public void beforeUpdate(EntryEvent event) throws CacheWriterException { |
| getGemfireCache().getLogger() |
| .info("SWAP:beforeCreate:" + event + " op:" + event.getOperation()); |
| verifyOrigin(event); |
| verifyPutAll(event); |
| setFired(event); |
| } |
| |
| @Override |
| public void beforeDestroy(EntryEvent event) throws CacheWriterException { |
| verifyOrigin(event); |
| setFired(event); |
| } |
| |
| @Override |
| public void beforeRegionClear(RegionEvent event) throws CacheWriterException { |
| setFired(null); |
| } |
| |
| @Override |
| public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException { |
| setFired(null); |
| } |
| |
| @Override |
| public void close() {} |
| } |
| |
| abstract static class txCallback { |
| |
| protected boolean isAccessor; |
| protected Exception ex = null; |
| |
| protected void verify(TransactionEvent txEvent) { |
| for (CacheEvent<?, ?> e : txEvent.getEvents()) { |
| verifyOrigin(e); |
| verifyPutAll(e); |
| } |
| } |
| |
| private void verifyOrigin(CacheEvent event) { |
| try { |
| assertEquals(true, event.isOriginRemote()); // change to !isAccessor after fixing #41498 |
| } catch (Exception e) { |
| ex = e; |
| } |
| } |
| |
| private void verifyPutAll(CacheEvent p_event) { |
| if (!(p_event instanceof EntryEvent)) { |
| return; |
| } |
| EntryEvent event = (EntryEvent) p_event; |
| CustId knownCustId = new CustId(1); |
| OrderId knownOrderId = new OrderId(2, knownCustId); |
| if (event.getKey().equals(knownOrderId)) { |
| try { |
| assertTrue(event.getOperation().isPutAll()); |
| assertNotNull(event.getTransactionId()); |
| } catch (Exception e) { |
| ex = e; |
| } |
| } |
| } |
| } |
| |
| static class TestTxListener extends txCallback implements TransactionListener { |
| |
| private boolean listenerInvoked; |
| |
| TestTxListener(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| |
| @Override |
| public void afterCommit(TransactionEvent event) { |
| listenerInvoked = true; |
| verify(event); |
| } |
| |
| @Override |
| public void afterFailedCommit(TransactionEvent event) { |
| verify(event); |
| } |
| |
| @Override |
| public void afterRollback(TransactionEvent event) { |
| listenerInvoked = true; |
| verify(event); |
| } |
| |
| public boolean isListenerInvoked() { |
| return this.listenerInvoked; |
| } |
| |
| @Override |
| public void close() {} |
| } |
| |
| static class TestTxWriter extends txCallback implements TransactionWriter { |
| public TestTxWriter(boolean isAccessor) { |
| this.isAccessor = isAccessor; |
| } |
| |
| @Override |
| public void beforeCommit(TransactionEvent event) { |
| verify(event); |
| } |
| |
| @Override |
| public void close() {} |
| } |
| |
| class TXFunction implements Function { |
| |
| static final String id = "TXFunction"; |
| |
| @Override |
| public void execute(FunctionContext context) { |
| Region r = null; |
| r = getGemfireCache().getRegion(CUSTOMER); |
| getGemfireCache().getLogger().fine("SWAP:callingPut"); |
| r.put(expectedCustId, expectedCustomer); |
| GemFireCacheImpl.getInstance().getLogger().warning(" XXX DOIN A PUT ", new Exception()); |
| context.getResultSender().lastResult(Boolean.TRUE); |
| } |
| |
| @Override |
| public String getId() { |
| return id; |
| } |
| |
| @Override |
| public boolean hasResult() { |
| return true; |
| } |
| |
| @Override |
| public boolean optimizeForWrite() { |
| return true; |
| } |
| |
| @Override |
| public boolean isHA() { |
| return false; |
| } |
| } |
| |
| enum Executions { |
| OnRegion, OnMember |
| } |
| |
| /** |
| * @return first key found on the given member |
| */ |
| CustId getKeyOnMember(final DistributedMember owner, PartitionedRegion pr) { |
| CustId retVal = null; |
| for (int i = 0; i < 5; i++) { |
| CustId custId = new CustId(i); |
| DistributedMember member = pr.getOwnerForKey(pr.getKeyInfo(custId)); |
| if (member.equals(owner)) { |
| retVal = custId; |
| break; |
| } |
| } |
| return retVal; |
| } |
| |
| protected Set<Customer> getCustomerSet(int size) { |
| Set<Customer> expectedSet = new HashSet<Customer>(); |
| for (int i = 0; i < size; i++) { |
| expectedSet.add(new Customer("customer" + i, "address" + i)); |
| } |
| return expectedSet; |
| } |
| |
| Set<CustId> getCustIdSet(int size) { |
| Set<CustId> expectedSet = new HashSet<CustId>(); |
| for (int i = 0; i < size; i++) { |
| expectedSet.add(new CustId(i)); |
| } |
| return expectedSet; |
| } |
| |
| static class OneUpdateCacheListener extends CacheListenerAdapter { |
| |
| boolean success = false; |
| |
| public boolean getSuccess() { |
| return success; |
| } |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| fail("create not expected"); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| if (!success) { |
| System.out.println("WE WIN!"); |
| success = true; |
| } else { |
| fail("Should have only had one update"); |
| } |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| fail("destroy not expected"); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| fail("invalidate not expected"); |
| } |
| } |
| |
| static class OneDestroyAndThenOneCreateCacheWriter extends CacheWriterAdapter { |
| |
| private boolean oneDestroy; |
| private boolean oneCreate; |
| |
| public void checkSuccess() throws Exception { |
| if (oneDestroy && oneCreate) { |
| // chill |
| } else { |
| fail("Didn't get both events. oneDestroy=" + oneDestroy + " oneCreate=" + oneCreate); |
| } |
| } |
| |
| @Override |
| public void beforeCreate(EntryEvent event) throws CacheWriterException { |
| if (!oneDestroy) { |
| fail("destroy should have arrived in writer before create"); |
| } else { |
| if (oneCreate) { |
| fail("more than one create detected! expecting destroy then create"); |
| } else { |
| oneCreate = true; |
| } |
| } |
| } |
| |
| @Override |
| public void beforeUpdate(EntryEvent event) throws CacheWriterException { |
| fail("update not expected"); |
| } |
| |
| @Override |
| public void beforeDestroy(EntryEvent event) throws CacheWriterException { |
| if (oneDestroy) { |
| fail("only one destroy expected"); |
| } else { |
| if (oneCreate) { |
| fail("destroy is supposed to precede create"); |
| } else { |
| oneDestroy = true; |
| } |
| } |
| } |
| |
| } |
| |
| protected Integer startServer(VM vm) { |
| return (Integer) vm.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| int port = getRandomAvailableTCPPort(); |
| CacheServer s = getCache().addCacheServer(); |
| s.setPort(port); |
| s.start(); |
| return port; |
| } |
| }); |
| } |
| |
| protected void createClientRegion(VM vm, final int port, final boolean isEmpty, final boolean ri, |
| final boolean CQ) { |
| vm.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| ClientCacheFactory ccf = new ClientCacheFactory(); |
| ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port); |
| ccf.setPoolSubscriptionEnabled(true); |
| ccf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); |
| ClientCache cCache = getClientCache(ccf); |
| ClientRegionFactory<Integer, String> crf = cCache.createClientRegionFactory( |
| isEmpty ? ClientRegionShortcut.PROXY : ClientRegionShortcut.CACHING_PROXY); |
| crf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); |
| crf.addCacheListener(new ClientListener()); |
| Region r = crf.create(D_REFERENCE); |
| Region cust = crf.create(CUSTOMER); |
| Region order = crf.create(ORDER); |
| if (ri) { |
| r.registerInterestRegex(".*"); |
| cust.registerInterestRegex(".*"); |
| order.registerInterestRegex(".*"); |
| } |
| if (CQ) { |
| CqAttributesFactory cqf = new CqAttributesFactory(); |
| cqf.addCqListener(new ClientCQListener()); |
| CqAttributes ca = cqf.create(); |
| cCache.getQueryService().newCq("SELECT * FROM " + cust.getFullPath(), ca).execute(); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| protected static class ClientCQListener implements CqListener { |
| |
| boolean invoked = false; |
| |
| @Override |
| public void onError(CqEvent aCqEvent) {} |
| |
| @Override |
| public void onEvent(CqEvent aCqEvent) { |
| invoked = true; |
| } |
| |
| @Override |
| public void close() {} |
| } |
| |
| protected static class ClientListener extends CacheListenerAdapter { |
| |
| boolean invoked = false; |
| int invokeCount = 0; |
| int invalidateCount = 0; |
| int putCount = 0; |
| int destroyCount = 0; |
| boolean putAllOp = false; |
| boolean isOriginRemote = false; |
| int creates; |
| int updates; |
| |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER CREATE:" + event.getKey()); |
| invoked = true; |
| invokeCount++; |
| putCount++; |
| creates++; |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER CREATE:" + event.getKey() |
| + " isPutAll:" + event.getOperation().isPutAll() + " op:" + event.getOperation()); |
| putAllOp = event.getOperation().isPutAll(); |
| isOriginRemote = event.isOriginRemote(); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER UPDATE:" + event.getKey() |
| + " isPutAll:" + event.getOperation().isPutAll() + " op:" + event.getOperation()); |
| putAllOp = event.getOperation().isPutAll(); |
| invoked = true; |
| invokeCount++; |
| putCount++; |
| updates++; |
| isOriginRemote = event.isOriginRemote(); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER UPDATE:" + event.getKey()); |
| invoked = true; |
| invokeCount++; |
| invalidateCount++; |
| isOriginRemote = event.isOriginRemote(); |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| event.getRegion().getCache().getLogger().warning("ZZZ AFTER DESTROY:" + event.getKey()); |
| invoked = true; |
| invokeCount++; |
| destroyCount++; |
| isOriginRemote = event.isOriginRemote(); |
| } |
| |
| public void reset() { |
| invoked = false; |
| invokeCount = 0; |
| invalidateCount = 0; |
| putCount = 0; |
| isOriginRemote = false; |
| creates = 0; |
| updates = 0; |
| } |
| } |
| |
| protected static class ServerListener extends CacheListenerAdapter { |
| |
| boolean invoked = false; |
| int creates; |
| int updates; |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| invoked = true; |
| creates++; |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| invoked = true; |
| updates++; |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| invoked = true; |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| invoked = true; |
| } |
| } |
| |
| @Test |
| public void testTXWithCQCommitInDatastoreCQ() throws Exception { |
| initAccessorAndDataStore(accessor, dataStore, 0); |
| int port = startServer(dataStore); |
| |
| createClientRegion(client, port, false, true, true); |
| dataStore.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| getCache().getCacheTransactionManager().begin(); |
| custRegion.put(custId, new Customer("foo", "bar")); |
| orderRegion.put(orderId, new Order("fooOrder")); |
| refRegion.put(custId, new Customer("foo", "bar")); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| client.invoke(this::verifyCustomerRegion); |
| } |
| |
| @Test |
| public void testTXWithCQCommitInDatastoreConnectedToAccessorCQ() throws Exception { |
| initAccessorAndDataStore(accessor, dataStore, 0); |
| int port = startServer(accessor); |
| |
| createClientRegion(client, port, false, true, true); |
| dataStore.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| getCache().getCacheTransactionManager().begin(); |
| custRegion.put(custId, new Customer("foo", "bar")); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| client.invoke(this::verifyCustomerRegion); |
| } |
| |
| @Test |
| public void testTXWithCQCommitInDatastoreConnectedToDatastoreCQ() throws Exception { |
| initAccessorAndDataStore(accessor, dataStore, 0); |
| int port = startServer(dataStore); |
| |
| createClientRegion(client, port, false, true, true); |
| dataStore.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| getCache().getCacheTransactionManager().begin(); |
| custRegion.put(custId, new Customer("foo", "bar")); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| client.invoke(this::verifyCustomerRegion); |
| } |
| |
| @Test |
| public void testTXWithCQCommitInAccessorConnectedToDatastoreCQ() throws Exception { |
| initAccessorAndDataStore(accessor, dataStore, 0); |
| int port = startServer(dataStore); |
| |
| createClientRegion(client, port, false, true, true); |
| accessor.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| getCache().getCacheTransactionManager().begin(); |
| custRegion.put(custId, new Customer("foo", "bar")); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| client.invoke(this::verifyCustomerRegion); |
| } |
| |
| @Test |
| public void testTXWithCQCommitInAccessorConnectedToAccessorCQ() throws Exception { |
| initAccessorAndDataStore(accessor, dataStore, 0); |
| int port = startServer(accessor); |
| |
| createClientRegion(client, port, false, true, true); |
| accessor.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| getCache().getCacheTransactionManager().begin(); |
| custRegion.put(custId, new Customer("foo", "bar")); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| client.invoke(this::verifyCustomerRegion); |
| } |
| |
| @Test |
| public void testCQCommitInDatastoreConnectedToAccessorCQ() throws Exception { |
| initAccessorAndDataStore(accessor, dataStore, 0); |
| int port = startServer(accessor); |
| |
| createClientRegion(client, port, false, true, true); |
| dataStore.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| getCache().getCacheTransactionManager().begin(); |
| custRegion.put(custId, new Customer("foo", "bar")); |
| getCache().getCacheTransactionManager().commit(); |
| return null; |
| } |
| }); |
| |
| client.invoke(this::verifyCustomerRegion); |
| } |
| |
| private void verifyCustomerRegion() { |
| verifyCustomerRegionListenerInvocation(); |
| verifyCQListenerInvocation(); |
| } |
| |
| private void verifyCustomerRegionListenerInvocation() { |
| Region<CustId, Customer> customerRegion = getCache().getRegion(CUSTOMER); |
| ClientListener customerRegionListener = |
| (ClientListener) customerRegion.getAttributes().getCacheListeners()[0]; |
| GeodeAwaitility.await() |
| .untilAsserted(() -> assertThat(customerRegionListener.invoked).isTrue()); |
| } |
| |
| @Test |
| public void testCQEntryDestroyCommitInDataStoreConnectedToAccessorCQ() { |
| doCQWithDelete(false, true); |
| } |
| |
| @Test |
| public void testCQEntryDestroyCommitInAccessorConnectedToAccessorCQ() { |
| doCQWithDelete(false, false); |
| } |
| |
| @Test |
| public void testCQEntryDestroyCommitInDataStoreConnectedToDataStoreCQ() { |
| doCQWithDelete(true, true); |
| } |
| |
| @Test |
| public void testCQEntryDestroyCommitInAccessorConnectedToDataStoreCQ() { |
| doCQWithDelete(true, false); |
| } |
| |
| private void doCQWithDelete(boolean startCacheServerOnDataStore, |
| boolean executeTransactionOnDataStore) { |
| initAccessorAndDataStore(accessor, dataStore, 0); |
| int port = startServer(startCacheServerOnDataStore); |
| |
| createClientRegion(client, port, false, true, true); |
| client.invoke(this::registerCQForRefRegion); |
| |
| executeDeleteTransaction(executeTransactionOnDataStore); |
| client.invoke(this::verify); |
| } |
| |
| private int startServer(boolean onDataStore) { |
| if (onDataStore) { |
| return startServer(dataStore); |
| } |
| return startServer(accessor); |
| } |
| |
| private void executeDeleteTransaction(boolean onDataStore) { |
| if (onDataStore) { |
| dataStore.invoke(this::doDeleteTransaction); |
| } else { |
| accessor.invoke(this::doDeleteTransaction); |
| } |
| } |
| |
| private void registerCQForRefRegion() throws Exception { |
| CqAttributesFactory cqf = new CqAttributesFactory(); |
| cqf.addCqListener(new ClientCQListener()); |
| CqAttributes ca = cqf.create(); |
| String refRegionPath = getCache().getRegion(D_REFERENCE).getFullPath(); |
| getCache().getQueryService().newCq("SELECT * FROM " + refRegionPath, ca).execute(); |
| } |
| |
| private void doDeleteTransaction() { |
| Region<CustId, Customer> customerRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| getCache().getCacheTransactionManager().begin(); |
| doDelete(customerRegion, orderRegion, refRegion); |
| getCache().getCacheTransactionManager().commit(); |
| } |
| |
| private void doDelete(Region<CustId, Customer> customerRegion, Region<OrderId, Order> orderRegion, |
| Region<CustId, Customer> refRegion) { |
| CustId custId = new CustId(1); |
| OrderId orderId = new OrderId(1, custId); |
| customerRegion.destroy(custId); |
| orderRegion.destroy(orderId); |
| refRegion.destroy(custId); |
| } |
| |
| private void verify() { |
| Region<CustId, Customer> customerRegion = getCache().getRegion(CUSTOMER); |
| Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER); |
| Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE); |
| verifyListenerInvocation(customerRegion, orderRegion, refRegion); |
| verifyCQListenerInvocation(); |
| } |
| |
| private void verifyListenerInvocation(Region<CustId, Customer> customerRegion, |
| Region<OrderId, Order> orderRegion, |
| Region<CustId, Customer> refRegion) { |
| ClientListener customerRegionListener = |
| (ClientListener) customerRegion.getAttributes().getCacheListeners()[0]; |
| ClientListener orderRegionListener = |
| (ClientListener) orderRegion.getAttributes().getCacheListeners()[0]; |
| ClientListener refRegionListener = |
| (ClientListener) refRegion.getAttributes().getCacheListeners()[0]; |
| GeodeAwaitility.await() |
| .untilAsserted(() -> assertThat(customerRegionListener.invoked).isTrue()); |
| GeodeAwaitility.await().untilAsserted(() -> assertThat(orderRegionListener.invoked).isTrue()); |
| GeodeAwaitility.await().untilAsserted(() -> assertThat(refRegionListener.invoked).isTrue()); |
| } |
| |
| private void verifyCQListenerInvocation() { |
| CqQuery[] queries = getCache().getQueryService().getCqs(); |
| for (CqQuery query : queries) { |
| ClientCQListener listener = (ClientCQListener) query.getCqAttributes().getCqListener(); |
| GeodeAwaitility.await().atMost(10, TimeUnit.SECONDS) |
| .untilAsserted(() -> assertThat(listener.invoked).isTrue()); |
| } |
| } |
| } |